datawookie
datawookie

Reputation: 6574

Joining a DStream and RDD with checkpointing

I've been battling to perform a join between a DStream and a RDD. To set the scene:

RDD

I'm reading in the RDD from a CSV file, splitting the records and producing a pair RDD.

sku_prices = sc.textFile("sku-catalog.csv")\
    .map(lambda line: line.split(","))\
    .map(lambda fields: (fields[0], float(fields[1])))

This is the output from sku_prices.collect():

[('0003003001', 19.25),
 ('0001017002', 2.25),
 ('0001017003', 3.5),
 ('0003013001', 18.75),
 ('0004017002', 16.5),
 ('0002008001', 2.25),
 ('0004002001', 10.75),
 ('0005020002', 10.5),
 ('0001004002', 3.5),
 ('0002016003', 14.25)]

DStream

I'm reading the DStream from Kafka.

orders = kstream.map(lambda n: n[1]).map(lambda n: json.loads(n))

items = orders.map(lambda order: order['items'])\
              .flatMap(lambda items: [(i['sku'], i['count']) for i in items])\
              .reduceByKey(lambda x, y: x + y)

When I run pprint() on orders I get output that looks like this:

-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', 3)
('0002016003', 1)
('0003013001', 1)

Join

Now I am wanting to join the items DStream to the sku_prices RDD. I know that I can't make that join directly, but my reading suggests that I can use the transform() method on the DStream to do the job. So this is what I have:

items.transform(lambda rdd: rdd.join(sku_prices)).pprint()

I'm expecting to get a DStream that looks something like this:

-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', (3, 10.75))
('0002016003', (1, 14.25))
('0003013001', (1, 18.75))

The Spark documentation suggests that this should work and it does: that result is precisely what I get! :)

Checkpointing

However I am also wanting to do a stateful operation, so I need to introduce checkpointing.

ssc.checkpoint("checkpoint")

Simply adding checkpointing results in this error on the transform():

It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation.

The answer on this thread suggests that checkpointing and external RDDs don't mix. Is there a way around this? Is it possible to join a DStream and a RDD when the StreamingContext has checkpointing enabled?

Thanks, Andrew.

Upvotes: 3

Views: 153

Answers (0)

Related Questions