Reputation: 393
I have the following scenario. I have a
I am currently trying to use rabbitmq for the above stack. So for the above case I want to create single rabbitmq connection for each process and use multiple channels for supporting multi threaded tasks. The Rabbitmq documentation says its fine to use multiple channels to support multiple threads. But pika library doesn't seem to support this scenario. You can refer the following example I have tried
import pika
import threading
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
def test_thread(a: int):
channel = connection.channel()
channel.exchange_declare(exchange='normal_ex', exchange_type='topic')
channel.basic_publish(exchange='normal_ex', routing_key='test', body=str(a))
for i in range(0, 10):
t = threading.Thread(target=test_thread, args=[i])
t.start()
time.sleep(10)
connection.close()
When I run the above program which uses multiple channels across multiple threads I am getting the following errors
Stream connection lost: AssertionError(('_AsyncTransportBase._produce() tx buffer size underflow', -21, 1))
Upvotes: 1
Views: 2508
Reputation: 22750
from https://pika.readthedocs.io/en/stable/faq.html#frequently-asked-questions
Is Pika thread safe?
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method add_callback_threadsafe from another thread to schedule a callback within an active pika connection.
Upvotes: 1