Skip to content Skip to main navigation Skip to footer

Python: gevent源代码分析之用gevent threadpool实现多进程任务调度

这两天出了两个事故,一个是因为正负面的接口被被人疯狂访问而变得堵塞,导致整个动态网页解析解析的崩溃,最后redis挂掉。 还有一个事情是动态ip轮询模块的bug。。。 多事之秋呀。  这几个晚上一直尝试看gevent的源代码,收获特别的大,gevent本身的一些实现就特别的灵巧,背靠这内核的epoll调度,实现一系列的io调度非堵塞。 

大家知道gevent是协程,协程是用户态层面自己解决io堵塞的问题,他会把每次堵塞时间都注册是epoll里面。 那么,gevent是单线程的形态,那么gevent可以实现多进程么? 今天看了下threadpool的相关代码,发现光看他的名字的话,是多线程的意思 。 thread就一线程的意思。   python的线程就是个坑爹的东西,我现在的服务端架构基本是多进程加多协程的架构。 言归正传,gevent是如何实现多进程?   看了下官方的issue,有人提出用threadpool实现。  简单过了下代码,他是利用os.fork 来派生多进程的,具体的调度我就多说了,主要是在from gevent.hub import get_hub这里。 

gevent的文章, http://xiaorui.cc/?p=1530

Python: gevent源代码分析之用gevent threadpool实现多进程任务调度
Python: gevent源代码分析之用gevent threadpool实现多进程任务调度

下面是我测试的表现,效果还是可以的,计算任务跑的还算平均。 测试是在一个云主机测试的,有些片面性。 

测试的代码如下:

Python

import time
import gevent
from gevent.threadpool import ThreadPool
def sum(i):
    i = 0
    for i in xrange(100000000):
        i += i+1 * 20 / 10 * 10 /10
def test(n,m):
    m=m
    vals = []
    keys = []
    for i in xrange(m):
        vals.append(i)
        keys.append('a%s'%i)
    d = None
    for i in xrange(n):
        d = dict(zip(keys, vals))
    return d
pool = ThreadPool(20)
start = time.time()
for _ in xrange(10):
    pool.spawn(test, 1000000,100)
gevent.wait()
delay = time.time() - start
print 'Running "time.sleep(1)>" 4 times with 3 threads. Should take about 2 seconds: %.3fs' % delay 

下面是Threadpool 的部分源代码,set_size是创建进程的数目,on_fork是fork进程,kill是干掉进程,spawn是触发进程,add_thread是增加进程。 

Python

def _set_size(self, size):
    if size < 0:
        raise ValueError('Size of the pool cannot be negative: %r' % (size, ))
    if size > self._maxsize:
        raise ValueError('Size of the pool cannot be bigger than maxsize: %r &gt; %r' % (size, self._maxsize))
    if self.manager:
        self.manager.kill()
    while self._size < size:
        self._add_thread()
    delay = 0.0001
    while self._size > size:
        while self._size - size &gt; self.task_queue.unfinished_tasks:
            self.task_queue.put(None)
        if getcurrent() is self.hub:
            break
        sleep(delay)
        delay = min(delay * 2, .05)
    if self._size:
        self.fork_watcher.start(self._on_fork)
    else:
        self.fork_watcher.stop()
size = property(_get_size, _set_size)
def _init(self, maxsize):
    self._size = 0
    self._semaphore = Semaphore(1)
    self._lock = Lock()
    self.task_queue = Queue()
    self._set_maxsize(maxsize)
def _on_fork(self):
    # fork() only leaves one thread; also screws up locks;
    # let's re-create locks and threads
    pid = os.getpid()
    if pid != self.pid:
        self.pid = pid
        # Do not mix fork() and threads; since fork() only copies one thread
        # all objects referenced by other threads has refcount that will never
        # go down to 0.
        self._init(self._maxsize)
def join(self):
    delay = 0.0005
    while self.task_queue.unfinished_tasks &gt; 0:
        sleep(delay)
        delay = min(delay * 2, .05)
def kill(self):
    self.size = 0
def _adjust_step(self):
    # if there is a possibility &amp; necessity for adding a thread, do it
    while self._size < self._maxsize and self.task_queue.unfinished_tasks > self._size:
        self._add_thread()
    # while the number of threads is more than maxsize, kill one
    # we do not check what's already in task_queue - it could be all Nones
    while self._size - self._maxsize &gt; self.task_queue.unfinished_tasks:
        self.task_queue.put(None)
    if self._size:
        self.fork_watcher.start(self._on_fork)
    else:
        self.fork_watcher.stop()
def _adjust_wait(self):
    delay = 0.0001
    while True:
        self._adjust_step()
        if self._size <= self._maxsize:
            return
        sleep(delay)
        delay = min(delay * 2, .05)
def adjust(self):
    self._adjust_step()
    if not self.manager and self._size > self._maxsize:
        # might need to feed more Nones into the pool
        self.manager = Greenlet.spawn(self._adjust_wait)
def _add_thread(self):
    with self._lock:
        self._size += 1
    try:
        start_new_thread(self._worker, ())
    except:
        with self._lock:
            self._size -= 1
        raise
def spawn(self, func, *args, **kwargs):
    while True:
        semaphore = self._semaphore
        semaphore.acquire()
        if semaphore is self._semaphore:
            break
    try:
        task_queue = self.task_queue
        result = AsyncResult()
        thread_result = ThreadResult(result, hub=self.hub)
        task_queue.put((func, args, kwargs, thread_result))
        self.adjust()
        # rawlink() must be the last call
        result.rawlink(lambda *args: self._semaphore.release())
        # XXX this _semaphore.release() is competing for order with get()
        # XXX this is not good, just make ThreadResult release the semaphore before doing anything else
    except:
        semaphore.release()
        raise
    return result 

原文:http://xiaorui.cc/2015/05/29/gevent源代码分析之用gevent-threadpool实现多进程任务调度/

0 Comments

There are no comments yet

Leave a comment

Your email address will not be published.