Reputation: 353
I am trying to integrate Kafka with Apache Spark Streaming, Here is code -
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
public class SampleSparkStraming {
public static void main(String[] args) {
//SparkContext context = new
SparkConf conf = new SparkConf().setAppName("SampleAPP").setMaster("spark://localhost:4040").set("spark.ui.port","4040");
//SparkConf conf = new SparkConf().setAppName("SampleAPP").setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(500000));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("TEST-Kafka");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
private static final long serialVersionUID = 1L;
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
private static final long serialVersionUID = 1L;
public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
}
});
}
});
//stream.print();
}
}
pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
Versions -
kafka_2.10-0.10.2.0
spark-2.1.0
OS - Windows 7
When i try with cmd consumer from Kafka, messages are getting consumed, but not working with other Program .. getting following exception -
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/04/10 17:07:32 INFO SparkContext: Running Spark version 2.1.0
17/04/10 17:07:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/04/10 17:07:33 INFO SecurityManager: Changing view acls to: xxxxx
17/04/10 17:07:33 INFO SecurityManager: Changing modify acls to: xxxxxx
17/04/10 17:07:33 INFO SecurityManager: Changing view acls groups to:
17/04/10 17:07:33 INFO SecurityManager: Changing modify acls groups to:
17/04/10 17:07:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xxxxx); groups with view permissions: Set(); users with modify permissions: Set(xxxxx); groups with modify permissions: Set()
17/04/10 17:07:33 INFO Utils: Successfully started service 'sparkDriver' on port 55878.
17/04/10 17:07:33 INFO SparkEnv: Registering MapOutputTracker
17/04/10 17:07:33 INFO SparkEnv: Registering BlockManagerMaster
17/04/10 17:07:33 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/04/10 17:07:33 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/04/10 17:07:33 INFO DiskBlockManager: Created local directory at C:\Users\xxxxxx\AppData\Local\Temp\blockmgr-38e935a6-96c1-4942-a88f-6b7c8677fba7
17/04/10 17:07:33 INFO MemoryStore: MemoryStore started with capacity 349.2 MB
17/04/10 17:07:33 INFO SparkEnv: Registering OutputCommitCoordinator
17/04/10 17:07:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
17/04/10 17:07:33 INFO Utils: Successfully started service 'SparkUI' on port 4041.
17/04/10 17:07:33 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.10.25.26:4041
17/04/10 17:07:33 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://localhost:4040...
17/04/10 17:07:33 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:4040 after 18 ms (0 ms spent in bootstraps)
17/04/10 17:07:33 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from localhost/127.0.0.1:4040 is closed
17/04/10 17:07:33 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master localhost:4040
org.apache.spark.SparkException: Exception thrown in awaitResult
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection from localhost/127.0.0.1:4040 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128)
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
And on Spark Window, getting following error -
17/04/10 17:07:33 WARN HttpParser: Illegal character 0x0 in state=START for buffer HeapByteBuffer@73f2273b[p=1,l=1292,c=16384,r=1291]={\x00<<<\x00\x00\x00\x00\x00\x05\x0c\x03_>\xF5s.bKM\x00...Ft\x00\x0b10.10.25.26>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00}
17/04/10 17:07:33 WARN HttpParser: badMessage: 400 Illegal character 0x0 for HttpChannelOverHttp@1a099881{r=0,c=false,a=IDLE,uri=}
There are similar question and answer was related to version conflicts, but i am unable to detect the issue.
Upvotes: 1
Views: 1244
Reputation: 3367
If you are running spark application in standalone mode then you need have cluster up first and use that url in your application, if not you can simply use local[*] mode to run your spark application.
And as per your code you also need to start your streamingcontext to get the data from kafka.
Upvotes: 3
Reputation: 30310
Assuming your URIs are correct, Apache Spark does not support Scala 2.12 right now. Besides, the "2.10" inside kafka_2.10-0.10.2.0
means that is supposed to be deployed with Scala 2.10 anyway.
So at the very least, use Scala 2.11.8, and then link this Kafka connector as shown in the documentation:
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.1.0
Upvotes: 0