Reputation: 75
I have been working on an ETL process to push / upsert my data from PostgreSQL into ArcGIS Online into a lines feature layer to be included on a WebMap. I have had no problem with using essentially this exact format/code with a points layer. I read in one forum post the gis.content.analyze
method will only work with points...but the post is alluding me now that I want to reference it, of course (it was not official from ESRI).
I am building this in MWAA (AWS managed Apache Airflow solution) - This makes it tricky to install system libraries, it took me quite a bit of time to simply get the arcgis
pypi package to work since it relies on underlying system libraries.
I understand that there are other options for upsert such as SQLite, shapefile, fdgb, GeoJSON. I have installed Fiona and have the following as driver support (in theory, I have to install some of the underlying system packages in a Docker like "startup.sh
" file, which can be a bit finicky and tricky with the permissions):
INFO - {'DXF': 'rw', 'CSV': 'raw', 'OpenFileGDB': 'raw', 'ESRIJSON': 'r', 'ESRI Shapefile': 'raw', 'FlatGeobuf': 'raw', 'GeoJSON': 'raw', 'GeoJSONSeq': 'raw', 'GPKG': 'raw', 'GML': 'rw', 'OGR_GMT': 'rw', 'GPX': 'rw', 'MapInfo File': 'raw', 'DGN': 'raw', 'S57': 'r', 'SQLite': 'raw', 'TopoJSON': 'r'}
When trying SQLite
I run into an issue where when I get to the gis.content.add
, my API response is 400 and includes incorrect filetype
, despite it loading under content in the portal and marking it as SQLite, I do think I'm doing something wrong as I can't just manually publish a feature layer from the SQLite file.
When trying ESRI Shapefile
, I am running into issues with the column names being truncated (I believe column names cannot be longer than 10 chars).
When trying GeoJSON
, I noticed when I build a feature layer from a GeoJSON, there is automagically a character limit of 2B. If I define the feature layer before in terms of dtypes and lengths can I get around this and use the code below perhaps?
My code is as follows:
from arcgis.gis import GIS
from arcgis.features import FeatureLayerCollection
import geopandas as gpd
import numpy as np
import time
import pandas as pd
import pandas as pd
from datetime import datetime
import tempfile
import helpers
def esri_con():
username, password = helpers.get_esri_secrets()
return GIS(url="https://www.arcgis.com", username=username, password=password)
def get_esri_layer(layer_id):
username, password = helpers.get_esri_secrets()
gis = GIS(url="https://www.arcgis.com", username=username, password=password)
feature_layer = FeatureLayerCollection.fromitem(gis.content.get(layer_id)).layers[0]
return feature_layer
def get_layer_count(input_layer):
return input_layer.query(return_count_only=True)
def db_query():
full_query = f"""
SELECT * FROM <mat_view>;
"""
# some additional WHERE logic to manipulate lookback window etc.
gdf = gpd.GeoDataFrame.from_postgis(full_query, con=con, geom_col="line_geometry")
columns_to_keep = [
<all necessary columns within feature layer>
"line_geometry",
]
gdf = gdf[columns_to_keep]
<logic to cast timestamps to pd.to_datetime>
<logic to cast string to a few columns>
gdf.replace({np.nan: None}, inplace=True)
gdf = gdf.where(pd.notnull(gdf), None)
return gdf
def upsert_records_via_csv(logger, gis, layer_id, gdf):
if gdf.empty:
logger.info("No lines data to process. Exiting with success...")
return True
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as temp_file:
gdf.to_csv(temp_file.name, index=False)
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_lines_data_upload.csv"
item_properties = {
"title": filename,
"type": "CSV",
"tags": "automated, upsert",
}
logger.info("Uploading lines CSV file to ArcGIS...")
csv_item = gis.content.add(
item_properties, data=temp_file.name, folder="<my_folder>"
)
logger.info(f"Lines CSV file uploaded to ArcGIS with ID {csv_item.id}")
logger.info("Analyzing CSV file...")
source_info = gis.content.analyze(
item=csv_item.id, file_type="csv", location_type="coordinates"
)
feature_layer = gis.content.get(layer_id).layers[0]
try:
success, return_msg = feature_layer.append(
item_id=csv_item.id,
upload_format="csv",
source_info=source_info["publishParameters"],
upsert=True,
update_geometry=True,
upsert_matching_field="<unique identifier>",
return_messages=True,
)
except Exception as e:
logger.error("Exception during upsert operation.")
logger.exception(e)
return False
if success:
logger.info(f"Successfully upserted {(len(gdf))} features.")
csv_item.delete()
else:
logger.info(f"Failed to add all {len(gdf)} features")
logger.error(f"Error msg: {return_msg}")
return success
if __name__ == "__main__":
logger = helpers.setup_logger("esri_line_sync_process")
sync_gdf = db_query()
count_of_db_records = len(sync_gdf)
logger.info(f"Query returned {count_of_db_records} line records to be upserted.")
if sync_gdf.empty:
logger.info("No data to process. Exiting with success...")
exit(0)
gis = esri_con()
sync_layer_id = "<line_feature_laye_idr>"
feature_layer = get_esri_layer(sync_layer_id)
esri_layer_initial_count = get_layer_count(feature_layer)
logger.info(f"Initial lines layer count: {esri_layer_initial_count}")
logger.info("Starting lines upsert process...")
success = upsert_records_via_csv(logger, gis, sync_layer_id, sync_gdf)
if success:
esri_layer_after_upsert_count = get_layer_count(feature_layer)
logger.info(
f"Lines upsert process completed. Final layer count: {esri_layer_after_upsert_count}"
)
else:
logger.error("Lines upsert process failed.")
exit(1)
EDIT: I should note that line_geometry
is pulled like this from pg:
round(earth_distance(ll_to_earth(w.latitude::double precision, w.longitude::double precision), ll_to_earth(base.bh_latitude::double precision, base.bh_longitude::double precision)) * 3.28084::double precision) < 35000::double precision THEN ( SELECT st_makeline(st_setsrid(st_makepoint(w.longitude::double precision, w.latitude::double precision), 4326), st_setsrid(st_makepoint(base.bh_longitude::double precision, base.bh_latitude::double precision), 4326)) AS st_makeline)
I am using the context block of with tempfile.NamedTemporaryFile...
because it plays nicely with my Airflow workers and the /tmp dir.
When I execute this code, it runs successfully, but any row it updates essentially wipes because I assume it is silently rejecting the geometry update due to it thinking its a point and pushing to a line layer.
When I dig deeper into the source_info['publishParameters']
I see 'geometryType': " "'esriGeometryPoint'
-- which seems as if it is assuming it is a points update, as I would expect to see esriGeometryPolyline
for this value (I would think).
I thought perhaps I needed to change the location_type
variable in the gis.content.analyze()
but none of the following make sense? coordinates | address | lookup | none
.
I've scoured the internet for an example of how to do this between the documentation and different forum posts but am failing in finding guidance or best practices on how to accomplish this task.
Right now, I think my paths forward are...
gdf.to_file(file, driver='FileGDB', layer='lines_data')
. I'm currently working on this dependency fun in a managed environment.Hoping someone has faced something similar and perhaps I'm overlooking a much simpler solution.
Upvotes: 0
Views: 56
Reputation: 75
I was able to get GeoJSON to work without using a driver by using gdf.to_json()
def upsert_records_via_geojson(logger, gis, layer_id, gdf):
if gdf.empty:
logger.info("No lines data to process. Exiting with success...")
return True
geojson_str = gdf.to_json(na='null', show_bbox=False, drop_id=False)
with tempfile.NamedTemporaryFile(delete=False, suffix=".geojson") as temp_file:
temp_file.write(geojson_str.encode('utf-8'))
filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_lines_data_upload.geojson"
item_properties = {
"title": filename,
"type": "GeoJson",
"tags": "automated, upsert",
}
logger.info("Uploading lines GeoJSON file to ArcGIS...")
geojson_item = gis.content.add(
item_properties, data=temp_file.name, folder="<my folder>"
)
logger.info(f"Lines GeoJSON file uploaded to ArcGIS with ID {geojson_item.id}")
feature_layer = gis.content.get(layer_id).layers[0]
try:
success, return_msg = feature_layer.append(
item_id=geojson_item.id,
upload_format="geojson",
upsert=True,
update_geometry=True,
upsert_matching_field="<unique field>",
return_messages=True,
)
except Exception as e:
logger.error("Exception during upsert operation.")
logger.exception(e)
return False
if success:
logger.info(f"Successfully upserted {(len(gdf))} features.")
geojson_item.delete()
else:
logger.info(f"Failed to add all {len(gdf)} features")
logger.error(f"Error msg: {return_msg}")
return success
Upvotes: 1