absmiths
absmiths

Reputation: 1174

Spark Streaming from Kafka topic throws offset out of range with no option to restart the stream

I have a streaming job running on Spark 2.1.1, polling Kafka 0.10. I am using the Spark KafkaUtils class to create a DStream, and everything is working fine until I have data that ages out of the topic because of the retention policy. My problem comes when I stop my job to make some changes if any data has aged out of the topic I get an error saying that my offsets are out of range. I have done a lot of research including looking at the spark source code, and I see lots of comments like the comments in this issue: SPARK-19680 - basically saying that data should not be lost silently - so auto.offset.reset is ignored by spark. My big question, though, is what can I do now? My topic will not poll in spark - it dies on startup with the offsets exception. I don't know how to reset the offsets so my job will just get started again. I have not enabled checkpoints since I read that those are unreliable for this use. I used to have a lot of code to manage offsets, but it appears that spark ignores requested offsets if there are any committed, so I am currently managing offsets like this:

val stream = KafkaUtils.createDirectStream[String, T](
    ssc,
    PreferConsistent,
    Subscribe[String, T](topics, kafkaParams))

stream.foreachRDD { (rdd, batchTime) =>
    val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    Log.debug("processing new batch...")

    val values = rdd.map(x => x.value())
    val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist

    consumer.processDataset(incomingFrame, batchTime)
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
}
ssc.start()
ssc.awaitTermination()

As a workaround I have been changing my group ids but that is really lame. I know this is expected behavior and should not happen, I just need to know how to get the stream running again. Any help would be appreciated.

Upvotes: 3

Views: 8412

Answers (4)

user2088250
user2088250

Reputation: 81

This problem was solved in the stream structuring structure by including "failOnDataLoss" = "false". It is unclear why there is no such option in the spark DStream framework.

This is a BIG quesion for spark developers!

In our projects, we tried to solve this problem by resetting the offsets form ealiest + 5 minutes ... it helps in most cases.

Upvotes: 0

anand babu
anand babu

Reputation: 364

auto.offset.reset=latest/earliest will be applied only when consumer starts first time.

there is Spark JIRA to resolve this issue, till then we need live with work arounds. https://issues.apache.org/jira/browse/SPARK-19680

Upvotes: 1

vaquar khan
vaquar khan

Reputation: 11479

Try

auto.offset.reset=latest

Or

auto.offset.reset=earliest

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer's group

anything else: throw exception to the consumer.

One more thing that affects what offset value will correspond to smallest and largest configs is log retention policy. Imagine you have a topic with retention configured to 1 hour. You produce 10 messages, and then an hour later you post 10 more messages. The largest offset will still remain the same but the smallest one won't be able to be 0 because Kafka will already remove these messages and thus the smallest available offset will be 10.

Upvotes: 0

NerdyNick
NerdyNick

Reputation: 803

Here is a block of code I wrote to get by this until a real solution is introduced to spark-streaming-kafka. It basically resets the offsets for the partitions that have aged out based on the OffsetResetStrategy you set. Just give it the same Map params, _params, you provide to KafkaUtils. Call this before calling KafkaUtils.create****Stream() from your driver.

final OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(_params.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toString().toUpperCase(Locale.ROOT));
if(OffsetResetStrategy.EARLIEST.equals(offsetResetStrategy) || OffsetResetStrategy.LATEST.equals(offsetResetStrategy)) {
    LOG.info("Going to reset consumer offsets");
    final KafkaConsumer<K,V> consumer = new KafkaConsumer<>(_params);

    LOG.debug("Fetching current state");
    final List<TopicPartition> parts = new LinkedList<>();
    final Map<TopicPartition, OffsetAndMetadata> currentCommited = new HashMap<>();
    for(String topic: this.topics()) {
        List<PartitionInfo> info = consumer.partitionsFor(topic);
        for(PartitionInfo i: info) {
            final TopicPartition p = new TopicPartition(topic, i.partition());
            final OffsetAndMetadata m = consumer.committed(p);
            parts.add(p);
            currentCommited.put(p, m);
        }
    }
    final Map<TopicPartition, Long> begining = consumer.beginningOffsets(parts);
    final Map<TopicPartition, Long> ending = consumer.endOffsets(parts);

    LOG.debug("Finding what offsets need to be adjusted");
    final Map<TopicPartition, OffsetAndMetadata> newCommit = new HashMap<>();
    for(TopicPartition part: parts) {
        final OffsetAndMetadata m = currentCommited.get(part);
        final Long begin = begining.get(part);
        final Long end = ending.get(part);

        if(m == null || m.offset() < begin) {
            LOG.info("Adjusting partition {}-{}; OffsetAndMeta={} Begining={} End={}", part.topic(), part.partition(), m, begin, end);

            final OffsetAndMetadata newMeta;
            if(OffsetResetStrategy.EARLIEST.equals(offsetResetStrategy)) {
                newMeta = new OffsetAndMetadata(begin);
            } else if(OffsetResetStrategy.LATEST.equals(offsetResetStrategy)) {
                newMeta = new OffsetAndMetadata(end);
            } else {
                newMeta = null;
            }

            LOG.info("New offset to be {}", newMeta);
            if(newMeta != null) {
                newCommit.put(part, newMeta);
            }
        }

    }
    consumer.commitSync(newCommit);
    consumer.close();
}

Upvotes: 3

Related Questions