Dipas
Dipas

Reputation: 304

pySpark addfile option, what happens on worker in executor

It is obvious that for distributing small look up data better to use broadcast variables.

Suppose we run pySpark code (spark submit) from Master node in yarn client mode. So application driver will be always created on master node. We read file from local path on master node.

with open('/tmp/myfile.txt', 'r') as f:
    lookup = {}
    for line in f.readlines():
        line = parse(line) # Method parse uses re and return dict
        lookup[line['name']] = line['age']

Then we create broadcast var and use it:

lookupBC = sc.broadcast(lookup)

output = sc.textFile('/path/to/hdfs/')\
    .map(lambda e: (lookupBC.value.get(e, e), 1))\
    .collect()

In our case this bc var is created on the driver (master node) and spark copies this var among all data nodes in cluster, where executors were created keeping it in memory on these nodes. Thus file will be read once and then distributed to executors.

What will happen if we use addFile option?

sc.addFile('/tmp/myfile.txt')

with open(SparkFiles.get('/tmp/myfile.txt')) as f:
    lookup = {}
    for line in f.readlines():
        line = parse(line) # Method parse uses re and return dict
        lookup[line['name']] = line['age']

output = sc.textFile('/path/to/hdfs/')\
    .map(lambda e: (lookup.get(e, e), 1))\
    .collect()

Spark will copy the file '/tmp/myfile.txt' to each node, where executor will be created. Then:

  1. How many times the file will be read? One time per executor on particular node? or one time per task?
  2. What will be the steps, how the code will be processed on executor?
  3. What to use better addFile or bc var?
  4. Will spark do any optimizations based on pyspark code and create implicit bc vars?

In executor logs I see info about bc vars, but I do not use any in my code:

18/03/21 15:36:27 INFO util.Utils: Fetching spark://172.25.235.201:36478/files/myfile.txt to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/fetchFileTemp230224632617642846.tmp
18/03/21 15:36:27 INFO util.Utils: Copying /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/-17884647971521635771454_cache to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/container_1520754626920_6227_01_000002/./myfile.txt
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
18/03/21 15:36:28 INFO client.TransportClientFactory: Successfully created connection to strt01we.ebb.er.com/172.25.235.216:43791 after 4 ms (0 ms spent in bootstraps)
18/03/21 15:36:28 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.3 KB, free 366.3 MB)
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 551 ms

Upvotes: 1

Views: 3339

Answers (1)

Kushagra Verma
Kushagra Verma

Reputation: 479

Broadcast variables appear to be loaded in memory, until they are destroyed explicitly. In contrast, sc.addFile seems to be creating a copy to disk (for each of the executors). So I would get guess SparkFiles.get() would load the file into the memory, every time its called.

  • So, in your example above, it would be loaded once.
  • But instead if you would have called SparkFiles.get() inside .map(), it would have tried reloading the file for each entry in the RDD.

Finally, to answer your questions,

How many times the file will be read? One time per executor on particular node? or one time per task?

Depends, where .get is called, as explained above.

What will be the steps, how the code will be processed on executor?

I didnt understand this part.

What to use better addFile or bc var?

These are different use cases for this. For example, consider a case where we have a 1GB sqliteDB dump. Spark can connect to this DB object via JDBC. It doesnt really need to load the whole object in memory.

Will spark do any optimizations based on pyspark code and create implicit bc vars?

Not sure, but I don't think so.

Upvotes: 1

Related Questions