Reputation: 8572
The following code just blocks the execution. It seems that map could not return the lazy-sequence for some reason. I am not sure why.
(ns testt.consumer
(:require
;internal
;external
[clojure.walk :refer [stringify-keys]]
[clojure.pprint :as pprint])
(:import
[kafka.consumer ConsumerConfig Consumer KafkaStream ]
[kafka.javaapi.consumer ConsumerConnector ]
[kafka.message MessageAndMetadata ]
[java.util Properties ])
(:gen-class))
; internal
(defn hashmap-to-properties
[h]
(doto (Properties.)
(.putAll (stringify-keys h))))
; external
(defn consumer-connector
[h]
(let [config (ConsumerConfig. (hashmap-to-properties h))]
(Consumer/createJavaConsumerConnector config)))
(defn message-stream
[^ConsumerConnector consumer topic thread-pool-size]
;this is dealing only with the first stream, needs to be fixed to support multiple streams
(let [ stream (first (.get (.createMessageStreams consumer {topic thread-pool-size}) topic)) ]
(map #(.message %) (iterate (.next (.iterator ^KafkaStream stream))))))
The configuration the connector expects:
{ :zookeeper.connect "10.0.0.1:2181"
:group.id "test-0"
:thread.pool.size "1"
:topic "test_topic"
:zookeeper.session.timeout.ms "1000"
:zookeeper.sync.time.ms "200"
:auto.commit.interval.ms "1000"
:auto.offset.reset "smallest"
:auto.commit.enable "true" }
Upvotes: 1
Views: 1040
Reputation: 8572
Actually there is no need for Kafka's iterate because Clojure provides a better way of dealing with streams. You can think about Kafka streams as a sequence and just do the following:
(doseq
[^kafka.message.MessageAndMetadata message stream] (do-some-stuff message))
This is probably the most efficient way of doing that.
More of the code is here:
Upvotes: 2