简介 Celery 是一款基于 python 的异步任务处理框架,用以实现分布式任务队列。官方文档
组件
worker (任务执行者),用来执行具体任务,可在多台服务器部署实现扩展,项目中我们使用 python 进行开发
broker (中间人),用来实现任务调度、worker 管理等功能;支持 RabbitMQ、Redis、Zookeeper 等中间件,项目中我们使用 redis
backend 用来存储任务结果,项目中我们使用 redis
application (应用),用来实例化 celery
tasks (任务),用来构建 application
实例化Celery 实例化一个 celery 应用,代码文件test_celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # 最简化构建一个 celery 应用,指定了 broker 和 backend from celery import Celery # 定义 broker 和 backend,分别为任务中间人和结果保存路径 BROKER = "redis://:@127.0.0.1:6379/3" BACKEND = "redis://:@127.0.0.1:6379/4" app = Celery("tasks",broker=BROKER,backend=BACKEND,) # 定义一个任务,名字为 add @app.task def add(x, y): c = x + y print('计算结果为: %d ' % c) return c @app.task(bind=True,max_retries=3) # 最大重试 3 次 def test_retry(self): print('执行 Celery 重试') raise self.retry(countdown=1) # 1 秒后执行重试 @app.task(bind=True) def test_fail(self): print('执行 Celery 失败') raise RuntimeError('测试 celery 失败')
以上代码,我们构建了一个 celery 应用,分别指定 broker 使用本地 redis 第三个库,backend 使用本地 redis 的第四个库;包含了一个名字为 add 的 task
现在,我们启动一个 celery worker
-A; 指定使用的 app
-l; 指定日志级别
-f; 指定日志文件
-D; 后台运行
–pidfile; 指定 pid 文件路径
-c; 启动子进程个数,默认为 cpu 核数
worker; 启动 worker1 $ celery -A test_celery worker -l info
注意:worker 启动会一直占用终端,可以使用 -D 参数放到后台
每个 worker 进程会一个进程池,包含多个子进程,默认为 cpu 核数,可以用 -c 指定,正常情况下,每台服务器只要启动一个 worker 即可。
发送一个celery任务 当前目录下编辑脚本 test_sender.py,内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # 脚本用来发送 celery 任务 from test_celery import * # 最简洁推送一个任务,不支持任何选项 add.delay(3,6) # 推送一个任务,第一个参数 (2,5) 为任务参数,必须为 tuple 或 list, # 如果任务只需要一个参数,必须添加逗号进行转换,格式 (var1,) # countdown=10,10 秒后开始执行 add.apply_async((2,5), countdown=10) # 参数的其他写法, add.apply_async(kwargs={'x':4, 'y':8}) add.s(5,6).apply_async() # 任务失败重试 test_retry.apply_async()
执行这个脚本
可以在 celery worker 终端里看到结果
任务组 提交一组任务执行
在 test_sender.py 中追加如下代码
1 2 3 4 5 # 任务组 from celery import group numbers = [(2, 2), (4, 4), (8, 8), (16, 16)] res = group(add.subtask(i) for i in numbers).apply_async() print(res.get())
任务链 在 test_sender.py 中追加如下代码
1 2 3 4 5 6 7 8 9 10 # 使用 link,将任务结果作为第一个参数传递到下一个任务 add.apply_async((2, 3), link=add.s(16)) # 同样,前一个任务结果作为下一个任务的第一个参数 from celery import chain res = chain(add.s(2, 2), add.s(4), add.s(8))() print(res.get()) # 使用管道符 (add.s(2, 2) | add.s(4) | add.s(8))().get()
如上都是推送任务到 celery 的方法,大家可以选择自己喜欢的方式进行推送
执行脚本,查看推送及执行情况
1 $ python3 test_celery.py
重写 celery 基类 重写 celery 基类,可以针对任务执行的过程的各个状态进行特殊处理
修改 test_celery.py,在 初始化 celery app 后,定义 MyTask,内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from celery import Task class MyTask(Task): abstract = True # 任务返回结果后执行 def after_return(self, *args, **kwargs): print('任务返回结果: {0!r}'.format(self.request)) # 任务执行失败是调用 def on_failure(self, exc, task_id, args, kwargs, einfo): print('任务执行失败') # 任务重试时调用 def on_retry(self, exc, task_id, args, kwargs, einfo): print('任务正在重试') # 任务成功时调用 def on_success(self, retval, task_id, args, kwargs): print('任务执行成功')
任务中使用自定义基类,修改 test_celery.py 中 add 任务如下
1 2 3 4 5 6 # 定义一个任务,名字为 add @app.task(bind=True,base=MyTask) def add(self, x, y): c = x + y print('计算结果为: %d ' % c) return c
现在推送 add 任务,celery 执行中将触发 MyTask 基类中的方法
celery Event 监控 celery 相关事件,可以定制一些 worker 进程报警,任务失败报警等功能 新建文件 test_event.py,内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 from celery import Celery import json import time from test_celery import app def my_monitor(app): state = app.events.State() # 处理事件 def announce_failed_tasks(event): state.event(event) # task name 仅与-received 事件一起发送,这里我们使用 state 跟踪该事件。 task = state.tasks.get(event['uuid']) print('任务失败: %s[%s] %s' % ( task.name, task.uuid, task.info(), )) # 处理事件 def announce_tasks_callback(event): print('任务状态:%s - %s - %s - %s' % (event.get('hostname'), event.get('type'), event.get('uuid'), time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp'))))) # 处理 worker 事件 def announce_worker(event): if event.get('type') == 'worker-heartbeat': print('心跳检测') elif event.get('type') == 'worker-offline': print('worker 下线:%s - %s - %s ' % (event.get('hostname'), event.get('type'), time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp'))))) elif event.get('type') == 'worker-online': print('worker 上线:%s - %s - %s ' % (event.get('hostname'), event.get('type'), time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp'))))) else: pass with app.connection() as connection: recv = app.events.Receiver(connection, handlers={ # 将 task 事件交给对应的处理函数 'task-failed': announce_failed_tasks, 'task-succeeded': announce_tasks_callback, 'task-received': announce_tasks_callback, 'task-revoked':announce_tasks_callback, 'task-retried': announce_tasks_callback, # 将 worker 事件交给对应的处理函数 'worker-online': announce_worker, 'worker-offline': announce_worker, }) recv.capture(limit=None, timeout=None, wakeup=True) if __name__ == '__main__': my_monitor(app)
修改文件 test_celery.py 配置项
1 2 3 4 5 # 在初始化 celery 后,配置两个参数,允许发送任务事件 app = Celery("tasks",broker=BROKER,backend=BACKEND,) # 添加下面两行 app.conf.task_send_sent_event = True app.conf.worker_send_task_events = True
重启 worker
1 2 # 重启 worker 让程序可以发送 worker 事件 $ celery -A test_celery worker -l info -n aa
启动 Event 监控程序
推送任务到 celery
1 $ python3 test_sender.py
再次重启 worker 我们在监控程序终端可以看到相关输出
定时任务 通过 celery 可以指定任务周期性完成,或者类似 crontab 之类的定时任务
celery 默认使用 utc 时区,所以我们需要修改到北京时区
修改 test_celery.py 文件
1 2 # 在 app 初始化完毕,进行 app 时区项配置 app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'
在文件 test_celery.py 追加下面代码块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from celery.schedules import crontab from datetime import timedelta app.conf.timezone = 'Asia/Shanghai' app.conf.beat_schedule = { 'task_every_30_seconds': { # 定时任务名称 'task': 'test_celery.add', # 调用 add 任务 'schedule': timedelta(seconds=5), # 每隔 5 秒执行一次 'args': (23, 56) # 作为参数传递进去 }, 'task_every_min': { 'task': 'test_celery.add', 'schedule': crontab(minute='*'), # 每分钟执行一次 'args': (24, 57) } }
我们通过设置 beat_schedule 参数,添加了两个任务调度,第一个名字 task_every_30_seconds,调用了 test_celery.add 任务,每隔 5 秒执行一次; 第二个名字 task_every_min,调用 test_celery.add 任务,每分钟执行一次。
crontab 表达式语法 | 语法 | 含义 | | —————————– | ————————– | | crontab(minute=0, hour=0) | 0:00 执行, | | crontab(minute=0, hour=’*/3’) | 每隔 3 小时执行一次 | | crontab(day_of_week=’sunday’) | 星期日,每分钟执行一次 | | crontab(day_of_month=’2’) | 每月第二天执行,每分钟一次 |
这个功能需要开启一下
1 2 3 # 关键词 beat, -l 指定日志级别为 debug # 进程同样会一直占用终端,方便我们查看输出信息,正式环境使用 --detach 放置到后台 $ celery -A test_celery beat -l debug
开启这个进程,程序会根据定义的时间周期将任务推送到任务池中,所以,必须开启相应的 celery worker 才能保证任务正常执行
开启任务后,会在当前目录下生成 celerybeat-schedule.db 文件
开启 worker
1 2 # 启动方式与普通 worker 一样 $ celery -A test_celery worker -l info
任务队列 开启任务队列,可以将任务推送到指定的 worker 中,在分布式任务中,指定主机执行特定任务会使用到
指定队列名称 1 2 3 4 5 # 在两个终端开启两个 celery workers,指定不同队列 -Q queue_name1,queue_name2 # 启动 celery worker 时指定队列, 参数 -Q 指定队列名称 # 一个 worker 启用多个队列,使用逗号分隔 $ celery -A test_celery worker -Q queue1 -l info $ celery -A test_celery worker -Q queue2 -l info
启动两个 worker,分别指定 queue1 queue2 队列
推送任务到指定队列 修改 test_sender.py 文件内容
1 2 3 4 5 6 7 8 9 10 11 from test_celery import * add.apply_async( (10,30), # 任务参数 queue='queue1' # 任务队列 ) add.apply_async( (11,31), # 任务参数 queue='queue2' # 任务队列 )
执行该脚本,大家可以在 worker 终端查看相关信息
在分布式环境中,队列主要用来将任务转发给不同的 celery worker 主机,实现不同环境的调用
celery 监控 1 2 # 安装 celery 监控插件 $ pip install flower
启动 3 个 worker,每个 worker 启动 4 个子进程
1 $ celery multi start 3 -A test_celery -l info -c 4 --pidfile=tmp/celery_%n.pid -f logs/celery.log
启动 flower
1 2 # -A 指定应用, --port 指定端口 $ celery flower -A test_celery --port=8080
添加任务
1 2 3 4 5 # 修改文件 test_sender.py from test_celery import * # 添加 100 个任务到 celery 中 for i in range(1,100): add.apply_async((10,30))
打开 web 服务,我们可以看到修改数据
celery 命令 查看 celery 相关信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 $ celery -A test_celery report software -> celery:4.3.0 (rhubarb) kombu:4.6.3 py:3.5.2 billiard:3.6.0.0 redis:3.2.1 platform -> system:Linux arch:64bit, ELF kernel version:4.15.0-48-generic imp:CPython loader -> celery.loaders.app.AppLoader settings -> transport:redis results:redis://127.0.0.1:6379/4 beat_schedule: { 'task_every_30_seconds': { 'args': (23, 56), 'schedule': datetime.timedelta(0, 5), 'task': 'test_celery.add'}, 'task_every_min': { 'args': (24, 57), 'schedule': <crontab: * * * * * (m/h/d/dM/MY)>, 'task': 'test_celery.add'}} broker_url: 'redis://127.0.0.1:6379/3' result_backend: 'redis://127.0.0.1:6379/4' timezone: 'Asia/Shanghai'
查看活动队列 1 2 3 4 5 $ celery -A test_celery inspect active_queues -> celery2@bogon: OK * {'name': 'celery', 'exchange': {'name': 'celery', 'type': 'direct', 'arguments': None, 'durable': True, 'passive': False, 'auto_delete': False, 'delivery_mode': None, 'no_declare': False}, 'routing_key': 'celery', 'queue_arguments': None, 'binding_arguments': None, 'consumer_arguments': None, 'durable': True, 'exclusive': False, 'auto_delete': False, 'no_ack': False, 'alias': None, 'bindings': [], 'no_declare': None, 'expires': None, 'message_ttl': None, 'max_length': None, 'max_length_bytes': None, 'max_priority': None} -> celery1@bogon: OK * {'name': 'celery', 'exchange': {'name': 'celery', 'type': 'direct', 'arguments': None, 'durable': True, 'passive': False, 'auto_delete': False, 'delivery_mode': None, 'no_declare': False}, 'routing_key': 'celery', 'queue_arguments': None, 'binding_arguments': None, 'consumer_arguments': None, 'durable': True, 'exclusive': False, 'auto_delete': False, 'no_ack': False, 'alias': None, 'bindings': [], 'no_declare': None, 'expires': None, 'message_ttl': None, 'max_length': None, 'max_length_bytes': None, 'max_priority': None}
检查状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 $ celery -A test_celery inspect stats -> celery@bogon: OK { "broker": { "alternates": [], "connect_timeout": 4, "failover_strategy": "round-robin", "heartbeat": 120.0, "hostname": "127.0.0.1", "insist": false, "login_method": null, "port": 6379, "ssl": false, "transport": "redis", "transport_options": {}, "uri_prefix": null, "userid": null, "virtual_host": "3" }, "clock": "2", "pid": 98737, ............
检查报告 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 $ celery -A test_celery inspect report -> celery@aa: OK software -> celery:4.3.0 (rhubarb) kombu:4.5.0 py:3.7.3 billiard:3.6.0.0 redis:3.2.1 platform -> system:Linux arch:64bit, ELF kernel version:3.10.0-957.el7.x86_64 imp:CPython loader -> celery.loaders.app.AppLoader settings -> transport:redis results:redis://127.0.0.1:6379/4 broker_url: 'redis://127.0.0.1:6379/3' result_backend: 'redis://127.0.0.1:6379/4' task_send_sent_event: True worker_send_task_events: True timezone: 'Asia/Shanghai' beat_schedule: { 'task_every_30_seconds': { 'args': (23, 56), 'schedule': datetime.timedelta(seconds=5), 'task': 'test_celery.add'}, 'task_every_min': { 'args': (24, 57), 'schedule': <crontab: * * * * * (m/h/d/dM/MY)>, 'task': 'test_celery.add'}} include: ('celery.app.builtins', 'test_celery')
其他 清除队列中的任务
1 2 3 4 5 6 7 8 $ celery -A test_celery purge WARNING: This will remove all tasks from queue: celery. There is no undo for this operation! (to skip this prompt use the -f option) Are you sure you want to delete all tasks (yes/NO)? yes Purged 10 messages from 1 known task queue.
发送 Ping
1 2 3 $ celery -A test_celery inspect ping -> celery@a2: OK pong
关闭 worker 进程
1 $ celery -A test_celery control shutdown
程序查看 celery 信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from test_celery import * # 获取所有节点 i = app.control.inspect() # 获取所有活跃节点 print('\33[31mCelery 获取节点: %s\33[0m' % i.active()) # 获取所有获取队列 print('\33[32mCelery 活跃队列:%s\33[0m' % i.active_queues()) # 发生 ping 包 print('\33[33mCelery 状态监测:%s \33[0m' % i.ping()) # 添加一个延时任务 add.apply_async((2,3), countdown=5) # 查看计划任务,(并非 cron 任务) print('\33[31mCelery 延时任务:%s\33[0m' % i.scheduled()) # 查看状态 print('\33[32mCelery 状态:%s\33[0m' % i.stats()) # 关闭所有节点 app.control.broadcast('shutdown')
动态加载 celery pool 个数 –autoscale=10,2 其中 10 为最大个数,2 为最小个数,用逗号分隔
1 2 3 4 5 6 7 8 9 10 11 12 13 14 $ celery -A test_celery worker --autoscale=10,2 -n au -------------- celery@user-166146.weave.local v4.3.0 (rhubarb) ---- **** ----- --- * *** * -- Linux-4.15.0-48-generic-x86_64-with-Ubuntu-16.04-xenial 2019-08-01 15:17:54 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f8f64f1ac18 - ** ---------- .> transport: redis://127.0.0.1:6379/3 - ** ---------- .> results: redis://127.0.0.1:6379/4 - *** --- * --- .> concurrency: {min=2, max=10} (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
查看 celery worker 集群中存活的节点
1 $ celery -A test_celery status
通过 redis 检查队列 获取所有可用队列
1 2 3 4 5 6 # redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \* $ redis-cli -n 3 keys \* 1) "_kombu.binding.celery" 2) "_kombu.binding.celeryev" 3) "unacked_mutex" 4) "celery"
查看队列任务数
1 2 3 4 # redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME $ redis-cli -n 3 llen celery (integer) 10 # 队列中包含 10 个任务
代码 文件 test_celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 # 最简化构建一个 celery 应用,指定了 broker 和 backend from celery import Celery # 定义 broker 和 backend,分别为任务中间人和结果保存路径 BROKER = "redis://:@127.0.0.1:6379/3" BACKEND = "redis://:@127.0.0.1:6379/4" app = Celery("tasks",broker=BROKER,backend=BACKEND,) app.conf.task_send_sent_event = True app.conf.worker_send_task_events = True from celery.schedules import crontab from datetime import timedelta app.conf.timezone = 'Asia/Shanghai' app.conf.beat_schedule = { 'task_every_30_seconds': { 'task': 'test_celery.add', # 调用 add 任务 'schedule': timedelta(seconds=5), # 每隔 5 秒执行一次 'args': (23, 56) # 作为参数传递进去 }, 'task_every_min': { 'task': 'test_celery.add', 'schedule': crontab(minute='*'), # 每分钟执行一次 'args': (24, 57) } } from celery import Task class MyTask(Task): abstract = True # 任务返回结果后执行 def after_return(self, *args, **kwargs): print('任务返回结果: {0!r}'.format(self.request)) # 任务执行失败是调用 def on_failure(self, exc, task_id, args, kwargs, einfo): print('任务执行失败') # 任务重试时调用 def on_retry(self, exc, task_id, args, kwargs, einfo): print('任务正在重试') # 任务成功时调用 def on_success(self, retval, task_id, args, kwargs): print('任务执行成功') from celery.result import AsyncResult @app.task def error_handler(uuid): result = AsyncResult(uuid) print('任务 %s 执行失败raised exception: ' % uuid) # 定义一个任务,名字为 add @app.task(bind=True,base=MyTask) def add(self, x, y): c = x + y print('计算结果为: %d ' % c) return c @app.task(bind=True,max_retries=3) # 最大重试 3 次 def test_retry(self): print('执行 Celery 重试') raise self.retry(countdown=1) # 1 秒后执行重试 @app.task(bind=True) def test_fail(self): print('执行 Celery 失败') raise RuntimeError('测试 celery 失败')
文件 test_event.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 from celery import Celery import json import time from test_celery import app # 设置运行 tasks 发送事件 def my_monitor(app): state = app.events.State() # 处理事件 def announce_failed_tasks(event): state.event(event) # task name 仅与-received 事件一起发送,使用 state 跟踪该事件。 task = state.tasks.get(event['uuid']) print('任务失败: %s[%s] %s' % ( task.name, task.uuid, task.info(), )) # 处理事件 def announce_tasks_callback(event): print('任务状态:%s - %s - %s - %s' % (event.get('hostname'), event.get('type'), event.get('uuid'), time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp'))))) # 处理 worker 事件 def announce_worker(event): if event.get('type') == 'worker-heartbeat': print('心跳检测') elif event.get('type') == 'worker-offline': print('worker 下线:%s - %s - %s ' % (event.get('hostname'), event.get('type'), time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp'))))) elif event.get('type') == 'worker-online': print('worker 上线:%s - %s - %s ' % (event.get('hostname'), event.get('type'), time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp'))))) else: pass with app.connection() as connection: recv = app.events.Receiver(connection, handlers={ # 将 task 事件交给对应的处理函数 'task-failed': announce_failed_tasks, 'task-succeeded': announce_tasks_callback, 'task-received': announce_tasks_callback, 'task-revoked':announce_tasks_callback, 'task-retried': announce_tasks_callback, # 将 worker 事件交给对应的处理函数 'worker-online': announce_worker, 'worker-offline': announce_worker, }) recv.capture(limit=None, timeout=None, wakeup=True) if __name__ == '__main__': my_monitor(app)
文件 test_sender.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from test_celery import * add.delay(3 ,6 ) add.apply_async((2 ,5 ), countdown=10 ) add.apply_async(kwargs={'x' :4 , 'y' :8 }) add.s(5 ,6 ).apply_async() test_retry.apply_async() test_fail.apply_async(link_error=error_handler.s()) from celery import groupnumbers = [(2 , 2 ), (4 , 4 ), (8 , 8 ), (16 , 16 )] res = group(add.subtask(i) for i in numbers).apply_async() print(res.get()) add.apply_async((2 , 3 ), link=add.s(16 )) from celery import chainres = chain(add.s(2 , 2 ), add.s(4 ), add.s(8 ))() print(res.get()) (add.s(2 , 2 ) | add.s(4 ) | add.s(8 ))().get()