refactor(rabbitmq) create exchange and queues with tutorial, and it works
This commit is contained in:
		
							
								
								
									
										21
									
								
								rabbitmq/log_direct.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								rabbitmq/log_direct.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,21 @@
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
import pika
 | 
			
		||||
 | 
			
		||||
connection = pika.BlockingConnection(
 | 
			
		||||
    pika.ConnectionParameters(host="localhost"),
 | 
			
		||||
)
 | 
			
		||||
channel = connection.channel()
 | 
			
		||||
 | 
			
		||||
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
 | 
			
		||||
 | 
			
		||||
severity = sys.argv[1] if len(sys.argv) > 2 else "info"
 | 
			
		||||
message = " ".join(sys.argv[2:]) or "Hello World!"
 | 
			
		||||
channel.basic_publish(
 | 
			
		||||
    exchange="direct_logs",
 | 
			
		||||
    routing_key=severity,
 | 
			
		||||
    body=message,
 | 
			
		||||
)
 | 
			
		||||
print(f" [x] Sent {severity}:{message}")
 | 
			
		||||
 | 
			
		||||
connection.close()
 | 
			
		||||
							
								
								
									
										52
									
								
								rabbitmq/receive_log_direct.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										52
									
								
								rabbitmq/receive_log_direct.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,52 @@
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
import pika
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
    connection = pika.BlockingConnection(
 | 
			
		||||
        pika.ConnectionParameters(host="localhost"),
 | 
			
		||||
    )
 | 
			
		||||
    channel = connection.channel()
 | 
			
		||||
 | 
			
		||||
    channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
 | 
			
		||||
 | 
			
		||||
    result = channel.queue_declare(queue="", exclusive=True)
 | 
			
		||||
    queue_name = result.method.queue
 | 
			
		||||
 | 
			
		||||
    severities = sys.argv[1:]
 | 
			
		||||
    if not severities:
 | 
			
		||||
        sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
 | 
			
		||||
    for severity in severities:
 | 
			
		||||
        channel.queue_bind(
 | 
			
		||||
            exchange="direct_logs",
 | 
			
		||||
            queue=queue_name,
 | 
			
		||||
            routing_key=severity,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    print(" [*] Waiting for logs. To exit press CTRL+C")
 | 
			
		||||
 | 
			
		||||
    def callback(ch, method, properties, body):
 | 
			
		||||
        print(f" [x] {method.routing_key}:{body.decode()}")
 | 
			
		||||
 | 
			
		||||
    channel.basic_consume(
 | 
			
		||||
        queue=queue_name,
 | 
			
		||||
        on_message_callback=callback,
 | 
			
		||||
        auto_ack=True,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    channel.start_consuming()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    try:
 | 
			
		||||
        main()
 | 
			
		||||
    except KeyboardInterrupt:
 | 
			
		||||
        print("Interrupted")
 | 
			
		||||
        try:
 | 
			
		||||
            sys.exit(0)
 | 
			
		||||
        except SystemExit:
 | 
			
		||||
            os._exit(0)
 | 
			
		||||
		Reference in New Issue
	
	Block a user