Akhil
Akhil

Reputation: 1254

Add trace and span id to Flink job

I've a requirement to add track and span id to Flink jobs running in cluster, the request flows something like below

User --> Rest API -> Kafka-topic-1 --> FlinkJob-1 --> Kafka-topic-2 --> FlinkJob-2 --> Consumer --> DB

I'm using Spring boot to create my rest APIs and using Spring Sleuth to add track and span id to generated logs, the track and span id is added when rest API is invoked and when message is put over Kakfa-topic-1 as well but I'm not able to figure out how to add track and span id while consuming message at FlinkJob-1 and FLinkJob-2 since they are out of spring context.

One way is to make track and span Id to kafka message headers and have Kafka Consumer/Producer interceptor to extract and log track and span Id, I tried this but my interceptors are not invoked as Flink APIs use Flink version of Kafka-client.

Couldn't get my custom KafkaDeserializationSchema invoked

public class MyDeserializationSchema implements KafkaDeserializationSchema<String> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyDeserializationSchema.class);

@Override
public TypeInformation<String> getProducedType() {
    System.out.println("************** Invoked 1");
    LOGGER.debug("************** Invoked 1");
    return null;
}

@Override
public boolean isEndOfStream(String nextElement) {
    System.out.println("************** Invoked 2");
    LOGGER.debug("************** Invoked 2");
    return true;
}

@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    System.out.println("************** Invoked 3");
    LOGGER.debug("************** Invoked 3");
    return record.toString();
}

 }

Can someone please suggest me how to achieve same.

Upvotes: 0

Views: 1205

Answers (2)

redhatvicky
redhatvicky

Reputation: 1930

You are using a Simple String here and in serialize byte to String can be done something like the below code.

public class MyDeserializationSchema  implements KafkaDeserializationSchema<String> {
    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new String(record.value(), "UTF-8");
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
  }

Upvotes: 0

MIkCode
MIkCode

Reputation: 2835

You can use KafkaDeserializationSchema in order to get the Header as well

For accessing the key, value and metadata of the Kafka message, the KafkaDeserializationSchema has the following deserialize method T deserialize(ConsumerRecord record).

public class Bla implements KafkaDeserializationSchema {
    @Override
    public boolean isEndOfStream(Object dcEvents) {
        return false;
    }

    @Override
    public Object deserialize(ConsumerRecord consumerRecord) throws Exception {
        return null;
    }



    @Override
    public TypeInformation<DCEvents> getProducedType() {
        return null;
    }

Upvotes: 1

Related Questions