shazeline
shazeline

Reputation: 503

Flink Kafka producer throws exceptions when publishing a keyed stream

I am running into issues writing a keyed stream from sink subtasks to an output kafka topic.

The job is of the form: source -> filter -> keyby(id) -> flatmap -> sink

The exceptions are coming from the kafka producer and cause checkpointing to timeout:

The job gets into a crashloop with the above exceptions and occasionally briefly recovers temporarily before crashlooping again. I believe the problem here is that I'm using the keys to determine the output partitions, which causes the P sink subtasks to each fan out writes to N output partitions. Ideally, each subtask would only write to a single partition.

The job has the following constraints/properties:

1: once a key has been written to an output kafka topic partition, it needs to always be written to the same kafka partition in the future

2: the sink subtask parallelism will initially equal the number of output partitions

3: I should be able to increase the parallelism in the future without violating #1

4: I will never add new partitions to the output kafka topic

If parallelism == partitions, then I believe the FlinkFixedPartitioner would be a fine solution. However, I don't think that it would respect the original key->partition mapping if I later increased parallelism since it chooses the output partition using this scheme.

Is there a technique I could use here to satisfy these constraints? Possibly a tweak to the kafka producer's settings, another method for partitioning the keyed stream, or something else?

Upvotes: 1

Views: 741

Answers (1)

Chris Gerken
Chris Gerken

Reputation: 16392

You are assuming that the partitioning logic used by Flink is the same as the partitioning logic used by Kafka. It's entirely possible (and it's what I suspect is happening) that given 4 keys A, B, C and D, that Flink would send A and B to one sink instance while C and D would go to another sink instance. Kafka then is probably using a different partitioning logic that send A and C to one partition while B and D are written to another.

Flink doesn't seem to want to expose the key group for a given key, but if your parallelism for the sink is the same as the number of Kafka partitions, then you should be able to use the task_id of the sink instance in custom Kafka partitioning logic. It's a bit brute force, but it should do what you want to do.

As I think about it some more, you can also write a custom partitioner for Flink that uses the same logic as a custom partitioner for your Kafka topic. That would handle scaling out to more instances of the sink.

Upvotes: 1

Related Questions