Reputation: 304
It is obvious that for distributing small look up data better to use broadcast variables.
Suppose we run pySpark code (spark submit) from Master node in yarn client mode. So application driver will be always created on master node. We read file from local path on master node.
with open('/tmp/myfile.txt', 'r') as f:
lookup = {}
for line in f.readlines():
line = parse(line) # Method parse uses re and return dict
lookup[line['name']] = line['age']
Then we create broadcast var and use it:
lookupBC = sc.broadcast(lookup)
output = sc.textFile('/path/to/hdfs/')\
.map(lambda e: (lookupBC.value.get(e, e), 1))\
.collect()
In our case this bc var is created on the driver (master node) and spark copies this var among all data nodes in cluster, where executors were created keeping it in memory on these nodes. Thus file will be read once and then distributed to executors.
What will happen if we use addFile option?
sc.addFile('/tmp/myfile.txt')
with open(SparkFiles.get('/tmp/myfile.txt')) as f:
lookup = {}
for line in f.readlines():
line = parse(line) # Method parse uses re and return dict
lookup[line['name']] = line['age']
output = sc.textFile('/path/to/hdfs/')\
.map(lambda e: (lookup.get(e, e), 1))\
.collect()
Spark will copy the file '/tmp/myfile.txt'
to each node, where executor will be created. Then:
In executor logs I see info about bc vars, but I do not use any in my code:
18/03/21 15:36:27 INFO util.Utils: Fetching spark://172.25.235.201:36478/files/myfile.txt to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/fetchFileTemp230224632617642846.tmp
18/03/21 15:36:27 INFO util.Utils: Copying /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/-17884647971521635771454_cache to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/container_1520754626920_6227_01_000002/./myfile.txt
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
18/03/21 15:36:28 INFO client.TransportClientFactory: Successfully created connection to strt01we.ebb.er.com/172.25.235.216:43791 after 4 ms (0 ms spent in bootstraps)
18/03/21 15:36:28 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.3 KB, free 366.3 MB)
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 551 ms
Upvotes: 1
Views: 3339
Reputation: 479
Broadcast variables appear to be loaded in memory, until they are destroyed explicitly.
In contrast, sc.addFile
seems to be creating a copy to disk (for each of the executors). So I would get guess SparkFiles.get()
would load the file into the memory, every time its called.
SparkFiles.get()
inside .map()
, it would have tried reloading the file for each entry in the RDD.Finally, to answer your questions,
How many times the file will be read? One time per executor on particular node? or one time per task?
Depends, where .get
is called, as explained above.
What will be the steps, how the code will be processed on executor?
I didnt understand this part.
What to use better addFile or bc var?
These are different use cases for this. For example, consider a case where we have a 1GB sqliteDB dump. Spark can connect to this DB object via JDBC. It doesnt really need to load the whole object in memory.
Will spark do any optimizations based on pyspark code and create implicit bc vars?
Not sure, but I don't think so.
Upvotes: 1