Python: django中定期执行任务
June 2, 2015
最近需要考虑如何在django环境中跑定时任务. 这个在 stackoverflow也有对应的 讨论, 方法也有不少, 这边简单尝试和总结下.
假设我们现在的定期任务就是睡眠 n秒, 然后往数据库里面写一条记录, 记录这个任务的起始以及结束时间, 并且我们 不关心该任务的返回结果. 项目名称为
from django.db import models from django.utils import timezone # Create your models here. class SimpleTask ( models . Model ): task_name = models . CharField ( max_length = 200 ) start_time = models . DateTimeField ( 'task begin time' , default = timezone . now ) finish_time = models . DateTimeField ( 'task end time' , blank = True , null = True )
Table of Contents
最朴素的方法
考虑一个最最朴素的实现: 起一个简单的daemon程序, 每隔一定时间进行任务处理; 或者是写一个简单的程序, 利用
简单的改进
自己重新写一个程序有时候代价还是挺大的, 尤其是业务逻辑十分复杂的时候, 所以可以考虑复用已有的代码. 如果考虑
- 实现一个api, 定期请求
- 扩展manage.py
方法一没什么好说的, 但是需要考虑访问控制; 方法二的话也很简单, 可以参考 自定义commands. 无论用哪种方式, 我们可以先实现对应的任务处理逻辑:
def run_simple_task ( task_name ): task = SimpleTask ( task_name = task_name ) task . save () ## do a lot lot of stuff...... seconds = random . randint ( 5 , 15 ) time . sleep ( seconds ) task . finish_time = timezone . now () task . save () return seconds
针对方法一, 实现对应的view:
from django.http import HttpResponse from .models import run_simple_task def do_task ( request , task_name ): seconds = run_simple_task ( task_name ) return HttpResponse ( "I have done task in %d sec: %s >" % ( seconds , task_name ))
添加对应的route:
urlpatterns = [ ## add one url ( r'^task/(?P<task_name>w+)' , views . do_task , name = 'do_task' ), ]
我们手动访问试一下:
针对方法二, 扩展
from django.core.management.base import BaseCommand , CommandError from mma_cron.models import run_simple_task class Command ( BaseCommand ): help = 'Run the simple command' def add_arguments ( self , parser ): pass def handle ( self , * args , ** options ): seconds = run_simple_task ( 'run from manage.py' ) self . stdout . write ( "task done>" )
再手动试一下:
至于定期可以通过设定cron来实现.
插件
上面的实现比较简便, 也很有效. django的插件也有对应的实现, 这里着重介绍一下 django-cron. 它类似使用了上面提到的方法二, 在此之上增加了 任务执行检查, 日志记录等功能. 我们也来看一下;)
安装的话推荐直接把源码下的
python manage.py migrate django_cron
该操作是创建对应的数据库, 之后执行定时任务时会把 时间和 相关结果保存到数据库中. 接下来我们需要在
from django.utils import timezone from django_cron import CronJobBase , Schedule from .models import run_simple_task class SimpleTaskCronJob ( CronJobBase ): RUN_EVERY_MINS = 1 schedule = Schedule ( run_every_mins = RUN_EVERY_MINS ) code = 'mma_cron.cron.simple_task_cron_job' def do ( self ): seconds = run_simple_task ( 'task from django-cron' ) msg = "I have done task in %s sec: %s >" return ( "I have done task in %s sec: %s >" % ( seconds , task_name ))
可以看到里面定义了该定时任务的 处理间隔与相应动作. 最后我们还需要在
CRON_CLASSES = [ "mma_cron.cron.SimpleTaskCronJob>" , ]
大功告成, 接下来我们只需在执行
值得一提的是如果注册了多个任务, 这些任务默认是串行执行的(考虑到安全性). 如果想要并行执行需要改一些设置, 详见文档.
Celery
上述几种方法都比较简单, 但是我们还有其他的方式;) 比如很多人都提到的
Celery. 给我的感觉它很像 gearman和 sidekiq, 类似一个任务分发和处理框架. 利用它的 Periodic Tasks, 可以实现定期发布任务让worker取执行任务. 我们也来简单看看.
简单尝试
我们先不管django, 先感性的了解下Celery. Celery的 任务分发和 结果存储需要依赖外部组件, 可选的有 RabbitMQ, Redis, 数据库等等. 简单起见, 这里选择使用 redis( RabbitMQ部署起来还是稍微复杂了点).
首先根据官方的 教程, 创建个分发 加法和 乘法任务的系统. 先来看看 worker的代码:
from __future__ import absolute_import from proj.celery import app @app.task def add ( x , y ): return x + y @app.task def mul ( x , y ): return x * y @app.task def xsum ( numbers ): return sum ( numbers )
再来看看 celery的设置代码:
from __future__ import absolute_import from celery import Celery from datetime import timedelta app = Celery ( 'proj' , broker = 'redis://localhost' , backend = 'redis://localhost' , include = [ 'proj.tasks' ]) ## 这段代码可以先忽略;) app . conf . update ( CELERY_TASK_RESULT_EXPIRES = 3600 , CELERYBEAT_SCHEDULE = { 'add-every-30-seconds' : { 'task' : 'proj.tasks.add' , 'schedule' : timedelta ( seconds = 30 ), 'args' : ( 16 , 32 ), }, }, ) if __name__ == '__main__' : app . start ()
我们先启动 redis, 然后通过执行
>>> import proj.tasks >>> result = proj.tasks.add.delay( 2,2) >>> result.get() 4
这样就创建一个任务, 我们在celery的控制台(worker进程)可以看到这样的输出:
Received task: proj.tasks.add[ a24a9792-a7c6-4f64-994d-ca6903b3182c] Task proj.tasks.add[ a24a9792-a7c6-4f64-994d-ca6903b3182c] succeeded in 0.0018904690005s: 4
很直观~ 我们同时也可以看看它在redis里面是什么个样子:
127.0.0.1:6379> keys * 1) "celery-task-meta-a24a9792-a7c6-4f64-994d-ca6903b3182c>" 2) "_kombu.binding.celery.pidbox>" 3) "_kombu.binding.celery>" 4) "_kombu.binding.celeryev>" 127.0.0.1:6379> TYPE _kombu.binding.celery set
现在我们是手动来创建任务, 我们可以启动一个定时生产任务的生产者
结合django
理解了celery, 再结合django就相对比较简单了. 推荐阅读一下官方的 文档. 结合我们的SimpleTask的例子, 先设定下celery的任务:
from __future__ import absolute_import import os from celery import Celery from datetime import timedelta os . environ . setdefault ( 'DJANGO_SETTINGS_MODULE' , 'mmtest.settings' ) from django.conf import settings app = Celery ( 'mmtest' , broker = 'redis://localhost' ) app . config_from_object ( 'django.conf:settings' ) ## 自动发现任务: APP/tasks.py app . autodiscover_tasks ( lambda : settings . INSTALLED_APPS ) ## 设置定时任务参数 app . conf . update ( CELERYBEAT_SCHEDULE = { 'do-task-every-30-seconds' : { 'task' : 'mma_cron.tasks.do_task' , 'schedule' : timedelta ( seconds = 30 ), }, }, ) @app.task ( bind = True ) def debug_task ( self ): print ( 'Request: {0!r}' . format ( self . request ))
接下来我们需要实现一下我们的worker:
from __future__ import absolute_import from celery import shared_task from .cron import run_simple_task @shared_task def do_task (): run_simple_task ( 'run by celery' )
不需要其他的工作, 我们就完成了我们的目的. 接下来就是分别启动 worker和 beat了. 这也实现了我们的定时任务的目的(就我们这个例子有点大材小用了).
选择
就我个人观点, 我比较倾向于改进后的朴素方法. 原因就是业务比较简单, 比较轻量级.
整个过程还是学到了不少东西. 继续学习python~
原文:http://blog.aka-cool.net/blog/2015/05/14/cron-job-in-django/
0 Comments