Celery 不能用 root 用户启动问题

celery 中增加如下代码

1
2
from celery import platforms
platforms.C_FORCE_ROOT = True

config.py 中的一些配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Backend
CELERY_RESULT_BACKEND = 'amqp://rabbit'

# Broker 可以配置为 HaProxy 监控的端口
BROKER_URL = 'amqp://rabbit'

# Broker 支持设置为 list,可以同时设置多个 rabbit
BROKER_URL = [
    'amqp://rabbit1',
    'amqp://rabbit2',
    'amqp://rabbit3'
]
# 每个子线程 ( 协程 ) 最多执行 40 个任务,防止内存泄漏导致进程僵死
CELERYD_MAX_TASKS_PER_CHILD = 40 

# 不保存结果(如果结果不太重要的话直接选择不保存结果,
# 否则会随着任务的继续占用过多的空间)
CELERY_IGNORE_RESULT = True 

# celery 更新到 4.0 后会出现非认证中间包的报错,修改包类型为 
# pickle,并压缩以及添加 pickle 包的认证 
CELERY_ACCEPT_CONTENT = ['pickle'] 
CELERY_TASK_SERIALIZER = 'pickle' 
CELERY_MESSAGE_COMPRESSION = 'gzip'

celery.py 中的一些配置

1
2
3
4
5
6
7
8
# 在 include 中增加 task 的放置位置
app = Celery('proj', include=
   [
       'proj.task_a_file',
       'proj.task_b_file',
       'proj.task_c_file'
   ]
)

celery 的启动

在 celery 启动中有多个参数,常用的启动参数如下:

  • autoscal= max,min 最大最小 worker 数量,期间 celery 会根据 payload 进行自动调整
  • -P gevent, eventlet 等,可以选择不同的池来启动服务
  • -l log 等级 debug, info, warning, error
  • –logfile= celery worker 日志的位置

例如:

1
celery worker -A proj --autoscal=1500,100 -P gevent -l info --logfile=/your/log/file

修改 Task 中的默认值并增加 handler 的处理

常用的 handler 有 on_failure, on_retry, on_success 等,分别处理任务失败,重试成功后的状态

1
2
3
4
5
6
class BaseTask(Task):
    default_retry_delay = 1
    max_retries = 3

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # 增加对失败任务的记录以及相应处理