# -*- coding: utf-8 -*- # SPDX-FileCopyrightText: : 2017-2023 The PyPSA-Eur Authors # # SPDX-License-Identifier: MIT """ Solves optimal operation and capacity for a network with the option to iteratively optimize while updating line reactances. This script is used for optimizing the electrical network as well as the sector coupled network. Description ----------- Total annual system costs are minimised with PyPSA. The full formulation of the linear optimal power flow (plus investment planning is provided in the `documentation of PyPSA `_. The optimization is based on the :func:`network.optimize` function. Additionally, some extra constraints specified in :mod:`solve_network` are added. .. note:: The rules ``solve_elec_networks`` and ``solve_sector_networks`` run the workflow for all scenarios in the configuration file (``scenario:``) based on the rule :mod:`solve_network`. """ import logging import re import numpy as np import pandas as pd import pypsa import xarray as xr from _benchmark import memory_logger from _helpers import configure_logging, update_config_with_sector_opts from pypsa.descriptors import get_activity_mask logger = logging.getLogger(__name__) pypsa.pf.logger.setLevel(logging.WARNING) from pypsa.descriptors import get_switchable_as_dense as get_as_dense def add_land_use_constraint(n, planning_horizons, config): if "m" in snakemake.wildcards.clusters: _add_land_use_constraint_m(n, planning_horizons, config) else: _add_land_use_constraint(n) def add_land_use_constraint_perfect(n): """ Add global constraints for tech capacity limit. """ logger.info("Add land-use constraint for perfect foresight") def compress_series(s): def process_group(group): if group.nunique() == 1: return pd.Series(group.iloc[0], index=[None]) else: return group return s.groupby(level=[0, 1]).apply(process_group) def new_index_name(t): # Convert all elements to string and filter out None values parts = [str(x) for x in t if x is not None] # Join with space, but use a dash for the last item if not None return " ".join(parts[:2]) + (f"-{parts[-1]}" if len(parts) > 2 else "") def check_p_min_p_max(p_nom_max): p_nom_min = n.generators[ext_i].groupby(grouper).sum().p_nom_min p_nom_min = p_nom_min.reindex(p_nom_max.index) check = ( p_nom_min.groupby(level=[0, 1]).sum() > p_nom_max.groupby(level=[0, 1]).min() ) if check.sum(): logger.warning( f"summed p_min_pu values at node larger than technical potential {check[check].index}" ) grouper = [n.generators.carrier, n.generators.bus, n.generators.build_year] ext_i = n.generators.p_nom_extendable # get technical limit per node and investment period p_nom_max = n.generators[ext_i].groupby(grouper).min().p_nom_max # drop carriers without tech limit p_nom_max = p_nom_max[~p_nom_max.isin([np.inf, np.nan])] # carrier carriers = p_nom_max.index.get_level_values(0).unique() gen_i = n.generators[(n.generators.carrier.isin(carriers)) & (ext_i)].index n.generators.loc[gen_i, "p_nom_min"] = 0 # check minimum capacities check_p_min_p_max(p_nom_max) # drop multi entries in case p_nom_max stays constant in different periods # p_nom_max = compress_series(p_nom_max) # adjust name to fit syntax of nominal constraint per bus df = p_nom_max.reset_index() df["name"] = df.apply( lambda row: f"nom_max_{row['carrier']}" + (f"_{row['build_year']}" if row["build_year"] is not None else ""), axis=1, ) for name in df.name.unique(): df_carrier = df[df.name == name] bus = df_carrier.bus n.buses.loc[bus, name] = df_carrier.p_nom_max.values return n def _add_land_use_constraint(n): # warning: this will miss existing offwind which is not classed AC-DC and has carrier 'offwind' for carrier in ["solar", "onwind", "offwind-ac", "offwind-dc"]: extendable_i = (n.generators.carrier == carrier) & n.generators.p_nom_extendable n.generators.loc[extendable_i, "p_nom_min"] = 0 ext_i = (n.generators.carrier == carrier) & ~n.generators.p_nom_extendable existing = ( n.generators.loc[ext_i, "p_nom"] .groupby(n.generators.bus.map(n.buses.location)) .sum() ) existing.index += " " + carrier + "-" + snakemake.wildcards.planning_horizons n.generators.loc[existing.index, "p_nom_max"] -= existing # check if existing capacities are larger than technical potential existing_large = n.generators[ n.generators["p_nom_min"] > n.generators["p_nom_max"] ].index if len(existing_large): logger.warning( f"Existing capacities larger than technical potential for {existing_large},\ adjust technical potential to existing capacities" ) n.generators.loc[existing_large, "p_nom_max"] = n.generators.loc[ existing_large, "p_nom_min" ] n.generators.p_nom_max.clip(lower=0, inplace=True) def _add_land_use_constraint_m(n, planning_horizons, config): # if generators clustering is lower than network clustering, land_use accounting is at generators clusters grouping_years = config["existing_capacities"]["grouping_years"] current_horizon = snakemake.wildcards.planning_horizons for carrier in ["solar", "onwind", "offwind-ac", "offwind-dc"]: existing = n.generators.loc[n.generators.carrier == carrier, "p_nom"] ind = list( set( [ i.split(sep=" ")[0] + " " + i.split(sep=" ")[1] for i in existing.index ] ) ) previous_years = [ str(y) for y in planning_horizons + grouping_years if y < int(snakemake.wildcards.planning_horizons) ] for p_year in previous_years: ind2 = [ i for i in ind if i + " " + carrier + "-" + p_year in existing.index ] sel_current = [i + " " + carrier + "-" + current_horizon for i in ind2] sel_p_year = [i + " " + carrier + "-" + p_year for i in ind2] n.generators.loc[sel_current, "p_nom_max"] -= existing.loc[ sel_p_year ].rename(lambda x: x[:-4] + current_horizon) n.generators.p_nom_max.clip(lower=0, inplace=True) def add_co2_sequestration_limit(n, config, limit=200): """ Add a global constraint on the amount of Mt CO2 that can be sequestered. """ n.carriers.loc["co2 stored", "co2_absorptions"] = -1 n.carriers.co2_absorptions = n.carriers.co2_absorptions.fillna(0) limit = limit * 1e6 for o in opts: if "seq" not in o: continue limit = float(o[o.find("seq") + 3 :]) * 1e6 break if not n.investment_periods.empty: periods = n.investment_periods names = pd.Index([f"co2_sequestration_limit-{period}" for period in periods]) else: periods = [np.nan] names = pd.Index(["co2_sequestration_limit"]) n.madd( "GlobalConstraint", names, sense="<=", constant=limit, type="primary_energy", carrier_attribute="co2_absorptions", investment_period=periods, ) def add_carbon_constraint(n, snapshots): glcs = n.global_constraints.query('type == "co2_limit"') if glcs.empty: return for name, glc in glcs.iterrows(): rhs = glc.constant carattr = glc.carrier_attribute emissions = n.carriers.query(f"{carattr} != 0")[carattr] if emissions.empty: continue # stores n.stores["carrier"] = n.stores.bus.map(n.buses.carrier) stores = n.stores.query("carrier in @emissions.index and not e_cyclic") time_valid = int(glc.loc["investment_period"]) if not stores.empty: last = n.snapshot_weightings.reset_index().groupby("period").last() last_i = last.set_index([last.index, last.timestep]).index final_e = n.model["Store-e"].loc[last_i, stores.index] time_i = pd.IndexSlice[time_valid, :] lhs = final_e.loc[time_i, :] - final_e.shift(snapshot=1).loc[time_i, :] n.model.add_constraints(lhs <= rhs, name=f"GlobalConstraint-{name}") def add_carbon_budget_constraint(n, snapshots): glcs = n.global_constraints.query('type == "Co2Budget"') if glcs.empty: return for name, glc in glcs.iterrows(): rhs = glc.constant carattr = glc.carrier_attribute emissions = n.carriers.query(f"{carattr} != 0")[carattr] if emissions.empty: continue # stores n.stores["carrier"] = n.stores.bus.map(n.buses.carrier) stores = n.stores.query("carrier in @emissions.index and not e_cyclic") time_valid = int(glc.loc["investment_period"]) weighting = n.investment_period_weightings.loc[time_valid, "years"] if not stores.empty: last = n.snapshot_weightings.reset_index().groupby("period").last() last_i = last.set_index([last.index, last.timestep]).index final_e = n.model["Store-e"].loc[last_i, stores.index] time_i = pd.IndexSlice[time_valid, :] lhs = final_e.loc[time_i, :] * weighting n.model.add_constraints(lhs <= rhs, name=f"GlobalConstraint-{name}") def add_max_growth(n, config): """ Add maximum growth rates for different carriers. """ opts = snakemake.params["sector"]["limit_max_growth"] # take maximum yearly difference between investment periods since historic growth is per year factor = n.investment_period_weightings.years.max() * opts["factor"] for carrier in opts["max_growth"].keys(): max_per_period = opts["max_growth"][carrier] * factor logger.info( f"set maximum growth rate per investment period of {carrier} to {max_per_period} GW." ) n.carriers.loc[carrier, "max_growth"] = max_per_period * 1e3 for carrier in opts["max_relative_growth"].keys(): max_r_per_period = opts["max_relative_growth"][carrier] logger.info( f"set maximum relative growth per investment period of {carrier} to {max_r_per_period}." ) n.carriers.loc[carrier, "max_relative_growth"] = max_r_per_period return n def add_retrofit_gas_boiler_constraint(n, snapshots): """ Allow retrofitting of existing gas boilers to H2 boilers. """ c = "Link" logger.info("Add constraint for retrofitting gas boilers to H2 boilers.") # existing gas boilers mask = n.links.carrier.str.contains("gas boiler") & ~n.links.p_nom_extendable gas_i = n.links[mask].index mask = n.links.carrier.str.contains("retrofitted H2 boiler") h2_i = n.links[mask].index n.links.loc[gas_i, "p_nom_extendable"] = True p_nom = n.links.loc[gas_i, "p_nom"] n.links.loc[gas_i, "p_nom"] = 0 # heat profile cols = n.loads_t.p_set.columns[ n.loads_t.p_set.columns.str.contains("heat") & ~n.loads_t.p_set.columns.str.contains("industry") & ~n.loads_t.p_set.columns.str.contains("agriculture") ] profile = n.loads_t.p_set[cols].div( n.loads_t.p_set[cols].groupby(level=0).max(), level=0 ) # to deal if max value is zero profile.fillna(0, inplace=True) profile.rename(columns=n.loads.bus.to_dict(), inplace=True) profile = profile.reindex(columns=n.links.loc[gas_i, "bus1"]) profile.columns = gas_i rhs = profile.mul(p_nom) dispatch = n.model["Link-p"] active = get_activity_mask(n, c, snapshots, gas_i) rhs = rhs[active] p_gas = dispatch.sel(Link=gas_i) p_h2 = dispatch.sel(Link=h2_i) lhs = p_gas + p_h2 n.model.add_constraints(lhs == rhs, name="gas_retrofit") def prepare_network( n, solve_opts=None, config=None, foresight=None, planning_horizons=None, co2_sequestration_potential=None, ): if "clip_p_max_pu" in solve_opts: for df in ( n.generators_t.p_max_pu, n.generators_t.p_min_pu, n.storage_units_t.inflow, ): df.where(df > solve_opts["clip_p_max_pu"], other=0.0, inplace=True) load_shedding = solve_opts.get("load_shedding") if load_shedding: # intersect between macroeconomic and surveybased willingness to pay # http://journal.frontiersin.org/article/10.3389/fenrg.2015.00055/full # TODO: retrieve color and nice name from config n.add("Carrier", "load", color="#dd2e23", nice_name="Load shedding") buses_i = n.buses.query("carrier == 'AC'").index if not np.isscalar(load_shedding): # TODO: do not scale via sign attribute (use Eur/MWh instead of Eur/kWh) load_shedding = 1e2 # Eur/kWh n.madd( "Generator", buses_i, " load", bus=buses_i, carrier="load", sign=1e-3, # Adjust sign to measure p and p_nom in kW instead of MW marginal_cost=load_shedding, # Eur/kWh p_nom=1e9, # kW ) if solve_opts.get("noisy_costs"): for t in n.iterate_components(): # if 'capital_cost' in t.df: # t.df['capital_cost'] += 1e1 + 2.*(np.random.random(len(t.df)) - 0.5) if "marginal_cost" in t.df: t.df["marginal_cost"] += 1e-2 + 2e-3 * ( np.random.random(len(t.df)) - 0.5 ) for t in n.iterate_components(["Line", "Link"]): t.df["capital_cost"] += ( 1e-1 + 2e-2 * (np.random.random(len(t.df)) - 0.5) ) * t.df["length"] if solve_opts.get("nhours"): nhours = solve_opts["nhours"] n.set_snapshots(n.snapshots[:nhours]) n.snapshot_weightings[:] = 8760.0 / nhours if foresight == "myopic": add_land_use_constraint(n, planning_horizons, config) if foresight == "perfect": n = add_land_use_constraint_perfect(n) if snakemake.params["sector"]["limit_max_growth"]["enable"]: n = add_max_growth(n, config) if n.stores.carrier.eq("co2 stored").any(): limit = co2_sequestration_potential add_co2_sequestration_limit(n, config, limit=limit) return n def add_CCL_constraints(n, config): """ Add CCL (country & carrier limit) constraint to the network. Add minimum and maximum levels of generator nominal capacity per carrier for individual countries. Opts and path for agg_p_nom_minmax.csv must be defined in config.yaml. Default file is available at data/agg_p_nom_minmax.csv. Parameters ---------- n : pypsa.Network config : dict Example ------- scenario: opts: [Co2L-CCL-24H] electricity: agg_p_nom_limits: data/agg_p_nom_minmax.csv """ agg_p_nom_minmax = pd.read_csv( config["electricity"]["agg_p_nom_limits"], index_col=[0, 1] ) logger.info("Adding generation capacity constraints per carrier and country") p_nom = n.model["Generator-p_nom"] gens = n.generators.query("p_nom_extendable").rename_axis(index="Generator-ext") grouper = pd.concat([gens.bus.map(n.buses.country), gens.carrier]) lhs = p_nom.groupby(grouper).sum().rename(bus="country") minimum = xr.DataArray(agg_p_nom_minmax["min"].dropna()).rename(dim_0="group") index = minimum.indexes["group"].intersection(lhs.indexes["group"]) if not index.empty: n.model.add_constraints( lhs.sel(group=index) >= minimum.loc[index], name="agg_p_nom_min" ) maximum = xr.DataArray(agg_p_nom_minmax["max"].dropna()).rename(dim_0="group") index = maximum.indexes["group"].intersection(lhs.indexes["group"]) if not index.empty: n.model.add_constraints( lhs.sel(group=index) <= maximum.loc[index], name="agg_p_nom_max" ) def add_EQ_constraints(n, o, scaling=1e-1): """ Add equity constraints to the network. Currently this is only implemented for the electricity sector only. Opts must be specified in the config.yaml. Parameters ---------- n : pypsa.Network o : str Example ------- scenario: opts: [Co2L-EQ0.7-24H] Require each country or node to on average produce a minimal share of its total electricity consumption itself. Example: EQ0.7c demands each country to produce on average at least 70% of its consumption; EQ0.7 demands each node to produce on average at least 70% of its consumption. """ # TODO: Generalize to cover myopic and other sectors? float_regex = "[0-9]*\.?[0-9]+" level = float(re.findall(float_regex, o)[0]) if o[-1] == "c": ggrouper = n.generators.bus.map(n.buses.country) lgrouper = n.loads.bus.map(n.buses.country) sgrouper = n.storage_units.bus.map(n.buses.country) else: ggrouper = n.generators.bus lgrouper = n.loads.bus sgrouper = n.storage_units.bus load = ( n.snapshot_weightings.generators @ n.loads_t.p_set.groupby(lgrouper, axis=1).sum() ) inflow = ( n.snapshot_weightings.stores @ n.storage_units_t.inflow.groupby(sgrouper, axis=1).sum() ) inflow = inflow.reindex(load.index).fillna(0.0) rhs = scaling * (level * load - inflow) p = n.model["Generator-p"] lhs_gen = ( (p * (n.snapshot_weightings.generators * scaling)) .groupby(ggrouper.to_xarray()) .sum() .sum("snapshot") ) # TODO: double check that this is really needed, why do have to subtract the spillage if not n.storage_units_t.inflow.empty: spillage = n.model["StorageUnit-spill"] lhs_spill = ( (spillage * (-n.snapshot_weightings.stores * scaling)) .groupby(sgrouper.to_xarray()) .sum() .sum("snapshot") ) lhs = lhs_gen + lhs_spill else: lhs = lhs_gen n.model.add_constraints(lhs >= rhs, name="equity_min") def add_BAU_constraints(n, config): """ Add a per-carrier minimal overall capacity. BAU_mincapacities and opts must be adjusted in the config.yaml. Parameters ---------- n : pypsa.Network config : dict Example ------- scenario: opts: [Co2L-BAU-24H] electricity: BAU_mincapacities: solar: 0 onwind: 0 OCGT: 100000 offwind-ac: 0 offwind-dc: 0 Which sets minimum expansion across all nodes e.g. in Europe to 100GW. OCGT bus 1 + OCGT bus 2 + ... > 100000 """ mincaps = pd.Series(config["electricity"]["BAU_mincapacities"]) p_nom = n.model["Generator-p_nom"] ext_i = n.generators.query("p_nom_extendable") ext_carrier_i = xr.DataArray(ext_i.carrier.rename_axis("Generator-ext")) lhs = p_nom.groupby(ext_carrier_i).sum() index = mincaps.index.intersection(lhs.indexes["carrier"]) rhs = mincaps[index].rename_axis("carrier") n.model.add_constraints(lhs >= rhs, name="bau_mincaps") # TODO: think about removing or make per country def add_SAFE_constraints(n, config): """ Add a capacity reserve margin of a certain fraction above the peak demand. Renewable generators and storage do not contribute. Ignores network. Parameters ---------- n : pypsa.Network config : dict Example ------- config.yaml requires to specify opts: scenario: opts: [Co2L-SAFE-24H] electricity: SAFE_reservemargin: 0.1 Which sets a reserve margin of 10% above the peak demand. """ peakdemand = n.loads_t.p_set.sum(axis=1).max() margin = 1.0 + config["electricity"]["SAFE_reservemargin"] reserve_margin = peakdemand * margin conventional_carriers = config["electricity"]["conventional_carriers"] ext_gens_i = n.generators.query( "carrier in @conventional_carriers & p_nom_extendable" ).index p_nom = n.model["Generator-p_nom"].loc[ext_gens_i] lhs = p_nom.sum() exist_conv_caps = n.generators.query( "~p_nom_extendable & carrier in @conventional_carriers" ).p_nom.sum() rhs = reserve_margin - exist_conv_caps n.model.add_constraints(lhs >= rhs, name="safe_mintotalcap") def add_operational_reserve_margin(n, sns, config): """ Build reserve margin constraints based on the formulation given in https://genxproject.github.io/GenX/dev/core/#Reserves. Parameters ---------- n : pypsa.Network sns: pd.DatetimeIndex config : dict Example: -------- config.yaml requires to specify operational_reserve: operational_reserve: # like https://genxproject.github.io/GenX/dev/core/#Reserves activate: true epsilon_load: 0.02 # percentage of load at each snapshot epsilon_vres: 0.02 # percentage of VRES at each snapshot contingency: 400000 # MW """ reserve_config = config["electricity"]["operational_reserve"] EPSILON_LOAD = reserve_config["epsilon_load"] EPSILON_VRES = reserve_config["epsilon_vres"] CONTINGENCY = reserve_config["contingency"] # Reserve Variables n.model.add_variables( 0, np.inf, coords=[sns, n.generators.index], name="Generator-r" ) reserve = n.model["Generator-r"] summed_reserve = reserve.sum("Generator") # Share of extendable renewable capacities ext_i = n.generators.query("p_nom_extendable").index vres_i = n.generators_t.p_max_pu.columns if not ext_i.empty and not vres_i.empty: capacity_factor = n.generators_t.p_max_pu[vres_i.intersection(ext_i)] p_nom_vres = ( n.model["Generator-p_nom"] .loc[vres_i.intersection(ext_i)] .rename({"Generator-ext": "Generator"}) ) lhs = summed_reserve + (p_nom_vres * (-EPSILON_VRES * capacity_factor)).sum( "Generator" ) # Total demand per t demand = get_as_dense(n, "Load", "p_set").sum(axis=1) # VRES potential of non extendable generators capacity_factor = n.generators_t.p_max_pu[vres_i.difference(ext_i)] renewable_capacity = n.generators.p_nom[vres_i.difference(ext_i)] potential = (capacity_factor * renewable_capacity).sum(axis=1) # Right-hand-side rhs = EPSILON_LOAD * demand + EPSILON_VRES * potential + CONTINGENCY n.model.add_constraints(lhs >= rhs, name="reserve_margin") # additional constraint that capacity is not exceeded gen_i = n.generators.index ext_i = n.generators.query("p_nom_extendable").index fix_i = n.generators.query("not p_nom_extendable").index dispatch = n.model["Generator-p"] reserve = n.model["Generator-r"] capacity_variable = n.model["Generator-p_nom"].rename( {"Generator-ext": "Generator"} ) capacity_fixed = n.generators.p_nom[fix_i] p_max_pu = get_as_dense(n, "Generator", "p_max_pu") lhs = dispatch + reserve - capacity_variable * p_max_pu[ext_i] rhs = (p_max_pu[fix_i] * capacity_fixed).reindex(columns=gen_i, fill_value=0) n.model.add_constraints(lhs <= rhs, name="Generator-p-reserve-upper") def add_battery_constraints(n): """ Add constraint ensuring that charger = discharger, i.e. 1 * charger_size - efficiency * discharger_size = 0 """ if not n.links.p_nom_extendable.any(): return discharger_bool = n.links.index.str.contains("battery discharger") charger_bool = n.links.index.str.contains("battery charger") dischargers_ext = n.links[discharger_bool].query("p_nom_extendable").index chargers_ext = n.links[charger_bool].query("p_nom_extendable").index eff = n.links.efficiency[dischargers_ext].values lhs = ( n.model["Link-p_nom"].loc[chargers_ext] - n.model["Link-p_nom"].loc[dischargers_ext] * eff ) n.model.add_constraints(lhs == 0, name="Link-charger_ratio") def add_chp_constraints(n): electric = ( n.links.index.str.contains("urban central") & n.links.index.str.contains("CHP") & n.links.index.str.contains("electric") ) heat = ( n.links.index.str.contains("urban central") & n.links.index.str.contains("CHP") & n.links.index.str.contains("heat") ) electric_ext = n.links[electric].query("p_nom_extendable").index heat_ext = n.links[heat].query("p_nom_extendable").index electric_fix = n.links[electric].query("~p_nom_extendable").index heat_fix = n.links[heat].query("~p_nom_extendable").index p = n.model["Link-p"] # dimension: [time, link] # output ratio between heat and electricity and top_iso_fuel_line for extendable if not electric_ext.empty: p_nom = n.model["Link-p_nom"] lhs = ( p_nom.loc[electric_ext] * (n.links.p_nom_ratio * n.links.efficiency)[electric_ext].values - p_nom.loc[heat_ext] * n.links.efficiency[heat_ext].values ) n.model.add_constraints(lhs == 0, name="chplink-fix_p_nom_ratio") rename = {"Link-ext": "Link"} lhs = ( p.loc[:, electric_ext] + p.loc[:, heat_ext] - p_nom.rename(rename).loc[electric_ext] ) n.model.add_constraints(lhs <= 0, name="chplink-top_iso_fuel_line_ext") # top_iso_fuel_line for fixed if not electric_fix.empty: lhs = p.loc[:, electric_fix] + p.loc[:, heat_fix] rhs = n.links.p_nom[electric_fix] n.model.add_constraints(lhs <= rhs, name="chplink-top_iso_fuel_line_fix") # back-pressure if not electric.empty: lhs = ( p.loc[:, heat] * (n.links.efficiency[heat] * n.links.c_b[electric].values) - p.loc[:, electric] * n.links.efficiency[electric] ) n.model.add_constraints(lhs <= rhs, name="chplink-backpressure") def add_pipe_retrofit_constraint(n): """ Add constraint for retrofitting existing CH4 pipelines to H2 pipelines. """ gas_pipes_i = n.links.query("carrier == 'gas pipeline' and p_nom_extendable").index h2_retrofitted_i = n.links.query( "carrier == 'H2 pipeline retrofitted' and p_nom_extendable" ).index if h2_retrofitted_i.empty or gas_pipes_i.empty: return p_nom = n.model["Link-p_nom"] CH4_per_H2 = 1 / n.config["sector"]["H2_retrofit_capacity_per_CH4"] lhs = p_nom.loc[gas_pipes_i] + CH4_per_H2 * p_nom.loc[h2_retrofitted_i] rhs = n.links.p_nom[gas_pipes_i].rename_axis("Link-ext") n.model.add_constraints(lhs == rhs, name="Link-pipe_retrofit") def extra_functionality(n, snapshots): """ Collects supplementary constraints which will be passed to ``pypsa.optimization.optimize``. If you want to enforce additional custom constraints, this is a good location to add them. The arguments ``opts`` and ``snakemake.config`` are expected to be attached to the network. """ opts = n.opts config = n.config if "BAU" in opts and n.generators.p_nom_extendable.any(): add_BAU_constraints(n, config) if "SAFE" in opts and n.generators.p_nom_extendable.any(): add_SAFE_constraints(n, config) if "CCL" in opts and n.generators.p_nom_extendable.any(): add_CCL_constraints(n, config) reserve = config["electricity"].get("operational_reserve", {}) if reserve.get("activate"): add_operational_reserve_margin(n, snapshots, config) for o in opts: if "EQ" in o: add_EQ_constraints(n, o) add_battery_constraints(n) add_pipe_retrofit_constraint(n) if n._multi_invest: add_carbon_constraint(n, snapshots) add_carbon_budget_constraint(n, snapshots) add_retrofit_gas_boiler_constraint(n, snapshots) def solve_network(n, config, solving, opts="", **kwargs): set_of_options = solving["solver"]["options"] cf_solving = solving["options"] kwargs["multi_investment_periods"] = ( True if config["foresight"] == "perfect" else False ) kwargs["solver_options"] = ( solving["solver_options"][set_of_options] if set_of_options else {} ) kwargs["solver_name"] = solving["solver"]["name"] kwargs["extra_functionality"] = extra_functionality kwargs["transmission_losses"] = cf_solving.get("transmission_losses", False) kwargs["linearized_unit_commitment"] = cf_solving.get( "linearized_unit_commitment", False ) kwargs["assign_all_duals"] = cf_solving.get("assign_all_duals", False) rolling_horizon = cf_solving.pop("rolling_horizon", False) skip_iterations = cf_solving.pop("skip_iterations", False) if not n.lines.s_nom_extendable.any(): skip_iterations = True logger.info("No expandable lines found. Skipping iterative solving.") # add to network for extra_functionality n.config = config n.opts = opts if rolling_horizon: kwargs["horizon"] = cf_solving.get("horizon", 365) kwargs["overlap"] = cf_solving.get("overlap", 0) n.optimize.optimize_with_rolling_horizon(**kwargs) status, condition = "", "" elif skip_iterations: status, condition = n.optimize(**kwargs) else: kwargs["track_iterations"] = (cf_solving.get("track_iterations", False),) kwargs["min_iterations"] = (cf_solving.get("min_iterations", 4),) kwargs["max_iterations"] = (cf_solving.get("max_iterations", 6),) status, condition = n.optimize.optimize_transmission_expansion_iteratively( **kwargs ) if status != "ok" and not rolling_horizon: logger.warning( f"Solving status '{status}' with termination condition '{condition}'" ) if "infeasible" in condition: raise RuntimeError("Solving status 'infeasible'") return n # %% if __name__ == "__main__": if "snakemake" not in globals(): from _helpers import mock_snakemake snakemake = mock_snakemake( "solve_sector_network_perfect", configfiles="../config/test/config.perfect.yaml", simpl="", opts="", clusters="5", ll="v1.5", sector_opts="8760H-T-H-B-I-A-solar+p3-dist1", planning_horizons="2030", ) configure_logging(snakemake) if "sector_opts" in snakemake.wildcards.keys(): update_config_with_sector_opts( snakemake.config, snakemake.wildcards.sector_opts ) opts = snakemake.wildcards.opts if "sector_opts" in snakemake.wildcards.keys(): opts += "-" + snakemake.wildcards.sector_opts opts = [o for o in opts.split("-") if o != ""] solve_opts = snakemake.params.solving["options"] np.random.seed(solve_opts.get("seed", 123)) n = pypsa.Network(snakemake.input.network) n = prepare_network( n, solve_opts, config=snakemake.config, foresight=snakemake.params.foresight, planning_horizons=snakemake.params.planning_horizons, co2_sequestration_potential=snakemake.params["co2_sequestration_potential"], ) with memory_logger( filename=getattr(snakemake.log, "memory", None), interval=30.0 ) as mem: n = solve_network( n, config=snakemake.config, solving=snakemake.params.solving, opts=opts, log_fn=snakemake.log.solver, ) logger.info("Maximum memory usage: {}".format(mem.mem_usage)) n.meta = dict(snakemake.config, **dict(wildcards=dict(snakemake.wildcards))) n.export_to_netcdf(snakemake.output[0])