0%

celery基础笔记

简介

Celery 是一款基于 python 的异步任务处理框架,用以实现分布式任务队列。
官方文档

组件

  • worker (任务执行者),用来执行具体任务,可在多台服务器部署实现扩展,项目中我们使用 python 进行开发
  • broker (中间人),用来实现任务调度、worker 管理等功能;支持 RabbitMQ、Redis、Zookeeper 等中间件,项目中我们使用 redis
  • backend 用来存储任务结果,项目中我们使用 redis
  • application (应用),用来实例化 celery
  • tasks (任务),用来构建 application

实例化Celery

实例化一个 celery 应用,代码文件test_celery.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 最简化构建一个 celery 应用,指定了 broker 和 backend
from celery import Celery
# 定义 broker 和 backend,分别为任务中间人和结果保存路径
BROKER = "redis://:@127.0.0.1:6379/3"
BACKEND = "redis://:@127.0.0.1:6379/4"

app = Celery("tasks",broker=BROKER,backend=BACKEND,)

# 定义一个任务,名字为 add
@app.task
def add(x, y):
c = x + y
print('计算结果为: %d ' % c)
return c

@app.task(bind=True,max_retries=3) # 最大重试 3 次
def test_retry(self):
print('执行 Celery 重试')
raise self.retry(countdown=1) # 1 秒后执行重试

@app.task(bind=True)
def test_fail(self):
print('执行 Celery 失败')
raise RuntimeError('测试 celery 失败')

以上代码,我们构建了一个 celery 应用,分别指定 broker 使用本地 redis 第三个库,backend 使用本地 redis 的第四个库;包含了一个名字为 add 的 task

现在,我们启动一个 celery worker

  • -A; 指定使用的 app
  • -l; 指定日志级别
  • -f; 指定日志文件
  • -D; 后台运行
  • –pidfile; 指定 pid 文件路径
  • -c; 启动子进程个数,默认为 cpu 核数
  • worker; 启动 worker
    1
    $ celery -A test_celery worker -l info

注意:worker 启动会一直占用终端,可以使用 -D 参数放到后台

每个 worker 进程会一个进程池,包含多个子进程,默认为 cpu 核数,可以用 -c 指定,正常情况下,每台服务器只要启动一个 worker 即可。

发送一个celery任务

当前目录下编辑脚本 test_sender.py,内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 脚本用来发送 celery 任务

from test_celery import *

# 最简洁推送一个任务,不支持任何选项
add.delay(3,6)
# 推送一个任务,第一个参数 (2,5) 为任务参数,必须为 tuple 或 list,
# 如果任务只需要一个参数,必须添加逗号进行转换,格式 (var1,)
# countdown=10,10 秒后开始执行
add.apply_async((2,5), countdown=10)
# 参数的其他写法,
add.apply_async(kwargs={'x':4, 'y':8})
add.s(5,6).apply_async()

# 任务失败重试
test_retry.apply_async()

执行这个脚本

1
$ python test_sender.py

可以在 celery worker 终端里看到结果

任务组

提交一组任务执行

在 test_sender.py 中追加如下代码

1
2
3
4
5
# 任务组
from celery import group
numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
res = group(add.subtask(i) for i in numbers).apply_async()
print(res.get())

任务链

在 test_sender.py 中追加如下代码

1
2
3
4
5
6
7
8
9
10
# 使用 link,将任务结果作为第一个参数传递到下一个任务
add.apply_async((2, 3), link=add.s(16))

# 同样,前一个任务结果作为下一个任务的第一个参数
from celery import chain
res = chain(add.s(2, 2), add.s(4), add.s(8))()
print(res.get())

# 使用管道符
(add.s(2, 2) | add.s(4) | add.s(8))().get()

如上都是推送任务到 celery 的方法,大家可以选择自己喜欢的方式进行推送

执行脚本,查看推送及执行情况

1
$ python3 test_celery.py

重写 celery 基类

重写 celery 基类,可以针对任务执行的过程的各个状态进行特殊处理

修改 test_celery.py,在 初始化 celery app 后,定义 MyTask,内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from celery import Task

class MyTask(Task):
abstract = True

# 任务返回结果后执行
def after_return(self, *args, **kwargs):
print('任务返回结果: {0!r}'.format(self.request))

# 任务执行失败是调用
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('任务执行失败')

# 任务重试时调用
def on_retry(self, exc, task_id, args, kwargs, einfo):
print('任务正在重试')

# 任务成功时调用
def on_success(self, retval, task_id, args, kwargs):
print('任务执行成功')

任务中使用自定义基类,修改 test_celery.py 中 add 任务如下

1
2
3
4
5
6
# 定义一个任务,名字为 add
@app.task(bind=True,base=MyTask)
def add(self, x, y):
c = x + y
print('计算结果为: %d ' % c)
return c

现在推送 add 任务,celery 执行中将触发 MyTask 基类中的方法

celery Event

监控 celery 相关事件,可以定制一些 worker 进程报警,任务失败报警等功能
新建文件 test_event.py,内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from celery import Celery
import json
import time
from test_celery import app

def my_monitor(app):
state = app.events.State()

# 处理事件
def announce_failed_tasks(event):
state.event(event)
# task name 仅与-received 事件一起发送,这里我们使用 state 跟踪该事件。
task = state.tasks.get(event['uuid'])

print('任务失败: %s[%s] %s' % (
task.name, task.uuid, task.info(), ))
# 处理事件
def announce_tasks_callback(event):
print('任务状态:%s - %s - %s - %s' %
(event.get('hostname'), event.get('type'), event.get('uuid'),
time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp')))))

# 处理 worker 事件
def announce_worker(event):
if event.get('type') == 'worker-heartbeat':
print('心跳检测')
elif event.get('type') == 'worker-offline':
print('worker 下线:%s - %s - %s ' %
(event.get('hostname'), event.get('type'),
time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp')))))
elif event.get('type') == 'worker-online':
print('worker 上线:%s - %s - %s ' %
(event.get('hostname'), event.get('type'),
time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp')))))
else: pass

with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
# 将 task 事件交给对应的处理函数
'task-failed': announce_failed_tasks,
'task-succeeded': announce_tasks_callback,
'task-received': announce_tasks_callback,
'task-revoked':announce_tasks_callback,
'task-retried': announce_tasks_callback,

# 将 worker 事件交给对应的处理函数
'worker-online': announce_worker,
'worker-offline': announce_worker,
})
recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
my_monitor(app)

修改文件 test_celery.py 配置项

1
2
3
4
5
# 在初始化 celery 后,配置两个参数,允许发送任务事件
app = Celery("tasks",broker=BROKER,backend=BACKEND,)
# 添加下面两行
app.conf.task_send_sent_event = True
app.conf.worker_send_task_events = True

重启 worker

1
2
# 重启 worker 让程序可以发送 worker 事件
$ celery -A test_celery worker -l info -n aa

启动 Event 监控程序

1
$ python3 test_event.py

推送任务到 celery

1
$ python3 test_sender.py

再次重启 worker
我们在监控程序终端可以看到相关输出

定时任务

通过 celery 可以指定任务周期性完成,或者类似 crontab 之类的定时任务

celery 默认使用 utc 时区,所以我们需要修改到北京时区

修改 test_celery.py 文件

1
2
# 在 app 初始化完毕,进行 app 时区项配置
app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'

在文件 test_celery.py 追加下面代码块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery.schedules import crontab
from datetime import timedelta
app.conf.timezone = 'Asia/Shanghai'
app.conf.beat_schedule = {
'task_every_30_seconds': { # 定时任务名称
'task': 'test_celery.add', # 调用 add 任务
'schedule': timedelta(seconds=5), # 每隔 5 秒执行一次
'args': (23, 56) # 作为参数传递进去
},
'task_every_min': {
'task': 'test_celery.add',
'schedule': crontab(minute='*'), # 每分钟执行一次
'args': (24, 57)
}
}

我们通过设置 beat_schedule 参数,添加了两个任务调度,第一个名字 task_every_30_seconds,调用了 test_celery.add 任务,每隔 5 秒执行一次; 第二个名字 task_every_min,调用 test_celery.add 任务,每分钟执行一次。

crontab 表达式语法
| 语法 | 含义 |
| —————————– | ————————– |
| crontab(minute=0, hour=0) | 0:00 执行, |
| crontab(minute=0, hour=’*/3’) | 每隔 3 小时执行一次 |
| crontab(day_of_week=’sunday’) | 星期日,每分钟执行一次 |
| crontab(day_of_month=’2’) | 每月第二天执行,每分钟一次 |

这个功能需要开启一下

1
2
3
# 关键词 beat, -l 指定日志级别为 debug
# 进程同样会一直占用终端,方便我们查看输出信息,正式环境使用 --detach 放置到后台
$ celery -A test_celery beat -l debug

开启这个进程,程序会根据定义的时间周期将任务推送到任务池中,所以,必须开启相应的 celery worker 才能保证任务正常执行

开启任务后,会在当前目录下生成 celerybeat-schedule.db 文件

开启 worker

1
2
# 启动方式与普通 worker 一样
$ celery -A test_celery worker -l info

任务队列

开启任务队列,可以将任务推送到指定的 worker 中,在分布式任务中,指定主机执行特定任务会使用到

指定队列名称

1
2
3
4
5
# 在两个终端开启两个 celery workers,指定不同队列 -Q queue_name1,queue_name2 
# 启动 celery worker 时指定队列, 参数 -Q 指定队列名称
# 一个 worker 启用多个队列,使用逗号分隔
$ celery -A test_celery worker -Q queue1 -l info
$ celery -A test_celery worker -Q queue2 -l info

启动两个 worker,分别指定 queue1 queue2 队列

推送任务到指定队列

修改 test_sender.py 文件内容

1
2
3
4
5
6
7
8
9
10
11
from test_celery import *

add.apply_async(
(10,30), # 任务参数
queue='queue1' # 任务队列
)

add.apply_async(
(11,31), # 任务参数
queue='queue2' # 任务队列
)

执行该脚本,大家可以在 worker 终端查看相关信息

在分布式环境中,队列主要用来将任务转发给不同的 celery worker 主机,实现不同环境的调用

celery 监控

1
2
# 安装 celery 监控插件
$ pip install flower

启动 3 个 worker,每个 worker 启动 4 个子进程

1
$ celery multi start 3 -A test_celery -l info -c 4 --pidfile=tmp/celery_%n.pid -f logs/celery.log

启动 flower

1
2
# -A 指定应用, --port 指定端口
$ celery flower -A test_celery --port=8080

添加任务

1
2
3
4
5
# 修改文件 test_sender.py
from test_celery import *
# 添加 100 个任务到 celery 中
for i in range(1,100):
add.apply_async((10,30))
1
$ python test_sender.py

打开 web 服务,我们可以看到修改数据

celery 命令

查看 celery 相关信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ celery -A test_celery report 

software -> celery:4.3.0 (rhubarb) kombu:4.6.3 py:3.5.2
billiard:3.6.0.0 redis:3.2.1
platform -> system:Linux arch:64bit, ELF
kernel version:4.15.0-48-generic imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:redis results:redis://127.0.0.1:6379/4

beat_schedule: {
'task_every_30_seconds': { 'args': (23, 56),
'schedule': datetime.timedelta(0, 5),
'task': 'test_celery.add'},
'task_every_min': { 'args': (24, 57),
'schedule': <crontab: * * * * * (m/h/d/dM/MY)>,
'task': 'test_celery.add'}}
broker_url: 'redis://127.0.0.1:6379/3'
result_backend: 'redis://127.0.0.1:6379/4'
timezone: 'Asia/Shanghai'

查看活动队列

1
2
3
4
5
$ celery -A test_celery inspect active_queues      
-> celery2@bogon: OK
* {'name': 'celery', 'exchange': {'name': 'celery', 'type': 'direct', 'arguments': None, 'durable': True, 'passive': False, 'auto_delete': False, 'delivery_mode': None, 'no_declare': False}, 'routing_key': 'celery', 'queue_arguments': None, 'binding_arguments': None, 'consumer_arguments': None, 'durable': True, 'exclusive': False, 'auto_delete': False, 'no_ack': False, 'alias': None, 'bindings': [], 'no_declare': None, 'expires': None, 'message_ttl': None, 'max_length': None, 'max_length_bytes': None, 'max_priority': None}
-> celery1@bogon: OK
* {'name': 'celery', 'exchange': {'name': 'celery', 'type': 'direct', 'arguments': None, 'durable': True, 'passive': False, 'auto_delete': False, 'delivery_mode': None, 'no_declare': False}, 'routing_key': 'celery', 'queue_arguments': None, 'binding_arguments': None, 'consumer_arguments': None, 'durable': True, 'exclusive': False, 'auto_delete': False, 'no_ack': False, 'alias': None, 'bindings': [], 'no_declare': None, 'expires': None, 'message_ttl': None, 'max_length': None, 'max_length_bytes': None, 'max_priority': None}

检查状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ celery -A test_celery inspect stats
-> celery@bogon: OK
{
"broker": {
"alternates": [],
"connect_timeout": 4,
"failover_strategy": "round-robin",
"heartbeat": 120.0,
"hostname": "127.0.0.1",
"insist": false,
"login_method": null,
"port": 6379,
"ssl": false,
"transport": "redis",
"transport_options": {},
"uri_prefix": null,
"userid": null,
"virtual_host": "3"
},
"clock": "2",
"pid": 98737,
............

检查报告

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ celery -A test_celery inspect report 
-> celery@aa: OK

software -> celery:4.3.0 (rhubarb) kombu:4.5.0 py:3.7.3
billiard:3.6.0.0 redis:3.2.1
platform -> system:Linux arch:64bit, ELF
kernel version:3.10.0-957.el7.x86_64 imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:redis results:redis://127.0.0.1:6379/4

broker_url: 'redis://127.0.0.1:6379/3'
result_backend: 'redis://127.0.0.1:6379/4'
task_send_sent_event: True
worker_send_task_events: True
timezone: 'Asia/Shanghai'
beat_schedule: {
'task_every_30_seconds': { 'args': (23, 56),
'schedule': datetime.timedelta(seconds=5),
'task': 'test_celery.add'},
'task_every_min': { 'args': (24, 57),
'schedule': <crontab: * * * * * (m/h/d/dM/MY)>,
'task': 'test_celery.add'}}
include:
('celery.app.builtins', 'test_celery')

其他

清除队列中的任务

1
2
3
4
5
6
7
8
$ celery -A test_celery purge 
WARNING: This will remove all tasks from queue: celery.
There is no undo for this operation!

(to skip this prompt use the -f option)

Are you sure you want to delete all tasks (yes/NO)? yes
Purged 10 messages from 1 known task queue.

发送 Ping

1
2
3
$ celery -A test_celery inspect ping      
-> celery@a2: OK
pong

关闭 worker 进程

1
$ celery -A test_celery control shutdown

程序查看 celery 信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from test_celery import *

# 获取所有节点
i = app.control.inspect()

# 获取所有活跃节点
print('\33[31mCelery 获取节点: %s\33[0m' % i.active())
# 获取所有获取队列
print('\33[32mCelery 活跃队列:%s\33[0m' % i.active_queues())
# 发生 ping 包
print('\33[33mCelery 状态监测:%s \33[0m' % i.ping())

# 添加一个延时任务
add.apply_async((2,3), countdown=5)
# 查看计划任务,(并非 cron 任务)
print('\33[31mCelery 延时任务:%s\33[0m' % i.scheduled())
# 查看状态
print('\33[32mCelery 状态:%s\33[0m' % i.stats())
# 关闭所有节点
app.control.broadcast('shutdown')
1
$ python test_report.py

动态加载 celery pool 个数

–autoscale=10,2 其中 10 为最大个数,2 为最小个数,用逗号分隔

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ celery -A test_celery worker --autoscale=10,2  -n au
-------------- celery@user-166146.weave.local v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-4.15.0-48-generic-x86_64-with-Ubuntu-16.04-xenial 2019-08-01 15:17:54
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f8f64f1ac18
- ** ---------- .> transport: redis://127.0.0.1:6379/3
- ** ---------- .> results: redis://127.0.0.1:6379/4
- *** --- * --- .> concurrency: {min=2, max=10} (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery

查看 celery worker 集群中存活的节点

1
$ celery -A test_celery status

通过 redis 检查队列

获取所有可用队列

1
2
3
4
5
6
# redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*
$ redis-cli -n 3 keys \*
1) "_kombu.binding.celery"
2) "_kombu.binding.celeryev"
3) "unacked_mutex"
4) "celery"

查看队列任务数

1
2
3
4
# redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
$ redis-cli -n 3 llen celery
(integer) 10
# 队列中包含 10 个任务

代码

文件 test_celery.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 最简化构建一个 celery 应用,指定了 broker 和 backend
from celery import Celery
# 定义 broker 和 backend,分别为任务中间人和结果保存路径
BROKER = "redis://:@127.0.0.1:6379/3"
BACKEND = "redis://:@127.0.0.1:6379/4"

app = Celery("tasks",broker=BROKER,backend=BACKEND,)
app.conf.task_send_sent_event = True
app.conf.worker_send_task_events = True

from celery.schedules import crontab
from datetime import timedelta
app.conf.timezone = 'Asia/Shanghai'
app.conf.beat_schedule = {
'task_every_30_seconds': {
'task': 'test_celery.add', # 调用 add 任务
'schedule': timedelta(seconds=5), # 每隔 5 秒执行一次
'args': (23, 56) # 作为参数传递进去
},
'task_every_min': {
'task': 'test_celery.add',
'schedule': crontab(minute='*'), # 每分钟执行一次
'args': (24, 57)
}
}
from celery import Task
class MyTask(Task):
abstract = True

# 任务返回结果后执行
def after_return(self, *args, **kwargs):
print('任务返回结果: {0!r}'.format(self.request))

# 任务执行失败是调用
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('任务执行失败')


# 任务重试时调用
def on_retry(self, exc, task_id, args, kwargs, einfo):
print('任务正在重试')


# 任务成功时调用
def on_success(self, retval, task_id, args, kwargs):
print('任务执行成功')

from celery.result import AsyncResult
@app.task
def error_handler(uuid):
result = AsyncResult(uuid)
print('任务 %s 执行失败raised exception: ' % uuid)

# 定义一个任务,名字为 add
@app.task(bind=True,base=MyTask)
def add(self, x, y):
c = x + y
print('计算结果为: %d ' % c)
return c

@app.task(bind=True,max_retries=3) # 最大重试 3 次
def test_retry(self):
print('执行 Celery 重试')
raise self.retry(countdown=1) # 1 秒后执行重试

@app.task(bind=True)
def test_fail(self):
print('执行 Celery 失败')
raise RuntimeError('测试 celery 失败')

文件 test_event.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from celery import Celery
import json
import time
from test_celery import app

# 设置运行 tasks 发送事件

def my_monitor(app):
state = app.events.State()

# 处理事件
def announce_failed_tasks(event):
state.event(event)
# task name 仅与-received 事件一起发送,使用 state 跟踪该事件。
task = state.tasks.get(event['uuid'])

print('任务失败: %s[%s] %s' % (
task.name, task.uuid, task.info(), ))
# 处理事件
def announce_tasks_callback(event):
print('任务状态:%s - %s - %s - %s' %
(event.get('hostname'), event.get('type'), event.get('uuid'),
time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp')))))

# 处理 worker 事件
def announce_worker(event):
if event.get('type') == 'worker-heartbeat':
print('心跳检测')
elif event.get('type') == 'worker-offline':
print('worker 下线:%s - %s - %s ' %
(event.get('hostname'), event.get('type'),
time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp')))))
elif event.get('type') == 'worker-online':
print('worker 上线:%s - %s - %s ' %
(event.get('hostname'), event.get('type'),
time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(event.get('timestamp')))))
else: pass

with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
# 将 task 事件交给对应的处理函数
'task-failed': announce_failed_tasks,
'task-succeeded': announce_tasks_callback,
'task-received': announce_tasks_callback,
'task-revoked':announce_tasks_callback,
'task-retried': announce_tasks_callback,

# 将 worker 事件交给对应的处理函数
'worker-online': announce_worker,
'worker-offline': announce_worker,
})
recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
my_monitor(app)

文件 test_sender.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 脚本用来发送 celery 任务

from test_celery import *

# 最简洁推送一个任务,不支持任何选项
add.delay(3,6)
# 推送一个任务,第一个参数 (2,5) 为任务参数,必须为 tuple 或 list,
# 如果任务只需要一个参数,必须添加逗号进行转换,格式 (var1,)
# countdown=10,10 秒后开始执行
add.apply_async((2,5), countdown=10)
# 参数的其他写法,
add.apply_async(kwargs={'x':4, 'y':8})
add.s(5,6).apply_async()

# 任务失败重试
test_retry.apply_async()

# 失败回调
test_fail.apply_async(link_error=error_handler.s())

# 任务组
from celery import group
numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
res = group(add.subtask(i) for i in numbers).apply_async()
print(res.get()) # # 如果没有 celery worker,脚本阻塞在这里

# 任务链
# 使用 link,将任务结果作为第一个参数传递到下一个任务
add.apply_async((2, 3), link=add.s(16))

# 同样,前一个任务结果作为下一个任务的第一个参数
from celery import chain
res = chain(add.s(2, 2), add.s(4), add.s(8))()
print(res.get())

# 使用管道符
(add.s(2, 2) | add.s(4) | add.s(8))().get()