From 8dc3e60f45de26154d997524c3487d939fcbbcca Mon Sep 17 00:00:00 2001 From: "angelinatingaeva@yandex.ru" Date: Mon, 12 Feb 2024 04:32:00 +0500 Subject: [PATCH] refactor(rabbitmq) create exchange and queues with tutorial, and it works --- rabbitmq/log_direct.py | 21 ++++++++++++++ rabbitmq/receive_log_direct.py | 52 ++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 rabbitmq/log_direct.py create mode 100644 rabbitmq/receive_log_direct.py diff --git a/rabbitmq/log_direct.py b/rabbitmq/log_direct.py new file mode 100644 index 0000000..94cd05a --- /dev/null +++ b/rabbitmq/log_direct.py @@ -0,0 +1,21 @@ +import sys + +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host="localhost"), +) +channel = connection.channel() + +channel.exchange_declare(exchange="direct_logs", exchange_type="direct") + +severity = sys.argv[1] if len(sys.argv) > 2 else "info" +message = " ".join(sys.argv[2:]) or "Hello World!" +channel.basic_publish( + exchange="direct_logs", + routing_key=severity, + body=message, +) +print(f" [x] Sent {severity}:{message}") + +connection.close() \ No newline at end of file diff --git a/rabbitmq/receive_log_direct.py b/rabbitmq/receive_log_direct.py new file mode 100644 index 0000000..9a0c607 --- /dev/null +++ b/rabbitmq/receive_log_direct.py @@ -0,0 +1,52 @@ +import os +import sys + +import pika + + +def main(): + connection = pika.BlockingConnection( + pika.ConnectionParameters(host="localhost"), + ) + channel = connection.channel() + + channel.exchange_declare(exchange="direct_logs", exchange_type="direct") + + result = channel.queue_declare(queue="", exclusive=True) + queue_name = result.method.queue + + severities = sys.argv[1:] + if not severities: + sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) + sys.exit(1) + + for severity in severities: + channel.queue_bind( + exchange="direct_logs", + queue=queue_name, + routing_key=severity, + ) + + print(" [*] Waiting for logs. To exit press CTRL+C") + + def callback(ch, method, properties, body): + print(f" [x] {method.routing_key}:{body.decode()}") + + channel.basic_consume( + queue=queue_name, + on_message_callback=callback, + auto_ack=True, + ) + + channel.start_consuming() + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Interrupted") + try: + sys.exit(0) + except SystemExit: + os._exit(0) \ No newline at end of file