Reputation: 6842
I am trying to write a simple "Hello World" kind of application using spark streaming and RabbitMq, in which Apache Spark Streaming will read message from RabbitMq via the RabbitMqReceiver and print it in the console. But some how I am not able to print the string read from Rabbit Mq into console. The spark streaming code is printing the message below:-
Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
Value Received BlockRDD[2] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
The message is sent to the rabbitmq via the simple code below:-
package helloWorld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "hello1";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World! is a code. Hi Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
I am trying to read messages via Apache Streaming as shown below:-
package rabbitmq.example;
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import com.stratio.receiver.RabbitMQUtils;
public class RabbitMqEx {
public static void main(String[] args) {
System.out.println("Creating Spark Configuration");
SparkConf conf = new SparkConf();
conf.setAppName("RabbitMq Receiver Example");
conf.setMaster("local[2]");
System.out.println("Retreiving Streaming Context from Spark Conf");
JavaStreamingContext streamCtx = new JavaStreamingContext(conf,
Durations.seconds(2));
Map<String, String>rabbitMqConParams = new HashMap<String, String>();
rabbitMqConParams.put("host", "localhost");
rabbitMqConParams.put("queueName", "hello1");
System.out.println("Trying to connect to RabbitMq");
JavaReceiverInputDStream<String> receiverStream = RabbitMQUtils.createJavaStream(streamCtx, rabbitMqConParams);
receiverStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> arg0) throws Exception {
System.out.println("Value Received " + arg0.toString());
return null;
}
} );
streamCtx.start();
streamCtx.awaitTermination();
}
}
The output console only has message like the following:-
Creating Spark Configuration
Retreiving Streaming Context from Spark Conf
Trying to connect to RabbitMq
Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
Value Received BlockRDD[2] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
In the logs I see the following:-
15/11/18 13:20:45 INFO SparkContext: Running Spark version 1.5.2
15/11/18 13:20:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/18 13:20:45 WARN Utils: Your hostname, jabong1143 resolves to a loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface wlan0)
15/11/18 13:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/11/18 13:20:45 INFO SecurityManager: Changing view acls to: jabong
15/11/18 13:20:45 INFO SecurityManager: Changing modify acls to: jabong
15/11/18 13:20:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jabong); users with modify permissions: Set(jabong)
15/11/18 13:20:46 INFO Slf4jLogger: Slf4jLogger started
15/11/18 13:20:46 INFO Remoting: Starting remoting
15/11/18 13:20:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42978]
15/11/18 13:20:46 INFO Utils: Successfully started service 'sparkDriver' on port 42978.
15/11/18 13:20:46 INFO SparkEnv: Registering MapOutputTracker
15/11/18 13:20:46 INFO SparkEnv: Registering BlockManagerMaster
15/11/18 13:20:46 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-9309b35f-a506-49dc-91ab-5c340cd3bdd1
15/11/18 13:20:46 INFO MemoryStore: MemoryStore started with capacity 947.7 MB
15/11/18 13:20:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-736f4b9c-764c-4b85-9b37-1cece102c95a/httpd-29196fa0-eb3f-4b7d-97ad-35c5325b09e5
15/11/18 13:20:46 INFO HttpServer: Starting HTTP Server
15/11/18 13:20:46 INFO Utils: Successfully started service 'HTTP file server' on port 37150.
15/11/18 13:20:46 INFO SparkEnv: Registering OutputCommitCoordinator
15/11/18 13:20:52 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/11/18 13:20:52 INFO SparkUI: Started SparkUI at http://192.168.1.3:4040
15/11/18 13:20:52 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/11/18 13:20:52 INFO Executor: Starting executor ID driver on host localhost
15/11/18 13:20:52 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 47306.
15/11/18 13:20:52 INFO NettyBlockTransferService: Server created on 47306
15/11/18 13:20:52 INFO BlockManagerMaster: Trying to register BlockManager
15/11/18 13:20:52 INFO BlockManagerMasterEndpoint: Registering block manager localhost:47306 with 947.7 MB RAM, BlockManagerId(driver, localhost, 47306)
15/11/18 13:20:52 INFO BlockManagerMaster: Registered BlockManager
Trying to connect to RabbitMq
15/11/18 13:20:53 INFO ReceiverTracker: Starting 1 receivers
15/11/18 13:20:53 INFO ReceiverTracker: ReceiverTracker started
15/11/18 13:20:53 INFO ForEachDStream: metadataCleanupDelay = -1
15/11/18 13:20:53 INFO RabbitMQInputDStream: metadataCleanupDelay = -1
15/11/18 13:20:53 INFO RabbitMQInputDStream: Slide time = 2000 ms
15/11/18 13:20:53 INFO RabbitMQInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/11/18 13:20:53 INFO RabbitMQInputDStream: Checkpoint interval = null
15/11/18 13:20:53 INFO RabbitMQInputDStream: Remember duration = 2000 ms
15/11/18 13:20:53 INFO RabbitMQInputDStream: Initialized and validated com.stratio.receiver.RabbitMQInputDStream@5d00adc2
15/11/18 13:20:53 INFO ForEachDStream: Slide time = 2000 ms
15/11/18 13:20:53 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/11/18 13:20:53 INFO ForEachDStream: Checkpoint interval = null
15/11/18 13:20:53 INFO ForEachDStream: Remember duration = 2000 ms
15/11/18 13:20:53 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@4c132773
15/11/18 13:20:53 INFO RecurringTimer: Started timer for JobGenerator at time 1447833054000
15/11/18 13:20:53 INFO JobGenerator: Started JobGenerator at 1447833054000 ms
15/11/18 13:20:53 INFO JobScheduler: Started JobScheduler
15/11/18 13:20:53 INFO StreamingContext: StreamingContext started
15/11/18 13:20:53 INFO DAGScheduler: Got job 0 (start at RabbitMqEx.java:38) with 1 output partitions
15/11/18 13:20:53 INFO DAGScheduler: Final stage: ResultStage 0(start at RabbitMqEx.java:38)
15/11/18 13:20:53 INFO ReceiverTracker: Receiver 0 started
15/11/18 13:20:53 INFO DAGScheduler: Parents of final stage: List()
15/11/18 13:20:53 INFO DAGScheduler: Missing parents: List()
15/11/18 13:20:53 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556), which has no missing parents
15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(46496) called with curMem=0, maxMem=993735475
15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 45.4 KB, free 947.7 MB)
15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(15206) called with curMem=46496, maxMem=993735475
15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.8 KB, free 947.6 MB)
15/11/18 13:20:53 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:47306 (size: 14.8 KB, free: 947.7 MB)
15/11/18 13:20:53 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
15/11/18 13:20:53 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556)
15/11/18 13:20:53 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/11/18 13:20:53 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, NODE_LOCAL, 2729 bytes)
15/11/18 13:20:53 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/11/18 13:20:53 INFO RecurringTimer: Started timer for BlockGenerator at time 1447833053800
15/11/18 13:20:53 INFO BlockGenerator: Started BlockGenerator
15/11/18 13:20:53 INFO BlockGenerator: Started block pushing thread
15/11/18 13:20:53 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.1.3:42978
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Starting receiver
15/11/18 13:20:53 INFO RabbitMQReceiver: Rabbit host addresses are :localhost
15/11/18 13:20:53 INFO RabbitMQReceiver: Address localhost
15/11/18 13:20:53 INFO RabbitMQReceiver: creating new connection and channel
15/11/18 13:20:53 INFO RabbitMQReceiver: No virtual host configured
15/11/18 13:20:53 INFO RabbitMQReceiver: created new connection and channel
15/11/18 13:20:53 INFO RabbitMQReceiver: onStart, Connecting..
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStart
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
15/11/18 13:20:53 INFO RabbitMQReceiver: declaring direct queue
15/11/18 13:20:53 ERROR RabbitMQReceiver: Got this unknown exception: java.io.IOException
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126)
at com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86)
at com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 5 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
at java.lang.Thread.run(Thread.java:745)
15/11/18 13:20:53 INFO RabbitMQReceiver: it has been stopped
15/11/18 13:20:53 ERROR RabbitMQReceiver: error on close channel, ignoring
15/11/18 13:20:53 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Trying to connect again
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Trying to connect again:
15/11/18 13:20:53 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing...
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStop
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Deregistering receiver 0
15/11/18 13:20:53 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopped receiver 0
15/11/18 13:20:54 INFO JobScheduler: Added jobs for time 1447833054000 ms
15/11/18 13:20:54 INFO JobScheduler: Starting job streaming job 1447833054000 ms.0 from job set of time 1447833054000 ms
Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
15/11/18 13:20:54 INFO JobScheduler: Finished job streaming job 1447833054000 ms.0 from job set of time 1447833054000 ms
15/11/18 13:20:54 INFO JobScheduler: Total delay: 0.031 s for time 1447833054000 ms (execution: 0.007 s)
15/11/18 13:20:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/11/18 13:20:54 INFO InputInfoTracker: remove old batch metadata:
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver again
15/11/18 13:20:55 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.1.3:42978
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver
15/11/18 13:20:55 INFO RabbitMQReceiver: Rabbit host addresses are :localhost
15/11/18 13:20:55 INFO RabbitMQReceiver: Address localhost
15/11/18 13:20:55 INFO RabbitMQReceiver: creating new connection and channel
15/11/18 13:20:55 INFO RabbitMQReceiver: No virtual host configured
15/11/18 13:20:55 INFO RabbitMQReceiver: created new connection and channel
15/11/18 13:20:55 INFO RabbitMQReceiver: onStart, Connecting..
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStart
15/11/18 13:20:55 INFO RabbitMQReceiver: declaring direct queue
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Receiver started again
15/11/18 13:20:55 ERROR RabbitMQReceiver: Got this unknown exception: java.io.IOException
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126)
at com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86)
at com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 5 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
at java.lang.Thread.run(Thread.java:745)
15/11/18 13:20:55 INFO RabbitMQReceiver: it has been stopped
15/11/18 13:20:55 ERROR RabbitMQReceiver: error on close channel, ignoring
15/11/18 13:20:55 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Trying to connect again
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Trying to connect again:
15/11/18 13:20:55 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing...
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStop
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Deregistering receiver 0
15/11/18 13:20:55 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopped receiver 0
15/11/18 13:20:56 INFO JobScheduler: Added jobs for time 1447833056000 ms
15/11/18 13:20:56 INFO JobScheduler: Starting job streaming job 1447833056000 ms.0 from job set of time 1447833056000 ms
Doing list_queues
list the following:-
sudo rabbitmqctl list_queues
Listing queues ...
hello1 2
I also printed the value of arg0.count
. It is reporting 0. It seems spark streaming is not able to read messages from rabbitmq.
However I can read from the queue using a simple java receiver as mentioned here.
Environment
Can some one let me know what is going wrong?
Upvotes: 3
Views: 13185
Reputation: 6842
This issue has been answered here by one of the authors of rabbitmq-receiver
. Just quoting it below:-
Hi,
You have declared the queue as non durable. The recevier expects to read from a durable queue. Try to declare it in this way:
channel.queueDeclare(QUEUE_NAME, false, true, false, null);
Also I found out declaring the queue as passive also works fine like below:-
channel.queueDeclare(QUEUE_NAME, true, false, false, null)
Upvotes: 9