Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Building a Resilient Task Queue System 📨 with RabbitMQ, Flask & Python 🐍
Apr 22, 2025
626 views
Written by Prashant Basnet
👋 Welcome to my Signature, a space between logic and curiosity.
I’m a Software Development Engineer who loves turning ideas into systems that work beautifully.
This space captures the process: the bugs, breakthroughs, and “aha” moments that keep me building.
🐇 What is RabbitMQ?
In real world systems, services fail, crash, restart, or scale dynamically. You can't afford tight coupling between them.
📥 What is a Message Queue?
✅ Great for:
It’s used in:
🐳 Running RabbitMQ with Docker
We used Docker to spin up RabbitMQ locally:
This exposes:
🧰 What is Pika?
With Pika, you can:
We installed it using:
⚙️ Environment Setup
We used a Python virtual environment to manage dependencies:
💻 We Built Two Services
🧑🍳 Producer – Flask App
📄 Code:
@app.route('/submit', methods=['POST'])def submit_task(): data = request.json correlation_id = str(uuid.uuid4()) connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST)) channel = connection.channel() channel.queue_declare(queue=REQUEST_QUEUE) channel.queue_declare(queue=REPLY_QUEUE) channel.basic_publish( exchange='', routing_key=REQUEST_QUEUE, body=json.dumps(data), properties=pika.BasicProperties( reply_to=REPLY_QUEUE, correlation_id=correlation_id ) ) connection.close() return jsonify({"message": "Task submitted", "correlation_id": correlation_id}), 200@app.route('/result/<correlation_id>', methods=['GET'])def get_result(correlation_id): connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST)) channel = connection.channel() channel.queue_declare(queue=REPLY_QUEUE) method_frame, properties, body = channel.basic_get(REPLY_QUEUE, auto_ack=False) while method_frame: if properties.correlation_id == correlation_id: channel.basic_ack(method_frame.delivery_tag) connection.close() return jsonify({"correlation_id": correlation_id, "result": json.loads(body)}), 200 else: channel.basic_nack(method_frame.delivery_tag, requeue=True) method_frame, properties, body = channel.basic_get(REPLY_QUEUE, auto_ack=False) connection.close() return jsonify({"message": "Result not ready"}), 404👷 Consumer – Worker Script
📄 Code:
def callback(ch, method, properties, body): print("Received task:", body) data = json.loads(body) a = data.get('a', 0) b = data.get('b', 0) c = data.get('c', 0) result = a + b + c response = json.dumps({"result": result}) ch.basic_publish( exchange='', routing_key=REPLY_QUEUE, body=response, properties=pika.BasicProperties( correlation_id=properties.correlation_id ) ) ch.basic_ack(delivery_tag=method.delivery_tag) connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST)) channel = connection.channel() channel.queue_declare(queue=REQUEST_QUEUE) channel.queue_declare(queue=REPLY_QUEUE) channel.basic_consume(queue=REQUEST_QUEUE, on_message_callback=callback) print("Waiting for messages...") channel.start_consuming()🤕 What Happens if Services Crash?
❌ If the Producer Crashes:
❌ If the Consumer Crashes:
🧪 Example Usage
curl -X POST http://localhost:5050/submit \ -H "Content-Type: application/json" \ -d '{"a": 5, "b": 10, "c": 2}'Result:
Even when the consumer goes down you can still produce the task and queue it:
curl -X POST http://localhost:5050/submit -H "Content-Type: application/json" -d '{"a": 1, "b": 2, "c": 3}'curl -X POST http://localhost:5050/submit -H "Content-Type: application/json" -d '{"a": 10, "b": 5, "c": 6}'