Reputation: 589
I'm trying to do stream processing and CEP on a Kafka message stream. For this I picked Apache Ignite to realise a prototype first. However I cannot connect to the queue:
Use kafka_2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Kafka works properly, I tested it with a consumer. Then I start ignite, then I run following in a spring boot commandline app.
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
Properties settings = new Properties();
// Set a few key parameters
settings.put("bootstrap.servers", "localhost:9092");
settings.put("group.id", "test");
settings.put("zookeeper.connect", "localhost:2181");
settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Create an instance of StreamsConfig from the Properties instance
kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);
IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");
try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) {
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic("test");
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(1);
// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(config);
// set decoders
StringDecoder keyDecoder = new StringDecoder(null);
StringDecoder valueDecoder = new StringDecoder(null);
kafkaStreamer.setKeyDecoder(keyDecoder);
kafkaStreamer.setValueDecoder(valueDecoder);
kafkaStreamer.start();
} finally {
kafkaStreamer.stop();
}
When the application starts I get
2017-02-23 10:25:23.409 WARN 1388 --- [ main] kafka.utils.VerifiableProperties : Property bootstrap.servers is not valid 2017-02-23 10:25:23.410 INFO 1388 --- [ main] kafka.utils.VerifiableProperties : Property group.id is overridden to test 2017-02-23 10:25:23.410 WARN 1388 --- [ main] kafka.utils.VerifiableProperties : Property key.deserializer is not valid 2017-02-23 10:25:23.411 WARN 1388 --- [ main] kafka.utils.VerifiableProperties : Property key.serializer is not valid 2017-02-23 10:25:23.411 WARN 1388 --- [ main] kafka.utils.VerifiableProperties : Property value.deserializer is not valid 2017-02-23 10:25:23.411 WARN 1388 --- [ main] kafka.utils.VerifiableProperties : Property value.serializer is not valid 2017-02-23 10:25:23.411 INFO 1388 --- [ main] kafka.utils.VerifiableProperties : Property zookeeper.connect is overridden to localhost:2181
Then
2017-02-23 10:25:24.057 WARN 1388 --- [r-finder-thread] kafka.client.ClientUtils$ : Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,user.local,9092)] failed
java.nio.channels.ClosedChannelException: null at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) ~[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) ~[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.send(SyncProducer.scala:124) ~[kafka_2.11-0.10.0.1.jar:na] at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) [kafka_2.11-0.10.0.1.jar:na] at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) [kafka_2.11-0.10.0.1.jar:na] at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) [kafka_2.11-0.10.0.1.jar:na] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [kafka_2.11-0.10.0.1.jar:na]
And reading from the queue doesn't work. Does anyone have an idea how to fix this?
Edit: If I comment the contents of the finally block then following error comes
[2m2017-02-27 16:42:27.780[0;39m [31mERROR[0;39m [35m29946[0;39m [2m---[0;39m [2m[pool-3-thread-1][0;39m [36m [0;39m [2m:[0;39m Message is ignored due to an error [msg=MessageAndMetadata(test,0,Message(magic = 1, attributes = 0, CreateTime = -1, crc = 2558126716, key = java.nio.HeapByteBuffer[pos=0 lim=1 cap=79], payload = java.nio.HeapByteBuffer[pos=0 lim=74 cap=74]),15941704,kafka.serializer.StringDecoder@74a96647,kafka.serializer.StringDecoder@42849d34,-1,CreateTime)]
java.lang.IllegalStateException: Data streamer has been closed. at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:401) ~[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:613) ~[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:667) ~[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:180) ~[ignite-kafka-1.8.0.jar:1.8.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
Thanks!
Upvotes: 1
Views: 933
Reputation: 8390
I think this happens because KafkaStreamer
is getting closed right after it's started (kafkaStreamer.stop()
call in finally
block). kafkaStreamer.start()
is not synchronous, it just spins out threads to consume from Kafka and exits.
Upvotes: 1