Reputation: 468
I'm building my first application using a microservice architecture. I'll be working mostly in Python using Flask.
I'm considering implementing an event/message bus to coordinate actions between services. A few services that I intend to implement are: Auth, Users, Posts, and Chat. The application has two entities ('User', and 'Group') that are used by almost every service. I have a separate database for each service, and each database has it's own users
and groups
tables to manage the user/group data specific to that service. Now, when I think about an event like creating a new user, each service will need to create a new entry in the users
table, which is why I'm considering using an event bus.
I read this post which discusses CQRS and using HTTP (REST) for external communication between services, while using an event bus for internal communication. Services process (HTTP) requests, and emit events about data changes (e.g. the creation of a new User by the Auth service). Other services consume the events which may trigger other processes (and more events).
Where I'm hung up is how to actually implement (in Python) a service which listens for both HTTP requests, and for new events in a set of subscribed channels. I get that you need to use a tool like redis/rabbitMQ, but is it possible to handle both types of requests in the same process, or do you need to run two servers (one for REST requests and the other for event handling)?
Also, if you have any comments on the general approach/architecture described above, I'm all ears.
Upvotes: 5
Views: 4959
Reputation: 468
So, after doing some more research and building a prototype, it is possible for a single server to listen for both HTTP requests and events from a message broker. However, it requires running two separate processes (one web server process to listen for HTTP, and one event process to listen to the message broker).
Here's the architecture I developed for my prototype:
The core modules (represented by the folder icon) represent the meat of a service, this is all of the code that actually changes data. The HTTP Server and the Event Worker both call methods from the core modules. Niether the HTTP Server or the Event Worker produce events, only the core modules produce events.
Here's a file structure:
Project
|-Foo
| |- foo.py
| |- web.py
| |- worker.py
| |- revent.py
|-Bar
| |- bar.py
| |- web.py
| |- worker.py
| |- revent.py
The web.py
files are simple flask apps:
# bar.py
from flask import Flask, request
from bar import Bar
app = Flask(__name__)
@app.route('/bar')
def bar():
return Bar.bar_action()
if __name__ == "__main__":
app.run(port=5001, debug=1)
For both the event worker and the core modules, I used a module revent.py
(redis + event) that I created. It consists of three classes:
Under the hood, this module is using redis streams. I'll paste the code for revent.py
below.
But first, here is a sample exmaple for bar.py
, which is called by the http server and worker to do work, and emits events about the work it's doing to the "bar" stream in redis.
# Bar/bar.py
from revent import Producer
import redis
class Bar():
ep = Producer("bar", host="localhost", port=6379, db=0)
@ep.event("update")
def bar_action(self, foo, **kwargs):
print("BAR ACTION")
#ep.send_event("update", {"test": str(True)})
return "BAR ACTION"
if __name__ == '__main__':
Bar().bar_action("test", test="True")
Finally, here's a sample worker that will listen for events on the "bar" stream Foo/worker.py
.
# Foo/worker.py
from revent import Worker
worker = Worker()
@worker.on('bar', "update")
def test(foo, test=False):
if bool(test) == False:
print('test')
else:
print('tested')
if __name__ == "__main__":
worker.listen(host='127.0.0.1', port=6379, db=0)
As promised, here's the code for the revent.py
module I built. It would probably be worth adding a more further developed version of this to pypl, but I am just using sym link to keep my two versions in sync.
# revent.py
import redis
from datetime import datetime
import functools
class Worker:
# streams = {
# "bar": {
# "update": Foo.foo_action
# },
# }
def __init__(self):
self._events = {}
def on(self, stream, action, **options):
"""
Wrapper to register a function to an event
"""
def decorator(func):
self.register_event(stream, action, func, **options)
return func
return decorator
def register_event(self, stream, action, func, **options):
"""
Map an event to a function
"""
if stream in self._events.keys():
self._events[stream][action] = func
else:
self._events[stream] = {action: func}
def listen(self, host, port, db):
"""
Main event loop
Establish redis connection from passed parameters
Wait for events from the specified streams
Dispatch to appropriate event handler
"""
self._r = redis.Redis(host=host, port=port, db=db)
streams = " ".join(self._events.keys())
while True:
event = self._r.xread({streams: "$"}, None, 0)
# Call function that is mapped to this event
self._dispatch(event)
def _dispatch(self, event):
"""
Call a function given an event
If the event has been registered, the registered function will be called with the passed params.
"""
e = Event(event=event)
if e.action in self._events[e.stream].keys():
func = self._events[e.stream][e.action]
print(f"{datetime.now()} - Stream: {e.stream} - {e.event_id}: {e.action} {e.data}")
return func(**e.data)
class Event():
"""
Abstraction for an event
"""
def __init__(self, stream="", action="", data={}, event=None):
self.stream = stream
self.action = action
self.data = data
self.event_id=None
if event:
self.parse_event(event)
def parse_event(self, event):
# event = [[b'bar', [(b'1594764770578-0', {b'action': b'update', b'test': b'True'})]]]
self.stream = event[0][0].decode('utf-8')
self.event_id = event[0][1][0][0].decode('utf-8')
self.data = event[0][1][0][1]
self.action = self.data.pop(b'action').decode('utf-8')
params = {}
for k, v in self.data.items():
params[k.decode('utf-8')] = v.decode('utf-8')
self.data = params
def publish(self, r):
body = {
"action": self.action
}
for k, v in self.data.items():
body[k] = v
r.xadd(self.stream, body)
class Producer:
"""
Abstraction for a service (module) that publishes events about itself
Manages stream information and can publish events
"""
# stream = None
# _r = redis.Redis(host="localhost", port=6379, db=0)
def __init__(self, stream_name, host, port, db):
self.stream = stream_name
self._r = redis.Redis(host="localhost", port=6379, db=0)
def send_event(self, action, data):
e = Event(stream=self.stream, action=action, data=data)
e.publish(self._r)
def event(self, action, data={}):
def decorator(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
result = func(*args, **kwargs)
arg_keys = func.__code__.co_varnames[1:-1]
for i in range(1, len(args)):
kwargs[arg_keys[i-1]] = args[i]
self.send_event(action, kwargs)
return result
return wrapped
return decorator
So, putting it all together. The foo.py
and bar.py
modules do the actual work of the Foo and Bar services respectively. Their methods are called by the HTTP server and the event worker to handle requests/events. In doing their work, these two modules emit events about their state changes so that other interested services can act accordingly. The HTTP server is just a normal web app using e.g. Flask. The event worker is similar in concept to a web server that listens for events in redis instead of http requests. Both of these processes (the web server and the event worker) need to run separately. So, if you're developing locally, you need to run them in different terminal windows or using a container/process orchestrator.
That was a lot. I hope it helps someone, let me know in the comments if you have questions.
I uploaded the revent.py file to pypi as a package -- redisevents. I'll add more documentation on how to use/extend it later this week.
Upvotes: 7