Jakub Macina
Jakub Macina

Reputation: 992

Spark Streaming updateStateByKey with tuple as a value

is it possible to use updateStateByKey() function with a tuple as a value? I am using PySpark and my input is (word, (count, tweet_id)), which means word is a key and a tuple (count, tweet_id) is a value. The task of updateStateByKey is for each word to sum their counts and create a list of all tweet_ids which contains the word.

I implemented following update function, however I got error list index out of range for new_values with index 1:

def updateFunc(new_values, last_sum):
  count = 0
  tweets_id = []
  if last_sum:
    count = last_sum[0]
    tweets_id = last_sum[1]
  return sum(new_values[0]) + count, tweets_id.extend(new_values[1])

And calling the method:

running_counts.updateStateByKey(updateFunc)

Upvotes: 0

Views: 2003

Answers (1)

Jakub Macina
Jakub Macina

Reputation: 992

I've found the solution. The problem was with checkpointing which means the current state is persisted to the disk in case of a failure. It caused problems because when I changed my definition of a state, in checkpoint it was in the old state without a tuple. Therefore, I deleted checkpoint from the disk and implement the final solution as:

def updateFunc(new_values, last_sum):
  count = 0
  counts = [field[0] for field in new_values]
  ids = [field[1] for field in new_values]
  if last_sum:
    count = last_sum[0]
    new_ids = last_sum[1] + ids
  else:
    new_ids = ids
  return sum(counts) + count, new_ids

Finally, the answer to my question is: yes, the state can be a tuple or any other data type for storing more values.

Upvotes: 2

Related Questions