Daniel Chepenko
Daniel Chepenko

Reputation: 2268

Python multithreading with shared variable

I'm trying to parallelise my job, but I'm new to multithreading, so feel confused about the concrete implementation.

I have a socket listener, that saves data to a buffer. When buffer reaches his capacity I need to save its data to database. On one thread I want to start socket listener, while on parallel task I want to check the buffer status.

BufferQueue is just an extension of a python list, with method that allow to check whether the list has reached the specified size.

SocketManager is streaming data provider of a STREAM_URL I'm listening to. It use callback function to handle messages

But as I use callbacks to retrieve data I'm not sure that using shared variable is a right and optimal decision for that

buffer = BufferQueue(buffer_size=10000)

def start_listening_to_sokcet(client):
    s = SocketManager(client)
    s.start_socket(cb_new)
    s.start()

def cb_new(message):
    print("New message")
    global buffer
    for m in message:
        #save data to buffer

def is_buffer_ready(buffer):
    global buffer
    print("Buffer state")
    if buffer.ready():
         #save buffer data to db

I'm appreciate if you can help me with this case

Upvotes: 3

Views: 12209

Answers (2)

abarnert
abarnert

Reputation: 365895

I think all you’re looking for is the queue module.

A queue.Queue is a self-synchronized queue designed specifically for passing objects between threads.

By default, calling get on a queue will block until an object is available, which is what you usually want to do—the point of using threads for concurrency in a network app is that your threads all look like normal synchronous code, but spend most of their time waiting on a socket, a file, a queue, or whatever when they have nothing to do. But you can check without blocking by using block=False, or put a timeout on the wait.

You can also specify a maxsize when you construct the queue. Then, by default, put will block until the queue isn’t too full to accept the new object. But, again, you can use block or timeout to try and fail if it’s too full.

All synchronization is taken care of internally inside get and put, so you don’t need a Lock to guarantee thread safety or a Condition to signal waiters.

A queue can even take care of shutdown for you. The producer can just put a special value that tells the consumer to quit when it sees it on a get.

For graceful shutdown where the producer then needs to wait until the consumer has finished, you can use the optional task_done method after the consumer has finished processing each queued object, and have the producer block on the join method. But if you don’t need this—or or have another way to wait for shutdown, e.g., joining the consumer thread—you can skip this part.

Upvotes: 4

Chen A.
Chen A.

Reputation: 11318

Multithreading gives you shared state of resources (variables). Instead of using globals, just pass in the buffer as an argument to your methods, and read/write from/to it.

You still need to control access to the buffer resource, so both threads are not reading/writing at the same time. You can achieve that using Lock from the threading module:

lock = threading.Lock()

def cb_new(buffer_, lock_, message):
    print("New message")
    with lock_():
        for m in message:
            #save data to buffer
            buffer.add(m)

def is_buffer_ready(buffer_, lock_):
    print("Buffer state")
    with lock_():
        if buffer_.ready():
             #save buffer data to db

Note that in case you are working with multiprocessing instead of threads, this solution won't work.

By the way, as @abarnert commented, there are better mechanisms to check if the buffer is ready (has data to read / has free space to write) then calling a function that checks it. Check out select.select() which blocks you until the buffer is actually ready.


When working with select, you put the calls inside a while True loop, and then you check if the buffer is ready for reading. You can start this function in a thread, passing a flag variable and the buffer. If you want to stop the thread, change the flag you passed to False. For the buffer object, use Queue.Queue() or similar datastructure.

def read_select(flag, buff):
    flag = 1
    while flag:
        r, _, _ = select.select([buff], [], [])
        if r:
            data = s.read(BUFFSIZE)
            # process data

P.S - select also works with sockets. You can pass a socket object instead of a buffer, and it would check if the buffer on the socket is ready for read.

Upvotes: 2

Related Questions