Reputation: 875
Is there a way to isolate pipelines between jobs or threads?
My Redis wrapper has 2 methods:
def queue_redis_message_on_pipeline(message, pipeline):
pipe = pipeline or get_websocket_redis_client().pipeline()
...
pipe.publish(message)
def execute_redis_pipeline(pipeline):
pipe = pipeline or get_websocket_redis_client().pipeline()
pipe.execute
It's all the same pipeline singleton. There are async workers that are using the same pipeline but may either A. flush the pipeline prematurely B. compound the pipeline and cause a network connection Out Of Memory error. Any suggestions?
Upvotes: 0
Views: 26
Reputation: 255
Create a separate pipeline for each thread or worker using threading.local():
import threading
thread_local = threading.local()
def get_thread_local_pipeline():
if not hasattr(thread_local, 'pipeline'):
thread_local.pipeline = get_websocket_redis_client().pipeline()
return thread_local.pipeline
def queue_redis_message_on_pipeline(message):
pipe = get_thread_local_pipeline()
#...
pipe.publish(message)
def execute_redis_pipeline():
pipe = get_thread_local_pipeline()
pipe.execute()
Upvotes: 0