schoon
schoon

Reputation: 3334

How do I write to Kafka using pyspark?

I am trying to write to Kafka using PySpark.
I got stuck on stage zero:

[Stage 0:>                                                          (0 + 8) / 9]

Then I get a timeout error:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Code is:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages
 org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 pyspark-shell'

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *

def main():
    spark = SparkSession.builder.master("local").appName("Spark CSV Reader")
     .getOrCreate();

    dirpath =  os.path.abspath(sys.argv[1])
    os.chdir(dirpath)

    mySchema = StructType([
     StructField("id", IntegerType()),StructField("name", StringType()),\
     StructField("year", IntegerType()),StructField("rating", DoubleType()),\
     StructField("duration", IntegerType())   ])
    streamingDataFrame = spark.readStream.schema(mySchema)
     .csv('file://' + dirpath + "/" )

    streamingDataFrame.selectExpr("CAST(id AS STRING) AS key",
     "to_json(struct(*)) AS value").\
      writeStream.format("kafka").option("topic", "topicName")\
      .option("kafka.bootstrap.servers", "localhost:9092")\
      .option("checkpointLocation", "./chkpt").start()

I am running HDP 2.6.

Upvotes: 0

Views: 3216

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191738

As I mentioned in the comments, Spark runs on multiple machines, and it is highly unlikely that all these machines will be Kafka brokers.

Use the external address(es) for the Kafka cluster

.option("kafka.bootstrap.servers", "<kafka-broker-1>:9092,<kafka-broker-2>:9092")\  

Upvotes: 1

Related Questions