Obiii
Obiii

Reputation: 824

Storm-kafka: set startOffsetTime to kafka.api.OffsetRequest.LatestTime in apache Flux Yaml topology

I am working on a topology using apache flux. Currently, strom fetches messages from beginning but I want it to fetch only the latest messages from kafka.

I am writing topology in YAML file.

This is how my spoutConfig looks like:

  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"

  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "172.25.33.191:2181"

  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      - ref: "zkHosts"
      - "blockdata"
      - ""
      - "myId"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"

      - name: "ignoreZkOffsets"
        value: true

      - name: "startOffsetTime"
        ref: "XXXXXXXXX"

Now, I am stuck. How do I set startOffsetTime to proper function to get only the latest messages from kafka?

I have tried ref:"LatestTime", but no matter what I put in there, it give me error :

java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value

Upvotes: 0

Views: 421

Answers (1)

Stig Rohde Døssing
Stig Rohde Døssing

Reputation: 3651

I believe Flux can handle calling static factory methods.

- id: "startingOffsetTime"
  className: "kafka.api.OffsetRequest"
  factory: "LatestTime"

and then use it in your SpoutConfig definition like

properties:
  - name: "startOffsetTime"
    ref: "startingOffsetTime"

I haven't tested this, but I think it should work. The ability to call static factory methods was merged a while back https://issues.apache.org/jira/browse/STORM-2796, but it seems to be missing from the documentation. I've raised an issue to update the docs https://issues.apache.org/jira/browse/STORM-3086.

In case you'd like to see an example of this feature, take a look at https://github.com/apache/storm/blob/master/flux/flux-core/src/test/resources/configs/config-methods-test.yaml#L38

Upvotes: 1

Related Questions