Rabbids
Rabbids

Reputation: 176

networkx finding shortest path in edge label order

I work for an airport, and my task is to calculate the taxiing routes for airplanes on taxiways, ensuring they follow the prescribed road sequence. Here, I'll give an example and would like to ask if there are any high-performance algorithms for calculating the shortest path while adhering to the specified route sequence, using Python.

import networkx as nx
import matplotlib.pyplot as plt

G = nx.Graph()
G.add_edge("A1", "A2", weight=1, name="A")
G.add_edge("A2", "A3", weight=1, name="A")
G.add_edge("A3", "A4", weight=1, name="A")
G.add_edge("A4", "A5", weight=1, name="A")
G.add_edge("A5", "A6", weight=1, name="A")

G.add_edge("A2", "B1", weight=1, name="B")
G.add_edge("B1", "B2", weight=1, name="B")
G.add_edge("B2", "B3", weight=1, name="B")

G.add_edge("A6", "C1", weight=1, name="C")
G.add_edge("C1", "C2", weight=1, name="C")
G.add_edge("A6", "C3", weight=1, name="C")
G.add_edge("C3", "C4", weight=1, name="C")
G.add_edge("C4", "C2", weight=1, name="C")

G.add_edge("B2", "D1", weight=1, name="D")
G.add_edge("D1", "D2", weight=1, name="D")
G.add_edge("D2", "D3", weight=1, name="D")
G.add_edge("D3", "C2", weight=1, name="D")

G.add_edge("D1", "E1", weight=1, name="E")
G.add_edge("D2", "E1", weight=1, name="E")
G.add_edge("E1", "E2", weight=1, name="E")

# draw
pos = nx.spring_layout(G, seed=22)
nx.draw_networkx_nodes(G, pos, node_size=300)
nx.draw_networkx_edges(G, pos, width=1)
nx.draw_networkx_labels(G, pos, font_size=10, font_family="sans-serif")
edge_labels = nx.get_edge_attributes(G, "name")
nx.draw_networkx_edge_labels(G, pos, edge_labels)
ax = plt.gca()
ax.margins(0.0001)
plt.axis("off")
plt.tight_layout()
plt.show()

G

For example:

If source = "B3" and target = "E2", and the route sequence order = ["B", "A", "C", "D", "E"], then the calculated result should be: ["B3", "B2", "B1", "A2", "A3", "A4", "A5", "A6", "C1", "C2", "D3", "D2", "E1", "E2"].

If source = "A1" and target = "B3", and the route sequence order = ["A", "B", "D", "E", "D", "B"], then the calculated result should be: ["A1", "A2", "B1", "B2", "D1", "D2", "E1", "D1", "B2", "B3"].

I have written a function based on my own ideas, and it is currently working as expected. However, I think its performance can still be optimized. The current approach involves sequentially searching for feasible paths in the specified order and then calculating the weight of each feasible path to determine the optimal one.

import networkx as nx

def sequential_shortest_path(G, start_node, end_node, order, order_label, weight_label):
    # Initialize the pathfinding paths index
    pindex = 0
    # Initialize the pathfinding paths
    paths = {
        pindex: {
            "prev": "",
            "node": start_node,
            "npath": [start_node],
            "visited": []
        }
    }
    # Find the path in stages according to the sequence order
    for k in range(1, len(order)):
        # Current road name
        now_way = order[k-1]
        # Target road name
        target_way = order[k]
        # The paths to the target road has been reached
        reach_paths = {}
        # Infinite loop pathfinding until the paths reaches the target road
        while True:
            for i in list(paths.keys()):
                prev = paths[i]["prev"]
                node = paths[i]["node"]
                neighbors = [neighbor for neighbor in G.neighbors(node) if neighbor != prev and neighbor not in paths[i]["visited"] and (G[node][neighbor][order_label] == now_way or G[node][neighbor][order_label] == target_way)]
                if len(neighbors) == 0:
                    del paths[i]
                elif len(neighbors) == 1:
                    neighbor = neighbors[0]
                    if G[node][neighbor][order_label] == target_way:
                        reach_paths[i] = {
                            "prev": node,
                            "node": neighbor,
                            "npath": paths[i]["npath"].copy() + [neighbor],
                            "visited": []
                        }
                        del paths[i]
                    else:
                        paths[i]["prev"] = node
                        paths[i]["node"] = neighbor
                        paths[i]["npath"].append(neighbor)
                        paths[i]["visited"].append(neighbor)
                elif len(neighbors) > 1:
                    for neighbor in neighbors:
                        pindex += 1
                        if G[node][neighbor][order_label] == target_way:
                            reach_paths[pindex] = {
                                "prev": node,
                                "node": neighbor,
                                "npath": paths[i]["npath"].copy() + [neighbor],
                                "visited": []
                            }
                        else:
                            paths[pindex] = {
                                "prev": node,
                                "node": neighbor,
                                "npath": paths[i]["npath"].copy() + [neighbor],
                                "visited": paths[i]["visited"].copy() + [neighbor]
                            }
                    del paths[i]
            if len(paths) == 0:
                break
        paths.update(reach_paths)
    # Find the last section of the road to the end node
    reach_paths = {}
    while True:
        for i in list(paths.keys()):
            prev = paths[i]["prev"]
            node = paths[i]["node"]
            if node == end_node:
                reach_paths[i] = {
                    "prev": prev,
                    "node": node,
                    "npath": paths[i]["npath"].copy()
                }
                del paths[i]
                continue
            neighbors = [neighbor for neighbor in G.neighbors(node) if neighbor != prev and neighbor not in paths[i]["visited"] and G[node][neighbor][order_label] == target_way]
            if len(neighbors) == 0:
                del paths[i]
            elif len(neighbors) == 1:
                neighbor = neighbors[0]
                paths[i]["prev"] = node
                paths[i]["node"] = neighbor
                paths[i]["npath"].append(neighbor)
                paths[i]["visited"].append(neighbor)
            elif len(neighbors) > 1:
                for neighbor in neighbors:
                    pindex += 1
                    paths[pindex] = {
                        "prev": node,
                        "node": neighbor,
                        "npath": paths[i]["npath"].copy() + [neighbor],
                        "visited": paths[i]["visited"].copy() + [neighbor],
                    }
                del paths[i]
        if len(paths) == 0:
            break
    
    # Best path
    best_path = None
    # Weight of the best path
    best_weight = None
    # Calculate the best path among the feasible paths
    for i, path in reach_paths.items():
        weight = nx.path_weight(G, path["npath"], weight_label)
        if best_weight is None or weight < best_weight:
            best_weight = weight
            best_path = path["npath"]
    
    return best_path, best_weight

Fuction Test

Upvotes: 2

Views: 86

Answers (1)

m.raynal
m.raynal

Reputation: 3123

Edit: new answer here, I've left the old one (which did not answer the problem) below.

First, let's describe the problem.

We'll denote by route(v1, v2) the "route" that corresponds to a edge v1, v2. In your example, we'd have route(C2, D3) = D, route(B1, B2) = B.
Your goal is to find the shortest path from a to b, by taking into account an extra constraint. The constraint is that the sequence of edges routes of the path must correspond (modulo repetitions) to a route sequence R given a input.

We'll tweak a bit the Dijkstra algorithm so it can take this constraint into account.
We'll use a heap that stores triplets (u, i, c) such that u is a vertex of G, i is a value in 0, len(R) representing what section of R the current path has already "validated", and c is the current total cost of the path. I'll detail below what order the heap should use to store its elements.

Initialize a proiority queue (heap) pq
pq.push((a, 0, 0))  # we start with the vertex a, route sequence index at 0, total cost at 0
repeat {
  if pq is empty then {                   # final case: no path
    return "NO PATH EXISTS"
  }

  u, i, c = pq.pop()
  if u == b and i == length(R) then { 
    return "TOTAL COST IS c"              #final case: shortest path found
  }

  for all vertices v that are neighbor of u do {
    if i > 0 and route(v, u) == R[i-1] then {
    #case: we're continuing on the "current" route
      pq.push((v, i, c + cost(u, v)))
    }
    if i < length(R) and route(v, u) == R[i] {
    # case: we're moving on the next route
      pq.push((v, i+1, c + cost(u, v)))
    }
    #other cases: invalid route sequence, we don't consider those
  }
}

And that's it.
The priority queue pq works as follows: its elements are pairs (u, i), they represent the shortest path from a to u that passes though the i first routes. It is ordered according to the total cost c (minimize), and in case of equality it is ordered according to i (maximize).
Meaning, for example, that (u, i, c) < (v, j, c+1) ; (u, i+1, c) < (v, i, c) ; (u, i, c) < (v, i+1, c+1)

Don't hesitate to ask questions in the comments if there's a part you don't understand. Also, keep in mind that this is an oversimplification of the real implementation you'll have to do. You'll need to (at least) keep track of the pairs (u, i) that have been visited (popped from pq), and you'll probably need to store the current path in the queue as well so you can return the entire path after the execution of the algorithm, and not only its total cost.

The complexity of the algorithm is length(R) times the one of a vanilla Dijkstra algorithm, so with small graph (hundreds of nodes) and routes it should work almost instantly.

Old, invalid answer:
The shortest path from a to b while adhering to the route sequence u1, u2, ..., un is the concatenation of the shortest paths (a, u1), (u1, u2), ... , (un, b).
That property is actually the one we use most often to prove that shortest path algorithms are indeed correct, we say that the shortest path problem has an "optimal substructure" which allows dynamic programming.

Considering the small number of nodes and edges in your graph (probably in the hundreds), and the positive weights (I imagine a function of time and/or distance), no need to look for some particular algorithm, a vanilla Dijktra will do the trick just fine. You should be able to compute a shortest path with routing constraint in Python in a matter of milliseconds without optimizing anything.

It could get more complicated if you need to integrate the "planes not colliding with each other" part of the problem into your algorithm.

Upvotes: 0

Related Questions