Fabian
Fabian

Reputation: 3

Solving integer network flow problem using OptaPlanner

I am new to OptaPlanner and would like to test its capabilities on an integer network flow problem.

The problem I am trying to model is relatively simple:

I started by modeling the problem facts as follows using OptaPy:

from optapy import problem_fact, planning_id

@problem_fact
class Node:
    def __init__(self, id, lb, ub, cost):
        self.id = id
        self.lb = lb
        self.ub = ub
        self.cost = cost

    @planning_id
    def get_id(self):
        return self.id

    def __str__(self):
        return f'Node(id={self.id},lb={self.lb},ub={self.ub},cost={self.cost})'

@problem_fact
class Edge:
    def __init__(self, id, node_from, node_to, cost):
        self.id = id
        self.node_from = node_from
        self.node_to = node_to
        self.cost = cost

    @planning_id
    def get_id(self):
        return self.id

    def __str__(self):
        return f'Node(id={self.id},node_from={self.node_from.id},node_to={self.node_to.id},cost={self.cost})'

I am guessing that the planning entities and variables are done analogously?

from optapy import planning_entity, planning_variable

@planning_entity
class NodeFlow:
    def __init__(self, id, node, flow=None):
        self.id = id
        self.node = node
        self.flow = flow

    @planning_id
    def get_id(self):
        return self.id

    @planning_variable(Node, ['nodeFlowRange'])
    def get_flow(self):
        return self.flow

    def set_flow(self, new_flow):
        self.flow = new_flow

@planning_entity
class EdgeFlow:
    def __init__(self, id, edge, flow=None):
        self.id = id
        self.edge = edge
        self.flow = flow

    @planning_id
    def get_id(self):
        return self.id

    @planning_variable(Node, ['edgeFlowRange'])
    def get_flow(self):
        return self.flow

    def set_flow(self, new_flow):
        self.flow = new_flow

However, now I am stuck modeling the constraints. It's not clear to me how to model, e.g., flow conservation. My code skeleton is as follows:

from optapy import constraint_provider
from optapy.types import Joiners, HardSoftScore

@constraint_provider
def define_constraints(constraint_factory):
    return [
        # Hard constraints
        flow_conservation(constraint_factory)
    ]

def flow_conservation(constraint_factory):
    return constraint_factory \
        .for_each(Node) \
        #.group_by(?) # To sum in-flows and out-flows per node?
        .join(Node,
              # ?
              ) \
        .penalize('Flow conservation conflict', HardSoftScore.ONE_HARD)

Is this the correct way to model such a problem? How do I model e.g. flow conservation?

Thank you for your help!

Upvotes: 0

Views: 140

Answers (1)

Christopher Chianelli
Christopher Chianelli

Reputation: 2013

"flow_bounds" can be modelled as a "built-in" constraint by using "@value_range_provider" on a "@planning_entity" (as described in https://www.optapy.org/docs/latest/planner-configuration/planner-configuration.html#valueRangeProviderOnPlanningEntity), but it appears there is a bug I need to fix with @value_range_provider on @planning_entity) (will add how to do it once a build with the fix is released).

Since "flow_conservation" only apply to non_sources/non_sinks, I recommend adding a kind field to Node to determine if it a source or sink (or just a normal node). From your NodeFlow and EdgeFlow classes, flow is declared to be a Node, but from your problem description, it should be an integer. I am confused what NodeFlow represent (is it outgoing - incoming for a given and hence should always be 0 except for source and sink nodes)?

"cost_minimization" can be split into node_cost_minimization and edge_cost_minimization, which sums the product item.flow * item.cost and penalize by it. Putting all the constraints together, it should look something like this:

from optapy import constraint_provider
from optapy.constraint import Joiners, ConstraintCollectors
from optapy.score import HardSoftScore
from domain import Node, Edge, NodeFlow, EdgeFlow

@constraint_provider
def flow_constraints(constraint_factory):
    return [
        flow_conservation(constraint_factory),
        minimize_node_cost(constraint_factory),
        minimize_edge_cost(constraint_factory),
        outgoing_bounds(constraint_factory),
        incoming_bounds(constraint_factory),
    ]


def outgoing_bounds(constraint_factory):
    return (
        constraint_factory.for_each(Node)
             # Join with outgoing edge flows
             .join(EdgeFlow, Joiners.equal(lambda node: node.id, lambda edge_flow: edge_flow.edge.node_from.id))
             # Get total outgoing flow
             .group_by(lambda node, outgoing_edge: node,
                       ConstraintCollectors.sum(lambda node, outgoing_edge: outgoing_edge.flow))
             # Only penalize if outgoing_flow is not in bounds for node
             .filter(lambda node, outgoing_flow: outgoing_flow < node.lb or outgoing_flow > node.ub)
             .penalize('source_bounds', HardSoftScore.ONE_HARD, lambda node, outgoing_flow: 1000 * max(node.lb - outgoing_flow, outgoing_flow - node.ub))
    )


def incoming_bounds(constraint_factory):
    return (
        constraint_factory.for_each(Node)
             # Join with incoming edge flows
             .join(EdgeFlow, Joiners.equal(lambda node: node.id,
                                           lambda edge_flow: edge_flow.edge.node_to.id))
             # Get total incoming flow
             .group_by(lambda node, incoming_edge: node,
                       ConstraintCollectors.sum(lambda node, incoming_edge: incoming_edge.flow))
             # Only penalize if outgoing_flow is not in bounds for node
             .filter(lambda node, incoming_flow: incoming_flow < node.lb or incoming_flow > node.ub)
             .penalize('sink_bounds', HardSoftScore.ONE_HARD, lambda node, incoming_flow: 1000 * max(node.lb - incoming_flow, incoming_flow - node.ub))
    )


def flow_conservation(constraint_factory):
    return (
        constraint_factory.for_each(Node)
             # Do not include source and sink nodes
             .filter(lambda node: node.kind != 'source' and node.kind != 'sink')
             # Join with incoming edge flows
             .join(EdgeFlow, Joiners.equal(lambda node: node.id,
                                           lambda edge_flow: edge_flow.edge.node_from.id))
             # Get total incoming flow
             .group_by(lambda node, incoming_edge: node,
                       ConstraintCollectors.sum(lambda node, incoming_edge: incoming_edge.flow))
             # Join with outgoing edge flows
             .join(EdgeFlow, Joiners.equal(lambda node, incoming_flow: node.id,
                                           lambda edge_flow: edge_flow.edge.node_to.id))
             # Get total outgoing flow
             .group_by(lambda node, incoming_flow, outgoing_edge: node,
                       lambda node, incoming_flow, outgoing_edge: incoming_flow,
                       ConstraintCollectors.sum(lambda node, incoming_flow, outgoing_edge: outgoing_edge.flow))
             # Only penalize if incoming flow != outgoing flow
             .filter(lambda node, incoming_flow, outgoing_flow: incoming_flow != outgoing_flow)
             .penalize('flow_conservation', HardSoftScore.ONE_HARD, lambda node, incoming_flow, outgoing_flow: abs(incoming_flow - outgoing_flow))
    )


def minimize_node_cost(constraint_factory):
    return (
        constraint_factory.for_each(NodeFlow)
            .group_by(ConstraintCollectors.sum(lambda node_flow: node_flow.node.cost * node_flow.flow))
            .penalize('node flow cost', HardSoftScore.ONE_SOFT, lambda cost: cost)
    )


def minimize_edge_cost(constraint_factory):
    return (
        constraint_factory.for_each(EdgeFlow)
            .group_by(ConstraintCollectors.sum(lambda edge_flow: edge_flow.edge.cost * edge_flow.flow))
            .penalize('edge flow cost', HardSoftScore.ONE_SOFT, lambda cost: cost)
    )

The domain objects I used:

from optapy import problem_fact, planning_id


@problem_fact
class Node:
    def __init__(self, id, lb, ub, cost, kind):
        self.id = id
        self.lb = lb
        self.ub = ub
        self.kind = kind
        self.cost = cost

    @planning_id
    def get_id(self):
        return self.id

    def __str__(self):
        return f'Node(id={self.id},lb={self.lb},ub={self.ub},cost={self.cost})'


@problem_fact
class Edge:
    def __init__(self, id, node_from, node_to, lb, ub, cost):
        self.id = id
        self.node_from = node_from
        self.node_to = node_to
        self.lb = lb
        self.ub = ub
        self.cost = cost

    @planning_id
    def get_id(self):
        return self.id

    def __str__(self):
        return f'Node(id={self.id},node_from={self.node_from.id},node_to={self.node_to.id},cost={self.cost})'


from optapy import planning_entity, planning_variable, planning_solution, planning_score, \
planning_entity_collection_property, problem_fact_collection_property, value_range_provider

from optapy.score import HardSoftScore


@planning_entity
class NodeFlow:
    def __init__(self, id, node, flow=None):
        self.id = id
        self.node = node
        self.flow = flow

    @planning_id
    def get_id(self):
        return self.id

    @planning_variable(object, ['nodeFlowRange'])
    def get_flow(self):
        return self.flow

    def set_flow(self, new_flow):
        self.flow = new_flow

    def __str__(self):
        return f'{self.node}: {self.flow}'



@planning_entity
class EdgeFlow:
    def __init__(self, id, edge, flow=None):
        self.id = id
        self.edge = edge
        self.flow = flow

    @planning_id
    def get_id(self):
        return self.id

    @planning_variable(int, ['edgeFlowRange'])
    def get_flow(self):
        return self.flow

    def set_flow(self, new_flow):
        self.flow = new_flow

    def __str__(self):
        return f'{self.edge}: {self.flow}'



@planning_solution
class NetworkFlow:
    def __init__(self, nodes, edges, node_flows, edge_flows, score):
        self.nodes = nodes
        self.edges = edges
        self.node_flows = node_flows
        self.edge_flows = edge_flows
        self.score = score

    @problem_fact_collection_property(Node)
    def get_nodes(self):
        return self.nodes

    @problem_fact_collection_property(Edge)
    def get_edges(self):
        return self.edges

    @planning_entity_collection_property(NodeFlow)
    def get_node_flows(self):
        return self.node_flows

    @planning_entity_collection_property(EdgeFlow)
    def get_edge_flows(self):
        return self.edge_flows

    @planning_score(HardSoftScore)
    def get_score(self):
        return self.score

    @problem_fact_collection_property(int)
    @value_range_provider('edgeFlowRange')
    def get_edge_flow_range(self):
        if len(self.edges) == 0:
            return [0]
        else:
            lb = min(map(lambda edge: edge.lb, self.edges))
            ub = max(map(lambda edge: edge.ub, self.edges))
            return [i for i in range(lb, ub + 1)]

    @problem_fact_collection_property(int)
    @value_range_provider('nodeFlowRange')
    def get_node_flow_range(self):
        if len(self.nodes) == 0:
            return [0]
        else:
            lb = min(map(lambda node: node.lb, self.nodes))
            ub = max(map(lambda node: node.ub, self.nodes))
            return [i for i in range(lb, ub + 1)]

    def set_score(self, score):
        self.score = score

    def __str__(self):
        return (f'Nodes:\n' +
                f'\n'.join(list(map(str, self.node_flows))) +
                f'\nEdges:\n' +
                f'\n'.join(list(map(str, self.edge_flows))) +
                f'\nScore: {self.score}'
        )


def generate_problem():
    nodes = [
        Node('source', 10, 10, 1, 'source'),
        Node('sink', 10, 10, 1, 'sink'),
        Node('a', 0, 2, 1, 'node'),
        Node('b', 0, 8, 2, 'node'),
        Node('c', 0, 3, 1, 'node'),
    ]
    edges = [
        Edge('source-a', nodes[0], nodes[2], 0, 2, 1),
        Edge('source-b', nodes[0], nodes[3], 0, 10, 2),
        Edge('b-c', nodes[3], nodes[4], 0, 3, 1),
        Edge('a-sink', nodes[2], nodes[1], 0, 2, 1),
        Edge('b-sink', nodes[3], nodes[1], 0, 5, 2),
        Edge('c-sink', nodes[4], nodes[1], 0, 3, 1),
    ]
    node_flows = []
    edge_flows = list(map(lambda edge: EdgeFlow(edge.id, edge), edges))
    return NetworkFlow(nodes, edges, node_flows, edge_flows, None)

It can be simpler (for instance, you can put the planning variable "flow" directly into Node and Edge and remove NodeFlow and EdgeFlow). Do note OptaPy is significantly slower than OptaPlanner currently, but we are working on improving its performance (hopefully in the next release).

Upvotes: 2

Related Questions