Reputation: 565
I am trying to publish rabbitmq message with following python script--
import findspark
findspark.init("/home/spark/spark-2.2.0")
from pyspark.sql import SparkSession
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue_durable', durable=True)
message="Hello_Hbase!"
channel.basic_publish(exchange='',routing_key='queue1',body=message)
print(" [x] Sent %r" % message)
connection.close()
next, this is the subscriber script. In this script I want to retrieve the message from routing_queue='queue1'
and want to store that message somewhere else..
import findspark
findspark.init("/home/spark/spark-2.2.0")
from pyspark.sql import SparkSession
import time
import pika
import happybase
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='queue1', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
connection = happybase.Connection(host='localhost', port=9090)
table = connection.table('blogpost')
print(connection.tables())
def callback(ch, method, body):
print(" [x] Received %r" % body)
time.sleep(body.count('.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
#return(body)
channel.basic_qos(prefetch_count=1)
body=channel.basic_consume( callback,queue='queue1')
print(body) ## here it is giving some encrypted msg, how to retrieve in original form
# here in the body,I am getting this - ctag1.587a9ab83301436195fc3f653c2f6db0
table.put('1', {'post:status': body})
print("hbase insertion done")
#channel.start_consuming()
Can someone let me know how to retrieve msg from rabbitmq queue in its original form?
Upvotes: 0
Views: 143
Reputation: 599490
basic_consume
doesn't return the body of the message.
That is accessible within the callback function, as the body
parameter, which you are already correctly using. You should do your database manipulation there too.
Upvotes: 0