Rabbitmq pika on multi process and multi threaded architecture

I have the following scenario. I have a

  1. python based web server (which publishes) - (Single process and multi threaded)
  2. Scheduled job (which publishes) (Single process and multi threaded to run different jobs)
  3. Consumer from rabbitmq queue (Subscribes to rabbitmq topic) (Single process and multi threaded to consume different messages)

I am currently trying to use rabbitmq for the above stack. So for the above case I want to create single rabbitmq connection for each process and use multiple channels for supporting multi threaded tasks. The Rabbitmq documentation says its fine to use multiple channels to support multiple threads. But pika library doesn't seem to support this scenario. You can refer the following example I have tried

import pika
import threading
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))


def test_thread(a: int):
    channel = connection.channel()
    channel.exchange_declare(exchange='normal_ex', exchange_type='topic')
    channel.basic_publish(exchange='normal_ex', routing_key='test', body=str(a))


for i in range(0, 10):
    t = threading.Thread(target=test_thread, args=[i])
    t.start()


time.sleep(10)
connection.close()

When I run the above program which uses multiple channels across multiple threads I am getting the following errors

Stream connection lost: AssertionError(('_AsyncTransportBase._produce() tx buffer size underflow', -21, 1))

Upvotes: 1

Views: 2508

Answers (1)

Gabriele Santomaggio
Gabriele Santomaggio

Reputation: 22750

from https://pika.readthedocs.io/en/stable/faq.html#frequently-asked-questions

Is Pika thread safe?

Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method add_callback_threadsafe from another thread to schedule a callback within an active pika connection.

Upvotes: 1

Related Questions