Reputation: 702
I have a thread A which permanently listens for events. When an event for a particular resource R1 arrives, it starts thread B and passes the job to B for processing. Thread A then continues to listen, while B waits for a job, receives the job from thread A and processes it. Additional events for resources R1 are also sent to thread B (placed in a queue for thread B). Events for resources R2, R3, and so on are treated similarly, a new thread is started for each unique resource, ie. thread C for R2, thread D for R3, and so on. The nature of the events is peaky for a particular resource, followed by long periods of nothing, hence thread A starts thread B and when B is finished with the job, it waits for another job from A and if no job arrives, it joins. Because thread B may still be waiting after completing a job from a previous event, thread A checks if B is alive before passing it to the current job (it places it in a queue). If it is still alive, A just passes B the job, if it is not it starts thread B, again and then passes it the job. To ensure serialization of events for a particular resource, only one thread for each resource is started (otherwise this would be trivial, just start a new thread for every event)
Now, here is the problem: there is a small but finite time when thread B has just timed out waiting for a job and will join, but has not joined, yet. If thread A checks if thread B is alive during that short time, thread A will see that B is alive and send it a job, but B will not process it because it is no longer awaiting jobs - instead it is in the process of joining. Hence the job is not processed. This can be simulated by inserting a sleep statement as the last line of code in thread B.
How can I ensure that when thread A checks that thread B is alive and is waiting for a job, and not currently joining? I have considered using a lock but acquiring a lock also takes time, even if that time is very small.
Upvotes: 0
Views: 912
Reputation: 2423
The shared state between thread A and thread B looks like this:
lock = Lock()
queue = list()
live = False
In thread A, when an event comes in:
with lock:
if not live:
spawn_thread_b()
live = True
queue.append(event)
In thread B, the :
lock.acquire()
while len(queue) > 0:
events, queue = queue, list() # Swap 'queue' with a new empty list
lock.release()
for event in events:
process(event)
lock.acquire()
live = False
lock.release()
With this implementation, it is possible to have a thread B that is joining while a new thread B is spawned. But it seems to me that this is unavoidable and is actually not an issue because those two threads will not interfere (thanks to the lock).
An alternative solution is to simply keep thread B alive. When no events come in, it will simply wait. Have a look at condition variables to implement the waiting for new events.
If you have a huge number of resources and expect the events to be sparsely spread over these resources, then I understand your desire to not keep the threads alive indefinitely.
If you have only a limited number of resources, then maybe keeping threads alive is a more straight-forward approach to tackling your problem. The continuous thread lifetime management is a hassle and may even impact performance in some scenarios.
Upvotes: 1
Reputation: 702
I think the following approach would work and doesn't require any locks (I used a statemodel to get my head around it)
Thread B has three states: alive
, zombie
, and dead
. alive
has two substates, ready
and processing
, zombie
also has two substates, draining
, and drained
. The key to the whole thing is to realize that nothing can stop thread B from becoming a zombie because it will simply time out independently from thread A. So there must be a mechanism for thread A to check if thread B has become a zombie and not send it any more jobs. Here is how I think it could work: once thread B is a zombie (ie it has timed out waiting for a job), it will definitely join so thread A must not send it any more jobs. So when thread B times out it must update its state to zombie
, check if there are any more jobs (that's the substate zombie::draining
) that thread A may have put just after thread B timed out, churn through them and when there are no more jobs enter substate zombie::drained
and join. At that point its superstate will change to dead
, ie the is_alive
method will return False
.
So thread B starts its life in the state alive::ready
when thread A creates it. It then waits for a job from thread A and when a job arrives it enters state alive::processing
. It processes it and returns to state alive::ready
. That continues until eventually thread A does not have any more jobs within the the timeout period, thread B then times out and enters state zombie
.
Now, every time thread A receives and event, it checks the state of thread B: if it is in state dead
(or has never been started), it starts it. If it is in state alive
it simply sends it the job. Finally, if it is in state zombie
it stops sending it jobs and waits (polls) until it is in state dead
, which means it has completed all pending jobs. Thread A can now be certain that there is no running thread B so it simply starts a new instance of thread B and sends it the pending job(s).
The only downside I can think of in this model is that thread A must check thread B's state every time it gets an event for B and that thread A must wait for B to complete when it is in state zombie
and start a new instance of thread B. But the whole thing is very unlikely to occur in the first place because the time window is really small when thread A checks and places a job and thread B is practially simultaneously timing out.
Upvotes: 0