Reputation: 992
is it possible to use updateStateByKey()
function with a tuple as a value? I am using PySpark and my input is (word, (count, tweet_id))
, which means word
is a key and a tuple (count, tweet_id)
is a value. The task of updateStateByKey
is for each word to sum their counts and create a list of all tweet_ids which contains the word.
I implemented following update function, however I got error list index out of range for new_values
with index 1:
def updateFunc(new_values, last_sum):
count = 0
tweets_id = []
if last_sum:
count = last_sum[0]
tweets_id = last_sum[1]
return sum(new_values[0]) + count, tweets_id.extend(new_values[1])
And calling the method:
running_counts.updateStateByKey(updateFunc)
Upvotes: 0
Views: 2003
Reputation: 992
I've found the solution. The problem was with checkpointing which means the current state is persisted to the disk in case of a failure. It caused problems because when I changed my definition of a state, in checkpoint it was in the old state without a tuple. Therefore, I deleted checkpoint from the disk and implement the final solution as:
def updateFunc(new_values, last_sum):
count = 0
counts = [field[0] for field in new_values]
ids = [field[1] for field in new_values]
if last_sum:
count = last_sum[0]
new_ids = last_sum[1] + ids
else:
new_ids = ids
return sum(counts) + count, new_ids
Finally, the answer to my question is: yes, the state can be a tuple or any other data type for storing more values.
Upvotes: 2