Reputation: 437
How does one send kafka headers with the table api in Flink? I used the sql syntax to create a sink but not sure how to include custom headers.
Upvotes: 0
Views: 1130
Reputation: 43667
What's currently possible is described here in the docs. The only writable metadata fields are the timestamp and the headers, where the headers are exposed as a map of strings to raw bytes.
For an example, see testKafkaSourceSinkWithMetadata from the Flink sources.
Excerpting from that example:
CREATE TABLE kafka (
...,
`headers` MAP<STRING, BYTES> METADATA
) WITH (
'connector' = 'kafka',
...
)
INSERT INTO kafka
VALUES
(..., MAP['k1', X'C0FFEE', 'k2', X'BABE01']),
(..., CAST(NULL AS MAP<STRING, BYTES>),
(..., MAP['k1', X'102030', 'k2', X'203040'])
Upvotes: 1