Reputation: 9939
First Batch:- I am trying to pull data from 100 flat file and loading up into an array and inserting them to kafka producer one by one as byte array.
Second Batch:- I am consuming from kafka consumer and then inserting them to NoSQL database.
I use Offsetnewset in the config file of shopify sarama golang package for Kafka.
I can receive and insert messages to kafka but while consuming I am getting only the first message. Since I gave Offset newest in the sarama config. how can I get all the data here.
Upvotes: 0
Views: 2256
Reputation: 19
It is difficult to be able to tell something without any code or more in depth explanation about how kafka is configured (i.e.: topics, partitions, ...), so few quick checks come to my mind:
Assuming you start consuming with the OffsetNewest set before you start producing, one thing that maybe happening is that you are not consuming from all partitions on that topic, regarding to sarama docs, you have to consume each partition explicitly by creating PartitionConsumers. From the example in https://godoc.org/github.com/Shopify/sarama#Consumer:
partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
if err != nil {
panic(err)
}
...
consumed := 0
ConsumerLoop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message offset %d\n", msg.Offset)
consumed++
case <-signals:
break ConsumerLoop
}
}
You, in fact, are starting the consuming after producing all the events, and so, the pointer to read them all is not OffsetNewest but OffsetOldest instead.
I'm sorry to not be able to give you a more useful answer, but maybe if you paste some code or give more details we can help a but more.
Upvotes: 2