Karthik Reddy
Karthik Reddy

Reputation: 91

Spark structured streaming query always starts with auto.offset.rest=earliest even though auto.offset.reset=latest is set

I have a weird issue with trying to read data from Kafka using Spark structured streaming. My use case is to be able to read from a topic from the largest/latest offset available.

My read configs:

val data = spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "some xyz server")
     .option("subscribe", "sampletopic")
     .option("auto.offset.reset", "latest")
     .option("startingOffsets", "latest")
     .option("kafkaConsumer.pollTimeoutMs", 20000)
     .option("failOnDataLoss","false")
     .option("maxOffsetsPerTrigger",20000)
     .load()

My write configs:

data
    .writeStream
    .outputMode("append")
    .queryName("test") 
    .format("parquet")
    .option("checkpointLocation", "s3://somecheckpointdir")
    .start("s3://outpath").awaitTermination()

Versions used:

I have done my research online and from [the Kafka documentation](https://kafka.apache.org/0100/documentation.html0/

I am using the new consumer apis and as the documentation suggests i just need to set auto.offset.reset to "latest" or startingOffsets to "latest" to ensure that my Spark job starts consuming from the the latest offset available per partition in Kafka.

I am also aware that the setting auto.offset.reset only kicks in when a new query is started for the first time and not on a restart of an application in which case it will continue to read from the last saved offset.

I am using s3 for checkpointing my offsets. and I see them being generated under s3://somecheckpointdir.

The issue I am facing is that the Spark job always read from earliest offset even though latest option is specified in the code during startup of application when it is started for the first time and I see this in the Spark logs. auto.offset.reset = earliest being used. I have not seen posts related to this particular issue.

I would like to know if I am missing something here and if someone has seen this behavior before. Any help/direction will indeed be useful. Thank you.

Upvotes: 7

Views: 14063

Answers (4)

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39950

For Structured Streaming can set startingOffsets to earliest so that every time you consume from the earliest available offset. The following will do the trick

.option("startingOffsets", "earliest")

However note that this is effective just for newly created queries:

startingOffsets

The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.


Alternatively, you might also choose to change the consumer group every time:

.option("kafka.group.id", "newGroupID")

Upvotes: 0

Charu Khatwani
Charu Khatwani

Reputation: 29

You cannot set auto.offset.reset in Spark Streaming as per the documentation. For setting to latest you just need to set the source option startingOffsets to specify where to start instead (earliest or latest). Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed.

It clearly says that following fields can't be set and the Kafka source or sink will throw an exception:

  1. group.id
  2. auto.offset.reset
  3. key.deserializer
  4. value.deserializer
  5. key.serializer
  6. value.serializer
  7. enable.auto.commit
  8. interceptor.classes

Upvotes: 2

Karthik Reddy
Karthik Reddy

Reputation: 91

Update: So i have done some testing on a local kafka instance with a controlled set of messages going in to kafka. I see that expected behavior is working fine when property startingOffsets is set to earlier or latest.

But the logs always show the property being pickup as earliest, which is a little misleading. auto.offset.reset=earliest, even though i am not setting it.

Thank you.

Upvotes: 2

zsxwing
zsxwing

Reputation: 20836

  1. All Kafka configurations should be set with kafka. prefix. Hence the correct option key is kafka.auto.offset.reset.
  2. You should never set auto.offset.reset. Instead, "set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off." [1]

[1] http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations

Upvotes: 4

Related Questions