Reputation: 9434
I'm able to stream twitter data into my Kafka topic via a producer. When I try to consume through the default Kafka consumer I'm able to see the tweets as well.
But when I try to use Spark Streaming to consume this and process further, I'm unable to find resources to refer. This is how my consumer looks like:
from pyspark.sql import SparkSession
import time
spark = SparkSession.builder.appName('LinkitTest').getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "tweets") \
.option("startingOffsets", "earliest") \
.load()
#df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"))
query = df.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()
Even when I do spark-submit
I see the tweets in the topic but the value aren't readable
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 kafka_consumer.py
I'm unable to figure out how to at least print the column values (or the tweets in this case) with the dataframe I have? Any help could be appriecated
UPDATE
I was able to print the values on the console, but as you see its not readable. How can I convert this to a readable String?
query = df.select(col("value"))\
.writeStream\
.format("console")\
.start()
Upvotes: 0
Views: 1554
Reputation: 191681
Instead of
print(df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"))
You want
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show()
But that is for batch dataframes, not streaming ones. For streaming ones, then you need to cast before writing.
df.select(col("value").cast("string"))\
.writeStream\
.format("console")\
Twitter data into Kafka via a producer
You don't need Spark for this. You can use tweepy and kafka-python
directly.
Upvotes: 1