andy
andy

Reputation: 565

How rabbitmq internally stores message and how to retrieve back in original form?

I am trying to publish rabbitmq message with following python script--

import findspark
findspark.init("/home/spark/spark-2.2.0")
from pyspark.sql import SparkSession
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue_durable', durable=True)
message="Hello_Hbase!"
channel.basic_publish(exchange='',routing_key='queue1',body=message)
print(" [x] Sent %r" % message)
connection.close()

next, this is the subscriber script. In this script I want to retrieve the message from routing_queue='queue1' and want to store that message somewhere else..

import findspark
findspark.init("/home/spark/spark-2.2.0")
from pyspark.sql import SparkSession

import time
import pika
import happybase
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='queue1', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

connection = happybase.Connection(host='localhost', port=9090)
table = connection.table('blogpost')
print(connection.tables())

    def callback(ch, method, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count('.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)
        #return(body)

channel.basic_qos(prefetch_count=1)
body=channel.basic_consume( callback,queue='queue1') 
print(body) ## here it is giving some encrypted msg, how to retrieve in original form
# here in the body,I am getting this - ctag1.587a9ab83301436195fc3f653c2f6db0 
table.put('1', {'post:status': body})
print("hbase insertion done")
#channel.start_consuming()

Can someone let me know how to retrieve msg from rabbitmq queue in its original form?

Upvotes: 0

Views: 143

Answers (1)

Daniel Roseman
Daniel Roseman

Reputation: 599490

basic_consume doesn't return the body of the message.

That is accessible within the callback function, as the body parameter, which you are already correctly using. You should do your database manipulation there too.

Upvotes: 0

Related Questions