RabbitMQ详解
发布日期:2021-05-13 04:45:33 浏览次数:13 分类:博客文章

本文共 25561 字,大约阅读时间需要 85 分钟。

 ������������

���������������������

���������Message���������������������������������������������������������������������������������������������������������������������������������������������������

���������������Message Queue������������������������������������������������������������������������������������������������������������������������������������������������������������������ MQ ��������������������������������������������������� MQ ���������������������������������������������������������������������������������������������������

���������������������

������������������������������������������������������������������������������������������������������������������ MQ ������

��������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������� MQ ������������������������������������������������������������������������������������������������������ MQ ������������������������������������������������������������MQ��������������������� MQ ��������������������������� MQ ������������������������������������������������������������������������������

RabbitMQ 

RabbitMQ ������������ Erlang ��������������� AMQP ������������������

rabbitMQ���������������AMQP������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������

 

rabbitMQ������

for Linux���������������epel���   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm ������erlang   $ yum -y install erlang ������RabbitMQ   $ yum -y install rabbitmq-server���������service rabbitmq-server start/stop
for Mac:bogon:~ yuan$ brew install rabbitmqbogon:~ yuan$ export PATH=$PATH:/usr/local/sbinbogon:~ yuan$ rabbitmq-server

rabbitMQ������������

������������

������

# ######################### ��������� ##########################!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_publish(exchange='',                      routing_key='hello',                      body='Hello World!')print(" [x] Sent 'Hello World!'")connection.close()
# ########################## ��������� ########################## connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body):    print(" [x] Received %r" % body) channel.basic_consume( callback,                       queue='hello',                       no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()

������������

���1���no-ack ��� False������������������������������(its channel is closed, connection is closed, or TCP connection is lost)���������������������RabbitMQ������������������������������������������

  • ������������������ch.basic_ack(delivery_tag=method.delivery_tag)
  • basic_comsume������no_ack=False

������������������������������:

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='10.211.55.4'))channel = connection.channel()channel.queue_declare(queue='hello')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    import time    time.sleep(10)    print 'ok'    ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(callback,                      queue='hello',                      no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()

(2)  durable  ������������������

# ���������#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))channel = connection.channel()# make message persistentchannel.queue_declare(queue='hello', durable=True)channel.basic_publish(exchange='',                      routing_key='hello',                      body='Hello World!',                      properties=pika.BasicProperties(                          delivery_mode=2, # make message persistent                      ))print(" [x] Sent 'Hello World!'")connection.close()# ���������#!/usr/bin/env python# -*- coding:utf-8 -*-import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))channel = connection.channel()# make message persistentchannel.queue_declare(queue='hello', durable=True)def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    import time    time.sleep(10)    print 'ok'    ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(callback,                      queue='hello',                      no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()

(3) ������������������

������������������������������������������������������������������������������������1 ������������������ ������ ���������������������������1������������������ ������ ������������������

channel.basic_qos(prefetch_count=1) ������������������������������������������������

#!/usr/bin/env python# -*- coding:utf-8 -*-import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))channel = connection.channel()# make message persistentchannel.queue_declare(queue='hello')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    import time    time.sleep(10)    print 'ok'    ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(callback,                      queue='hello',                      no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()

exchange������

3.1 ������������

������������������������������������������������������������������������������������������������������������������������������������������������������������������������RabbitMQ���������������������������������������������������������������������������������������������������������������������������������������������������

exchange type = fanout
# ���������#!/usr/bin/env pythonimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='logs',                         type='fanout')message = ' '.join(sys.argv[1:]) or "info: Hello World!"channel.basic_publish(exchange='logs',                      routing_key='',                      body=message)print(" [x] Sent %r" % message)connection.close()# ���������#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='logs',                         type='fanout')result = channel.queue_declare(exclusive=True)queue_name = result.method.queuechannel.queue_bind(exchange='logs',                   queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r" % body)channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()
View Code

 3.2 ���������������

exchange type = direct

���������������������������������������������������������������������������������RabbitMQ���������������������������������������������������������������������������������������������������������������exchange���exchange������ ��������� ���������������������������������������������

#!/usr/bin/env pythonimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs',                         type='direct')result = channel.queue_declare(exclusive=True)queue_name = result.method.queueseverities = sys.argv[1:]if not severities:    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])    sys.exit(1)for severity in severities:    channel.queue_bind(exchange='direct_logs',                       queue=queue_name,                       routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()
View Code

 3.3 ������������

exchange type = topic
������������������              ���������old.boy.python          old.*  -- ���������old.boy.python          old.#  -- ������

���topic���������������������������������������������������������������������������������������������exchange���exchange��������������������������� ���������������������������������������������������������������������������������

  • # ������������������ 0 ��� ��� ������ ������
  • *  ������������������ ������ ������

 ���������

#!/usr/bin/env pythonimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='topic_logs',                         type='topic')result = channel.queue_declare(exclusive=True)queue_name = result.method.queuebinding_keys = sys.argv[1:]if not binding_keys:    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])    sys.exit(1)for binding_key in binding_keys:    channel.queue_bind(exchange='topic_logs',                       queue=queue_name,                       routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()

 ������RabbitMQ���RPC

Callback queue ������������

������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������reply_to���

Correlation id ������������

������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������correlation_id������������������������������������������������correlation_id���������������������������������������������������������

������������������������������������������������������������������������������������������RPC������������������RPC���������RPC���������������������������������������������reply_to������correlation_id��������������������������������������������� ���������������������������RPC������������������������������������������������RPC������������������������������������������������������������������reply_to������������������������������������������������������ ���������������������������������������������������������������������������������������correlation_id���������������������������������������������

������������

#!/usr/bin/env pythonimport pika# ���������������������������������localhost������������ip������connection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))# ������������channel = connection.channel()# ������RPC������������channel.queue_declare(queue='rpc_queue')# ������������������def fib(n):    if n == 0:        return 0    elif n == 1:        return 1    else:        return fib(n-1) + fib(n-2)# ���RPC������������������������������������def on_request(ch, method, props, body):    n = int(body)    print(" [.] fib(%s)" % n)    # ������������������������    response = fib(n)    # ���������������(������)���������������������    ch.basic_publish(exchange='',                     routing_key=props.reply_to,                     properties=pika.BasicProperties(correlation_id = \                                                         props.correlation_id),                     body=str(response))    ch.basic_ack(delivery_tag = method.delivery_tag)# ������������������������������������������������������������������������channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests")channel.start_consuming()

���������

#!/usr/bin/env pythonimport pikaimport uuidclass FibonacciRpcClient(object):    def __init__(self):        ���������        ���������������������������������������������������������������������RPC������������������������                ���������                # ���������������������������������ip������        self.connection = pika.BlockingConnection(pika.ConnectionParameters(                host='localhost'))                        # ���������������������������channel������������������������        self.channel = self.connection.channel()                # ���������������������������������������������������������������������������������������������������������������������������������������������������        result = self.channel.queue_declare(exclusive=True)        # ���������������������������������������������������        self.callback_queue = result.method.queue                # ���������������������������������������������������������������������`on_response`���������������������������;         self.channel.basic_consume(self.on_response, no_ack=True,                                   queue=self.callback_queue)    # ������������������������������������������������    def on_response(self, ch, method, props, body):        if self.corr_id == props.correlation_id:            self.response = body    # ������RPC������    def call(self, n):            # ��������� response        self.response = None                #������correlation_id         self.corr_id = str(uuid.uuid4())                # ������RPC���������������RPC������������`rpc_queue`������������������������`reply_to`���`correlation_id`        self.channel.basic_publish(exchange='',                                   routing_key='rpc_queue',                                   properties=pika.BasicProperties(                                         reply_to = self.callback_queue,                                         correlation_id = self.corr_id,                                         ),                                   body=str(n))                                                   while self.response is None:            self.connection.process_data_events()        return int(self.response)# ���������������fibonacci_rpc = FibonacciRpcClient()# ������RPC������print(" [x] Requesting fib(30)")response = fibonacci_rpc.call(30)print(" [.] Got %r" % response)

������

RabbitMQ������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������

code

rabbitmq������������������amqp���������python���������������������pika

pip install pika -i https://pypi.douban.com/simple/

������������send.py

import pika# ������������������connection = pika.BlockingConnection(pika.ConnectionParameters(           'localhost'))  # ���������������RabbitMQ���������channel = connection.channel()  # ������channel

���������������������������������������������������������������������������������������������������������������������

������������������������������������������������������������������������������������������������rabbitmq���������������������

channel.queue_declare(queue='hello')  # ���RabbitMQ���������hello������������channel.basic_publish(exchange='',  # ���������������exchange������������������������                  routing_key='hello',  # ������������������ hello ���                  body='Hello World!')  # ������������connection.close()  # ������ ������flush

RabbitMQ������������1GB������������������������������������������������

������������������������hello��������������������������������������� rabbitmqctl list_queues ���������

hello 1

���������������hello������ ���������������������������

������������receive.py

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(               'localhost'))channel = connection.channel()

������������������������������������������������������

channel.queue_declare(queue='hello')  # ��������������������� ������������������ hello ������ ������������������ ���������������������������������������������������������def callback(ch, method, properties, body):  # ���������������������������������    print(" [x] Received %r" % body)channel.basic_consume(callback,                      queue='hello',  # ���������������hello���������                      no_ack=True)  #������������������������������ack������������channel.start_consuming()  # ������������������ ���������������������������

������������������������������

������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������task������message������������������������������������������������������������������������������������������������������������������������web���������������������������http���������������������������������������������

channel.basic_publish(exchange='',                  routing_key='task_queue',                  body=message,                  properties=pika.BasicProperties(                     delivery_mode = 2, # ���������������������                  ))

������������������������ ������ ������������������������������������������������

������ack

���������������������������������������������������������������������������������������������������������������������������������rabbitmq���������������������������������������������������������������������������

���������������������������rabbitmq���������ack������������������������������������������������������ack���rabbitmq���������rabbitmq��������������������������������������������������������������������������� ������rabbitmq������������ack������������������������ ���������������������������������������������������timeout������������������������������������������������������

ack���������������������������������������������������������������no_ack=True

channel.basic_consume(callback, queue='hello')  # ���������ack

���ack���callback:

def callback(ch, method, properties, body):    print " [x] Received %r" % (body,)    time.sleep( body.count('.') )    print " [x] Done"    ch.basic_ack(delivery_tag = method.delivery_tag)  # ������ack

���������������

���������������RabbitMQ������������������������������������������������������������������������

(������������������������������������������

channel.queue_declare(queue='task_queue', durable=True)

������������������������������������������������������������������

channel.basic_publish(exchange='',routing_key="task_queue",body=message,properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))

������������������RabbitMQ���������������������������������������������������������������������������RabbitMQ���������������������������������������������������������������������������������������������������������publisher confirm���

���������������������

��������������������������������������������������������������������������������������������������������������������������������� ������������������������������������������������������������������������������������ack���������RabbitMQ���������������������������������������������������������������������������

channel.basic_qos(prefetch_count=1)

������RabbitMQ���������������������������������������ack������������������������������������������

������

������������������������������������������������������������������������������������������������������������������������������������

exchange

���������������������������������������������������������������������������������������������������������������������������������������������������������exchange������exchange���������������������������������������������������������������������������������������exchange������������������������������������������������������������������������������������������������������������������������������������������������������exchange���direct���topic���headers���fanout���������������������������������fanout������������������������������exchange��������� '' ���������default exchange���

channel.exchange_declare(exchange='logs', type='fanout')  # ���exchange������������������������������������������������

������������

result = channel.queue_declare()  # ������������������������result = channel.queue_declare(exclusive=True)  # ���������������������������������������������������������������������������������queue_name = result.method.queue

������result.method.queue���������������������������������������������������������

������exchange ��� ������

channel.queue_bind(exchange='logs',               queue='hello')

logs���������������������hello���������������

��������������������������������������� logs exchange

channel.basic_publish(exchange='logs',                  routing_key='',                  body=message)

������

���������������������bind������������exchange���queue���������������������������������exchange������������������������bind������������������routing_key���������

������direct exchange

���������routing key������������������������������routing key������������

channel.exchange_declare(exchange='direct_logs',                     type='direct')

���������������������������severity������������

channel.basic_publish(exchange='direct_logs',                  routing_key=severity,                  body=message)

���������������������������severity������

channel.queue_bind(exchange='direct_logs',                   queue=queue_name,                   routing_key=severity)

������topic exchange

���������������direct exchange ������������������routing key������������������������������.������routing key���topic exchange������������

"stock.usd.nyse" "nyse.vmw"

���direct exchange������������������������������������key���������������������routing key������������������������������������������

* ������1���������# ������0������������������

������������������������routing key������3���������������������celerity.colour.species���

Q1:*.orange.*  ���������������������colour������orange���Q2:*.*.rabbit  ���������������������������species���rabbit���lazy.#      ���������������������������lazy���

qucik.orange.rabbit Q1 Q2������������������quick.orange.fox ������Q1���������������������lazy.pink.rabbit������������������Q2���������������������������������������������������������������#���������������������������

RPC

���������������������������������������������������������

������������������������������������������������������������������������������������������

self.connection = pika.BlockingConnection(pika.ConnectionParameters(            host='localhost'))    self.channel = self.connection.channel()    result = self.channel.queue_declare(exclusive=True)    self.callback_queue = result.method.queue    self.channel.basic_consume(self.on_response, no_ack=True,                               queue=self.callback_queue)

���������������������rpc���������������������reply_to���������������������correlation_id������������������������������id������������������������������RPC���������������������������������������������������������������������������������������������������������������������������correlation_id���������������������������������������������������������������������������

���������������������������������correlation_id������������������������������������������--������������������������������������������ack������������������������������������������������������������������������������������������������������������������������������������������

channel.basic_publish(exchange='',                       routing_key='rpc_queue',                       properties=pika.BasicProperties(                             reply_to = self.callback_queue,                             correlation_id = self.corr_id,                             ),                       body=str(n))  # ������������while self.response is None:  # ���������������������������    self.connection.process_data_events()  # ������������������return int(self.response)

3���������������������rpc_queue������

4���RPC������������rpc_queue���������������������������������

channel.basic_consume(on_request, queue='rpc_queue')  # ������ ������������# ���������������ch.basic_publish(exchange='',                 routing_key=props.reply_to,                 properties=pika.BasicProperties(correlation_id = \                                                     props.correlation_id),                 body=str(response))  # ���������������������������ch.basic_ack(delivery_tag = method.delivery_tag)  # ������ack

5���������������������������������������������������correlation_id���������������������

if self.corr_id == props.correlation_id:        self.response = body

 

 

上一篇:第三方支付(支付宝)
下一篇:单例模式4种方式

发表评论

最新留言

很好
[***.229.124.182]2025年04月17日 04时01分42秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章