baumannalexj
baumannalexj

Reputation: 875

Is there a way to isolate redis-py pipelines so they are not prematurely flushed by async workers?

Is there a way to isolate pipelines between jobs or threads?

My Redis wrapper has 2 methods:

def queue_redis_message_on_pipeline(message, pipeline):
     pipe = pipeline or get_websocket_redis_client().pipeline()
     ...
     pipe.publish(message)

def execute_redis_pipeline(pipeline):
    pipe = pipeline or get_websocket_redis_client().pipeline()
    pipe.execute

It's all the same pipeline singleton. There are async workers that are using the same pipeline but may either A. flush the pipeline prematurely B. compound the pipeline and cause a network connection Out Of Memory error. Any suggestions?

Upvotes: 0

Views: 26

Answers (1)

ITM_Coder
ITM_Coder

Reputation: 255

Create a separate pipeline for each thread or worker using threading.local():

import threading

thread_local = threading.local()

def get_thread_local_pipeline():
    if not hasattr(thread_local, 'pipeline'):
        thread_local.pipeline = get_websocket_redis_client().pipeline()
    return thread_local.pipeline

def queue_redis_message_on_pipeline(message):
    pipe = get_thread_local_pipeline()
    #...
    pipe.publish(message)

def execute_redis_pipeline():
    pipe = get_thread_local_pipeline()
    pipe.execute()

Upvotes: 0

Related Questions