
本文共 11557 字,大约阅读时间需要 38 分钟。
Celery���������?
Celery ������������ Python ������������������������������������������������������������������������������,���������������������������������������������������������������
Celery ���������������������������������������������������
������������������������������������������������������������������������ Celery ���������������������������������������������������������������������
1.������������
������������������������ Celery ��������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������� rabbitmq ������������������������������������������������������������������������
Celery ������������������������������������ Python ������������������������������������������������������������������������������������������ Python ������������������������ Celery ���������������������
������ Celery ������������������������������������������
1.1 Brokers
brokers ������������������������������������������������������������������Celery ���������������������������������������brokers ���������������������������������/���������������������(������)
��������� brokers ��� rabbitmq���redis���Zookeeper ���
1.2 Result Stores / backend
������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������ Result Stores ���
��������� backend ��� redis���Memcached ���������������������������������
1.3 Workers
������ Celery ���������������������������������/������������������������������������������������������������������
1.4 Tasks
��������������������������������������������������������������������������������������������������������������������� workers ���������������
������������������������������������������������������������������������
��������������� redis ������ celery ��� broker ��� backend���
(������ brokers ��� backend ������)
������ Celery ��� redis ������ python ��� redis ������:
apt-get install redis-serverpip install redispip install celery
������������������������������ celery ��� 4.0 ������������������������ python ��� redis ������������ 2.10.4 ��������������������������� redis ������ timeout ������������
������������������������������task:
#tasks.pyfrom celery import Celeryapp = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') #���������celery���backend���broker@app.task #��������������������� celery taskdef add(x, y): return x + y
OK���������������broker ���������������backend ���������������task ������������������������������������ worker ��������������������� tasks.py ������������������������
celery -A tasks worker --loglevel=info
������������������ tasks ��������������������� worker ���������������������������broker���������������������worker������������������������������
������������������������������������������������������������������������������������������������������������ task ������������
#trigger.pyfrom tasks import addresult = add.delay(4, 4) #������������ add(4, 4)������������������ celery ��������������� delay ������������while not result.ready(): time.sleep(1)print 'task done: {0}'.format(result.get())
���������������
delay ������������������ AsyncResult ���������������������������������������������������������������������result.ready()
��� true������������ result.get()
������������������
������������������������ celery ���������������������
2. ������������
��������������������������������������������������������� Celery ��������������������������������������������������������������������������������������������������������������������� Celery ������������������������
���������������������task:
@app.task #��������������������� celery taskdef add(x, y): return x + y
������������������app.task
������������������������������������������������������ celery task ������������������������������������������������������������������������������ task ������������������������
������������������������������������������������ task ������������������������������������������������������������ add ������ task ������������������������������ self ������������ task ���������������������������������
������������������������������������ task ��������������������������� task ������������ add ���������������������������������
2.1 ������������������������������������
������������������������������������������������������������������������ task ��� on_failure���on_success
������������
# tasks.pyclass MyTask(Task): def on_success(self, retval, task_id, args, kwargs): print 'task done: {0}'.format(retval) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): print 'task fail, reason: {0}'.format(exc) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)@app.task(base=MyTask)def add(x, y): return x + y
������ ������������������ worker���
celery -A tasks worker --loglevel=info
������������������������
������������tasks:
@app.task #��������������������� celery taskdef add(x, y): raise KeyError return x + y
������������ worker������������ trigger.py:
������������������������������������������������������������������������������ on_failure���on_success
2.2 ���������������������������
# tasks.pyfrom celery.utils.log import get_task_loggerlogger = get_task_logger(__name__)@app.task(bind=True)def add(self, x, y): logger.info(self.request.__dict__) return x + y
���������������������
������������������������������������������������������������������������������������������������������������������������������������������������������������������
������ celery.task.request ���������������������������
2.3 ������������������
��������������������������������������������������������������� Celery ���������������������������������������
������ | ������ |
---|---|
PENDING | ��������������� |
STARTED | ��������������� |
SUCCESS | ������������������ |
FAILURE | ������������������ |
RETRY | ������������������ |
REVOKED | ������������ |
������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
# tasks.pyfrom celery import Celeryimport time@app.task(bind=True)def test_mes(self): for i in xrange(1, 11): time.sleep(0.1) self.update_state(state="PROGRESS", meta={'p': i*10}) return 'finish'
��������� trigger.py ������������
# trigger.pyfrom task import add,test_mesimport sysdef pm(body): res = body.get('result') if body.get('status') == 'PROGRESS': sys.stdout.write('\r������������: {0}%'.format(res.get('p'))) sys.stdout.flush() else: print '\r' print resr = test_mes.delay()print r.get(on_message=pm, propagate=False)
2.4 ������/���������������
Celery ��������������������������������������������������������������������������������������������������������������������������� beat ������������
������ Celery ������������ celery_config.py:
# celery_config.pyfrom datetime import timedeltafrom celery.schedules import crontabCELERYBEAT_SCHEDULE = { 'ptask': { 'task': 'tasks.period_task', 'schedule': timedelta(seconds=5), },}CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
��������� schedule
��������������������������������������������� datetime.timedelta ������ crontab ���������������������������������������������������������������������
��������������������������� datetime ��������������������������������������������������������� utc ������������������������������������
CELERY_TIMEZONE = 'Asia/Shanghai'
��������� tasks.py ���������������������������������������
# tasks.pyapp = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')app.config_from_object('celery_config')@app.task(bind=True)def period_task(self): print 'period task done: {0}'.format(self.request.id)
������������������ worker������������������ beat���
celery -A task beat
2.5 ������������
���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
������������
@app.taskdef update_page_info(url): page = fetch_page.delay(url).get() info = parse_page.delay(url, page).get() store_page_info.delay(url, info)@app.taskdef fetch_page(url): return myhttplib.get(url)@app.taskdef parse_page(url, page): return myparser.parse_document(page)@app.taskdef store_page_info(url, info): return PageInfo.objects.create(url, info)
������������1
def update_page_info(url): # fetch_page -> parse_page -> store_page chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url) chain()@app.task()def fetch_page(url): return myhttplib.get(url)@app.task()def parse_page(page): return myparser.parse_document(page)@app.task(ignore_result=True)def store_page_info(info, url): PageInfo.objects.create(url=url, info=info)
������������2
fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])
������������������������������������������������������������������������������������ ( ������������������������������������������ si() ������ s(immutable=True) ��������������� )���
��������� s()
��������� celery.signature()
������������������������signature ���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
2.6 ������������
������������������������������������������������������������������������������������ add.delay(2, 2)
������������������������������������������������ apply_async
������������������������������ delay
������ apply_async
��������������������������������������������� apply_async
������������������������������������������������ callbacks/errbacks ���������������������������������������������������������������������������������������������
2.7 ������ AsyncResult
AsyncResult ������������������������������������������������������������������ tornado ������ Future ��������������������������������������������������������������������������� js ��������������������������� Promise ������������������ Celery 4.0 ������������������ promise ������������������������ gevent ��������������������������� js promise ������������������
import gevent.monkeymonkey.patch_all()import timefrom celery import Celeryapp = Celery(broker='amqp://', backend='rpc')@app.taskdef add(x, y): return x + ydef on_result_ready(result): print('Received result for id %r: %r' % (result.id, result.result,))add.delay(2, 2).then(on_result_ready)
��������������������� promise ������������������������ backend ��� RPC (amqp) ��� Redis ������ ��������������������������������� gevent ������������������������������������������������ ��������������������������������������������������������������������������������� tornado��� twisted ������
delay
��� apply_async
��������������� AsyncResult ������������������������������������ task id ������������������ task ��� AsyncResult: AsyncResult(task_id=xxx)
������ AsyncResult ���������������������������
������ Celery ��������������������������������������������������������������������������� Celery ������������������������������������������������
发表评论
最新留言
关于作者
