jimfawkes
jimfawkes

Reputation: 375

How can I perform a calculation on a rolling window over a partition in polars?

I have a Dataset containing GPS Coordinates of a few planes. I would like to calculate the bearing of each plane at every point in time.

The Dataset as among others these columns:

  1. event_uid
  2. plane_no
  3. timestamp
  4. gps_lat
  5. gps_lon

I can calculate the bearing for a single plane_no by doing something as follows:

lf.sort("timestamp")
    .with_row_index('new_index')
    .with_columns(
        pl.struct(pl.col('gps_lat'), pl.col('gps_lon'))
        .rolling('new_index', period='2i')
        .shift(-1)
        .alias("latlon")
    ).with_columns(
        pl.col("latlon")
        .map_elements(lambda val: calculate_bearing(val[0]['gps_lat'], val[0]['gps_lon'], val[1]['gps_lat'], val[1]['gps_lon']), return_dtype=pl.Float64)
        .alias("bearing")
    ).drop(["latlon", "new_index"])

In order to work on my entire dataset though I need to run this on every plane_no as a partition. I can't combine rolling with over in polars. Is there an alternative? Any possible improvement ideas or overall comments are also welcome.

Upvotes: 0

Views: 62

Answers (2)

Dean MacGregor
Dean MacGregor

Reputation: 18691

General answer about simultaneous rolling and group_by

If you want to do a rolling and group_by at the same time then use the df method rolling which has a group_by argument which looks like

(
    df
    .sort("timestamp")
    .with_row_index()
    .rolling("index", period="3i", group_by="plane_no", offset="-1i")
    .agg(pl.col("gps_lat"))
)

Specific answer to whole problem

You don't need to do rolling here since the only thing you care about is the next value. You'd want to use rolling when your index value (usually time) paramount to which group it's in. For instance, if you wanted the average value of something within each 15m block then rolling would be a good fit.

For your case you just need shift. rolling is overkill especially since you're having to construct an index for it to use.

Additionally, I think you'd benefit from converting your calculate_bearing formula into a polars expression to avoid the map_elements call.

You can find the formula for bearing here. The inputs for it are expected to be in radians so you also need degrees to radians which is many places but here's one

All together you can do

def pl_calculate_bearing(
    lat: pl.Expr | str, lon: pl.Expr | str, timestamp: None | str | pl.Expr
) -> pl.Expr:
    if isinstance(lat, str):
        lat = pl.col(lat)
    if isinstance(lon, str):
        lon = pl.col(lon)
    if timestamp is not None:
        lat = lat.sort_by(timestamp)
        lon = lon.sort_by(timestamp)
    from math import pi
    # degrees to radians
    lat = (lat * pi) / 180
    lon = (lon * pi) / 180

    lat_next = lat.shift(-1)
    lon_next = lon.shift(-1)

    y = (lon_next - lon).sin() * lat_next.cos()
    x = lat.cos() * lat_next.sin() - lat.sin() * lat_next.cos() * (lon_next - lon).cos()

    return (pl.arctan2(y, x) * 180 / pi + 360) % 360

This has a few benefits, mostly that it doesn't have to call back to python for every row in your df instead keeping the calculation in polars which should be much faster. Semantically, it keeps you from having to go through the process of bunching everything into a struct or presorting your df. It does the shifting and sorting as part of the function. Note the timestamp third argument is optional, if you trust your df is already properly sorted then you can leave it blank.

With that defined getting the bearing is reduced to:

df.with_columns(
    bearing = pl_calculate_bearing("gps_lat","gps_lon","timestamp")
    .over("plane_no")
    )

Upvotes: 1

bajun65537
bajun65537

Reputation: 585

I think you are almost there! There is no need for extra indexing as .shift(-1).over("plane_no") will get you the next row.

What I would suggest you do is:

  • sort by plane_no and timestamp, and shift coordinates forward by one row within each plane_no
  • calculate Bearing and drop temp columns formula (as you did)

Say your dataframe is as follows:

df = pl.DataFrame({
    "event_uid": [1, 2, 3, 4, 5, 6],
    "plane_no": ["A", "A", "A", "B", "B", "B"],
    "timestamp": [1, 2, 3, 1, 2, 3],
    "gps_lat": [37.7749, 37.7750, 37.7751, 40.7128, 40.7130, 40.7132],
    "gps_lon": [-122.4194, -122.4195, -122.4196, -74.0060, -74.0062, -74.0064]
})
event_uid plane_no timestamp gps_lat gps_lon
1 A 1 37.7749 -122.4194
2 A 2 37.7750 -122.4195
3 A 3 37.7751 -122.4196
4 B 1 40.7128 -74.0060
5 B 2 40.7130 -74.0062
6 B 3 40.7132 -74.0064

So in the first step, we sort by plane_np and timestamp, and shift the coordinates.

df = df.sort(["plane_no", "timestamp"]).with_columns([
    pl.col("gps_lat").shift(-1).over("plane_no").alias("gps_lat_next"),
    pl.col("gps_lon").shift(-1).over("plane_no").alias("gps_lon_next")
])
event_uid plane_no timestamp gps_lat gps_lon gps_lat_next gps_lon_next
1 A 1 37.7749 -122.4194 37.7750 -122.4195
2 A 2 37.7750 -122.4195 37.7751 -122.4196
3 A 3 37.7751 -122.4196 null null
4 B 1 40.7128 -74.0060 40.7130 -74.0062
5 B 2 40.7130 -74.0062 40.7132 -74.0064
6 B 3 40.7132 -74.0064 null null

Then, in the next step, we calculate the Bearing formula and drop helper columns.

df = df.sort(["plane_no", "timestamp"]).with_columns([
    pl.col("gps_lat").shift(-1).over("plane_no").alias("gps_lat_next"),
    pl.col("gps_lon").shift(-1).over("plane_no").alias("gps_lon_next")
]).with_columns(
    pl.struct(["gps_lat", "gps_lon", "gps_lat_next", "gps_lon_next"])
    .map_elements(lambda row: calculate_bearing(row["gps_lat"], row["gps_lon"], row["gps_lat_next"], row["gps_lon_next"]), return_dtype=pl.Float64)
    .alias("bearing")
).drop(["gps_lat_next", "gps_lon_next"])
event_uid plane_no timestamp gps_lat gps_lon bearing
1 A 1 37.7749 -122.4194 321.676379
2 A 2 37.7750 -122.4195 321.676416
3 A 3 37.7751 -122.4196 null
4 B 1 40.7128 -74.0060 322.838393
5 B 2 40.7130 -74.0062 322.838476
6 B 3 40.7132 -74.0064 null

Upvotes: 2

Related Questions