引入

在使用 celery 进行并发时,发现默认的自动控制并发的算法中没有进行内存的控制,导致服务占用过多内存而出现服务器宕机,现实现 Custom AutoScaler 以解决此问题

项目配置

首先进行项目配置,将 AutoScaler 配置到 Celery 中

1
CELERYD_AUTOSCALER = 'proj.tasks.CustomAutoScale'

使用 worker name 区分不同的 worker

仅需在 celery 启动参数的 -n 中添加名称即可

1
worker -A proj -l info -P eventlet --autoscale 10,2 -n test_worker

实现 CustomAutoScale

实现自己的 AutoScaler 的主要内容是实现 mabe_scale 函数,或者直接实现 _maybe_scale,本示例为对原先只判断 process 扩充 memory 的判断,当前该方法可以确保内存占用保持在 60-85% 之间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class CustomAutoScale(Autoscaler):
    def _maybe_scale(self, req=None):
        worker_name = self.worker.hostname
        memory_obj = psutil.virtual_memory()
        memory_percent = memory_obj.percent
        procs = self.processes
        if memory_percent < 60.0:
            cur = min(self.qty, self.max_concurrency)
            if cur > procs:
                self.scale_up(cur - procs)
                logger.debug("[worker_name: {}][memory_percent: {}][current: {}][scale up: {}]".
                             format(worker_name, memory_percent, self.processes, cur - procs))
                return True
            cur = max(self.qty, self.min_concurrency)
            if cur < procs:
                self.scale_down(procs - cur)
                logger.debug("[worker_name: {}][memory_percent: {}][current: {}][scale down: {}]".
                             format(worker_name, memory_percent, self.processes, procs - cur))
                return True
        elif memory_percent < 85.0:
            logger.debug("[worker_name: {}][memory_percent: {}][current: {}][scale: {}]".
                         format(worker_name, memory_percent, self.processes, 0))
            return True
        elif memory_percent < 90.0:
            self.scale_down(1)
            logger.debug("[worker_name: {}][memory_percent: {}][current: {}][scale down: {}]".
                         format(worker_name, memory_percent, self.processes, 1))
            return True
        else:
            cur = procs - self.min_concurrency
            down_process = max(int(cur / 2), 1)
            self.scale_down(down_process)
            logger.debug("[worker_name: {}][memory_percent: {}][current: {}][scale down: {}]".
                         format(worker_name, memory_percent, self.processes, down_process))
            return True

小结

通过以上方法可以灵活的控制 celery worker 的并发数,如果需要可以添加 load average,disk io 等其他内容进一步进行控制