Reputation: 824
I am working on a topology using apache flux. Currently, strom fetches messages from beginning but I want it to fetch only the latest messages from kafka.
I am writing topology in YAML file.
This is how my spoutConfig looks like:
- id: "stringScheme"
className: "org.apache.storm.kafka.StringScheme"
- id: "stringMultiScheme"
className: "org.apache.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stringScheme"
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "172.25.33.191:2181"
- id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts"
- "blockdata"
- ""
- "myId"
properties:
- name: "scheme"
ref: "stringMultiScheme"
- name: "ignoreZkOffsets"
value: true
- name: "startOffsetTime"
ref: "XXXXXXXXX"
Now, I am stuck. How do I set startOffsetTime to proper function to get only the latest messages from kafka?
I have tried ref:"LatestTime", but no matter what I put in there, it give me error :
java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value
Upvotes: 0
Views: 421
Reputation: 3651
I believe Flux can handle calling static factory methods.
- id: "startingOffsetTime"
className: "kafka.api.OffsetRequest"
factory: "LatestTime"
and then use it in your SpoutConfig definition like
properties:
- name: "startOffsetTime"
ref: "startingOffsetTime"
I haven't tested this, but I think it should work. The ability to call static factory methods was merged a while back https://issues.apache.org/jira/browse/STORM-2796, but it seems to be missing from the documentation. I've raised an issue to update the docs https://issues.apache.org/jira/browse/STORM-3086.
In case you'd like to see an example of this feature, take a look at https://github.com/apache/storm/blob/master/flux/flux-core/src/test/resources/configs/config-methods-test.yaml#L38
Upvotes: 1