Reputation: 155
I'm working on isochrones building them for some schools just to show accessibility. Here is my code:
# importing the required libraries
import osmnx as ox
import pandas as pd
import warnings
import networkx as nx
import geopandas as gpd
from shapely.geometry import Point, Polygon
import matplotlib.cm as cm
import matplotlib.colors as colors
import folium
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=PendingDeprecationWarning)
ox.config(use_cache=True, log_console=False)
# reading the data
sec_data = pd.read_csv('H:/WORK/Upwork/Project 7 - Python School Data Analysis/Analysis By Country/Tanzania/CSV Files/secondary.csv')
sec_schools = gpd.GeoDataFrame(
sec_data, geometry=gpd.points_from_xy(sec_data.longitude, sec_data.latitude)
)
sec_schools.crs = "EPSG:4326"
# we'll test it using the first 20 records
sec_schools = sec_schools.head(20)
# function to get isochrones
def get_isochrone(lon, lat, walk_times=[15, 30], speed=4.5, name=None, point_index=None):
loc = (lat, lon)
G = ox.graph_from_point(loc, simplify=True, network_type="walk")
gdf_nodes = ox.graph_to_gdfs(G, edges=False)
center_node = ox.distance.nearest_nodes(G, lon, lat)
meters_per_minute = speed * 1000 / 60 # km per hour to m per minute
for u, v, k, data in G.edges(data=True, keys=True):
data["time"] = data["length"] / meters_per_minute
polys = []
for walk_time in walk_times:
subgraph = nx.ego_graph(G, center_node, radius=walk_time, distance="time")
node_points = [
Point(data["x"], data["y"]) for node, data in subgraph.nodes(data=True)
]
polys.append(gpd.GeoSeries(node_points).unary_union.convex_hull)
info = {}
if name:
info["name"] = [name for t in walk_times]
if point_index:
info["point_index"] = [point_index for t in walk_times]
return {**{"geometry": polys, "time": walk_times}, **info}
# walk time list of minutes
WT = [30, 45, 60]
# build geopandas data frame of isochrone polygons for each school
isochrones = pd.concat(
[
gpd.GeoDataFrame(
get_isochrone(
r["geometry"].x,
r["geometry"].y,
name=r["school name"],
point_index=i,
walk_times=WT,
),
crs=sec_schools.crs,
)
for i, r in sec_schools.iterrows()
]
)
# merge overlapping polygons
# https://gis.stackexchange.com/questions/334459/how-to-dissolve-overlapping-polygons-using-geopandas
mergedpolys = gpd.GeoDataFrame(
geometry=isochrones.groupby("time")["geometry"]
.agg(lambda g: g.unary_union)
.apply(lambda g: [g] if isinstance(g, Polygon) else g.geoms)
.explode(),
crs=isochrones.crs,
)
# visualize merged polygons
m = None
for i, wt in enumerate(WT[::-1]):
m = mergedpolys.loc[[wt]].explore(
m=m,
color=colors.to_hex(cm.get_cmap("tab20b", len(WT))(i)),
name=wt,
height=300,
width=500,
)
m = sec_schools.head(SCHOOLS).explore(
m=m, marker_kwds={"radius": 3, "color": "red"}, name="schools"
)
folium.LayerControl().add_to(m)
When I run the above code, I get an error, "ValueError: Found no graph nodes within the requested polygon". I have a strong reason to believe that this error could be dependent on the place. How can I go about this? I will appreciate any help. I was thinking of try...catch to catch the error, but I don't know where to place in the code, or any other solution I need to do. Here is the GitHub Link to the first 20 schools. Please note that these data is available to the public from their resource page.
In addition also, how would I make travel times over 30 minutes to plot on the map? Like in the above code, there are three travel times, but only polygons for 30 mins travel time are returned, while the rest are not. If there is a way to go past this I will truly appreciate the help.
Upvotes: 0
Views: 1386
Reputation: 1
In the code, dist
is the distance so, in my case, I just increased it from the 550 to 1000 and the error disappeared. My understanding is that a graph with complete nodes depends on the distance, because the distance tells the length of the area the graph should cover; and, if it does not contain valid points that could make nodes, then we see this error.
#import osmnx library
import osmnx as ox
def return_road_type_smnx(driver_type:str,lat,lon):
# Download road network as a Graph object
# The network will be centred at your coordinate
# and includes all nodes in the vicinity of 2 km
G = ox.graph_from_point((lat,lon), dist=550, network_type='all')
# print(lat,lon)
# plot if you wish
# ox.plot_graph(G)
# Convert Graph to graph data frame
gdf = ox.graph_to_gdfs(G, nodes=False, fill_edge_geometry=True)
# convert to GeoJSON
gdf.to_json()
# print(gdf.highway)
#get the road type
roads = gdf.highway
# get the road name
# road_names=gdf.name.unique()
# print road names
# print(road_names[1])
# calculate and attach distance
# calculate and attach distance
roads_with_distances = [(road, ox.distance.nearest_nodes(G, lon,
lat,return_dist=False)) for road in roads]
# print(roads_with_distances)
# sort by distance
roads_with_distances = sorted(roads_with_distances, key=lambda x: x[1])
# Select closest road
closest_road = roads_with_distances
# print(closest_road)
# closest_road_type = roads_with_distances[0][0][4]
# print the closest road type
# print(closest_road[0][0])
# initialise speed limit
speed_limit = 0
# set speed limit based on road type
if driver_type == 'pud':
if closest_road[0][0] == 'primary':
speed_limit = 90
elif closest_road[0][0] == 'secondary':
speed_limit = 60
elif closest_road[0][0] == 'tertiary':
speed_limit = 60
elif closest_road[0][0] == 'residential':
speed_limit = 30
elif closest_road[0][0] == 'unclassified':
speed_limit = 0
elif closest_road[0][0] == 'service':
speed_limit = 40
elif closest_road[0][0] == 'trunk':
speed_limit = 90
elif closest_road[0][0] == 'primary_link':
speed_limit = 90
elif closest_road[0][0] == 'trunk_link':
speed_limit = 90
else:
print('invalid road type')
elif driver_type=='prd':
if closest_road[0][0] == 'primary':
speed_limit = 100
elif closest_road[0][0] == 'secondary':
speed_limit = 80
elif closest_road[0][0] == 'tertiary':
speed_limit = 60
elif closest_road[0][0] == 'residential':
speed_limit = 30
elif closest_road[0][0] == 'unclassified':
speed_limit = 0
elif closest_road[0][0] == 'service':
speed_limit = 40
elif closest_road[0][0] == 'trunk':
speed_limit = 100
elif closest_road[0][0] == 'primary_link':
speed_limit = 100
elif closest_road[0][0] == 'trunk_link':
speed_limit = 100
else:
# print the closest road type
print('Not Known to us',closest_road[0][0])
else:
print(driver_type+'-Driver type not known')
print(speed_limit)
return speed_limit
Upvotes: 0
Reputation: 31236
I see three sub-questions
ox.graph_from_point()
takes time. This is where time is taken. Having implemented caching using a pickle file so same results are not continuously requestedox.graph_from_point()
has a dist parameter. This defaults to 1000m which is not sufficient for walking for 60 minutes at 4.5km/h. Have amended to define a distance that will be sufficientimport osmnx as ox
import pandas as pd
import warnings
import networkx as nx
import geopandas as gpd
from shapely.geometry import Point, Polygon
import shapely
import matplotlib.cm as cm
import matplotlib.colors as colors
import folium
from pathlib import Path
# getting isochrones is expensive, cache
f = Path.cwd().joinpath("pkl_cache")
if not f.is_dir():
f.mkdir()
f = f.joinpath("isochrones.pkl")
if f.exists():
isochrones = pd.read_pickle(f)
else:
isochrones = gpd.GeoDataFrame(columns=["point_index", "name"])
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=PendingDeprecationWarning)
ox.config(use_cache=True, log_console=False)
# reading the data
# sec_data = pd.read_csv('H:/WORK/Upwork/Project 7 - Python School Data Analysis/Analysis By Country/Tanzania/CSV Files/secondary.csv')
# sec_schools = gpd.GeoDataFrame(
# sec_data, geometry=gpd.points_from_xy(sec_data.longitude, sec_data.latitude)
# )
sec_schools = gpd.GeoDataFrame(
pd.read_csv(
"https://raw.githubusercontent.com/Evanskip31/isochrones-polygons/master/first_20_secondary_schools.csv",
index_col=0,
).assign(geometry=lambda d: d["geometry"].apply(shapely.wkt.loads)),
crs="epsg:4326",
)
sec_schools.crs = "EPSG:4326"
# we'll test it using the first few records
SCHOOLS = 20
sec_schools = sec_schools.head(SCHOOLS)
# function to get isochrones
def get_isochrone(
lon, lat, walk_times=[15, 30], speed=4.5, name=None, point_index=None
):
loc = (lat, lon)
# take distance walked into account...
G = ox.graph_from_point(
loc,
simplify=True,
network_type="walk",
dist=(max(walk_times) / 60) * (speed + 1) * 1000,
)
gdf_nodes = ox.graph_to_gdfs(G, edges=False)
center_node = ox.distance.nearest_nodes(G, lon, lat)
meters_per_minute = speed * 1000 / 60 # km per hour to m per minute
for u, v, k, data in G.edges(data=True, keys=True):
data["time"] = data["length"] / meters_per_minute
polys = []
for walk_time in walk_times:
subgraph = nx.ego_graph(G, center_node, radius=walk_time, distance="time")
node_points = [
Point(data["x"], data["y"]) for node, data in subgraph.nodes(data=True)
]
polys.append(gpd.GeoSeries(node_points).unary_union.convex_hull)
info = {}
if name:
info["name"] = [name for t in walk_times]
if point_index is not None:
info["point_index"] = [point_index for t in walk_times]
return {**{"geometry": polys, "time": walk_times}, **info}
# walk time list of minutes
WT = [30, 45, 60]
# build geopandas data frame of isochrone polygons for each school
isochrones = pd.concat(
[isochrones]
+ [
gpd.GeoDataFrame(
get_isochrone(
r["geometry"].x,
r["geometry"].y,
name=r["school name"],
point_index=i,
walk_times=WT,
),
crs=sec_schools.crs,
)
for i, r in sec_schools.loc[
~sec_schools.index.isin(isochrones["point_index"].tolist())
].iterrows()
]
)
# save to act as a cache
isochrones.to_pickle(f)
# merge overlapping polygons
# https://gis.stackexchange.com/questions/334459/how-to-dissolve-overlapping-polygons-using-geopandas
mergedpolys = gpd.GeoDataFrame(
geometry=isochrones.groupby("time")["geometry"]
.agg(lambda g: g.unary_union)
.apply(lambda g: [g] if isinstance(g, Polygon) else g.geoms)
.explode(),
crs=isochrones.crs,
)
# visualize merged polygons
m = None
for i, wt in enumerate(WT[::-1]):
m = mergedpolys.loc[[wt]].explore(
m=m,
color=colors.to_hex(cm.get_cmap("tab20b", len(WT))(i)),
name=wt,
height=300,
width=500,
)
m = sec_schools.head(SCHOOLS).explore(
m=m, marker_kwds={"radius": 3, "color": "red"}, name="schools"
)
folium.LayerControl().add_to(m)
m
Upvotes: 1