Reputation: 864
I have a Flask web-server which generates server-sent-events (sse) which should be received by all connected web-clients.
In "Version 1" below this works. All web-clients receive the events, and update accordingly.
In "Version 2" below, which is a refactoring of Version 1, this no longer works as expected:
Instead I get:
As far as I can make out, the server is always generating the events, and normally at least one client is receiving.
My initial test hosted the web-server on a Raspberry Pi 3, with the web-clients on the Pi, on Windows and OSX using a variety of browsers.
To eliminate any possible network issues I repeated the same test with the web-server and 3 instances of Chrome all hosted on the same OSX laptop. This gave the same results: Version 1 "OK", Version 2 "NOT OK".
The client that successfully receives seemingly varies randomly from event to event: so far I can't discern a pattern.
Both Version 1 and Version 2 have a structure change_objects
containing "things that should be tracked for changes"
In Version 1 change_objects
is a dict of dicts.
In Version 2 I refactored change_objects
to be a list of instances of the class Reporter
, or sub classes of Reporter
.
The changes to the "things" are triggered based on web-services received elsewhere in the code.
def check_walk(walk_new, walk_old):
if walk_new != walk_old:
print("walk change", walk_old, walk_new)
return True, walk_new
else:
return False, walk_old
def walk_event(walk):
silliness = walk['silliness']
data = '{{"type": "walk_change", "silliness": {}}}'.format(silliness)
return "data: {}\n\n".format(data)
change_objects = {
"walk1": {
"object": walks[0],
"checker": check_walk,
"event": walk_event,
},
... more things to be tracked...
}
def event_stream(change_objects):
copies = {}
for key, value in change_objects.items():
copies[key] = {"obj_old": deepcopy(value["object"])} # ensure a true copy, not a reference!
while True:
gevent.sleep(0.5)
for key, value in change_objects.items():
obj_new = deepcopy(value["object"]) # use same version in check and yield functions
obj_changed, copies[key]["obj_old"] = value["checker"](obj_new, copies[key]["obj_old"])
if (obj_changed):
yield value["event"](obj_new)
@app.route('/server_events')
def sse_request():
return Response(
event_stream(change_objects),
mimetype='text/event-stream')
class Reporter:
def __init__(self, reportee, name):
self._setup(reportee, name)
def _setup(self, reportee, name):
self.old = self.truecopy(reportee)
self.new = reportee
self.name = "{}_change".format(name)
def truecopy(self, orig):
return deepcopy(orig)
def changed(self):
if self.new != self.old:
self.old = self.truecopy(self.new)
return True
else:
return False
def sse_event(self):
data = self.new.copy()
data['type'] = self.name
data = json.dumps(data)
return "data: {}\n\n".format(data)
class WalkReporter(Reporter):
# as we are only interested in changes to attribute "silliness" (not other attributes) --> override superclass sse_event
def sse_event(self):
silliness = self.new['silliness']
data = '{{"type": "walk_change", "silliness": {}}}'.format(silliness)
return "data: {}\n\n".format(data)
change_objects = [
WalkReporter(name="walk1", reportee=walks[0]),
... more objects to be tracked...
]
def event_stream(change_objects):
while True:
gevent.sleep(0.5)
for obj in change_objects:
if obj.changed():
yield obj.sse_event()
@app.route('/server_events')
def sse_request():
return Response(
event_stream(change_objects),
mimetype='text/event-stream')
Full disclosure: This question is a follow on to the question: Refactor a (multi)generator python function
which focussed on refactoring the event_stream()
function when tracking changes to multiple "things".
However the problem here is clearly outside the scope of the original question, hence a new one.
Upvotes: 0
Views: 951
Reputation: 864
The refactored "Version 2" code in the question suffers from a concurrency / timing problem.
sse_request()
is called for each of the web-clients (in the test case 3 instances). We thus have 3 instances looping in event_stream()
.
These calls happen "more or less" in parallel: which actually means in random sequence.
However the list change_objects
is shared, so the first web-client that spots a change will update the "old" copy in the shared WalkReporter
instance to the latest state, and may do so before the other clients spot the change. i.e. the first successful web-client effectively hides the change from the other web-clients.
This is easily fixed, by giving each web-client its own copy of change_objects
.
i.e. change_objects
is moved into sse_request()
as shown below.
@app.route('/server_events')
def sse_request():
change_objects = [
WalkReporter(name="walk1", reportee=walks[0]),
... more objects to be tracked...
]
return Response(
event_stream(change_objects),
mimetype='text/event-stream')
With this minor change, each instance of sse_request()
can spot the changes, and thus all the web-clients receive the sse-events as expected.
Upvotes: 1