Reputation: 2611
My Stream was working fine, due to some reason it was killed using command pkill, now after restarting it is not working, it not receiving anything.
My Stream Consumer : Here I am configuring as 4 stream task,
// stream Consumer Handler
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> userStream = builder.stream("usertopic",Consumed.with(Serdes.String(), Serdes.String()));
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ALL my bootstrap servers);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "500");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//consumer_timeout_ms
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);
props.put("state.dir","/tmp/kafka/stat));
userStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));
/* pass stream to my logic commented */
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
}
});
kafkaStreams.cleanUp();
kafkaStreams.start();
Stream starting logs:
:43,593 INFO [streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-50] o.a.k.c.c.KafkaConsumer: [Consumer clientId=streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-50-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions
2019-09-09T16:06:43,593 INFO [streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-50] o.a.k.s.p.i.StreamThread: stream-thread [streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-50] State transition from PARTITIONS_ASSIGNED to RUNNING
2019-09-09T16:06:43,593 INFO [streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-50] o.a.k.s.KafkaStreams: stream-client [streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d] State transition from REBALANCING to RUNNING
2019-09-09T16:11:39,386 INFO [kafka-producer-network-thread | b2b] o.a.k.c.Metadata: Cluster ID: O8kYaORIQQ2vie_DUA_xvA
2019-09-09T16:11:41,148 INFO [AsyncResolver-bootstrap-executor-0] c.n.d.s.r.a.ConfigClusterResolver: Resolving eureka endpoints via configuration
2019-09-09T16:11:41,624 INFO [kafka-producer-network-thread | streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-1-producer] o.a.k.c.Metadata: Cluster ID: O8kYaORIQQ2vie_DUA_xvA
2019-09-09T16:11:41,672 INFO [kafka-producer-network-thread | streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-2-producer] o.a.k.c.Metadata: Cluster ID: O8kYaORIQQ2vie_DUA_xvA
I deleted my application id in /tmp/kafka/stat but still not receiving.
But below kafka consumer client gets the data when I test
/kafka-console-consumer.sh --bootstrap-server serverip:9237 --from-beginning --topic usertopic
Upvotes: 1
Views: 960
Reputation: 832
I think you might be running other consumer instance of same group, for testing purpose you can change stream groupname
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstream");
change to
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstream-test");
if this works then it should be any of the below reasons
1. Other instance is running with same group name
2. Your stream is not gracefully shutdown
3. your state dir is not cleaned
Upvotes: 2