Reputation: 3588
I was trying to use KStreamBuilder to move data from 1 topic to another. I tried the following code, with exception
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
public class StreamsInTopic {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
System.out.println("KStreamBuilder initialized!!");
builder.stream("nil_PF1_P1").to("nil_RF1_P1_1");
System.out.println("Streaming prepared!!");
KafkaStreams streams = new KafkaStreams(builder, props);
System.out.println("KafkaStreams Initialised!!");
streams.start();
System.out.println("Streams started!!");
Thread.sleep(30000L);
streams.close();
System.out.println("Streams closed!!");
}
}
output :
KStreamBuilder initialized!!
Streaming prepared!!
log4j:WARN No appenders could be found for logger (org.apache.kafka.streams.StreamsConfig).
log4j:WARN Please initialize the log4j system properly.
KafkaStreams Initialised!!
Streams started!!
Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Streams closed!!
Then i tried consuming data.
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF1_P1_1 --from-beginning
Any idea? Do i need any aditional configuration? I am using kafka 0.10.0.0 cluster and client.
Dependencies used.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
Upvotes: 1
Views: 1672
Reputation: 15067
Looking at what you shared in your question, the problem seems to be that you are not writing (= producing) any data into the input topic "nil_PF1_P1":
Also: You are immediately closing the Kafka Streams instance in your code:
streams.start();
System.out.println("Streams started!!");
//Thread.sleep(1000L);
streams.close();
This won't give the application enough time to actually perform any processing. Typically, you'd only call streams.start()
in your main
method above, and register a shutdown hook in your Java application that would call streams.close()
when being triggered.
For testing/development purposes, you can of course also call streams.close()
from within main()
, but then I'd increase the sleep time in between start and close (e.g. try 30 seconds instead of 1 second) -- but of course you also need to make sure you are actually writing some data to the application's input topic during that time window.
Edit: The cause of the java.lang.IllegalArgumentException: Invalid timestamp -1
error is most likely that you have been writing the data to the input topic with a non-0.10 producer. Details are explained at http://docs.confluent.io/current/streams/faq.html#invalid-timestamp-exception.
Upvotes: 2
Reputation: 62330
Kafka Stream was first release with version 0.10
and thus requires all records that are written into a topic to have an associated timestamp (an additional field next to key and value, that was introduced in v0.10). For Streams, this timestamp must not be negative (even if the broker does not check this and allows to insert data with negative timestamp).
Thus, it can happen that topic written with older Java producer (ie, pre 0.10 producer) write records with missing timestamp field. It also possible, that you consume "old" topics, ie, topics written to a 0.9 broker and you later upgrade your broker to 0.10 -- all those messages will not have a timestamp set. For compatibility reasons, KafkaConsumer
(v0.10) sets a missing timestamp to value -1
.
In Kafka Streams, internally, timestamps from input messages are "forwarded" to output messaged, thus, if you consume messages without timestamp, Kafka Streams tries to write messages with timestamp -1
into output topics, resulting in the above error. (Kafka Streams uses 0.10 Java producer that checks if the timestamp is valid and throw the above exception for negative timestamp values).
To avoid this problem, you need to change the used timestamp extractor via stream configuration parameter timestamp.extractor
(see http://docs.confluent.io/3.0.0/streams/developer-guide.html#optional-configuration-parameters). Depending on your semantics, you can either use WallclockTimestampExtractor
or provide an customized extractor.
Upvotes: 1