Houston
Houston

Reputation: 437

Headers with Flink Table Api Kafka Sink

How does one send kafka headers with the table api in Flink? I used the sql syntax to create a sink but not sure how to include custom headers.

Upvotes: 0

Views: 1130

Answers (1)

David Anderson
David Anderson

Reputation: 43667

What's currently possible is described here in the docs. The only writable metadata fields are the timestamp and the headers, where the headers are exposed as a map of strings to raw bytes.

For an example, see testKafkaSourceSinkWithMetadata from the Flink sources.

Excerpting from that example:

CREATE TABLE kafka (
  ...,
  `headers` MAP<STRING, BYTES> METADATA
) WITH (
  'connector' = 'kafka',
  ...
)


INSERT INTO kafka
VALUES
  (..., MAP['k1', X'C0FFEE', 'k2', X'BABE01']),
  (..., CAST(NULL AS MAP<STRING, BYTES>),
  (..., MAP['k1', X'102030', 'k2', X'203040'])

Upvotes: 1

Related Questions