Reputation: 13
I have written a spark program (Python 3.6 and Spark 2.3.2) for Collaborative Filtering Recommendation System that works on 2 cases:
I have written train and predict programs that has these 2 cases. My code works for user based recommendation but when I try to train my model for Item-based CF, I get the following error:
2020-10-18 20:12:33 ERROR Executor:91 - Exception in task 0.0 in stage 23.0 (TID 196)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 238, in main
File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\serializers.py", line 690, in read_int
length = stream.read(4)
File "C:\Users\17372\AppData\Local\Programs\Python\Python36\lib\socket.py", line 586, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
I tried solving this issue using solutions on this link: Pyspark socket timeout exception after application running for a while
It did not work.
I found a solution to add "--spark.worker.timeout=120" in execution as follows:
bin\spark-submit task3train.py train_review.json task3item.model item_based --spark.worker.timeout=120
I still see the same error. Tried Try Catch blocks as well, but I am not sure how to do it right.
What do I do?
My code for Item-based CF:
if model_type == ITEM_BASED_MODEL:
# group original data by bidx, and remove those unpopular business (rated time < 3)
# tuple(bidx, (uidx, score))
# [(5306, [(3662, 5.0), (3218, 5.0), (300, 5.0),..]), ()
shrunk_bid_uids_rdd = input_lines \
.map(lambda kv: (bus_index_dict[kv[1]], (user_index_dict[kv[0]], kv[2]))) \
.groupByKey().mapValues(lambda uid_score: list(uid_score)) \
.filter(lambda bid_uid_score: len(bid_uid_score[1]) >= CO_RATED_THRESHOLD) \
.mapValues(lambda vals: [{uid_score[0]: uid_score[1]} for uid_score in vals]) \
.mapValues(lambda val: flatMixedList(val))
candidate_bids = shrunk_bid_uids_rdd.map(lambda bid_uids: bid_uids[0]).coalesce(2)
# convert shrunk_bid_uids_rdd into dict form
# dict(bidx: dict(uidx: score))
# => e.g. {5306: defaultdict(<class 'list'>, {3662: 5.0, 3218: 5.0, 300: 5.0...}),
bid_uid_dict = shrunk_bid_uids_rdd \
.map(lambda bid_uid_score: {bid_uid_score[0]: bid_uid_score[1]}) \
.flatMap(lambda kv_items: kv_items.items()).collectAsMap()
# generate all possible pair between candidate bidx
# and compute the pearson similarity
candidate_pair = candidate_bids.cartesian(candidate_bids) \
.filter(lambda id_pair: id_pair[0] < id_pair[1]) \
.filter(lambda id_pair: existNRecords(bid_uid_dict[id_pair[0]],
bid_uid_dict[id_pair[1]])) \
.map(lambda id_pair: (id_pair,
computeSimilarity(bid_uid_dict[id_pair[0]],
bid_uid_dict[id_pair[1]]))) \
.filter(lambda kv: kv[1] > 0) \
.map(lambda kv: {"b1": reversed_index_bus_dict[kv[0][0]],
"b2": reversed_index_bus_dict[kv[0][1]],
"sim": kv[1]})
Upvotes: 1
Views: 4644
Reputation: 21
doing a repartition(1) worked for me. I was facing the same issue with very few rows(dummy data). Another alternative solution that I tried is
.config('spark.default.parallelism', 1)
.config('spark.sql.shuffle.partitions', 1)
But this 1 value is there becaise I was trying it for dummy data
Upvotes: 1
Reputation: 151
I encountered the same error with Python 3.7 and Spark 2.4.4 running locally. No combination of spark options helped.
I was reading rows from parquet files which were heavily skewed. They contained a binary column with values between a few bytes and more than 10MB. The resulting dataframe contained a relatively small number of partitions despite setting a high number for spark.default.parallelism
. The number of partitions remained similar to the number of parquet files I was reading and I kept getting a socket timeout.
I tried to set spark.sql.files.maxPartitionBytes
to a small enough value but the error was still there. The only thing that helped was a repartition
after reading the data to increase the number of partitions and to distribute the rows more evenly. Note that this is only an observation and I still cannot explain why the error went away.
If data skew is also a topic here it could be mitigated by changing your code to:
input_lines \
.repartition(n) \
.map(...)
n
depends on your cluster and job characteristics and there is a sweet spot. If n
is too low you will get the socket timeout. If n
is too large it will have a negative effect on performance.
Upvotes: 1