ankit
ankit

Reputation: 295

Batching and queueing in a real-time webserver

Added a server designI need a webserver which routes the incoming requests to back-end workers by batching them every 0.5 second or when it has 50 http requests whichever happens earlier. What will be a good way to implement it in python/tornado or any other language?

What I am thinking is to publish the incoming requests to a rabbitMQ queue and then somehow batch them together before sending to the back-end servers. What I can't figure out is how to pick multiple requests from the rabbitMq queue. Could someone point me to right direction or suggest some alternate apporach?

Upvotes: 10

Views: 2025

Answers (1)

CasualDemon
CasualDemon

Reputation: 6160

I would suggest using a simple python micro web framework such as bottle. Then you would send the requests to a background process via a queue (thus allowing the connection to end).

The background process would then have a continuous loop that would check your conditions (time and number), and do the job once the condition is met.

Edit:

Here is an example webserver that batches the items before sending them to any queuing system you want to use (RabbitMQ always seemed overcomplicated to me with Python. I have used Celery and other simpler queuing systems before). That way the backend simply grabs a single 'item' from the queue, that will contain all required 50 requests.

import bottle
import threading
import Queue


app = bottle.Bottle()

app.queue = Queue.Queue()


def send_to_rabbitMQ(items):
    """Custom code to send to rabbitMQ system"""
    print("50 items gathered, sending to rabbitMQ")


def batcher(queue):
    """Background thread that gathers incoming requests"""
    while True:
        batcher_loop(queue)


def batcher_loop(queue):
    """Loop that will run until it gathers 50 items,
    then will call then function 'send_to_rabbitMQ'"""
    count = 0
    items = []
    while count < 50:
        try:
            next_item = queue.get(timeout=.5)
        except Queue.Empty:
            pass
        else:
            items.append(next_item)
            count += 1

    send_to_rabbitMQ(items)


@app.route("/add_request", method=["PUT", "POST"])
def add_request():
    """Simple bottle request that grabs JSON and puts it in the queue"""
    request = bottle.request.json['request']
    app.queue.put(request)


if __name__ == '__main__':
    t = threading.Thread(target=batcher, args=(app.queue, ))
    t.daemon = True  # Make sure the background thread quits when the program ends
    t.start()

    bottle.run(app)

Code used to test it:

import requests
import json

for i in range(101):
    req = requests.post("http://localhost:8080/add_request",
                        data=json.dumps({"request": 1}),
                        headers={"Content-type": "application/json"})

Upvotes: 1

Related Questions