Reputation: 11
I have been attempting to build a Kafka Streaming application for use with Spark. I have a static dataset for testing. After running my code once through, Kafka sets the current offset such that I cannot re-process the data upon a second run. Running kafka-streams-application-reset
supposedly resets the offsets. However, re-running my code results in an empty GlobalKTable
. The only way I have been able to re-analyze the data is by changing my ID in my Kafka connection. Here is what I'm doing.
Setup the sample data in Kafka:
kafka-console-producer --broker-list localhost:9092 \
--topic testTopic \
--property "parse.key=true" \
--property "key.separator=:"
1:abcd
2:bcde
3:cdef
4:defg
5:efgh
6:fghi
7:ghij
8:hijk
9:ijkl
10:jklm
Scala code:
//Streams imports - need to update Kafka
import org.apache.kafka.common.serialization.Serdes
//import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{GlobalKTable, KStream, KTable, Materialized, Produced, KStreamBuilder}
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.state.{KeyValueIterator, QueryableStoreTypes, ReadOnlyKeyValueStore, KeyValueStore}
import org.apache.kafka.streams.state.Stores
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import java.util.{Properties}
val kafkaServer = "127.0.0.1:9092"
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStream")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer)
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass())
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass())
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
p.put(StreamsConfig.CLIENT_ID_CONFIG, "test-consumer-stream")
val config = new StreamsConfig(p)
val builder: StreamsBuilder = new StreamsBuilder()
val imkvs = Stores.inMemoryKeyValueStore("testLookup-stream")
val sBuilder = Stores.keyValueStoreBuilder(imkvs, Serdes.String, Serdes.String).withLoggingDisabled().withCachingEnabled()
val gTable: GlobalKTable[String, String] = builder.globalTable("testTopic", Materialized.as(imkvs).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withCachingDisabled())
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
val read: ReadOnlyKeyValueStore[String, String] = streams.store(gTable.queryableStoreName(), QueryableStoreTypes.keyValueStore[String, String]())
val hexLookup = "2"
println(read.get(hexLookup))
val iter: KeyValueIterator[String, String] = read.all()
while(iter.hasNext) {
val next = iter.next()
println(next.key + ": " + next.value)
}
Streams Reset command:
kafka-streams-application-reset --application-id testStream \
--bootstrap-servers localhost:9092 \
--to-earliest
1) Am I coding something wrong, or is kafka-streams-application-reset
not functioning correctly?
2) I had hoped that using a inMemoryKeyValueStore
would result in Kafka not keeping track of the current offset; is there a way to force a GlobalKTable
to not keep the current offset? I want to always search the entire dataset.
Software Versions:
Kafka 1.1.1-1
Confluent 4.1.1-1
Spark-Scala 2.3.1
kafka-clients 1.1.0
kafka-streams 1.1.0
Upvotes: 1
Views: 1186
Reputation: 20880
If you want to restart an application from an empty internal state and re-process the data from offset 0, you have to provide "--input-topics" parameter with comma seperated list of topics.
bin/kafka-streams-application-reset.sh --application-id testApplication1 --input-topics demoTopic1
You can find more details here : https://kafka.apache.org/10/documentation/streams/developer-guide/app-reset-tool
Regarding GlobalKTable, ideally it is materialized view on top of stream/topic just like any other queryable store.
Also GlobalKTable always applies "auto.offset.reset" strategy "earliest" regardless of the specified value in StreamsConfig.
So it should allow you to query the entire table at any time.
Upvotes: 1