Kulasangar
Kulasangar

Reputation: 9434

How to process the dataframe which was read from Kafka Topic using Spark Streaming

I'm able to stream twitter data into my Kafka topic via a producer. When I try to consume through the default Kafka consumer I'm able to see the tweets as well.

enter image description here

But when I try to use Spark Streaming to consume this and process further, I'm unable to find resources to refer. This is how my consumer looks like:

from pyspark.sql import SparkSession
import time

spark = SparkSession.builder.appName('LinkitTest').getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "tweets") \
  .option("startingOffsets", "earliest") \
  .load()

#df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

print(df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"))

query = df.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()

Even when I do spark-submit I see the tweets in the topic but the value aren't readable

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 kafka_consumer.py

enter image description here

I'm unable to figure out how to at least print the column values (or the tweets in this case) with the dataframe I have? Any help could be appriecated

UPDATE

I was able to print the values on the console, but as you see its not readable. How can I convert this to a readable String?

query = df.select(col("value"))\
  .writeStream\
  .format("console")\
  .start()

enter image description here

Upvotes: 0

Views: 1554

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191681

Instead of

print(df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"))

You want

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show()

But that is for batch dataframes, not streaming ones. For streaming ones, then you need to cast before writing.

df.select(col("value").cast("string"))\
  .writeStream\
  .format("console")\

Twitter data into Kafka via a producer

You don't need Spark for this. You can use tweepy and kafka-python directly.

Upvotes: 1

Related Questions