dmcontador
dmcontador

Reputation: 668

Optimizing pivoting and filling

They gave me a table storing sensor readings with a schema [TimeStamp, SensorKey, SensorValue].

TimeStamp             Id           Value
2019-01-01 00:00:47   1            66.6
2019-01-01 00:00:47   2            0.66
2019-01-01 00:00:57   1            66.7
2019-01-01 00:00:57   2            0.68
2019-01-01 00:00:57   3            166.6
2019-01-01 00:01:07   3            146.6

Note that it only stores changes to sensor readings, with limited precision and sampling rate, and repeats a value every hour after the last change if it doesn't change.

Their queries mean checking value of sensor A (and B, and C, and D...) when sensor Z value passes this condition. And they want to use Python and Spark.

So to compare the values of different sensors, I get the rows for those sensor keys and pivot the results to a schema [TimeStamp, ValueOfA, ..., Value of Z].

df1 = df0.groupBy("TS").pivot("Id", listOfIds).agg(F.last("Value"))

TimeStamp             Sensor1      Sensor2     Sensor3
2019-01-01 00:00:47   66.6         0.66        Null
2019-01-01 00:00:57   66.7         0.68        166.6
2019-01-01 00:01:07   Null         Null        146.6

Then I fill the gaps (always onwards, if I don't have older data to fill the first rows I discard them).

window1hour = Window.orderBy('TS').rowsBetween(-360, 0)
# 360 = 1 hour / 0.1 Hz sampling rate.
df2 = df1
for sid in sensorIds:
    df2 = df2\
        .withColumn(sid, F.last(F.column(sid), ignorenulls=True).over(window1hour))\
        .filter(F.column(sid).isNotNull())

The comparisons, column by column, are trivial now.

But when compared to doing the same with pandas it's slower, so much that it feels like I'm doing something wrong. At least for small queries.

What's happening? And what will happen when it's a large query?

About small and large: I have over thousands of different sensors and about a billion records per year. So the data definitely fits in one server but not in RAM. In fact, they will start with only one server for the data, maybe a second for a second Spark instance (both multiprocessor and with lots of memory), and hopefully they will invest in more hardware if they see returns. They will start making the small queries day by day, and they want them fast. But later they will want to do queries over several years, and it must not explode.

Ideas/doubts: Is the preprocessing done in a single thread? Should I stablish the parallelization myself, or do I let Spark handle it? Should I break the year-spanning queries in many day spanning ones (but then why would I want Spark at all)? Do I solve the small queries in pandas and the large in Spark (and can I set the threshold beforehand)?

What other improvements can I apply?

Upvotes: 1

Views: 206

Answers (1)

Travis Hegner
Travis Hegner

Reputation: 2495

It's not uncommon for "small" data to be faster in tools other than spark. Spark has fairly significant overhead for it's parallel functionality (granted, these overheads are very small when compared with the old map-reduce paradigm).

Where spark shines is it's ability to scale linearly for "large" data by adding servers. It's at this point the overhead becomes worth it, as it will automatically break the work up among all of the available executors.

I believe letting spark handle the parallelization is ideal, if only for simplicity's sake. Whether or not to implement the "small" queries in another framework is entirely dependent on whether you want to maintain two code paths, and whether your customer is comfortable with the speed of them.

Upvotes: 2

Related Questions