Akhil Nadh PC
Akhil Nadh PC

Reputation: 614

Understanding implementation of parallel programming via threading

enter image description here

Scenarion

  1. Sensor is continuously sending data in an interval of 100 milliseconds ( time needs to be configurable)
  2. One Thread read the data continuously from sensor and write it to a common queue
  3. This process is continuous until keyboard interrupt press happens
  4. Thread 2 locks queue, ( may momentarily block Thread1)
  5. Read full data from queue to temp structure
  6. Release the queue
  7. process the data in it. It is a computational task. While performing this task. Thread 1 should keep on filling the buffer with sensor data.

I have read about threading and GIL, so step 7 cannot afford to have any loss in data sent by the sensor while performing the computational process() on thread 2.

How this can be implemented using Python?

What I started with it is

import queue
from threading import Thread
import queue
from queue import Queue
q = Queue(maxsize=10)
def fun1():
    
    fun2Thread = Thread(target=fun2)
    fun2Thread.start()
    while True:
      try:
        q.put(1)  
      except KeyboardInterrupt:
        print("Key Interrupt")
    fun2Thread.join()
def fun2():
    print(q.get())
def read():
    fun1Thread = Thread(target=fun1)
    fun1Thread.start()
    fun1Thread.join()
read()

The issue I'm facing in this is the terminal is stuck after printing 1. Can someone please guide me on how to implement this scenario?

Upvotes: 0

Views: 60

Answers (1)

Adon Bilivit
Adon Bilivit

Reputation: 26976

Here's an example that may help.

We have a main program (driver), a client and a server. The main program manages queue construction and the starting and ending of the subprocesses.

The client sends a range of values via a queue to the client. When the range is exhausted it tells the server to terminate. There's a delay (sleep) in enqueueing the data for demonstration purposes.

Try running it once without any interrupt and note how everything terminates nicely. Then run again and interrupt (Ctrl-C) and again note a clean termination.

from multiprocessing import Queue, Process
from signal import signal, SIGINT, SIG_IGN
from time import sleep


def client(q, default):
    signal(SIGINT, default)
    try:
        for i in range(10):
            sleep(0.5)
            q.put(i)
    except KeyboardInterrupt:
        pass
    finally:
        q.put(-1)


def server(q):
    while (v := q.get()) != -1:
        print(v)


def main():
    q = Queue()
    default = signal(SIGINT, SIG_IGN)
    (server_p := Process(target=server, args=(q,))).start()
    (client_p := Process(target=client, args=(q, default))).start()
    client_p.join()
    server_p.join()


if __name__ == '__main__':
    main()

EDIT:

Edited to ensure that the server process continues to drain the queue if the client is terminated due to a KeyboardInterrupt (SIGINT)

Upvotes: 1

Related Questions