Jae Ryu
Jae Ryu

Reputation: 31

Argo Events Kafka triggers cannot parse message headers to enable distributed tracing

TL;DR - Argo Events Kafka eventsource triggers do not currently parse headers of consumed Kafka message, which is needed to enable distributed tracing. I submitted a feature request (here) - if you face the same problem please upvote, and curious if anyone figured out a workaround.

====================================

Context

Common pattern of Argo Workflows we deploy are Kafka event-driven, asynchronous distributed workloads, e.g.:

To monitor the entire system for user-centric metrics "how long did it take & where are the bottle necks", I'm looking to instrument distributed tracing from service "A" to service "B". We use Datadog as aggregator, with dd-trace.

Pattern I've seen is manual propagation of trace ctx via Kafka headers - by injecting headers to Kafka messages before emitting (similar to HTTP headers, with parent trace metadata), and receiving Consumer once done processing the message will then add child_span to that parent_span received from upstream.

ex) of above: https://newrelic.com/blog/how-to-relic/distributed-tracing-with-kafka


Issue

Argo-Events Kafka event source trigger does not parse any headers, only passing the body json for downstream Workflow to use at eventData.Body.
[source code]

Simplified views of my Argo Eventsource -> Trigger -> Workflow:

# eventsource/my-kafka-eventsource.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
spec:
  kafka:
    my-kafka-eventsource:
      topic: <my-topic>
      version: "2.5.0"
# sensors/trigger-my-workflow.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
spec:
  dependencies:
    - name: my-kafka-eventsource-dep
      eventSourceName: my-kafka-eventsource
      eventName: my-kafka-eventsource
  triggers:
    - template:
        name: start-my-workflow
        k8s:
          operation: create
          source:
            resource:
              apiVersion: argoproj.io/v1alpha1
              kind: Workflow
              spec:
                entrypoint: my-sick-workflow
                arguments:
                  parameters:
                    - name: proto_message
                      value: needs to be overriden
                    # I would like to be able to add this
                    - name: msg_headers
                      value: needs to be overriden
                templates:
                  - name: my-sick-workflow
                    dag:
                      tasks:
                        - name: my-sick-workflow
                          templateRef:
                            name: my-sick-workflow
                            template: my-sick-workflow
          parameters:
            # content/body of consumed message
            - src: 
                dependencyName: my-kafka-eventsource-dep
                dataKey: body  
              dest: spec.arguments.parameters.0.value
            # I would like to do this - get msg.headers() if exists.
            - src: 
                dependencyName: my-kafka-eventsource-dep
                dataKey: headers
              dest: spec.arguments.parameters.1.value
# templates/my-sick-workflow.yaml
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
spec:
  templates:
    - name: my-sick-workflow
      container:
        image: <image>
        command: [ "python", "/main.py" ] 
        # I want to add the 2nd arg - msg_headers - here
        args: [ "{{workflow.parameters.proto_message}}", "{{workflow.parameters.msg_headers}}" ]

# so that in my Workflow Dag step source code, 
# I can access headers of Kafka msg from upstream by.... 
# body=sys.argv[1], headers=sys.argv[2]

Confluent-Kafka API docs on accessing message headers: [doc]


Q's

  1. Has anyone found a workaround on passing tracing context from upstream to downstream service that travels between Kafka Producer<>Argo Events?

  2. I considered changing my Argo-Workflows sensor trigger to HTTP trigger accepting payloads, by a new Kafka consumer listening for the message that is currently triggering my Argo Workflow --> then forward HTTP payload with parent trace metadata in headers.

    • it's anti-pattern to rest of my workflows, so I would like to avoid if there's a simpler solution.

Upvotes: 0

Views: 519

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191738

As you pointed out, the only real workaround without forking some part of Argo Events, or implementing your own Source/Sensor yourself would be to use a Kafka Consumer (or Kafka Connect), and call a WebHook EventSource (or another, which can extract the information you need).

Upvotes: 0

Related Questions