Mojo713
Mojo713

Reputation: 75

How to upsert LineString in ArcGIS python via CSV (or other suggestion?)

I have been working on an ETL process to push / upsert my data from PostgreSQL into ArcGIS Online into a lines feature layer to be included on a WebMap. I have had no problem with using essentially this exact format/code with a points layer. I read in one forum post the gis.content.analyze method will only work with points...but the post is alluding me now that I want to reference it, of course (it was not official from ESRI).

I am building this in MWAA (AWS managed Apache Airflow solution) - This makes it tricky to install system libraries, it took me quite a bit of time to simply get the arcgis pypi package to work since it relies on underlying system libraries.

I understand that there are other options for upsert such as SQLite, shapefile, fdgb, GeoJSON. I have installed Fiona and have the following as driver support (in theory, I have to install some of the underlying system packages in a Docker like "startup.sh" file, which can be a bit finicky and tricky with the permissions):

INFO - {'DXF': 'rw', 'CSV': 'raw', 'OpenFileGDB': 'raw', 'ESRIJSON': 'r', 'ESRI Shapefile': 'raw', 'FlatGeobuf': 'raw', 'GeoJSON': 'raw', 'GeoJSONSeq': 'raw', 'GPKG': 'raw', 'GML': 'rw', 'OGR_GMT': 'rw', 'GPX': 'rw', 'MapInfo File': 'raw', 'DGN': 'raw', 'S57': 'r', 'SQLite': 'raw', 'TopoJSON': 'r'}

When trying SQLite I run into an issue where when I get to the gis.content.add, my API response is 400 and includes incorrect filetype, despite it loading under content in the portal and marking it as SQLite, I do think I'm doing something wrong as I can't just manually publish a feature layer from the SQLite file.

When trying ESRI Shapefile, I am running into issues with the column names being truncated (I believe column names cannot be longer than 10 chars).

When trying GeoJSON, I noticed when I build a feature layer from a GeoJSON, there is automagically a character limit of 2B. If I define the feature layer before in terms of dtypes and lengths can I get around this and use the code below perhaps?

My code is as follows:

from arcgis.gis import GIS
from arcgis.features import FeatureLayerCollection
import geopandas as gpd
import numpy as np
import time
import pandas as pd
import pandas as pd
from datetime import datetime
import tempfile
import helpers

def esri_con():
    username, password = helpers.get_esri_secrets()
    return GIS(url="https://www.arcgis.com", username=username, password=password)

def get_esri_layer(layer_id):
    username, password = helpers.get_esri_secrets()
    gis = GIS(url="https://www.arcgis.com", username=username, password=password)
    feature_layer = FeatureLayerCollection.fromitem(gis.content.get(layer_id)).layers[0]
    return feature_layer

def get_layer_count(input_layer):
    return input_layer.query(return_count_only=True)

def db_query():
    full_query = f"""
    SELECT * FROM <mat_view>;
    """
    # some additional WHERE logic to manipulate lookback window etc.

    gdf = gpd.GeoDataFrame.from_postgis(full_query, con=con, geom_col="line_geometry")

    columns_to_keep = [
        <all necessary columns within feature layer>
        "line_geometry",
    ]

    gdf = gdf[columns_to_keep]

    <logic to cast timestamps to pd.to_datetime>
    <logic to cast string to a few columns> 
    
    gdf.replace({np.nan: None}, inplace=True)
    gdf = gdf.where(pd.notnull(gdf), None)

    return gdf

def upsert_records_via_csv(logger, gis, layer_id, gdf):
    if gdf.empty:
        logger.info("No lines data to process. Exiting with success...")
        return True

    with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as temp_file:
        gdf.to_csv(temp_file.name, index=False)

        filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_lines_data_upload.csv"
        item_properties = {
            "title": filename,
            "type": "CSV",
            "tags": "automated, upsert",
        }

        logger.info("Uploading lines CSV file to ArcGIS...")
        csv_item = gis.content.add(
            item_properties, data=temp_file.name, folder="<my_folder>"
        )
        logger.info(f"Lines CSV file uploaded to ArcGIS with ID {csv_item.id}")

        logger.info("Analyzing CSV file...")
        source_info = gis.content.analyze(
            item=csv_item.id, file_type="csv", location_type="coordinates"
        )

        feature_layer = gis.content.get(layer_id).layers[0]
        try:
            success, return_msg = feature_layer.append(
                item_id=csv_item.id,
                upload_format="csv",
                source_info=source_info["publishParameters"],
                upsert=True,
                update_geometry=True,
                upsert_matching_field="<unique identifier>",
                return_messages=True,
            )
        except Exception as e:
            logger.error("Exception during upsert operation.")
            logger.exception(e)
            return False
        if success:
            logger.info(f"Successfully upserted {(len(gdf))} features.")
            csv_item.delete()
        else:
            logger.info(f"Failed to add all {len(gdf)} features")
            logger.error(f"Error msg: {return_msg}")

    return success

if __name__ == "__main__":
    logger = helpers.setup_logger("esri_line_sync_process")

    sync_gdf = db_query()
    count_of_db_records = len(sync_gdf)
    logger.info(f"Query returned {count_of_db_records} line records to be upserted.")
    if sync_gdf.empty:
        logger.info("No data to process. Exiting with success...")
        exit(0)

    gis = esri_con()
    sync_layer_id = "<line_feature_laye_idr>"
    feature_layer = get_esri_layer(sync_layer_id)
    esri_layer_initial_count = get_layer_count(feature_layer)
    logger.info(f"Initial lines layer count: {esri_layer_initial_count}")

    logger.info("Starting lines upsert process...")
    success = upsert_records_via_csv(logger, gis, sync_layer_id, sync_gdf)
    if success:
        esri_layer_after_upsert_count = get_layer_count(feature_layer)
        logger.info(
            f"Lines upsert process completed. Final layer count: {esri_layer_after_upsert_count}"
        )
    else:
        logger.error("Lines upsert process failed.")
        exit(1)

EDIT: I should note that line_geometry is pulled like this from pg: round(earth_distance(ll_to_earth(w.latitude::double precision, w.longitude::double precision), ll_to_earth(base.bh_latitude::double precision, base.bh_longitude::double precision)) * 3.28084::double precision) < 35000::double precision THEN ( SELECT st_makeline(st_setsrid(st_makepoint(w.longitude::double precision, w.latitude::double precision), 4326), st_setsrid(st_makepoint(base.bh_longitude::double precision, base.bh_latitude::double precision), 4326)) AS st_makeline)

I am using the context block of with tempfile.NamedTemporaryFile... because it plays nicely with my Airflow workers and the /tmp dir.

When I execute this code, it runs successfully, but any row it updates essentially wipes because I assume it is silently rejecting the geometry update due to it thinking its a point and pushing to a line layer.

When I dig deeper into the source_info['publishParameters'] I see 'geometryType': " "'esriGeometryPoint' -- which seems as if it is assuming it is a points update, as I would expect to see esriGeometryPolyline for this value (I would think).

I thought perhaps I needed to change the location_type variable in the gis.content.analyze() but none of the following make sense? coordinates | address | lookup | none.

I've scoured the internet for an example of how to do this between the documentation and different forum posts but am failing in finding guidance or best practices on how to accomplish this task.

Right now, I think my paths forward are...

  1. Install underlying GDAL dependencies in order to use gdf.to_file(file, driver='FileGDB', layer='lines_data'). I'm currently working on this dependency fun in a managed environment.
  2. Change the column names of my feature layers to be shorter than 10 characters in order to use the Shapefile method.
  3. Test if I can use upsert for GeoJson on a predefined, prepopulated feature layer generated from ArcGIS Pro.
  4. In chunks, pull out every single unique identifiers into an array from my feature layer, sort them against my gdf by what needs to be inserted, updated and deleted, and yet again run chunks process them. This seems like the most straightforward but least efficient and painful way...

Hoping someone has faced something similar and perhaps I'm overlooking a much simpler solution.

Upvotes: 0

Views: 56

Answers (1)

Mojo713
Mojo713

Reputation: 75

I was able to get GeoJSON to work without using a driver by using gdf.to_json()

def upsert_records_via_geojson(logger, gis, layer_id, gdf):
    if gdf.empty:
        logger.info("No lines data to process. Exiting with success...")
        return True

    geojson_str = gdf.to_json(na='null', show_bbox=False, drop_id=False)
    with tempfile.NamedTemporaryFile(delete=False, suffix=".geojson") as temp_file:
        temp_file.write(geojson_str.encode('utf-8'))

        filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_lines_data_upload.geojson"
        item_properties = {
            "title": filename,
            "type": "GeoJson",
            "tags": "automated, upsert",
        }

        logger.info("Uploading lines GeoJSON file to ArcGIS...")
        geojson_item = gis.content.add(
            item_properties, data=temp_file.name, folder="<my folder>"
        )
        logger.info(f"Lines GeoJSON file uploaded to ArcGIS with ID {geojson_item.id}")

        feature_layer = gis.content.get(layer_id).layers[0]
        try:
            success, return_msg = feature_layer.append(
                item_id=geojson_item.id,
                upload_format="geojson",
                upsert=True,
                update_geometry=True,
                upsert_matching_field="<unique field>",
                return_messages=True,
            )
        except Exception as e:
            logger.error("Exception during upsert operation.")
            logger.exception(e)
            return False
        if success:
            logger.info(f"Successfully upserted {(len(gdf))} features.")
            geojson_item.delete()
        else:
            logger.info(f"Failed to add all {len(gdf)} features")
            logger.error(f"Error msg: {return_msg}")

    return success

Upvotes: 1

Related Questions