Python: gevent源代码分析之用gevent threadpool实现多进程任务调度
May 29, 2015
这两天出了两个事故,一个是因为正负面的接口被被人疯狂访问而变得堵塞,导致整个动态网页解析解析的崩溃,最后redis挂掉。 还有一个事情是动态ip轮询模块的bug。。。 多事之秋呀。 这几个晚上一直尝试看gevent的源代码,收获特别的大,gevent本身的一些实现就特别的灵巧,背靠这内核的epoll调度,实现一系列的io调度非堵塞。
大家知道gevent是协程,协程是用户态层面自己解决io堵塞的问题,他会把每次堵塞时间都注册是epoll里面。 那么,gevent是单线程的形态,那么gevent可以实现多进程么? 今天看了下threadpool的相关代码,发现光看他的名字的话,是多线程的意思 。 thread就一线程的意思。 python的线程就是个坑爹的东西,我现在的服务端架构基本是多进程加多协程的架构。 言归正传,gevent是如何实现多进程? 看了下官方的issue,有人提出用threadpool实现。 简单过了下代码,他是利用os.fork 来派生多进程的,具体的调度我就多说了,主要是在from gevent.hub import get_hub这里。
gevent的文章, http://xiaorui.cc/?p=1530
下面是我测试的表现,效果还是可以的,计算任务跑的还算平均。 测试是在一个云主机测试的,有些片面性。
测试的代码如下:
Python
import time import gevent from gevent.threadpool import ThreadPool def sum(i): i = 0 for i in xrange(100000000): i += i+1 * 20 / 10 * 10 /10 def test(n,m): m=m vals = [] keys = [] for i in xrange(m): vals.append(i) keys.append('a%s'%i) d = None for i in xrange(n): d = dict(zip(keys, vals)) return d pool = ThreadPool(20) start = time.time() for _ in xrange(10): pool.spawn(test, 1000000,100) gevent.wait() delay = time.time() - start print 'Running "time.sleep(1)>" 4 times with 3 threads. Should take about 2 seconds: %.3fs' % delay
下面是Threadpool 的部分源代码,set_size是创建进程的数目,on_fork是fork进程,kill是干掉进程,spawn是触发进程,add_thread是增加进程。
Python
def _set_size(self, size): if size < 0: raise ValueError('Size of the pool cannot be negative: %r' % (size, )) if size > self._maxsize: raise ValueError('Size of the pool cannot be bigger than maxsize: %r > %r' % (size, self._maxsize)) if self.manager: self.manager.kill() while self._size < size: self._add_thread() delay = 0.0001 while self._size > size: while self._size - size > self.task_queue.unfinished_tasks: self.task_queue.put(None) if getcurrent() is self.hub: break sleep(delay) delay = min(delay * 2, .05) if self._size: self.fork_watcher.start(self._on_fork) else: self.fork_watcher.stop() size = property(_get_size, _set_size) def _init(self, maxsize): self._size = 0 self._semaphore = Semaphore(1) self._lock = Lock() self.task_queue = Queue() self._set_maxsize(maxsize) def _on_fork(self): # fork() only leaves one thread; also screws up locks; # let's re-create locks and threads pid = os.getpid() if pid != self.pid: self.pid = pid # Do not mix fork() and threads; since fork() only copies one thread # all objects referenced by other threads has refcount that will never # go down to 0. self._init(self._maxsize) def join(self): delay = 0.0005 while self.task_queue.unfinished_tasks > 0: sleep(delay) delay = min(delay * 2, .05) def kill(self): self.size = 0 def _adjust_step(self): # if there is a possibility & necessity for adding a thread, do it while self._size < self._maxsize and self.task_queue.unfinished_tasks > self._size: self._add_thread() # while the number of threads is more than maxsize, kill one # we do not check what's already in task_queue - it could be all Nones while self._size - self._maxsize > self.task_queue.unfinished_tasks: self.task_queue.put(None) if self._size: self.fork_watcher.start(self._on_fork) else: self.fork_watcher.stop() def _adjust_wait(self): delay = 0.0001 while True: self._adjust_step() if self._size <= self._maxsize: return sleep(delay) delay = min(delay * 2, .05) def adjust(self): self._adjust_step() if not self.manager and self._size > self._maxsize: # might need to feed more Nones into the pool self.manager = Greenlet.spawn(self._adjust_wait) def _add_thread(self): with self._lock: self._size += 1 try: start_new_thread(self._worker, ()) except: with self._lock: self._size -= 1 raise def spawn(self, func, *args, **kwargs): while True: semaphore = self._semaphore semaphore.acquire() if semaphore is self._semaphore: break try: task_queue = self.task_queue result = AsyncResult() thread_result = ThreadResult(result, hub=self.hub) task_queue.put((func, args, kwargs, thread_result)) self.adjust() # rawlink() must be the last call result.rawlink(lambda *args: self._semaphore.release()) # XXX this _semaphore.release() is competing for order with get() # XXX this is not good, just make ThreadResult release the semaphore before doing anything else except: semaphore.release() raise return result
原文:http://xiaorui.cc/2015/05/29/gevent源代码分析之用gevent-threadpool实现多进程任务调度/
0 Comments