user28073171
user28073171

Reputation: 11

Spatial join apache-sedona Overture maps

I am new to apache-spark and sedona. I have a csv file containing lat and lon column, and I am trying to join these with the geometry in overture maps buildings dataset. My goal is to get data related to all the lat-long pair i have in my csv file. Following is my code I have written in jupyter notebook in a dataproc cluster. If there is a better way to achieve this instead of joining lat-long pair with geometry in overture maps, please do let me know.

This is my code.

from sedona.spark import *
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import os
import time

# Initialize Sedona context with custom configuration
config = (
    SedonaContext.builder().
    config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"). \
    config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-3.4_2.12:1.5.1,'
           'org.datasyslab:geotools-wrapper:1.5.1-28.2,'
           'uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.4'). \
    config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all'). \
    getOrCreate()
)


sedona = SedonaContext.create(config)

# Step 1: Load the buildings data with geometries from S3
DATA_LINK = (
    "s3a://overturemaps-us-west-2/release/2024-09-18.0/"
)  
df_building = sedona.read.format("geoparquet").load(DATA_LINK + "theme=buildings/type=building")


# Step 2: Load the CSV data with latitude and longitude columns
csv_path = "path/to/csv/file/"
csv_data = config.read.option("header", "true").csv(csv_path)
csv_data = csv_data.withColumn("Latitude", F.col("Latitude").cast(DoubleType())) \
                   .withColumn("Longitude", F.col("Longitude").cast(DoubleType()))


# Step 3: Convert Latitude and Longitude in the CSV data to Point geometry, and drop 'id' to avoid duplicate
csv_data = csv_data.withColumn("point_geometry", F.expr("ST_Point(Longitude, Latitude)")).drop("id")

# Step 4: Register DataFrames as temporary views for Sedona SQL operations
df_building.createOrReplaceTempView("buildings")
csv_data.createOrReplaceTempView("csv_data")

# Step 5: Perform the spatial join to find buildings that contain the points
query = """
SELECT csv.*, b.*
FROM csv_data AS csv
JOIN buildings AS b
ON ST_Contains(b.geometry, csv.point_geometry)
"""
joined_df = sedona.sql(query)

# Display two rows from the result to check if the join is successful
joined_df.select().show(2)

And I am getting the following error

ERROR ClusterManager: Could not initialize cluster nodes=[geospatial-cluster-w-3.us-central1-f.c.quick-doodad-428221-a1.internal, geospatial-cluster-w-0.us-central1-f.c.quick-doodad-428221-a1.internal, geospatial-cluster-w-2.us-central1-f.c.quick-doodad-428221-a1.internal, geospatial-cluster-w-1.us-central1-f.c.quick-doodad-428221-a1.internal] nodeHostName=geospatial-cluster-m.us-central1-f.c.quick-doodad-428221-a1.internal nodeHostAddress=10.128.0.22 currentNodeIndex=null
                                                                                
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[1], line 65
     62 joined_df = sedona.sql(query)
     64 # Display two rows from the result to check if the join is successful
---> 65 joined_df.select().show(2)
     67 # Step 6: Drop the 'point_geometry' column to avoid unsupported data type error
     68 
     69 # # Convert geometry and point_geometry to WKT format to make them compatible with CSV
   (...)
     89 
     90 # Display two rows from the result to check if the join is successful
     91 joined_df.select().show(2)

File /usr/lib/spark/python/pyspark/sql/dataframe.py:945, in DataFrame.show(self, n, truncate, vertical)
    885 def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> None:
    886     """Prints the first ``n`` rows to the console.
    887 
    888     .. versionadded:: 1.3.0
   (...)
    943     name | Bob
    944     """
--> 945     print(self._show_string(n, truncate, vertical))

File /usr/lib/spark/python/pyspark/sql/dataframe.py:963, in DataFrame._show_string(self, n, truncate, vertical)
    957     raise PySparkTypeError(
    958         error_class="NOT_BOOL",
    959         message_parameters={"arg_name": "vertical", "arg_type": type(vertical).__name__},
    960     )
    962 if isinstance(truncate, bool) and truncate:
--> 963     return self._jdf.showString(n, 20, vertical)
    964 else:
    965     try:

File /opt/conda/miniconda3/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /usr/lib/spark/python/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
    177 def deco(*a: Any, **kw: Any) -> Any:
    178     try:
--> 179         return f(*a, **kw)
    180     except Py4JJavaError as e:
    181         converted = convert_exception(e.java_exception)

File /opt/conda/miniconda3/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o125.showString.
: java.lang.NoClassDefFoundError: org/opengis/referencing/FactoryException
    at org.apache.spark.sql.sedona_sql.strategy.join.TraitJoinQueryBase.toSpatialRDD(TraitJoinQueryBase.scala:41)
    at org.apache.spark.sql.sedona_sql.strategy.join.TraitJoinQueryBase.toSpatialRDD$(TraitJoinQueryBase.scala:40)
    at org.apache.spark.sql.sedona_sql.strategy.join.RangeJoinExec.toSpatialRDD(RangeJoinExec.scala:38)
    at org.apache.spark.sql.sedona_sql.strategy.join.TraitJoinQueryBase.toSpatialRddPair(TraitJoinQueryBase.scala:38)
    at org.apache.spark.sql.sedona_sql.strategy.join.TraitJoinQueryBase.toSpatialRddPair$(TraitJoinQueryBase.scala:34)
    at org.apache.spark.sql.sedona_sql.strategy.join.RangeJoinExec.toSpatialRddPair(RangeJoinExec.scala:38)
    at org.apache.spark.sql.sedona_sql.strategy.join.TraitJoinQueryExec.doExecute(TraitJoinQueryExec.scala:62)
    at org.apache.spark.sql.sedona_sql.strategy.join.TraitJoinQueryExec.doExecute$(TraitJoinQueryExec.scala:53)
    at org.apache.spark.sql.sedona_sql.strategy.join.RangeJoinExec.doExecute(RangeJoinExec.scala:38)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:498)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:547)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.opengis.referencing.FactoryException
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
    ... 56 more

I can see two error, one that the cluster manager could not initialize cluster nodes, but when ssh into master node and tries to ping the workers the communication between them was not interrupted.

And the other error is Py4JJavaError: An error occurred while calling o125.showString. : java.lang.NoClassDefFoundError: org/opengis/referencing/FactoryException which suggests that some jar file is missing, I cannot find which one is.

Upvotes: 1

Views: 61

Answers (0)

Related Questions