From 3730ca7828453b780f2bf1ef1c1fda4935add900 Mon Sep 17 00:00:00 2001 From: "angelinatingaeva@yandex.ru" Date: Mon, 12 Feb 2024 01:46:31 +0500 Subject: [PATCH] refactor(rabbitmq): send tasks from one queue to some workers --- rabbitmq/send_message.py | 23 +++++++++++++++++++++++ rabbitmq/worker.py | 24 ++++++++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 rabbitmq/send_message.py create mode 100644 rabbitmq/worker.py diff --git a/rabbitmq/send_message.py b/rabbitmq/send_message.py new file mode 100644 index 0000000..67bbc92 --- /dev/null +++ b/rabbitmq/send_message.py @@ -0,0 +1,23 @@ +import sys + +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host="localhost"), +) +channel = connection.channel() + +channel.queue_declare(queue="task_queue", durable=True) + +message = " ".join(sys.argv[1:]) or "Hello World!" +channel.basic_publish( + exchange="", + routing_key="task_queue", + body=message, + properties=pika.BasicProperties( + delivery_mode=pika.DeliveryMode.Persistent, + ), +) +print(f" [x] Sent {message}") + +connection.close() diff --git a/rabbitmq/worker.py b/rabbitmq/worker.py new file mode 100644 index 0000000..e64b1c8 --- /dev/null +++ b/rabbitmq/worker.py @@ -0,0 +1,24 @@ +import time + +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host="localhost"), +) +channel = connection.channel() + +channel.queue_declare(queue="task_queue", durable=True) +print(" [*] Waiting for messages. To exit press CTRL+C") + + +def callback(ch, method, properties, body): + print(f" [x] Received {body.decode()}") + time.sleep(body.count(b".")) + print(" [x] Done") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +channel.basic_qos(prefetch_count=1) +channel.basic_consume(queue="task_queue", on_message_callback=callback) + +channel.start_consuming() \ No newline at end of file