bitcycle
bitcycle

Reputation: 7812

How to prevent duplicate values in a shared queue

A producer thread queries a data store and puts objects into a queue. Each consumer thread(s) will then pull an object off of the shared queue and do a very long call to an external service. When the call returns, the consumer marks the object as having been completed.

My problem is that I basically have to wait until the queue is empty before the producer can add to it again, or else I risk getting duplicates being sent through.

[edit] Someone asked a good question over IRC and I figured I would add the answer here. The question was, "Why do your producers produce duplicates?" The answer is basically that the producer produces duplicates because we don't track a "sending" state of each object, only "sent" or "unsent".

Is there a way that I can check for duplicates in the queue?

Upvotes: 3

Views: 5726

Answers (4)

ravi
ravi

Reputation: 1

import queue

class UniqueQueue(queue.Queue):
    def __init__(self):
        super().__init__()
        self.task_set = set()

    def put(self, task, block=True, timeout=None):
        if task not in self.task_set:
            super().put(task, block, timeout)
            self.task_set.add(task)
        else:
            print(f"Task '{task}' already exists in the queue.")

# Example usage:
my_queue = UniqueQueue()

my_queue.put("Task1")
my_queue.put("Task2")
my_queue.put("Task1")  # This task will not be added to the queue

# Display the queue contents
while not my_queue.empty():
    print(my_queue.get())

Upvotes: 0

steveha
steveha

Reputation: 76765

It seems to me like it's not really a problem to have duplicate objects in the queue; you just want to make sure you only do the processing once per object.

EDIT: I originally suggested using a set or OrderedDict to keep track of the objects, but Python has a perfect solution: functools.lru_cache

Use @functools.lru_cache as a decorator on your worker function, and it will manage a cache for you. You can set a maximum size, and it will not grow beyond that size. If you use an ordinary set and don't manage it, it could grow to very large size and slow down your workers.

If you are using multiple worker processes instead of threads, you would need a solution that works across processes. Instead of a set or an lru_cache you could use a shared dict where the key is the unique ID value you use to detect duplicates, and the value is a timestamp for when the object went into the dict; then from time to time you could delete the really old entries in the dict. Here's a StackOverflow answer about shared dict objects:

multiprocessing: How do I share a dict among multiple processes?

And the rest of my original answer follows:

If so, I suggest you have the consumer thread(s) use a set to keep track of objects that have been seen. If an object is not in the set, add it and process it; if it is in the set, ignore it as a duplicate.

If this will be a long-running system, instead of a set, use an OrderedDict to track seen objects. Then from time to time clean out the oldest entries in the OrderedDict.

Upvotes: 4

rocksportrocker
rocksportrocker

Reputation: 7429

If you talk about the classes in the Queue module: following the API there is no way to detect if a queue contains a given object.

Upvotes: 1

dlawrence
dlawrence

Reputation: 1655

What do you mean by mark the object as having been completed? Do you leave the object in the queue and change a flag? Or do you mean you mark the object as having been completed in the data store. If the former, how does the queue ever become empty? If the latter, why not remove the object from the queue before you start processing?

Assuming you want to be able to handle cases where the processing fails without losing data, one approach would be to create a separate work queue and processing queue. Then, when a consumer pulls a job from the work queue, they move it to the processing queue and start the long running call to an external service. When that returns, it can mark the data complete and remove it from the processing queue. If you add a field for when the data was put into the processing queue, you could potentially run a periodic job that checks for processing jobs that exceed a certain time and attempt to reprocess them (updating the timestamp before restarting).

Upvotes: 0

Related Questions