Édipo Féderle
Édipo Féderle

Reputation: 4257

clj-kafka - consumer empty

I am trying usgin library clj-kafka.

Here my code

  (use [clj-kafka.producer]
       [clj-kafka.zk]
       [clj-kafka.consumer.zk]
       [clj-kafka.core]))

(brokers {"zookeeper.connect" "localhost:2181"})

(def p (producer {"metadata.broker.list" "localhost:9092"
                  "serializer.class" "kafka.serializer.DefaultEncoder"
                  "partitioner.class" "kafka.producer.DefaultPartitioner"}))

(send-message p (message "test" (.getBytes "this is my message")))

(def config {"zookeeper.connect" "localhost:2181"
             "group.id" "clj-kafka.consumer"
             "auto.offset.reset" "smallest"
             "auto.commit.enable" "false"})

(with-resource [c (consumer config)]
  shutdown
  (take 2 (messages c "test"))) ;; return ()

I start zookepper-server and kafka itself with

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

The config/zookepper.properties:

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

and config/server.properties:

broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
zocket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

The 'problem' is when I execute:

(with-resource [c (consumer config)]
  shutdown
  (take 2 (messages c "test"))) ;; return empty ()

Any idea here?

Thanks in advance

Upvotes: 2

Views: 248

Answers (1)

leeor
leeor

Reputation: 17801

See this github issue. Seems the documentation isn't great. You have to force the realization of the sequence (which is lazy) with doall. try this:

(with-resource [c (consumer config)]
  shutdown
  (doall (take 2 (messages c "test"))))

Upvotes: 1

Related Questions