abhinavkulkarni
abhinavkulkarni

Reputation: 2399

Inconsistent results due to Spark's lazy evaluation

I have a simple pyspark code:

l = [
    {'userId': 'u1', 'itemId': 'a1', 'click': 1},
    {'userId': 'u1', 'itemId': 'a2', 'click': 0},
    {'userId': 'u2', 'itemId': 'b1', 'click': 1},
    {'userId': 'u2', 'itemId': 'b2', 'click': 1},
]

d = sc.parallelize(l)

Essentially, 1st user clicked on one of two items, while 2nd user clicked on both items.

Let's group by the events by userId and process those in a function.

def fun((user_id, events)):
    events = list(events)
    user_id = events[0]['userId']

    clicked = set()
    not_clicked = set()

    for event in events:
        item_id = event['itemId']
        if event['click']==1:
            clicked.add(item_id)
        else:
            not_clicked.add(item_id)

    ret = {'userId': user_id, 'click': 1}
    for item_id in clicked:
        ret['itemId'] = item_id
        yield ret

    ret['click'] = 0
    for item_id in not_clicked:
        ret['itemId'] = item_id
        yield ret

d1 = d\
    .map(lambda obj: (obj['userId'], obj))\
    .groupByKey()\
    .flatMap(fun)

d1.collect()

This is what I get:

[{'click': 1, 'itemId': 'a1', 'userId': 'u1'},
 {'click': 0, 'itemId': 'a2', 'userId': 'u1'},
 {'click': 1, 'itemId': 'b1', 'userId': 'u2'},
 {'click': 0, 'itemId': 'b2', 'userId': 'u2'}]

The result for user u2 is incorrect.

Can someone explain why this is happening and what is the best practice to prevent this?

Thanks.

Upvotes: 1

Views: 582

Answers (1)

zero323
zero323

Reputation: 330093

What you see has very little to do with Spark evaluation model. Your code is just faulty. It is pretty easy see this when you execute it locally:

key = 'u2'

values = [
    {'click': 1, 'itemId': 'b1', 'userId': 'u2'},
    {'click': 1, 'itemId': 'b2', 'userId': 'u2'}
]

list(fun((key, values)))
[{'click': 0, 'itemId': 'b2', 'userId': 'u2'},
 {'click': 0, 'itemId': 'b2', 'userId': 'u2'}]

As you can see this makes even less sense than what you get from Spark. The problem is you use mutable data when it shouldn't be used. Since you modify the same dict in place all yields return exactly the same object:

(d1, d2) = list(fun((key, values)))
d1 is d2
True

I believe that discrepancy compared to Spark is related to batched serialization, where the first item is serialized in different batch before function exits and effective order is more or less like this:

import pickle
from itertools import islice, chain 

gen = fun((key, values))

# The first batch is serialized
b1 = [pickle.dumps(x) for x in list(islice(gen, 0, 1))]

# Window is adjusted and the second batch is serialized
# fun exits with StopIteration when we try to take 
# the second element in the batch
# element so code proceeds to ret['click'] = 0
b2 = [
    pickle.dumps(x) for x in
    # Use list to eagerly take a whole batch before pickling
    list(islice(gen, 0, 2))  
] 

[pickle.loads(x) for x in chain(*[b1, b2])]
[{'click': 1, 'itemId': 'b1', 'userId': 'u2'},
 {'click': 0, 'itemId': 'b2', 'userId': 'u2'}]

but if you want a definitive confirmation you'll have to check it yourself (replace batched serializer with a one which will wait for all data).

How to solve it? Just don't use the same dictionary. Instead initialize a new one inside a loop:

for item_id in clicked:
    yield {'userId': user_id, 'click': 1, 'item_id': item_id}

for item_id in not_clicked:
    yield {'userId': user_id, 'click': 0, 'item_id': item_id}

Upvotes: 2

Related Questions