import logging
import time
import linopy
import networkx as nx
import pandas as pd
from networkx.drawing.nx_agraph import graphviz_layout
from syfop.node_base import NodeInputBase, NodeOutputBase
from syfop.units import default_units, interval_length_h, ureg
from syfop.util import DEFAULT_NUM_TIME_STEPS, timeseries_variable
[docs]
class SolverError(Exception):
"""Raised when the solver fails to find an optimal solution."""
...
[docs]
class Network:
"""A network is a directed acyclic graph of nodes, which represents the flow of commodities
between nodes. Nodes are represented by objects of classes defined in :py:mod:`syfop.node`.
The connections between nodes are defined by the parameter ``inputs`` when creating node
objects.
The network is used to optimize the sizes of the nodes, such that the total costs are
minimized.
Attributes of this class are not supposed to be modified outside of this class - use the
methods instead.
Attributes
----------
nodes : list of subclasses of syfop.node.NodeBase
List of nodes to be included in the network. All nodes used as input nodes need to be
included in this list.
time_coords : pandas.DatetimeIndex
time coordinates for the optimization problem
nodes_dict : dict
dictionary of nodes with node names as keys
model : linopy.model.Model
optimization model for the network
"""
def __init__(
self,
nodes,
time_coords=None,
time_coords_freq="h",
time_coords_num=DEFAULT_NUM_TIME_STEPS,
time_coords_year=2020,
total_cost_unit=ureg.EUR,
solver_dir=None,
units={},
):
"""
Parameters
----------
nodes : list of subclasses of syfop.node.NodeBase
List of nodes to be included in the network. All nodes used as input nodes need to be
included in this list otherwise a ``ValueError`` is raised.
time_coords : pandas.DatetimeIndex
time coordinates used for all time series in the network, typically hourly time steps
for a year. If ``None``, ``time_coords`` are generated using the parameters
``time_coords_freq``, ``time_coords_num`` and ``time_coords_year``.
time_coords_freq : str
used only if ``time_coords`` is ``None``, frequency of the time coordinates
time_coords_num : int
used only if ``time_coords`` is ``None``, number of time stamps generated. Note that
the default value might be wrong in case of a leap year.
time_coords_year : int
used only if ``time_coords`` is ``None``, year used for generating time stamps (first
hour of this year will be used for the first time stamp)
total_cost_unit : pint.Unit
unit of the objective function, needs to be a currency
solver_dir : str
Path where temporary files for the lp file, see :py:class:`linopy.model.Model`. This is
used as workaround on the `VSC <https://vsc.ac.at>`__, because the default temp folder
is on a partition with very limited space and deleting the files after the optimization
does not work (always?).
units : dict
A mapping from commodity to unit, e.g. ``{"electricity": ureg.MW}``. This overwrites
the default units in :py:mod:`syfop.units.default_units`. Required for commodities,
which are not defined in :py:mod:`syfop.units.default_units`.
"""
if len(nodes) == 0:
raise ValueError("empty network not allowed, provide a non empty list of nodes!")
all_input_nodes = {input_node for node in nodes for input_node in node.inputs}
if not (all_input_nodes <= set(nodes)):
raise ValueError(
"nodes used as input node, but missing in list of nodes passed to "
f"Network(): {', '.join(node.name for node in (all_input_nodes - set(nodes)))}"
)
# check if names of nodes are unique
# TODO we might run into troubles if constraints or variables are not unique, e.g. because
# the name contains underscores
node_names = [node.name for node in nodes]
if len(set(node_names)) != len(nodes):
raise ValueError(f"node names are not unique: {', '.join(node_names)}")
# minor code duplication with util.const_time_series(), but should not matter too much
if time_coords is None:
time_coords = pd.date_range(
str(time_coords_year),
freq=time_coords_freq,
periods=time_coords_num,
)
self.time_coords = time_coords
self.nodes = nodes
self.nodes_dict = {node.name: node for node in nodes}
self.total_cost_unit = total_cost_unit
self._set_units(units)
self._check_consistent_time_coords(nodes, time_coords)
self._check_all_nodes_connected(nodes)
self.model = self._generate_optimization_model(nodes, solver_dir)
def _set_units(self, units):
self.units = default_units.copy()
self.units.update(units)
# if units is never modified later on, this should be okay to allow nodes to access units
for node in self.nodes:
node.units = self.units
def _check_all_nodes_connected(self, nodes):
"""Check if graph of node forms a connected network."""
graph = self._create_graph(nodes)
components = list(nx.weakly_connected_components(graph))
if len(components) > 1:
raise ValueError(
"network is not connected, there are multiple components: "
f"{', '.join(str(component) for component in components)}"
)
def _check_consistent_time_coords(self, nodes, time_coords):
# all time series need to be defined on the same coordinates otherwise vector comparison
# will lead to empty constraints
for field in ("input_flows", "output_flows", "input_profile", "output_profile"):
for node in nodes:
if not hasattr(node, field) or getattr(node, field) is None:
# don't need to check flows created later than this check, because these are
# filled with flows using self.time_coords
continue
flows = getattr(node, field)
if isinstance(flows, dict):
flows = flows.values()
field_title = "an item in"
else:
flows = [flows]
field_title = "an"
for flow in flows:
if len(flow.time) != len(time_coords):
raise ValueError(
f"inconsistent time_coords: node {node.name} has {field_title} {field} "
f"with length {len(flow.time)}, but the network has time_coords "
f"with length {len(time_coords)}"
)
if (flow.time != time_coords).any():
raise ValueError(
f"inconsistent time_coords: node {node.name} has {field_title} {field} "
"with time_coords different from the Network's time_coords"
)
def _add_leaf_flow_variables(self, model, nodes):
# Add a variable for nodes, which do not have input flows or output flows. This variable
# should be simply the sum of output flows (for the input flow variable) or sum of input
# flows (for the output flow variable).
# In most cases this is not really needed but nice to have to see the values in the
# solution and output_flow comes in handy in to be in the size constraints.
# When input_flow_costs is set on nodes which do not have any other inputs set (fixed,
# profile or other nodes), this is used to determine the total input_flow_costs. Also it
# makes the constraints easier because we never have empty sums.
for node in nodes:
if len(node.input_flows) == 0:
node.input_flows[""] = timeseries_variable(
model, self.time_coords, f"input_flow_{node.name}"
)
if len(node.output_flows) == 0:
node.output_flows[""] = timeseries_variable(
model, self.time_coords, f"output_flow_{node.name}"
)
def _set_missing_input_commodities(self, nodes):
# set input commodities for input nodes, i.e. NodeScaleableInput and NodeFixedInput:
# Input nodes have only one input commodity, which is the same as the output commodity,
# therefore it does not need to be set explicitly by the user, but can be infered from the
# node(s) it is connected to.
# We need to disallow multiple output_commodities for input nodes anyhow, because there is
# no convert factor.
for node in nodes:
if node.input_commodities is None:
assert (
len(set(node.output_commodities)) == 1
), f"unexpected number of output_commodities for node '{node.name}'"
# all other node types should have input_commmodities set at this point
assert isinstance(
node, NodeInputBase
), f" unexpected type for node '{node.name}': {type(node)}"
node.input_commodities = [node.output_commodities[0]]
def _set_output_connections(self, nodes):
for node in nodes:
node.outputs = []
for node in nodes:
for input_ in node.inputs:
input_.outputs.append(node)
# output connections are not known when Node objects are created, so we add
# it to the Node objects here, except for nodes which are NodeOutputeBase
for node in nodes:
if node.output_flows is None:
node.output_flows = {
output.name: output.input_flows[node.name] for output in node.outputs
}
# TODO this has quadratic performance and is very ugly, but having dicts
# everywhere also not nice, maybe switch to some adjacency matrix thingy, where
# one can select all input edges or output edges by choosing a column or a row?
node.output_commodities = [
output.input_commodities[output.inputs.index(node)]
for output in node.outputs
if output.input_commodities is not None # FIXME this line is stupid
]
if len(node.output_commodities) == 0:
# here we have no ouputs of a Node, so there is only the leaf output flow
node.output_commodities = [node.size_commodity]
if (
len(set(node.output_commodities)) > 1
and hasattr(node, "storage")
and node.storage is not None
):
# we have one storage per node, so it is not clear what should be stored
raise ValueError("storage not supported for multiple output commodities")
def _generate_optimization_model(self, nodes, solver_dir):
"""Create the linopy optimization model for the network."""
model = linopy.Model(solver_dir=solver_dir)
for node in nodes:
node.create_variables(model, self.time_coords)
self._set_output_connections(nodes)
self._set_missing_input_commodities(nodes)
self._add_leaf_flow_variables(model, nodes)
for node in nodes:
node.create_constraints(model, self.time_coords)
model.add_objective(self.total_costs())
return model
[docs]
def add_variables(self, *args, **kwargs):
"""Add custom variables to the linopy optimization model. See
:py:meth:`linopy.model.Model.add_variables()` for a documentation of the parameters.
This method must be called before :py:class:`Network.optimize()` is called.
"""
return self.model.add_variables(*args, **kwargs)
[docs]
def add_constraints(self, *args, **kwargs):
"""Add custom constraints to the linopy optimization model. See
:py:meth:`linopy.model.Model.add_constraints()` for a documentation of the parameters.
To create the constraint, variables can be accessed via ``Network.model.variables`` or by
node attributes, e.g. in ``Network.nodes_dict['wind'].size`` for a node with name *wind*.
This method must be called before :py:meth:`Network.optimize()` is called.
"""
return self.model.add_constraints(*args, **kwargs)
def _check_storage_level_zero(self):
"""This is a basic plausibility check. A storage which is never full or never empty
could be replaced by a smaller storage, which would be advantageous if costs are
positive. So something is fishy if this check fails.
"""
for node in self.nodes:
if hasattr(node, "storage") and node.storage is not None and node.storage.costs > 0:
storage_level = self.model.solution[f"storage_level_{node.name}"]
storage_size = self.model.solution[f"size_storage_{node.name}"]
# XXX not sure what to use as epsilon here
assert (
storage_level.min() < 1e-12
), f"storage for {node.name} never empty (min value: {storage_level.min()}"
assert (
storage_level.max() > storage_size - 1e-12
), f"storage for {node.name} never full (max value: {storage_level.max()}"
[docs]
def optimize(self, solver_name="highs", **kwargs):
"""Optimize all node sizes: minimize total costs (sum of all (scaled) node costs) with
subject to all constraints induced by the network.
Parameters
----------
solver_name : str, optional
all solvers supported by `linopy
<https://linopy.readthedocs.io/en/latest/prerequisites.html#install-a-solver>`__
**kwargs
additional parameters passed to the solver via :py:meth:`linopy.model.Model.solve()`.
"""
t0 = time.time()
io_api = "direct" if solver_name in ("gurobi", "highs") else "lp"
self.model.solve(solver_name=solver_name, io_api=io_api, **kwargs)
if linopy.constants.SolverStatus[self.model.status] != linopy.constants.SolverStatus.ok:
raise SolverError(
f"unable to solve optimization, solver status={self.model.status}, "
f"termination condition={self.model.termination_condition}"
)
self._check_storage_level_zero()
logging.info("Solving time: %s", time.time() - t0)
[docs]
def total_costs(self):
"""Return the total costs of the network as a linopy expression: this is the sum of all
node costs (size of the node times the cost per unit) and input flow costs (sum of all
input flows of the complete time span times the cost per unit).
This is used as objective in the optimization problem.
The result of the solved optimization can be found in the attribute
``Network.model.objective.value``.
"""
technology_costs = sum(
node.size * node.costs_magnitude(self.total_cost_unit)
for node in self.nodes
if node.has_costs()
)
storage_costs = sum(
node.storage.size * node.storage_cost_magnitude(self.total_cost_unit)
for node in self.nodes
if hasattr(node, "storage") and node.storage is not None
)
costs = technology_costs
# add costs for input_flows (e.g. fuel costs) if defined
for node in self.nodes:
if not hasattr(node, "input_flow_costs") or not node.input_flow_costs:
continue
input_flows = list(node.input_flows.values())
assert len(input_flows) == 1, "only one input_flow is supported"
input_flow = input_flows[0]
# this is just a check: atm we support only Node, so input_flows should be plain linopy
# variables without units, but in case of a NodeScalableInput or a NodeFixInput we
# would need to strip units here and use the magnitude.
assert isinstance(input_flows[0], linopy.Variable), "unexpected input_flow type"
interval_length_h_ = interval_length_h(self.time_coords)
input_flow_unit = self.units[node.input_commodities[0]] # something like MW
input_flow_costs_mag = node.input_flow_costs.to(
self.total_cost_unit / (input_flow_unit * ureg.h)
).magnitude
# something like: EUR/MWh * x h * MW
costs = costs + input_flow_costs_mag * interval_length_h_ * input_flow.sum()
costs = costs + storage_costs
return costs
@staticmethod
def _create_graph(nodes):
"""Create a nx.DiGraph object to plot the network."""
graph = nx.DiGraph()
for node in nodes:
if isinstance(node, NodeInputBase):
color = "#c72321"
elif isinstance(node, NodeOutputBase):
color = "#f0c220"
else:
color = "#000000"
graph.add_node(node.name, color=color)
if hasattr(node, "storage") and node.storage is not None:
# XXX hopefully this name is unique
graph.add_node(f"{node.name}_storage", color="#0d8085")
graph.add_edge(f"{node.name}_storage", node.name)
graph.add_edge(node.name, f"{node.name}_storage")
for input_ in node.inputs:
graph.add_edge(input_.name, node.name)
return graph
[docs]
def draw(self, mode="netgraph"):
"""Draw a graphic representation of the network of all nodes and edges.
Parameters
----------
mode : str
choice of plotting library used to draw the graph, one of: netgraph, graphviz
"""
graph = self._create_graph(self.nodes)
# this requires python >=3.7, otherwise order of values() is not guaranteed
node_color = {node: node_attrs["color"] for node, node_attrs in graph.nodes(data=True)}
if mode == "graphviz":
nx.draw(
graph,
pos=graphviz_layout(graph, prog="dot"),
node_color=node_color.values(),
# node_size=5000,
with_labels=True,
)
elif mode == "netgraph":
# netgraph is used only here, let's keep it an optional dependency
import netgraph
# TODO we could do some fancy stuff here to make plotting nicer:
# shape = input_proportions
# color = output node
# scalable or fixed size?
# display results?
# display commodities / units?
# TODO allow custom args to be passed to netgraph?
netgraph.Graph(
graph,
node_labels=True,
node_layout="dot",
node_label_offset=0.09,
node_color=node_color,
arrows=True,
edge_width=1.2,
node_edge_width=0.0,
)
else:
raise ValueError(f"invalid draw mode: {mode} (valid modes: netgraph, graphviz)")