sid802
sid802

Reputation: 365

PySpark - running processes

In my Spark Streaming (Spark 2.1.0) application, I have the need to build a graph from a file and initializing that graph takes between 5 and 10 seconds.

So I tried initializing it once per executor so it'll be initialized only once.

After running the application, I've noticed that it's initiated much more than once per executor, every time with a different process id (every process has it's own logger).

Doesn't every executor have it's own JVM and it's own process? Or is that only relevant when I develop in JVM languages like Scala/Java? Do executors in PySpark spawn new processes for new tasks?

And if they do, how can I make sure that my graph object will really only be initiated once?

The way I initialize my object:

class MySingletons(object):
    kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    tagger = _init_tagger()  # This returns an object with a graph inside of it
    @classmethod
    def handle_batch(cls, records_batch):
        analyzed = cls.tagger.tag(records_batch)
        return analyzed

And then I call it inside the driver:

def handle_partition(records: Sequence):
    records_lst = list(records)
    if len(records_lst) > 0:
        MySingletons.handle_batch(records_lst)


def handle_rdd(rdd: RDD):
    rdd_values = rdd.map(lambda x: x[1])
    rdd_values.foreachPartition(handle_partition)


ssc.union(*streams).filter(lambda x: x[1] is not None).foreachRDD(handle_rdd)
ssc.start()
ssc.awaitTermination()

Thanks :)

Upvotes: 2

Views: 1273

Answers (1)

zero323
zero323

Reputation: 330063

In short, there is significant difference between JVM languages and Python:

  • On JVM executors use separate threads.
  • In Python executors use separate processes.

Lifetime of Python objects will be limited to a particular child process and each process will have its own "singletons".

Furthermore Python workers can be killed if spark.python.worker.reuse if false or if there are idle for an extended period of time.

Finally dynamic allocation can complicate things even further.

Depending on the details of the logic, there are quite a few possible workarounds, but there typically quite involved, and not universal.

Upvotes: 2

Related Questions