barath
barath

Reputation: 842

Got wrong record for spark-executor-<groupid> <topic> 0 even after seeking to offset <number>

My Spark job throws an exception as below:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.AssertionError: assertion failed: Got wrong record for spark-executor-test-local-npp_consumer_grp_3 <topic> 0 even after seeking to offset 29599
    at scala.Predef$.assert(Predef.scala:170)

I've disabled auto.commit (enable.auto.commit=false) and use Kafka API to commit the offset

((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges.get());`). 

What could be the reason for such error? Does this error occurs due to issue with Kafka consumer side or due to my spark-kafka consumer program?*

After looking into CachedKafkaConsumer source code I think this is should be due to consecutive buffer miss (my buffer size is default size - 65536 - receive.buffer.bytes = 65536) but I don't see the buffer miss message - Buffer miss for $groupId $topic $partition $offset in my logs.

So, I'm wondering whether it's due to buffer size?

I tried increasing receive.buffer.bytes to 655360 yet my spark-kafka consumer failed with same error. Could this error be due to my Kafka source be sending due to huge data

Upvotes: 0

Views: 2828

Answers (3)

rodolk
rodolk

Reputation: 5907

I was having this issue too and run into this link: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Got-wrong-record-after-seeking-to-offset-quot-issue-td30609.html

This issue was solved in version 2.4.0: https://issues.apache.org/jira/browse/SPARK-17147

I was consuming messages from a compacted topic (compressed) and using verion 2.3.0 of spark-streaming-kafka-0-10_2 which cannot deal with compression.

By going to version 2.4.0 of spark-streaming-kafka-0-10_2 I was able to solve it: org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0

I also need to configure: spark.streaming.kafka.allowNonConsecutiveOffsets=true

My submit command looks like:

spark-submit --class com.streamtest.Main --master spark://myparkhost:7077 --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0,org.apache.spark:spark-streaming_2.11:2.3.0,org.apache.spark:spark-core_2.11:2.3.0 --conf spark.streaming.kafka.allowNonConsecutiveOffsets=true /work/streamapp/build/libs/streamapp.jar

Upvotes: 0

WoodSuper
WoodSuper

Reputation: 11

I have the same problem and found the following source code in class CachedKafkaCounsumer from spark-streaming. This is obviously due to the fact that the offset from consumer poll and the offset which the consumer seek is not equal.

I reproduce this problem, and found out that offset from one topicAndPartition is discontinuous in Kafka

def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
if (offset != nextOffset) {
  logInfo(s"Initial fetch for $groupId $topic $partition $offset")
  seek(offset)
  poll(timeout)
}

if (!buffer.hasNext()) { poll(timeout) }
assert(buffer.hasNext(),
  s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()

if (record.offset != offset) {
  logInfo(s"Buffer miss for $groupId $topic $partition $offset")
  seek(offset)
  poll(timeout)
  assert(buffer.hasNext(),
    s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
  record = buffer.next()
  assert(record.offset == offset,
    s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
}

nextOffset = offset + 1
record
}

Upvotes: 1

Tomas Lach
Tomas Lach

Reputation: 41

I had the same problem here, when I was reading from topic filled using transactional producer. This issue was caused by transaction markers (commit/abort), which spark-streaming-kafka cannot read. When you run SimpleConsumerShell with --print-offsets option on this topic, you should see "gaps" between offsets.

Only solution I see now is to disable transactional producer, because newer spark-streaming-kafka is not yet implemented.

Upvotes: 0

Related Questions