Reputation: 3
I am new to OptaPlanner and would like to test its capabilities on an integer network flow problem.
The problem I am trying to model is relatively simple:
I started by modeling the problem facts as follows using OptaPy:
from optapy import problem_fact, planning_id
@problem_fact
class Node:
def __init__(self, id, lb, ub, cost):
self.id = id
self.lb = lb
self.ub = ub
self.cost = cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return f'Node(id={self.id},lb={self.lb},ub={self.ub},cost={self.cost})'
@problem_fact
class Edge:
def __init__(self, id, node_from, node_to, cost):
self.id = id
self.node_from = node_from
self.node_to = node_to
self.cost = cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return f'Node(id={self.id},node_from={self.node_from.id},node_to={self.node_to.id},cost={self.cost})'
I am guessing that the planning entities and variables are done analogously?
from optapy import planning_entity, planning_variable
@planning_entity
class NodeFlow:
def __init__(self, id, node, flow=None):
self.id = id
self.node = node
self.flow = flow
@planning_id
def get_id(self):
return self.id
@planning_variable(Node, ['nodeFlowRange'])
def get_flow(self):
return self.flow
def set_flow(self, new_flow):
self.flow = new_flow
@planning_entity
class EdgeFlow:
def __init__(self, id, edge, flow=None):
self.id = id
self.edge = edge
self.flow = flow
@planning_id
def get_id(self):
return self.id
@planning_variable(Node, ['edgeFlowRange'])
def get_flow(self):
return self.flow
def set_flow(self, new_flow):
self.flow = new_flow
However, now I am stuck modeling the constraints. It's not clear to me how to model, e.g., flow conservation. My code skeleton is as follows:
from optapy import constraint_provider
from optapy.types import Joiners, HardSoftScore
@constraint_provider
def define_constraints(constraint_factory):
return [
# Hard constraints
flow_conservation(constraint_factory)
]
def flow_conservation(constraint_factory):
return constraint_factory \
.for_each(Node) \
#.group_by(?) # To sum in-flows and out-flows per node?
.join(Node,
# ?
) \
.penalize('Flow conservation conflict', HardSoftScore.ONE_HARD)
Is this the correct way to model such a problem? How do I model e.g. flow conservation?
Thank you for your help!
Upvotes: 0
Views: 140
Reputation: 2013
"flow_bounds" can be modelled as a "built-in" constraint by using "@value_range_provider" on a "@planning_entity" (as described in https://www.optapy.org/docs/latest/planner-configuration/planner-configuration.html#valueRangeProviderOnPlanningEntity), but it appears there is a bug I need to fix with @value_range_provider
on @planning_entity
) (will add how to do it once a build with the fix is released).
Since "flow_conservation" only apply to non_sources/non_sinks, I recommend adding a kind field to Node
to determine if it a source or sink (or just a normal node). From your NodeFlow
and EdgeFlow
classes, flow
is declared to be a Node
,
but from your problem description, it should be an integer. I am confused
what NodeFlow
represent (is it outgoing - incoming
for a given and hence should always be 0 except for source
and sink
nodes)?
"cost_minimization" can be split into node_cost_minimization
and edge_cost_minimization
, which sums the product item.flow * item.cost
and penalize by it. Putting all the constraints together, it should look something like this:
from optapy import constraint_provider
from optapy.constraint import Joiners, ConstraintCollectors
from optapy.score import HardSoftScore
from domain import Node, Edge, NodeFlow, EdgeFlow
@constraint_provider
def flow_constraints(constraint_factory):
return [
flow_conservation(constraint_factory),
minimize_node_cost(constraint_factory),
minimize_edge_cost(constraint_factory),
outgoing_bounds(constraint_factory),
incoming_bounds(constraint_factory),
]
def outgoing_bounds(constraint_factory):
return (
constraint_factory.for_each(Node)
# Join with outgoing edge flows
.join(EdgeFlow, Joiners.equal(lambda node: node.id, lambda edge_flow: edge_flow.edge.node_from.id))
# Get total outgoing flow
.group_by(lambda node, outgoing_edge: node,
ConstraintCollectors.sum(lambda node, outgoing_edge: outgoing_edge.flow))
# Only penalize if outgoing_flow is not in bounds for node
.filter(lambda node, outgoing_flow: outgoing_flow < node.lb or outgoing_flow > node.ub)
.penalize('source_bounds', HardSoftScore.ONE_HARD, lambda node, outgoing_flow: 1000 * max(node.lb - outgoing_flow, outgoing_flow - node.ub))
)
def incoming_bounds(constraint_factory):
return (
constraint_factory.for_each(Node)
# Join with incoming edge flows
.join(EdgeFlow, Joiners.equal(lambda node: node.id,
lambda edge_flow: edge_flow.edge.node_to.id))
# Get total incoming flow
.group_by(lambda node, incoming_edge: node,
ConstraintCollectors.sum(lambda node, incoming_edge: incoming_edge.flow))
# Only penalize if outgoing_flow is not in bounds for node
.filter(lambda node, incoming_flow: incoming_flow < node.lb or incoming_flow > node.ub)
.penalize('sink_bounds', HardSoftScore.ONE_HARD, lambda node, incoming_flow: 1000 * max(node.lb - incoming_flow, incoming_flow - node.ub))
)
def flow_conservation(constraint_factory):
return (
constraint_factory.for_each(Node)
# Do not include source and sink nodes
.filter(lambda node: node.kind != 'source' and node.kind != 'sink')
# Join with incoming edge flows
.join(EdgeFlow, Joiners.equal(lambda node: node.id,
lambda edge_flow: edge_flow.edge.node_from.id))
# Get total incoming flow
.group_by(lambda node, incoming_edge: node,
ConstraintCollectors.sum(lambda node, incoming_edge: incoming_edge.flow))
# Join with outgoing edge flows
.join(EdgeFlow, Joiners.equal(lambda node, incoming_flow: node.id,
lambda edge_flow: edge_flow.edge.node_to.id))
# Get total outgoing flow
.group_by(lambda node, incoming_flow, outgoing_edge: node,
lambda node, incoming_flow, outgoing_edge: incoming_flow,
ConstraintCollectors.sum(lambda node, incoming_flow, outgoing_edge: outgoing_edge.flow))
# Only penalize if incoming flow != outgoing flow
.filter(lambda node, incoming_flow, outgoing_flow: incoming_flow != outgoing_flow)
.penalize('flow_conservation', HardSoftScore.ONE_HARD, lambda node, incoming_flow, outgoing_flow: abs(incoming_flow - outgoing_flow))
)
def minimize_node_cost(constraint_factory):
return (
constraint_factory.for_each(NodeFlow)
.group_by(ConstraintCollectors.sum(lambda node_flow: node_flow.node.cost * node_flow.flow))
.penalize('node flow cost', HardSoftScore.ONE_SOFT, lambda cost: cost)
)
def minimize_edge_cost(constraint_factory):
return (
constraint_factory.for_each(EdgeFlow)
.group_by(ConstraintCollectors.sum(lambda edge_flow: edge_flow.edge.cost * edge_flow.flow))
.penalize('edge flow cost', HardSoftScore.ONE_SOFT, lambda cost: cost)
)
The domain objects I used:
from optapy import problem_fact, planning_id
@problem_fact
class Node:
def __init__(self, id, lb, ub, cost, kind):
self.id = id
self.lb = lb
self.ub = ub
self.kind = kind
self.cost = cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return f'Node(id={self.id},lb={self.lb},ub={self.ub},cost={self.cost})'
@problem_fact
class Edge:
def __init__(self, id, node_from, node_to, lb, ub, cost):
self.id = id
self.node_from = node_from
self.node_to = node_to
self.lb = lb
self.ub = ub
self.cost = cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return f'Node(id={self.id},node_from={self.node_from.id},node_to={self.node_to.id},cost={self.cost})'
from optapy import planning_entity, planning_variable, planning_solution, planning_score, \
planning_entity_collection_property, problem_fact_collection_property, value_range_provider
from optapy.score import HardSoftScore
@planning_entity
class NodeFlow:
def __init__(self, id, node, flow=None):
self.id = id
self.node = node
self.flow = flow
@planning_id
def get_id(self):
return self.id
@planning_variable(object, ['nodeFlowRange'])
def get_flow(self):
return self.flow
def set_flow(self, new_flow):
self.flow = new_flow
def __str__(self):
return f'{self.node}: {self.flow}'
@planning_entity
class EdgeFlow:
def __init__(self, id, edge, flow=None):
self.id = id
self.edge = edge
self.flow = flow
@planning_id
def get_id(self):
return self.id
@planning_variable(int, ['edgeFlowRange'])
def get_flow(self):
return self.flow
def set_flow(self, new_flow):
self.flow = new_flow
def __str__(self):
return f'{self.edge}: {self.flow}'
@planning_solution
class NetworkFlow:
def __init__(self, nodes, edges, node_flows, edge_flows, score):
self.nodes = nodes
self.edges = edges
self.node_flows = node_flows
self.edge_flows = edge_flows
self.score = score
@problem_fact_collection_property(Node)
def get_nodes(self):
return self.nodes
@problem_fact_collection_property(Edge)
def get_edges(self):
return self.edges
@planning_entity_collection_property(NodeFlow)
def get_node_flows(self):
return self.node_flows
@planning_entity_collection_property(EdgeFlow)
def get_edge_flows(self):
return self.edge_flows
@planning_score(HardSoftScore)
def get_score(self):
return self.score
@problem_fact_collection_property(int)
@value_range_provider('edgeFlowRange')
def get_edge_flow_range(self):
if len(self.edges) == 0:
return [0]
else:
lb = min(map(lambda edge: edge.lb, self.edges))
ub = max(map(lambda edge: edge.ub, self.edges))
return [i for i in range(lb, ub + 1)]
@problem_fact_collection_property(int)
@value_range_provider('nodeFlowRange')
def get_node_flow_range(self):
if len(self.nodes) == 0:
return [0]
else:
lb = min(map(lambda node: node.lb, self.nodes))
ub = max(map(lambda node: node.ub, self.nodes))
return [i for i in range(lb, ub + 1)]
def set_score(self, score):
self.score = score
def __str__(self):
return (f'Nodes:\n' +
f'\n'.join(list(map(str, self.node_flows))) +
f'\nEdges:\n' +
f'\n'.join(list(map(str, self.edge_flows))) +
f'\nScore: {self.score}'
)
def generate_problem():
nodes = [
Node('source', 10, 10, 1, 'source'),
Node('sink', 10, 10, 1, 'sink'),
Node('a', 0, 2, 1, 'node'),
Node('b', 0, 8, 2, 'node'),
Node('c', 0, 3, 1, 'node'),
]
edges = [
Edge('source-a', nodes[0], nodes[2], 0, 2, 1),
Edge('source-b', nodes[0], nodes[3], 0, 10, 2),
Edge('b-c', nodes[3], nodes[4], 0, 3, 1),
Edge('a-sink', nodes[2], nodes[1], 0, 2, 1),
Edge('b-sink', nodes[3], nodes[1], 0, 5, 2),
Edge('c-sink', nodes[4], nodes[1], 0, 3, 1),
]
node_flows = []
edge_flows = list(map(lambda edge: EdgeFlow(edge.id, edge), edges))
return NetworkFlow(nodes, edges, node_flows, edge_flows, None)
It can be simpler (for instance, you can put the planning variable "flow" directly into Node
and Edge
and remove NodeFlow
and EdgeFlow
). Do note OptaPy is significantly slower than OptaPlanner currently, but we are working on improving its performance (hopefully in the next release).
Upvotes: 2