Reputation: 3334
I am trying to write to Kafka using PySpark.
I got stuck on stage zero:
[Stage 0:> (0 + 8) / 9]
Then I get a timeout error:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Code is:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 pyspark-shell'
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def main():
spark = SparkSession.builder.master("local").appName("Spark CSV Reader")
.getOrCreate();
dirpath = os.path.abspath(sys.argv[1])
os.chdir(dirpath)
mySchema = StructType([
StructField("id", IntegerType()),StructField("name", StringType()),\
StructField("year", IntegerType()),StructField("rating", DoubleType()),\
StructField("duration", IntegerType()) ])
streamingDataFrame = spark.readStream.schema(mySchema)
.csv('file://' + dirpath + "/" )
streamingDataFrame.selectExpr("CAST(id AS STRING) AS key",
"to_json(struct(*)) AS value").\
writeStream.format("kafka").option("topic", "topicName")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("checkpointLocation", "./chkpt").start()
I am running HDP 2.6.
Upvotes: 0
Views: 3216
Reputation: 191738
As I mentioned in the comments, Spark runs on multiple machines, and it is highly unlikely that all these machines will be Kafka brokers.
Use the external address(es) for the Kafka cluster
.option("kafka.bootstrap.servers", "<kafka-broker-1>:9092,<kafka-broker-2>:9092")\
Upvotes: 1