Reputation: 21
I have 100 Terabytes of data in parquet format. The data has lots of columns including latitude and longitude columns. I would like to use Apache Sedona to convert these columns to point type column (geometry), so that I will be able to do some other functions such as find the points in polygon. I am using databricks notebook with pyspark 3.2.
There are ways with SQL and Scala but I cannot find any API in the Apache Sedona documentation on how to do that in pyspark. Can anyone please help me how to do that?
The way I am reading my data is like this:
rawDf = sedona.read.format("parquet").load("s3://PATH_TO_MY_PARQUET_DATA")
rawDf.createOrReplaceTempView("rawdf")
I have tried the following code, but it gives me the error st_makePoint is not defined:
df = rawDf.withColumn("geometry", st_makePoint(col("longitude"), col("latitude")))
Any help would be greatly appreciated.
Upvotes: 1
Views: 387
Reputation: 21
Finally, after lots of trial and error and searches, I have found the answer to my question. We can add the point column to the data like this:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
SedonaContext.create(spark)
df = spark.read.parquet("PATH_TO_YOUR_DATA")
df.createOrReplaceTempView("df")
df_with_geom = spark.sql("Select *, ST_Point(longitude, latitude) AS geometry from df")
df_with_geom.createOrReplaceTempView("df_with_geom")
After running this code, the df_with_geom will have an additional column "geometry" that includes the points and it is geometry type.
If we assume these are some points within the US and we want to find which points are within which states. This is a spatial join query that can be done with the following steps:
First we need to have polygons for the states:
#Data source for the state boundary polygons:
Ubuntu wget https://raw.githubusercontent.com/DataOceanLab/CPTS-415-Project-Examples/main/boundary-each-state.tsv
states_wkt = spark.read.option("delimiter", "\t").option("header", "false").csv("PATH/boundary-each-state.tsv").toDF("state_name","state_bound")
states_wkt.show()
states_wkt.printSchema()
states = states_wkt.selectExpr("state_name", "ST_GeomFromWKT(state_bound) as state_bound")
states.show()
states.printSchema()
states.createOrReplaceTempView("states")
the result will look like this:
+-------------+--------------------+
| state_name| state_bound|
+-------------+--------------------+
| Alaska|POLYGON((-141.020...|
| Alabama|POLYGON((-88.1955...|
| Arkansas|POLYGON((-94.0416...|
| Arizona|POLYGON((-112.598...|
| California|POLYGON((-124.400...|
+-------------+--------------------+
only showing top 20 rows
root
|-- state_name: string (nullable = true)
|-- state_bound: string (nullable = true)
+-------------+--------------------+
| state_name| state_bound|
+-------------+--------------------+
| Alaska|POLYGON ((-141.02...|
| Alabama|POLYGON ((-88.195...|
| Arkansas|POLYGON ((-94.041...|
| Arizona|POLYGON ((-112.59...|
| California|POLYGON ((-124.40...|
+-------------+--------------------+
only showing top 20 rows
root
|-- state_name: string (nullable = true)
|-- state_bound: geometry (nullable = true)
Finally by running the following code, we can add the state column to the data for each point:
result = spark.sql("SELECT * FROM states s, df_with_geom a WHERE ST_Contains(s.state_bound, a.geometry)")
Upvotes: 1