分布式队列Celery
发布日期:2021-05-13 04:45:37 浏览次数:13 分类:博客文章

本文共 11557 字,大约阅读时间需要 38 分钟。

Celery���������?

Celery ������������ Python ������������������������������������������������������������������������������,���������������������������������������������������������������

Celery ���������������������������������������������������

������������������������������������������������������������������������ Celery ���������������������������������������������������������������������

1.������������

������������������������ Celery ��������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������� rabbitmq ������������������������������������������������������������������������

Celery ������������������������������������ Python ������������������������������������������������������������������������������������������ Python ������������������������ Celery ���������������������

������ Celery ������������������������������������������

1.1 Brokers

brokers ������������������������������������������������������������������Celery ���������������������������������������brokers ���������������������������������/���������������������(������)

��������� brokers ��� rabbitmq���redis���Zookeeper ���

1.2 Result Stores / backend

������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������ Result Stores ���

��������� backend ��� redis���Memcached ���������������������������������

1.3 Workers

������ Celery ���������������������������������/������������������������������������������������������������������

1.4 Tasks

��������������������������������������������������������������������������������������������������������������������� workers ���������������

������������������������������������������������������������������������

��������������� redis ������ celery ��� broker ��� backend���

(������ brokers ��� backend ������)

������ Celery ��� redis ������ python ��� redis ������:

apt-get install redis-serverpip install redispip install celery

������������������������������ celery ��� 4.0 ������������������������ python ��� redis ������������ 2.10.4 ��������������������������� redis ������ timeout ������������

������������������������������task:

#tasks.pyfrom celery import Celeryapp = Celery('tasks',  backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') #���������celery���backend���broker@app.task  #��������������������� celery taskdef add(x, y):    return x + y

OK���������������broker ���������������backend ���������������task ������������������������������������ worker ��������������������� tasks.py ������������������������

celery -A tasks worker --loglevel=info

������������������ tasks ��������������������� worker ���������������������������broker���������������������worker������������������������������

������������������������������������������������������������������������������������������������������������ task ������������

#trigger.pyfrom tasks import addresult = add.delay(4, 4) #������������ add(4, 4)������������������ celery ��������������� delay ������������while not result.ready():    time.sleep(1)print 'task done: {0}'.format(result.get())

���������������

delay ������������������ AsyncResult ���������������������������������������������������������������������result.ready() ��� true������������ result.get() ������������������

������������������������ celery ���������������������

2. ������������

��������������������������������������������������������� Celery ��������������������������������������������������������������������������������������������������������������������� Celery ������������������������

���������������������task:

@app.task  #��������������������� celery taskdef add(x, y):    return x + y

������������������app.task������������������������������������������������������ celery task ������������������������������������������������������������������������������ task ������������������������

������������������������������������������������ task ������������������������������������������������������������ add ������ task ������������������������������ self ������������ task ���������������������������������

������������������������������������ task ��������������������������� task ������������ add ���������������������������������

2.1 ������������������������������������

������������������������������������������������������������������������ task ��� on_failure���on_success ������������

# tasks.pyclass MyTask(Task):    def on_success(self, retval, task_id, args, kwargs):        print 'task done: {0}'.format(retval)        return super(MyTask, self).on_success(retval, task_id, args, kwargs)        def on_failure(self, exc, task_id, args, kwargs, einfo):        print 'task fail, reason: {0}'.format(exc)        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)@app.task(base=MyTask)def add(x, y):    return x + y

������ ������������������ worker���

celery -A tasks worker --loglevel=info

������������������������

������������tasks:

@app.task  #��������������������� celery taskdef add(x, y):    raise KeyError    return x + y

������������ worker������������ trigger.py:

 

������������������������������������������������������������������������������ on_failure���on_success

2.2 ���������������������������

# tasks.pyfrom celery.utils.log import get_task_loggerlogger = get_task_logger(__name__)@app.task(bind=True)def add(self, x, y):    logger.info(self.request.__dict__)    return x + y

���������������������

 

������������������������������������������������������������������������������������������������������������������������������������������������������������������

������ celery.task.request ��������������������������� 

2.3 ������������������

��������������������������������������������������������������� Celery ���������������������������������������

 

������ ������
PENDING ���������������
STARTED ���������������
SUCCESS ������������������
FAILURE ������������������
RETRY ������������������
REVOKED ������������

������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������

# tasks.pyfrom celery import Celeryimport time@app.task(bind=True)def test_mes(self):    for i in xrange(1, 11):        time.sleep(0.1)        self.update_state(state="PROGRESS", meta={'p': i*10})    return 'finish'

��������� trigger.py ������������

# trigger.pyfrom task import add,test_mesimport sysdef pm(body):    res = body.get('result')    if body.get('status') == 'PROGRESS':        sys.stdout.write('\r������������: {0}%'.format(res.get('p')))        sys.stdout.flush()    else:        print '\r'        print resr = test_mes.delay()print r.get(on_message=pm, propagate=False)

2.4 ������/���������������

Celery ��������������������������������������������������������������������������������������������������������������������������� beat ������������

������ Celery ������������ celery_config.py:

# celery_config.pyfrom datetime import timedeltafrom celery.schedules import crontabCELERYBEAT_SCHEDULE = {    'ptask': {        'task': 'tasks.period_task',        'schedule': timedelta(seconds=5),    },}CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

��������� schedule ��������������������������������������������� datetime.timedelta ������ crontab ���������������������������������������������������������������������

��������������������������� datetime ��������������������������������������������������������� utc ������������������������������������

CELERY_TIMEZONE = 'Asia/Shanghai'

��������� tasks.py ���������������������������������������

# tasks.pyapp = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')app.config_from_object('celery_config')@app.task(bind=True)def period_task(self):    print 'period task done: {0}'.format(self.request.id)

������������������ worker������������������ beat���

celery -A task beat

 

2.5 ������������

���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������

������������

@app.taskdef update_page_info(url):    page = fetch_page.delay(url).get()    info = parse_page.delay(url, page).get()    store_page_info.delay(url, info)@app.taskdef fetch_page(url):    return myhttplib.get(url)@app.taskdef parse_page(url, page):    return myparser.parse_document(page)@app.taskdef store_page_info(url, info):    return PageInfo.objects.create(url, info)

������������1

def update_page_info(url):    # fetch_page -> parse_page -> store_page    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)    chain()@app.task()def fetch_page(url):    return myhttplib.get(url)@app.task()def parse_page(page):    return myparser.parse_document(page)@app.task(ignore_result=True)def store_page_info(info, url):    PageInfo.objects.create(url=url, info=info)

������������2

fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])

������������������������������������������������������������������������������������ ( ������������������������������������������ si() ������ s(immutable=True) ��������������� )���

��������� s() ��������� celery.signature() ������������������������signature ���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������

2.6 ������������

������������������������������������������������������������������������������������ add.delay(2, 2) ������������������������������������������������ apply_async ������������������������������ delay ������ apply_async ��������������������������������������������� apply_async ������������������������������������������������ callbacks/errbacks ���������������������������������������������������������������������������������������������

2.7 ������ AsyncResult

AsyncResult ������������������������������������������������������������������ tornado ������ Future ��������������������������������������������������������������������������� js ��������������������������� Promise ������������������ Celery 4.0 ������������������ promise ������������������������ gevent ��������������������������� js promise ������������������

import gevent.monkeymonkey.patch_all()import timefrom celery import Celeryapp = Celery(broker='amqp://', backend='rpc')@app.taskdef add(x, y):    return x + ydef on_result_ready(result):    print('Received result for id %r: %r' % (result.id, result.result,))add.delay(2, 2).then(on_result_ready)

��������������������� promise ������������������������ backend ��� RPC (amqp) ��� Redis ������ ��������������������������������� gevent ������������������������������������������������ ��������������������������������������������������������������������������������� tornado��� twisted ������

delay ��� apply_async ��������������� AsyncResult ������������������������������������ task id ������������������ task ��� AsyncResult: AsyncResult(task_id=xxx)

������ AsyncResult ���������������������������

������ Celery ��������������������������������������������������������������������������� Celery ������������������������������������������������

 

 

上一篇:nginx基本配置
下一篇:Tornado 目录

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2025年04月29日 15时56分04秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章