Reputation: 4920
In my PySpark code I have a DataFrame
populated with data coming from a sensor and each single row has timestamp, event_description and event_value.
Each sensor event is composed by measurements defined by an id and a value. The only guarantee I have is that all the "phases" related to a single event are included between two EV_SEP
rows (unsorted).
Inside each event "block" there is an event label which is the value associated to EV_CODE
.
+-------------------------+------------+-------------+
| timestamp | event_id | event_value |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:12.540 | EV_SEP | ----- |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:14.201 | EV_2 | 10 |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:13.331 | EV_1 | 11 |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:15.203 | EV_CODE | ABC |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:16.670 | EV_SEP | ----- |
+-------------------------+------------+-------------+
I would like to create a new column containing that label, so that I know that all the events are associated to that label:
+-------------------------+----------+-------------+------------+
| timestamp | event_id | event_value | event_code |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:12.540 | EV_SEP | ----- | ABC |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:14.201 | EV_2 | 10 | ABC |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:13.331 | EV_1 | 11 | ABC |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:15.203 | EV_CODE | ABC | ABC |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:16.670 | EV_SEP | ----- | ABC |
+-------------------------+----------+-------------+------------+
With pandas I can easily get the indexes of the EV_SEP
rows, split the table into blocks, take the EV_CODE
from each block and create an event_code
column with such value.
A possible solution would be:
Is there any better way to solve this problem?
Upvotes: 2
Views: 5163
Reputation: 4719
from pyspark.sql import functions as f
Sample data:
df.show()
+-----------------------+--------+-----------+
|timestamp |event_id|event_value|
+-----------------------+--------+-----------+
|2017-01-01 00:00:12.540|EV_SEP |null |
|2017-01-01 00:00:14.201|EV_2 |10 |
|2017-01-01 00:00:13.331|EV_1 |11 |
|2017-01-01 00:00:15.203|EV_CODE |ABC |
|2017-01-01 00:00:16.670|EV_SEP |null |
|2017-01-01 00:00:20.201|EV_2 |10 |
|2017-01-01 00:00:24.203|EV_CODE |DEF |
|2017-01-01 00:00:31.670|EV_SEP |null |
+-----------------------+--------+-----------+
Add index:
df_idx = df.filter(df['event_id'] == 'EV_SEP') \
.withColumn('idx', f.row_number().over(Window.partitionBy().orderBy(df['timestamp'])))
df_block = df.filter(df['event_id'] != 'EV_SEP').withColumn('idx', f.lit(0))
'Spread' index:
df = df_idx.union(df_block).withColumn('idx', f.max('idx').over(
Window.partitionBy().orderBy('timestamp').rowsBetween(Window.unboundedPreceding, Window.currentRow))).cache()
Add EV_CODE
:
df_code = df.filter(df['event_id'] == 'EV_CODE').withColumnRenamed('event_value', 'event_code')
df = df.join(df_code, on=[df['idx'] == df_code['idx']]) \
.select(df['timestamp'], df['event_id'], df['event_value'], df_code['event_code'])
Finally:
+-----------------------+--------+-----------+----------+
|timestamp |event_id|event_value|event_code|
+-----------------------+--------+-----------+----------+
|2017-01-01 00:00:12.540|EV_SEP |null |ABC |
|2017-01-01 00:00:13.331|EV_1 |11 |ABC |
|2017-01-01 00:00:14.201|EV_2 |10 |ABC |
|2017-01-01 00:00:15.203|EV_CODE |ABC |ABC |
|2017-01-01 00:00:16.670|EV_SEP |null |DEF |
|2017-01-01 00:00:20.201|EV_2 |10 |DEF |
|2017-01-01 00:00:24.203|EV_CODE |DEF |DEF |
+-----------------------+--------+-----------+----------+
Upvotes: 3
Reputation: 478
Creating a new Hadoop InputFormat
would be a more computationally efficient way to accomplish your goal here (although is arguably the same or more gymnastics in terms of code). You can specify alternative Hadoop input formats using sc.hadoopFile
in the Python API, but you must take care of conversion from the Java format to Python. You can then specify the format. The converters available in PySpark are relatively few but this reference proposes using the Avro converter as an example. You might also simply find it convenient to let your custom Hadoop input format output text which you then additionally parse in Python to avoid the issue of implementing a converter.
Once you have that in place, you would create a special input format (in Java or Scala using the Hadoop API's) to treat the special sequences of rows having EV_SEP
as record delimiters instead of the newline character. You could do this quite simply by collecting rows as they are read in an accumulator (just a simple ArrayList
could do as a proof-of-concept) and then emitting the accumulated list of records when you find two EV_SEP
rows in a row.
I would point out that using TextInputFormat
as a basis for such a design might be tempting, but that the input format will split such files arbitrarily at newline characters and you will need to implement custom logic to properly support splitting the files. Alternatively, you can avoid the problem by simply not implementing file splitting. This is a simple modification to the partitioner.
If you do need to split files, the basic idea is:
EV_SEP
.Detecting these sequences for the edge case around file splitting would be a challenge. I would suggest establishing the largest byte-width of rows and reading sliding-window chunks of an appropriate width (basically 2x the size of the rows) backwards from your starting point, then matching against those windows using a precompiled Java regex and Matcher. This is similar to how Sequence Files find their sync marks, but uses a regex to detect the sequence instead of strict equality.
As a side note, I would be concerned given some of the other background you mention here that sorting the DataFrame by timestamp could alter the contents of events that happen in the same time period in different files.
Upvotes: 0