Reputation: 11
Heyo StackOverflow,
Currently trying to find an elegant way to do a specific transformation.
So I have a dataframe of actions, that looks like this:
+---------+----------+----------+---------+
|timestamp| user_id| action| value|
+---------+----------+----------+---------+
| 100| 1| click| null|
| 101| 2| click| null|
| 103| 1| drag| AAA|
| 101| 1| click| null|
| 108| 1| click| null|
| 100| 2| click| null|
| 106| 1| drag| BBB|
+---------+----------+----------+---------+
Context:
Users can perform actions: clicks and drags. Clicks don't have a value, drags do. A drag implies there was a click but not the other way around. Let's also assume that the drag event can be recorded after or before the click event.
So I have, for each drag, a corresponding click action. What I would like to do, is merge the drag and click actions into 1, ie. delete the drag action after giving its value
to the click action.
To know which click corresponds to which drag, I have to take the click whose timestamp is the closest to the drag's timestamp
. I also want to make sure that a drag cannot be linked to a click if there timestamp difference is over 5 (and that means some drags might not be linked, it's fine). Of course, I don't want the drag of user 1 to correspond to the click of user 2.
Here, the result would look like this:
+---------+----------+----------+---------+
|timestamp| user_id| action| value|
+---------+----------+----------+---------+
| 100| 1| click| null|
| 101| 2| click| null|
| 101| 1| click| AAA|
| 108| 1| click| BBB|
| 100| 2| click| null|
+---------+----------+----------+---------+
The drag with AAA
(timestamp
= 103) was linked to the click that happened at 101 because it's the closest to 103. Same logic for BBB
.
So I would like to perform these operations, in a smooth/efficient way. So far, I have something like this:
val window = Window partitionBy ($"user_id") orderBy $"timestamp".asc
myDF
.withColumn("previous_value", lag("value", 1, null) over window)
.withColumn("previous_timestamp", lag("timestamp", 1, null) over window)
.withColumn("next_value", lead("value", 1, null) over window)
.withColumn("next_timestamp", lead("timestamp", 1, null) over window)
.withColumn("value",
when(
$"previous_value".isNotNull and
// If there is more than 5 sec. difference, it shouldn't be joined
$"timestamp" - $"previous_timestamp" < 5 and
(
$"next_timestamp".isNull or
$"next_timestamp" - $"timestamp" > $"timestamp" - $"previous_timestamp"
), $"previous_value")
.otherwise(
when($"next_timestamp" - $"timestamp" < 5, $"next_value")
.otherwise(null)
)
)
.filter($"action" === "click")
.drop("previous_value")
.drop("previous_timestamp")
.drop("next_value")
.drop("next_timestamp")
But I feel this is rather inefficient. Is there a better way to do this ? (something that can be done without having to create 4 temporary columns...) Is there a way to manipulate both the row with offset -1 and +1 in the same expression for example ?
Thanks in advance!
Upvotes: 0
Views: 330
Reputation: 501
Here's my attempt using Spark-SQL rather than DataFrame APIs, but it should be possible to convert:
myDF.registerTempTable("mydf")
spark.sql("""
with
clicks_table as (select * from mydf where action='click')
,drags_table as (select * from mydf where action='drag' )
,one_click_many_drags as (
select
c.timestamp as c_timestamp
, d.timestamp as d_timestamp
, c.user_id as c_user_id
, d.user_id as d_user_id
, c.action as c_action
, d.action as d_action
, c.value as c_value
, d.value as d_value
from clicks_table c
inner join drags_table d
on c.user_id = d.user_id
and abs(c.timestamp - d.timestamp) <= 5 --a drag cannot be linked to a click if there timestamp difference is over 5
)
,one_click_one_drag as (
select c_timestamp as timestamp, c_user_id as user_id, c_action as action, d_value as value
from (
select *, row_number() over (
partition by d_user_id, d_timestamp --for each drag timestamp with multiple possible click timestamps, we rank the click timestamps by nearness
order by
abs(c_timestamp - d_timestamp) asc --prefer nearest
, c_timestamp asc --prefer next_value if tied
) as rn
from one_click_many_drags
)
where rn=1 --take only the best match for each drag timestamp
)
--now we start from the clicks_table and add in the desired drag values!
select c.timestamp, c.user_id, c.action, m.value
from clicks_table c
left join one_click_one_drag m
on c.user_id = m.user_id
and c.timestamp = m.timestamp
""")
Tested to produce your desired output.
Upvotes: 1