Nachiket Kate
Nachiket Kate

Reputation: 8571

Write dataframe to kafka pyspark

I have a spark dataframe which I would like to write to Kafka. I have tried below snippet,

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ('k', 'v'))
for row in df.rdd.collect():
    producer.send('topic',str(row.asDict()))
    producer.flush()

This works but problem with this snippet is this is not Scalable as every time collect runs, data will be aggregated on driver node and can slow down all operations.

As foreach operation on dataframe can run in parallel on worker nodes. I tried below approach.

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ('k', 'v'))
def custom_fun(row):
    producer.send('topic',str(row.asDict()))
    producer.flush()

df.foreach(custom_fun)

This doesn't and gives pickling error. PicklingError: Cannot pickle objects of type <type 'itertools.count'> Not able to understand the reason behind this error. Can anyone help me understand this error or provide any other parallel solution?

Upvotes: 2

Views: 8911

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

The error you get looks unrelated to Kafka writes. Looks like somewhere else in your code you use itertools.count (AFAIK it is not used in Spark's source at all, it is of course possible that it comes with KafkaProducer) which is for some reason serialized with cloudpickle module. Changing Kafka writing code might have no impact at all. If KafkaProducer is the source of the error, you should be able to resolve this with forachPartition:

from kafka import KafkaProducer


def send_to_kafka(rows):
    producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
    for row in rows:
        producer.send('topic',str(row.asDict()))  
        producer.flush()

df.foreachPartition(send_to_kafka)

That being said:

or provide any other parallel solution?

I would recommend using Kafka source. Include Kafka SQL package, for example:

spark.jars.packages  org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

And:

from pyspark.sql.functions import to_json, col, struct

(df 
    .select(to_json(struct([col(c).alias(c) for c in df.columns])))
    .write
    .format("kafka") 
    .option("kafka.bootstrap.servers", botstrap_servers) 
    .option("topic", topic)
    .save())

Upvotes: 3

Related Questions