
本文共 25561 字,大约阅读时间需要 85 分钟。
������������
���������������������
���������Message���������������������������������������������������������������������������������������������������������������������������������������������������
���������������Message Queue������������������������������������������������������������������������������������������������������������������������������������������������������������������ MQ ��������������������������������������������������� MQ ���������������������������������������������������������������������������������������������������
���������������������
������������������������������������������������������������������������������������������������������������������ MQ ������
��������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������� MQ ������������������������������������������������������������������������������������������������������ MQ ������������������������������������������������������������MQ��������������������� MQ ��������������������������� MQ ������������������������������������������������������������������������������
RabbitMQ
RabbitMQ ������������ Erlang ��������������� AMQP ������������������
rabbitMQ���������������AMQP������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
rabbitMQ������
for Linux���������������epel��� $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm ������erlang $ yum -y install erlang ������RabbitMQ $ yum -y install rabbitmq-server���������service rabbitmq-server start/stop
for Mac:bogon:~ yuan$ brew install rabbitmqbogon:~ yuan$ export PATH=$PATH:/usr/local/sbinbogon:~ yuan$ rabbitmq-server
rabbitMQ������������
������������
������
# ######################### ��������� ##########################!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print(" [x] Sent 'Hello World!'")connection.close()
# ########################## ��������� ########################## connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume( callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
������������
���1���no-ack ��� False������������������������������(its channel is closed, connection is closed, or TCP connection is lost)���������������������RabbitMQ������������������������������������������
- ������������������
ch.basic_ack(delivery_tag=method.delivery_tag)
- basic_comsume������
no_ack=False
������������������������������:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters( host='10.211.55.4'))channel = connection.channel()channel.queue_declare(queue='hello')def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(callback, queue='hello', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
(2) durable ������������������
# ���������#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))channel = connection.channel()# make message persistentchannel.queue_declare(queue='hello', durable=True)channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent ))print(" [x] Sent 'Hello World!'")connection.close()# ���������#!/usr/bin/env python# -*- coding:utf-8 -*-import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))channel = connection.channel()# make message persistentchannel.queue_declare(queue='hello', durable=True)def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(callback, queue='hello', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
(3) ������������������
������������������������������������������������������������������������������������1 ������������������ ������ ���������������������������1������������������ ������ ������������������
channel.basic_qos(prefetch_count=1) ������������������������������������������������
#!/usr/bin/env python# -*- coding:utf-8 -*-import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))channel = connection.channel()# make message persistentchannel.queue_declare(queue='hello')def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(callback, queue='hello', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
exchange������
3.1 ������������
������������������������������������������������������������������������������������������������������������������������������������������������������������������������RabbitMQ���������������������������������������������������������������������������������������������������������������������������������������������������
exchange type = fanout
# ���������#!/usr/bin/env pythonimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='logs', type='fanout')message = ' '.join(sys.argv[1:]) or "info: Hello World!"channel.basic_publish(exchange='logs', routing_key='', body=message)print(" [x] Sent %r" % message)connection.close()# ���������#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='logs', type='fanout')result = channel.queue_declare(exclusive=True)queue_name = result.method.queuechannel.queue_bind(exchange='logs', queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume(callback, queue=queue_name, no_ack=True)channel.start_consuming()
3.2 ���������������
exchange type = direct
���������������������������������������������������������������������������������RabbitMQ���������������������������������������������������������������������������������������������������������������exchange���exchange������ ��������� ���������������������������������������������
#!/usr/bin/env pythonimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')result = channel.queue_declare(exclusive=True)queue_name = result.method.queueseverities = sys.argv[1:]if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1)for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback, queue=queue_name, no_ack=True)channel.start_consuming()
3.3 ������������
exchange type = topic
������������������ ���������old.boy.python old.* -- ���������old.boy.python old.# -- ������
���topic���������������������������������������������������������������������������������������������exchange���exchange��������������������������� ���������������������������������������������������������������������������������
- # ������������������ 0 ��� ��� ������ ������
- * ������������������ ������ ������
���������
#!/usr/bin/env pythonimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')result = channel.queue_declare(exclusive=True)queue_name = result.method.queuebinding_keys = sys.argv[1:]if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1)for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback, queue=queue_name, no_ack=True)channel.start_consuming()
������RabbitMQ���RPC
Callback queue ������������
������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������reply_to
���
Correlation id ������������
������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������correlation_id
������������������������������������������������correlation_id
���������������������������������������������������������
������������������������������������������������������������������������������������������RPC������������������RPC���������RPC���������������������������������������������reply_to������correlation_id��������������������������������������������� ���������������������������RPC������������������������������������������������RPC������������������������������������������������������������������reply_to������������������������������������������������������ ���������������������������������������������������������������������������������������correlation_id���������������������������������������������
������������
#!/usr/bin/env pythonimport pika# ���������������������������������localhost������������ip������connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))# ������������channel = connection.channel()# ������RPC������������channel.queue_declare(queue='rpc_queue')# ������������������def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2)# ���RPC������������������������������������def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) # ������������������������ response = fib(n) # ���������������(������)��������������������� ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag)# ������������������������������������������������������������������������channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests")channel.start_consuming()
���������
#!/usr/bin/env pythonimport pikaimport uuidclass FibonacciRpcClient(object): def __init__(self): ��������� ���������������������������������������������������������������������RPC������������������������ ��������� # ���������������������������������ip������ self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) # ���������������������������channel������������������������ self.channel = self.connection.channel() # ��������������������������������������������������������������������������������������������������������������������������������������������������� result = self.channel.queue_declare(exclusive=True) # ��������������������������������������������������� self.callback_queue = result.method.queue # ���������������������������������������������������������������������`on_response`���������������������������; self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) # ������������������������������������������������ def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body # ������RPC������ def call(self, n): # ��������� response self.response = None #������correlation_id self.corr_id = str(uuid.uuid4()) # ������RPC���������������RPC������������`rpc_queue`������������������������`reply_to`���`correlation_id` self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response)# ���������������fibonacci_rpc = FibonacciRpcClient()# ������RPC������print(" [x] Requesting fib(30)")response = fibonacci_rpc.call(30)print(" [.] Got %r" % response)
������
RabbitMQ������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
code
rabbitmq������������������amqp���������python���������������������pika
pip install pika -i https://pypi.douban.com/simple/
������������send.py
import pika# ������������������connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) # ���������������RabbitMQ���������channel = connection.channel() # ������channel
���������������������������������������������������������������������������������������������������������������������
������������������������������������������������������������������������������������������������rabbitmq���������������������
channel.queue_declare(queue='hello') # ���RabbitMQ���������hello������������channel.basic_publish(exchange='', # ���������������exchange������������������������ routing_key='hello', # ������������������ hello ��� body='Hello World!') # ������������connection.close() # ������ ������flush
RabbitMQ������������1GB������������������������������������������������
������������������������hello��������������������������������������� rabbitmqctl list_queues ���������
hello 1
���������������hello������ ���������������������������
������������receive.py
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost'))channel = connection.channel()
������������������������������������������������������
channel.queue_declare(queue='hello') # ��������������������� ������������������ hello ������ ������������������ ���������������������������������������������������������def callback(ch, method, properties, body): # ��������������������������������� print(" [x] Received %r" % body)channel.basic_consume(callback, queue='hello', # ���������������hello��������� no_ack=True) #������������������������������ack������������channel.start_consuming() # ������������������ ���������������������������
������������������������������
������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������task������message������������������������������������������������������������������������������������������������������������������������web���������������������������http���������������������������������������������
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # ��������������������� ))
������������������������ ������ ������������������������������������������������
������ack
���������������������������������������������������������������������������������������������������������������������������������rabbitmq���������������������������������������������������������������������������
���������������������������rabbitmq���������ack������������������������������������������������������ack���rabbitmq���������rabbitmq��������������������������������������������������������������������������� ������rabbitmq������������ack������������������������ ���������������������������������������������������timeout������������������������������������������������������
ack���������������������������������������������������������������no_ack=True
channel.basic_consume(callback, queue='hello') # ���������ack
���ack���callback:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) # ������ack
���������������
���������������RabbitMQ������������������������������������������������������������������������
(������������������������������������������channel.queue_declare(queue='task_queue', durable=True)
������������������������������������������������������������������
channel.basic_publish(exchange='',routing_key="task_queue",body=message,properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))
������������������RabbitMQ���������������������������������������������������������������������������RabbitMQ���������������������������������������������������������������������������������������������������������publisher confirm���
���������������������
��������������������������������������������������������������������������������������������������������������������������������� ������������������������������������������������������������������������������������ack���������RabbitMQ���������������������������������������������������������������������������
channel.basic_qos(prefetch_count=1)
������RabbitMQ���������������������������������������ack������������������������������������������
������
������������������������������������������������������������������������������������������������������������������������������������
exchange
���������������������������������������������������������������������������������������������������������������������������������������������������������exchange������exchange���������������������������������������������������������������������������������������exchange������������������������������������������������������������������������������������������������������������������������������������������������������exchange���direct���topic���headers���fanout���������������������������������fanout������������������������������exchange��������� '' ���������default exchange���
channel.exchange_declare(exchange='logs', type='fanout') # ���exchange������������������������������������������������
������������
result = channel.queue_declare() # ������������������������result = channel.queue_declare(exclusive=True) # ���������������������������������������������������������������������������������queue_name = result.method.queue
������result.method.queue���������������������������������������������������������
������exchange ��� ������
channel.queue_bind(exchange='logs', queue='hello')
logs���������������������hello���������������
��������������������������������������� logs exchange
channel.basic_publish(exchange='logs', routing_key='', body=message)
������
���������������������bind������������exchange���queue���������������������������������exchange������������������������bind������������������routing_key���������
������direct exchange
���������routing key������������������������������routing key������������
channel.exchange_declare(exchange='direct_logs', type='direct')
���������������������������severity������������
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
���������������������������severity������
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
������topic exchange
���������������direct exchange ������������������routing key������������������������������.������routing key���topic exchange������������
"stock.usd.nyse" "nyse.vmw"
���direct exchange������������������������������������key���������������������routing key������������������������������������������
* ������1���������# ������0������������������
������������������������routing key������3���������������������celerity.colour.species���
Q1:*.orange.* ���������������������colour������orange���Q2:*.*.rabbit ���������������������������species���rabbit���lazy.# ���������������������������lazy���
qucik.orange.rabbit Q1 Q2������������������quick.orange.fox ������Q1���������������������lazy.pink.rabbit������������������Q2���������������������������������������������������������������#���������������������������
RPC
���������������������������������������������������������
������������������������������������������������������������������������������������������
self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
���������������������rpc���������������������reply_to���������������������correlation_id������������������������������id������������������������������RPC���������������������������������������������������������������������������������������������������������������������������correlation_id���������������������������������������������������������������������������
���������������������������������correlation_id������������������������������������������--������������������������������������������ack������������������������������������������������������������������������������������������������������������������������������������������
channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) # ������������while self.response is None: # ��������������������������� self.connection.process_data_events() # ������������������return int(self.response)
3���������������������rpc_queue������
4���RPC������������rpc_queue���������������������������������channel.basic_consume(on_request, queue='rpc_queue') # ������ ������������# ���������������ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) # ���������������������������ch.basic_ack(delivery_tag = method.delivery_tag) # ������ack
5���������������������������������������������������correlation_id���������������������
if self.corr_id == props.correlation_id: self.response = body
发表评论
最新留言
关于作者
