Bidisha Mukherjee
Bidisha Mukherjee

Reputation: 45

Kafka and Pyspark Integration

I am naive in Big data, I am trying to connect kafka to spark. Here is my producer code

import os
import sys
import pykafka
def get_text():
    ## This block generates my required text. 
      text_as_bytes=text.encode(text)
      producer.produce(text_as_bytes)


if __name__ == "__main__":
    client = pykafka.KafkaClient("localhost:9092")
    print ("topics",client.topics)
    producer = client.topics[b'imagetext'].get_producer()

    get_text() 

This is printing my generated text on console consumer when I do bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imagetext --from-beginning

Now I want this text to be consumed using Spark and this is my Jupyter code

import findspark
findspark.init()
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /spark-2.1.1-bin-hadoop2.6/spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-shell'



conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)

ssc = StreamingContext(sc,5)
print('ssc =================== {} {}')

kstream = KafkaUtils.createDirectStream(ssc, topics = ['imagetext'], 
     kafkaParams = {"metadata.broker.list": 'localhost:9092'})

print('contexts =================== {} {}')
lines = kstream.map(lambda x: x[1])
lines.pprint()

ssc.start()
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)

But this is producing output on my Jupyter as

Time: 2018-02-21 15:03:25
-------------------------------------------

-------------------------------------------
Time: 2018-02-21 15:03:30
-------------------------------------------

Not the text that is on my console consumer.. Please help, unable to figure out the mistake.

Upvotes: 3

Views: 2158

Answers (2)

Hosein47
Hosein47

Reputation: 1

Just change your port in the consumer from 9092 to 2181 as it is the Zookeeper. From the producer side, it has to be connected to the Kafka with port number 9092. And from the streamer side, it has to be connected to the Zookeeper with port number 2181.

Upvotes: 0

mayank agrawal
mayank agrawal

Reputation: 2545

I found another solution to it. While the solution of putting get_text() in a loop works, it is not the right solution. You data was not in continuous fashion when it was sent in Kafka. As a result, Spark streaming should not get it in such a way.

Kafka-python library provides a get(timeout) functionality so that Kafka waits for a request.

producer.send(topic,data).get(timeout=10)

Since you are using pykafka, I am not sure whether it will work. Nevertheless, you can still try once and dont put get_text() in loop.

Upvotes: 0

Related Questions