# -*- coding: utf-8 -*- # SPDX-FileCopyrightText: : 2017-2024 The PyPSA-Eur Authors # # SPDX-License-Identifier: MIT """ Solves optimal operation and capacity for a network with the option to iteratively optimize while updating line reactances. This script is used for optimizing the electrical network as well as the sector coupled network. Description ----------- Total annual system costs are minimised with PyPSA. The full formulation of the linear optimal power flow (plus investment planning is provided in the `documentation of PyPSA `_. The optimization is based on the :func:`network.optimize` function. Additionally, some extra constraints specified in :mod:`solve_network` are added. .. note:: The rules ``solve_elec_networks`` and ``solve_sector_networks`` run the workflow for all scenarios in the configuration file (``scenario:``) based on the rule :mod:`solve_network`. """ import importlib import logging import os import re import sys from functools import reduce import numpy as np import pandas as pd import pypsa import xarray as xr import yaml from _benchmark import memory_logger from _helpers import ( configure_logging, set_scenario_config, update_config_from_wildcards, ) from pypsa.descriptors import get_activity_mask from pypsa.descriptors import get_switchable_as_dense as get_as_dense logger = logging.getLogger(__name__) pypsa.pf.logger.setLevel(logging.WARNING) def add_land_use_constraint(n, planning_horizons, config): if "m" in snakemake.wildcards.clusters: _add_land_use_constraint_m(n, planning_horizons, config) else: _add_land_use_constraint(n) def add_land_use_constraint_perfect(n): """ Add global constraints for tech capacity limit. """ logger.info("Add land-use constraint for perfect foresight") def compress_series(s): def process_group(group): if group.nunique() == 1: return pd.Series(group.iloc[0], index=[None]) else: return group return s.groupby(level=[0, 1]).apply(process_group) def new_index_name(t): # Convert all elements to string and filter out None values parts = [str(x) for x in t if x is not None] # Join with space, but use a dash for the last item if not None return " ".join(parts[:2]) + (f"-{parts[-1]}" if len(parts) > 2 else "") def check_p_min_p_max(p_nom_max): p_nom_min = n.generators[ext_i].groupby(grouper).sum().p_nom_min p_nom_min = p_nom_min.reindex(p_nom_max.index) check = ( p_nom_min.groupby(level=[0, 1]).sum() > p_nom_max.groupby(level=[0, 1]).min() ) if check.sum(): logger.warning( f"summed p_min_pu values at node larger than technical potential {check[check].index}" ) grouper = [n.generators.carrier, n.generators.bus, n.generators.build_year] ext_i = n.generators.p_nom_extendable # get technical limit per node and investment period p_nom_max = n.generators[ext_i].groupby(grouper).min().p_nom_max # drop carriers without tech limit p_nom_max = p_nom_max[~p_nom_max.isin([np.inf, np.nan])] # carrier carriers = p_nom_max.index.get_level_values(0).unique() gen_i = n.generators[(n.generators.carrier.isin(carriers)) & (ext_i)].index n.generators.loc[gen_i, "p_nom_min"] = 0 # check minimum capacities check_p_min_p_max(p_nom_max) # drop multi entries in case p_nom_max stays constant in different periods # p_nom_max = compress_series(p_nom_max) # adjust name to fit syntax of nominal constraint per bus df = p_nom_max.reset_index() df["name"] = df.apply( lambda row: f"nom_max_{row['carrier']}" + (f"_{row['build_year']}" if row["build_year"] is not None else ""), axis=1, ) for name in df.name.unique(): df_carrier = df[df.name == name] bus = df_carrier.bus n.buses.loc[bus, name] = df_carrier.p_nom_max.values return n def _add_land_use_constraint(n): # warning: this will miss existing offwind which is not classed AC-DC and has carrier 'offwind' for carrier in [ "solar", "solar rooftop", "solar-hsat", "onwind", "offwind-ac", "offwind-dc", "offwind-float", ]: extendable_i = (n.generators.carrier == carrier) & n.generators.p_nom_extendable n.generators.loc[extendable_i, "p_nom_min"] = 0 ext_i = (n.generators.carrier == carrier) & ~n.generators.p_nom_extendable existing = ( n.generators.loc[ext_i, "p_nom"] .groupby(n.generators.bus.map(n.buses.location)) .sum() ) existing.index += " " + carrier + "-" + snakemake.wildcards.planning_horizons n.generators.loc[existing.index, "p_nom_max"] -= existing # check if existing capacities are larger than technical potential existing_large = n.generators[ n.generators["p_nom_min"] > n.generators["p_nom_max"] ].index if len(existing_large): logger.warning( f"Existing capacities larger than technical potential for {existing_large},\ adjust technical potential to existing capacities" ) n.generators.loc[existing_large, "p_nom_max"] = n.generators.loc[ existing_large, "p_nom_min" ] n.generators.p_nom_max.clip(lower=0, inplace=True) def _add_land_use_constraint_m(n, planning_horizons, config): # if generators clustering is lower than network clustering, land_use accounting is at generators clusters grouping_years = config["existing_capacities"]["grouping_years_power"] current_horizon = snakemake.wildcards.planning_horizons for carrier in [ "solar", "solar rooftop", "solar-hsat", "onwind", "offwind-ac", "offwind-dc", ]: extendable_i = (n.generators.carrier == carrier) & n.generators.p_nom_extendable n.generators.loc[extendable_i, "p_nom_min"] = 0 existing = n.generators.loc[n.generators.carrier == carrier, "p_nom"] ind = list( {i.split(sep=" ")[0] + " " + i.split(sep=" ")[1] for i in existing.index} ) previous_years = [ str(y) for y in set(planning_horizons + grouping_years) if y < int(snakemake.wildcards.planning_horizons) ] for p_year in previous_years: ind2 = [ i for i in ind if i + " " + carrier + "-" + p_year in existing.index ] sel_current = [i + " " + carrier + "-" + current_horizon for i in ind2] sel_p_year = [i + " " + carrier + "-" + p_year for i in ind2] n.generators.loc[sel_current, "p_nom_max"] -= existing.loc[ sel_p_year ].rename(lambda x: x[:-4] + current_horizon) # check if existing capacities are larger than technical potential existing_large = n.generators[ n.generators["p_nom_min"] > n.generators["p_nom_max"] ].index if len(existing_large): logger.warning( f"Existing capacities larger than technical potential for {existing_large},\ adjust technical potential to existing capacities" ) n.generators.loc[existing_large, "p_nom_max"] = n.generators.loc[ existing_large, "p_nom_min" ] n.generators.p_nom_max.clip(lower=0, inplace=True) def add_solar_potential_constraints(n, config): """ Add constraint to make sure the sum capacity of all solar technologies (fixed, tracking, ets. ) is below the region potential. Example: ES1 0: total solar potential is 10 GW, meaning: solar potential : 10 GW solar-hsat potential : 8 GW (solar with single axis tracking is assumed to have higher land use) The constraint ensures that: solar_p_nom + solar_hsat_p_nom * 1.13 <= 10 GW """ land_use_factors = { "solar-hsat": config["renewable"]["solar"]["capacity_per_sqkm"] / config["renewable"]["solar-hsat"]["capacity_per_sqkm"], } rename = {"Generator-ext": "Generator"} solar_carriers = ["solar", "solar-hsat"] solar = n.generators[ n.generators.carrier.isin(solar_carriers) & n.generators.p_nom_extendable ].index solar_today = n.generators[ (n.generators.carrier == "solar") & (n.generators.p_nom_extendable) ].index solar_hsat = n.generators[(n.generators.carrier == "solar-hsat")].index if solar.empty: return land_use = pd.DataFrame(1, index=solar, columns=["land_use_factor"]) for carrier, factor in land_use_factors.items(): land_use = land_use.apply( lambda x: (x * factor) if carrier in x.name else x, axis=1 ) if "m" in snakemake.wildcards.clusters: location = pd.Series( [" ".join(i.split(" ")[:2]) for i in n.generators.index], index=n.generators.index, ) ggrouper = pd.Series( n.generators.loc[solar].index.rename("bus").map(location), index=n.generators.loc[solar].index, ).to_xarray() rhs = ( n.generators.loc[solar_today, "p_nom_max"] .groupby(n.generators.loc[solar_today].index.rename("bus").map(location)) .sum() - n.generators.loc[solar_hsat, "p_nom_opt"] .groupby(n.generators.loc[solar_hsat].index.rename("bus").map(location)) .sum() * land_use_factors["solar-hsat"] ).clip(lower=0) else: location = pd.Series(n.buses.index, index=n.buses.index) ggrouper = n.generators.loc[solar].bus rhs = ( n.generators.loc[solar_today, "p_nom_max"] .groupby(n.generators.loc[solar_today].bus.map(location)) .sum() - n.generators.loc[solar_hsat, "p_nom_opt"] .groupby(n.generators.loc[solar_hsat].bus.map(location)) .sum() * land_use_factors["solar-hsat"] ).clip(lower=0) lhs = ( (n.model["Generator-p_nom"].rename(rename).loc[solar] * land_use.squeeze()) .groupby(ggrouper) .sum() ) logger.info("Adding solar potential constraint.") n.model.add_constraints(lhs <= rhs, name="solar_potential") def add_co2_sequestration_limit(n, limit=200): """ Add a global constraint on the amount of Mt CO2 that can be sequestered. """ if not n.investment_periods.empty: periods = n.investment_periods names = pd.Index([f"co2_sequestration_limit-{period}" for period in periods]) else: periods = [np.nan] names = pd.Index(["co2_sequestration_limit"]) n.madd( "GlobalConstraint", names, sense=">=", constant=-limit * 1e6, type="operational_limit", carrier_attribute="co2 sequestered", investment_period=periods, ) def add_carbon_constraint(n, snapshots): glcs = n.global_constraints.query('type == "co2_atmosphere"') if glcs.empty: return for name, glc in glcs.iterrows(): carattr = glc.carrier_attribute emissions = n.carriers.query(f"{carattr} != 0")[carattr] if emissions.empty: continue # stores n.stores["carrier"] = n.stores.bus.map(n.buses.carrier) stores = n.stores.query("carrier in @emissions.index and not e_cyclic") if not stores.empty: last = n.snapshot_weightings.reset_index().groupby("period").last() last_i = last.set_index([last.index, last.timestep]).index final_e = n.model["Store-e"].loc[last_i, stores.index] time_valid = int(glc.loc["investment_period"]) time_i = pd.IndexSlice[time_valid, :] lhs = final_e.loc[time_i, :] - final_e.shift(snapshot=1).loc[time_i, :] rhs = glc.constant n.model.add_constraints(lhs <= rhs, name=f"GlobalConstraint-{name}") def add_carbon_budget_constraint(n, snapshots): glcs = n.global_constraints.query('type == "Co2Budget"') if glcs.empty: return for name, glc in glcs.iterrows(): carattr = glc.carrier_attribute emissions = n.carriers.query(f"{carattr} != 0")[carattr] if emissions.empty: continue # stores n.stores["carrier"] = n.stores.bus.map(n.buses.carrier) stores = n.stores.query("carrier in @emissions.index and not e_cyclic") if not stores.empty: last = n.snapshot_weightings.reset_index().groupby("period").last() last_i = last.set_index([last.index, last.timestep]).index final_e = n.model["Store-e"].loc[last_i, stores.index] time_valid = int(glc.loc["investment_period"]) time_i = pd.IndexSlice[time_valid, :] weighting = n.investment_period_weightings.loc[time_valid, "years"] lhs = final_e.loc[time_i, :] * weighting rhs = glc.constant n.model.add_constraints(lhs <= rhs, name=f"GlobalConstraint-{name}") def add_max_growth(n): """ Add maximum growth rates for different carriers. """ opts = snakemake.params["sector"]["limit_max_growth"] # take maximum yearly difference between investment periods since historic growth is per year factor = n.investment_period_weightings.years.max() * opts["factor"] for carrier in opts["max_growth"].keys(): max_per_period = opts["max_growth"][carrier] * factor logger.info( f"set maximum growth rate per investment period of {carrier} to {max_per_period} GW." ) n.carriers.loc[carrier, "max_growth"] = max_per_period * 1e3 for carrier in opts["max_relative_growth"].keys(): max_r_per_period = opts["max_relative_growth"][carrier] logger.info( f"set maximum relative growth per investment period of {carrier} to {max_r_per_period}." ) n.carriers.loc[carrier, "max_relative_growth"] = max_r_per_period return n def add_retrofit_gas_boiler_constraint(n, snapshots): """ Allow retrofitting of existing gas boilers to H2 boilers. """ c = "Link" logger.info("Add constraint for retrofitting gas boilers to H2 boilers.") # existing gas boilers mask = n.links.carrier.str.contains("gas boiler") & ~n.links.p_nom_extendable gas_i = n.links[mask].index mask = n.links.carrier.str.contains("retrofitted H2 boiler") h2_i = n.links[mask].index n.links.loc[gas_i, "p_nom_extendable"] = True p_nom = n.links.loc[gas_i, "p_nom"] n.links.loc[gas_i, "p_nom"] = 0 # heat profile cols = n.loads_t.p_set.columns[ n.loads_t.p_set.columns.str.contains("heat") & ~n.loads_t.p_set.columns.str.contains("industry") & ~n.loads_t.p_set.columns.str.contains("agriculture") ] profile = n.loads_t.p_set[cols].div( n.loads_t.p_set[cols].groupby(level=0).max(), level=0 ) # to deal if max value is zero profile.fillna(0, inplace=True) profile.rename(columns=n.loads.bus.to_dict(), inplace=True) profile = profile.reindex(columns=n.links.loc[gas_i, "bus1"]) profile.columns = gas_i rhs = profile.mul(p_nom) dispatch = n.model["Link-p"] active = get_activity_mask(n, c, snapshots, gas_i) rhs = rhs[active] p_gas = dispatch.sel(Link=gas_i) p_h2 = dispatch.sel(Link=h2_i) lhs = p_gas + p_h2 n.model.add_constraints(lhs == rhs, name="gas_retrofit") def prepare_network( n, solve_opts=None, config=None, foresight=None, planning_horizons=None, co2_sequestration_potential=None, ): if "clip_p_max_pu" in solve_opts: for df in ( n.generators_t.p_max_pu, n.generators_t.p_min_pu, n.links_t.p_max_pu, n.links_t.p_min_pu, n.storage_units_t.inflow, ): df.where(df > solve_opts["clip_p_max_pu"], other=0.0, inplace=True) if load_shedding := solve_opts.get("load_shedding"): # intersect between macroeconomic and surveybased willingness to pay # http://journal.frontiersin.org/article/10.3389/fenrg.2015.00055/full # TODO: retrieve color and nice name from config n.add("Carrier", "load", color="#dd2e23", nice_name="Load shedding") buses_i = n.buses.index if not np.isscalar(load_shedding): # TODO: do not scale via sign attribute (use Eur/MWh instead of Eur/kWh) load_shedding = 1e2 # Eur/kWh n.madd( "Generator", buses_i, " load", bus=buses_i, carrier="load", sign=1e-3, # Adjust sign to measure p and p_nom in kW instead of MW marginal_cost=load_shedding, # Eur/kWh p_nom=1e9, # kW ) if solve_opts.get("noisy_costs"): for t in n.iterate_components(): # if 'capital_cost' in t.df: # t.df['capital_cost'] += 1e1 + 2.*(np.random.random(len(t.df)) - 0.5) if "marginal_cost" in t.df: t.df["marginal_cost"] += 1e-2 + 2e-3 * ( np.random.random(len(t.df)) - 0.5 ) for t in n.iterate_components(["Line", "Link"]): t.df["capital_cost"] += ( 1e-1 + 2e-2 * (np.random.random(len(t.df)) - 0.5) ) * t.df["length"] if solve_opts.get("nhours"): nhours = solve_opts["nhours"] n.set_snapshots(n.snapshots[:nhours]) n.snapshot_weightings[:] = 8760.0 / nhours if foresight == "myopic": add_land_use_constraint(n, planning_horizons, config) if foresight == "perfect": n = add_land_use_constraint_perfect(n) if snakemake.params["sector"]["limit_max_growth"]["enable"]: n = add_max_growth(n) if n.stores.carrier.eq("co2 sequestered").any(): limit = co2_sequestration_potential add_co2_sequestration_limit(n, limit=limit) return n def add_CCL_constraints(n, config): """ Add CCL (country & carrier limit) constraint to the network. Add minimum and maximum levels of generator nominal capacity per carrier for individual countries. Opts and path for agg_p_nom_minmax.csv must be defined in config.yaml. Default file is available at data/agg_p_nom_minmax.csv. Parameters ---------- n : pypsa.Network config : dict Example ------- scenario: opts: [Co2L-CCL-24h] electricity: agg_p_nom_limits: data/agg_p_nom_minmax.csv """ agg_p_nom_minmax = pd.read_csv( config["solving"]["agg_p_nom_limits"]["file"], index_col=[0, 1], header=[0, 1] )[snakemake.wildcards.planning_horizons] logger.info("Adding generation capacity constraints per carrier and country") p_nom = n.model["Generator-p_nom"] gens = n.generators.query("p_nom_extendable").rename_axis(index="Generator-ext") if config["solving"]["agg_p_nom_limits"]["agg_offwind"]: rename_offwind = { "offwind-ac": "offwind-all", "offwind-dc": "offwind-all", "offwind": "offwind-all", } gens = gens.replace(rename_offwind) grouper = pd.concat([gens.bus.map(n.buses.country), gens.carrier], axis=1) lhs = p_nom.groupby(grouper).sum().rename(bus="country") if config["solving"]["agg_p_nom_limits"]["include_existing"]: gens_cst = n.generators.query("~p_nom_extendable").rename_axis( index="Generator-cst" ) gens_cst = gens_cst[ (gens_cst["build_year"] + gens_cst["lifetime"]) >= int(snakemake.wildcards.planning_horizons) ] if config["solving"]["agg_p_nom_limits"]["agg_offwind"]: gens_cst = gens_cst.replace(rename_offwind) rhs_cst = ( pd.concat( [gens_cst.bus.map(n.buses.country), gens_cst[["carrier", "p_nom"]]], axis=1, ) .groupby(["bus", "carrier"]) .sum() ) rhs_cst.index = rhs_cst.index.rename({"bus": "country"}) rhs_min = agg_p_nom_minmax["min"].dropna() idx_min = rhs_min.index.join(rhs_cst.index, how="left") rhs_min = rhs_min.reindex(idx_min).fillna(0) rhs = (rhs_min - rhs_cst.reindex(idx_min).fillna(0).p_nom).dropna() rhs[rhs < 0] = 0 minimum = xr.DataArray(rhs).rename(dim_0="group") else: minimum = xr.DataArray(agg_p_nom_minmax["min"].dropna()).rename(dim_0="group") index = minimum.indexes["group"].intersection(lhs.indexes["group"]) if not index.empty: n.model.add_constraints( lhs.sel(group=index) >= minimum.loc[index], name="agg_p_nom_min" ) if config["solving"]["agg_p_nom_limits"]["include_existing"]: rhs_max = agg_p_nom_minmax["max"].dropna() idx_max = rhs_max.index.join(rhs_cst.index, how="left") rhs_max = rhs_max.reindex(idx_max).fillna(0) rhs = (rhs_max - rhs_cst.reindex(idx_max).fillna(0).p_nom).dropna() rhs[rhs < 0] = 0 maximum = xr.DataArray(rhs).rename(dim_0="group") else: maximum = xr.DataArray(agg_p_nom_minmax["max"].dropna()).rename(dim_0="group") index = maximum.indexes["group"].intersection(lhs.indexes["group"]) if not index.empty: n.model.add_constraints( lhs.sel(group=index) <= maximum.loc[index], name="agg_p_nom_max" ) def add_EQ_constraints(n, o, scaling=1e-1): """ Add equity constraints to the network. Currently this is only implemented for the electricity sector only. Opts must be specified in the config.yaml. Parameters ---------- n : pypsa.Network o : str Example ------- scenario: opts: [Co2L-EQ0.7-24h] Require each country or node to on average produce a minimal share of its total electricity consumption itself. Example: EQ0.7c demands each country to produce on average at least 70% of its consumption; EQ0.7 demands each node to produce on average at least 70% of its consumption. """ # TODO: Generalize to cover myopic and other sectors? float_regex = "[0-9]*\.?[0-9]+" level = float(re.findall(float_regex, o)[0]) if o[-1] == "c": ggrouper = n.generators.bus.map(n.buses.country) lgrouper = n.loads.bus.map(n.buses.country) sgrouper = n.storage_units.bus.map(n.buses.country) else: ggrouper = n.generators.bus lgrouper = n.loads.bus sgrouper = n.storage_units.bus load = ( n.snapshot_weightings.generators @ n.loads_t.p_set.groupby(lgrouper, axis=1).sum() ) inflow = ( n.snapshot_weightings.stores @ n.storage_units_t.inflow.groupby(sgrouper, axis=1).sum() ) inflow = inflow.reindex(load.index).fillna(0.0) rhs = scaling * (level * load - inflow) p = n.model["Generator-p"] lhs_gen = ( (p * (n.snapshot_weightings.generators * scaling)) .groupby(ggrouper.to_xarray()) .sum() .sum("snapshot") ) # TODO: double check that this is really needed, why do have to subtract the spillage if not n.storage_units_t.inflow.empty: spillage = n.model["StorageUnit-spill"] lhs_spill = ( (spillage * (-n.snapshot_weightings.stores * scaling)) .groupby(sgrouper.to_xarray()) .sum() .sum("snapshot") ) lhs = lhs_gen + lhs_spill else: lhs = lhs_gen n.model.add_constraints(lhs >= rhs, name="equity_min") def add_BAU_constraints(n, config): """ Add a per-carrier minimal overall capacity. BAU_mincapacities and opts must be adjusted in the config.yaml. Parameters ---------- n : pypsa.Network config : dict Example ------- scenario: opts: [Co2L-BAU-24h] electricity: BAU_mincapacities: solar: 0 onwind: 0 OCGT: 100000 offwind-ac: 0 offwind-dc: 0 Which sets minimum expansion across all nodes e.g. in Europe to 100GW. OCGT bus 1 + OCGT bus 2 + ... > 100000 """ mincaps = pd.Series(config["electricity"]["BAU_mincapacities"]) p_nom = n.model["Generator-p_nom"] ext_i = n.generators.query("p_nom_extendable") ext_carrier_i = xr.DataArray(ext_i.carrier.rename_axis("Generator-ext")) lhs = p_nom.groupby(ext_carrier_i).sum() index = mincaps.index.intersection(lhs.indexes["carrier"]) rhs = mincaps[lhs.indexes["carrier"]].rename_axis("carrier") n.model.add_constraints(lhs >= rhs, name="bau_mincaps") # TODO: think about removing or make per country def add_SAFE_constraints(n, config): """ Add a capacity reserve margin of a certain fraction above the peak demand. Renewable generators and storage do not contribute. Ignores network. Parameters ---------- n : pypsa.Network config : dict Example ------- config.yaml requires to specify opts: scenario: opts: [Co2L-SAFE-24h] electricity: SAFE_reservemargin: 0.1 Which sets a reserve margin of 10% above the peak demand. """ peakdemand = n.loads_t.p_set.sum(axis=1).max() margin = 1.0 + config["electricity"]["SAFE_reservemargin"] reserve_margin = peakdemand * margin conventional_carriers = config["electricity"]["conventional_carriers"] # noqa: F841 ext_gens_i = n.generators.query( "carrier in @conventional_carriers & p_nom_extendable" ).index p_nom = n.model["Generator-p_nom"].loc[ext_gens_i] lhs = p_nom.sum() exist_conv_caps = n.generators.query( "~p_nom_extendable & carrier in @conventional_carriers" ).p_nom.sum() rhs = reserve_margin - exist_conv_caps n.model.add_constraints(lhs >= rhs, name="safe_mintotalcap") def add_operational_reserve_margin(n, sns, config): """ Build reserve margin constraints based on the formulation given in https://genxproject.github.io/GenX/dev/core/#Reserves. Parameters ---------- n : pypsa.Network sns: pd.DatetimeIndex config : dict Example: -------- config.yaml requires to specify operational_reserve: operational_reserve: # like https://genxproject.github.io/GenX/dev/core/#Reserves activate: true epsilon_load: 0.02 # percentage of load at each snapshot epsilon_vres: 0.02 # percentage of VRES at each snapshot contingency: 400000 # MW """ reserve_config = config["electricity"]["operational_reserve"] EPSILON_LOAD = reserve_config["epsilon_load"] EPSILON_VRES = reserve_config["epsilon_vres"] CONTINGENCY = reserve_config["contingency"] # Reserve Variables n.model.add_variables( 0, np.inf, coords=[sns, n.generators.index], name="Generator-r" ) reserve = n.model["Generator-r"] summed_reserve = reserve.sum("Generator") # Share of extendable renewable capacities ext_i = n.generators.query("p_nom_extendable").index vres_i = n.generators_t.p_max_pu.columns if not ext_i.empty and not vres_i.empty: capacity_factor = n.generators_t.p_max_pu[vres_i.intersection(ext_i)] p_nom_vres = ( n.model["Generator-p_nom"] .loc[vres_i.intersection(ext_i)] .rename({"Generator-ext": "Generator"}) ) lhs = summed_reserve + ( p_nom_vres * (-EPSILON_VRES * xr.DataArray(capacity_factor)) ).sum("Generator") # Total demand per t demand = get_as_dense(n, "Load", "p_set").sum(axis=1) # VRES potential of non extendable generators capacity_factor = n.generators_t.p_max_pu[vres_i.difference(ext_i)] renewable_capacity = n.generators.p_nom[vres_i.difference(ext_i)] potential = (capacity_factor * renewable_capacity).sum(axis=1) # Right-hand-side rhs = EPSILON_LOAD * demand + EPSILON_VRES * potential + CONTINGENCY n.model.add_constraints(lhs >= rhs, name="reserve_margin") # additional constraint that capacity is not exceeded gen_i = n.generators.index ext_i = n.generators.query("p_nom_extendable").index fix_i = n.generators.query("not p_nom_extendable").index dispatch = n.model["Generator-p"] reserve = n.model["Generator-r"] capacity_variable = n.model["Generator-p_nom"].rename( {"Generator-ext": "Generator"} ) capacity_fixed = n.generators.p_nom[fix_i] p_max_pu = get_as_dense(n, "Generator", "p_max_pu") lhs = dispatch + reserve - capacity_variable * xr.DataArray(p_max_pu[ext_i]) rhs = (p_max_pu[fix_i] * capacity_fixed).reindex(columns=gen_i, fill_value=0) n.model.add_constraints(lhs <= rhs, name="Generator-p-reserve-upper") def add_battery_constraints(n): """ Add constraint ensuring that charger = discharger, i.e. 1 * charger_size - efficiency * discharger_size = 0 """ if not n.links.p_nom_extendable.any(): return discharger_bool = n.links.index.str.contains("battery discharger") charger_bool = n.links.index.str.contains("battery charger") dischargers_ext = n.links[discharger_bool].query("p_nom_extendable").index chargers_ext = n.links[charger_bool].query("p_nom_extendable").index eff = n.links.efficiency[dischargers_ext].values lhs = ( n.model["Link-p_nom"].loc[chargers_ext] - n.model["Link-p_nom"].loc[dischargers_ext] * eff ) n.model.add_constraints(lhs == 0, name="Link-charger_ratio") def add_lossy_bidirectional_link_constraints(n): if not n.links.p_nom_extendable.any() or "reversed" not in n.links.columns: return n.links["reversed"] = n.links.reversed.fillna(0).astype(bool) carriers = n.links.loc[n.links.reversed, "carrier"].unique() # noqa: F841 forward_i = n.links.query( "carrier in @carriers and ~reversed and p_nom_extendable" ).index def get_backward_i(forward_i): return pd.Index( [ ( re.sub(r"-(\d{4})$", r"-reversed-\1", s) if re.search(r"-\d{4}$", s) else s + "-reversed" ) for s in forward_i ] ) backward_i = get_backward_i(forward_i) lhs = n.model["Link-p_nom"].loc[backward_i] rhs = n.model["Link-p_nom"].loc[forward_i] n.model.add_constraints(lhs == rhs, name="Link-bidirectional_sync") def add_chp_constraints(n): electric = ( n.links.index.str.contains("urban central") & n.links.index.str.contains("CHP") & n.links.index.str.contains("electric") ) heat = ( n.links.index.str.contains("urban central") & n.links.index.str.contains("CHP") & n.links.index.str.contains("heat") ) electric_ext = n.links[electric].query("p_nom_extendable").index heat_ext = n.links[heat].query("p_nom_extendable").index electric_fix = n.links[electric].query("~p_nom_extendable").index heat_fix = n.links[heat].query("~p_nom_extendable").index p = n.model["Link-p"] # dimension: [time, link] # output ratio between heat and electricity and top_iso_fuel_line for extendable if not electric_ext.empty: p_nom = n.model["Link-p_nom"] lhs = ( p_nom.loc[electric_ext] * (n.links.p_nom_ratio * n.links.efficiency)[electric_ext].values - p_nom.loc[heat_ext] * n.links.efficiency[heat_ext].values ) n.model.add_constraints(lhs == 0, name="chplink-fix_p_nom_ratio") rename = {"Link-ext": "Link"} lhs = ( p.loc[:, electric_ext] + p.loc[:, heat_ext] - p_nom.rename(rename).loc[electric_ext] ) n.model.add_constraints(lhs <= 0, name="chplink-top_iso_fuel_line_ext") # top_iso_fuel_line for fixed if not electric_fix.empty: lhs = p.loc[:, electric_fix] + p.loc[:, heat_fix] rhs = n.links.p_nom[electric_fix] n.model.add_constraints(lhs <= rhs, name="chplink-top_iso_fuel_line_fix") # back-pressure if not electric.empty: lhs = ( p.loc[:, heat] * (n.links.efficiency[heat] * n.links.c_b[electric].values) - p.loc[:, electric] * n.links.efficiency[electric] ) n.model.add_constraints(lhs <= rhs, name="chplink-backpressure") def add_pipe_retrofit_constraint(n): """ Add constraint for retrofitting existing CH4 pipelines to H2 pipelines. """ if "reversed" not in n.links.columns: n.links["reversed"] = False gas_pipes_i = n.links.query( "carrier == 'gas pipeline' and p_nom_extendable and ~reversed" ).index h2_retrofitted_i = n.links.query( "carrier == 'H2 pipeline retrofitted' and p_nom_extendable and ~reversed" ).index if h2_retrofitted_i.empty or gas_pipes_i.empty: return p_nom = n.model["Link-p_nom"] CH4_per_H2 = 1 / n.config["sector"]["H2_retrofit_capacity_per_CH4"] lhs = p_nom.loc[gas_pipes_i] + CH4_per_H2 * p_nom.loc[h2_retrofitted_i] rhs = n.links.p_nom[gas_pipes_i].rename_axis("Link-ext") n.model.add_constraints(lhs == rhs, name="Link-pipe_retrofit") def add_co2_atmosphere_constraint(n, snapshots): glcs = n.global_constraints[n.global_constraints.type == "co2_atmosphere"] if glcs.empty: return for name, glc in glcs.iterrows(): carattr = glc.carrier_attribute emissions = n.carriers.query(f"{carattr} != 0")[carattr] if emissions.empty: continue # stores n.stores["carrier"] = n.stores.bus.map(n.buses.carrier) stores = n.stores.query("carrier in @emissions.index and not e_cyclic") if not stores.empty: last_i = snapshots[-1] lhs = n.model["Store-e"].loc[last_i, stores.index] rhs = glc.constant n.model.add_constraints(lhs <= rhs, name=f"GlobalConstraint-{name}") def extra_functionality(n, snapshots): """ Collects supplementary constraints which will be passed to ``pypsa.optimization.optimize``. If you want to enforce additional custom constraints, this is a good location to add them. The arguments ``opts`` and ``snakemake.config`` are expected to be attached to the network. """ config = n.config constraints = config["solving"].get("constraints", {}) if constraints["BAU"] and n.generators.p_nom_extendable.any(): add_BAU_constraints(n, config) if constraints["SAFE"] and n.generators.p_nom_extendable.any(): add_SAFE_constraints(n, config) if constraints["CCL"] and n.generators.p_nom_extendable.any(): add_CCL_constraints(n, config) reserve = config["electricity"].get("operational_reserve", {}) if reserve.get("activate"): add_operational_reserve_margin(n, snapshots, config) if EQ_o := constraints["EQ"]: add_EQ_constraints(n, EQ_o.replace("EQ", "")) if {"solar-hsat", "solar"}.issubset(config["renewable"].keys()): add_solar_potential_constraints(n, config) add_battery_constraints(n) add_lossy_bidirectional_link_constraints(n) add_pipe_retrofit_constraint(n) if n._multi_invest: add_carbon_constraint(n, snapshots) add_carbon_budget_constraint(n, snapshots) add_retrofit_gas_boiler_constraint(n, snapshots) else: add_co2_atmosphere_constraint(n, snapshots) if snakemake.params.custom_extra_functionality: source_path = snakemake.params.custom_extra_functionality assert os.path.exists(source_path), f"{source_path} does not exist" sys.path.append(os.path.dirname(source_path)) module_name = os.path.splitext(os.path.basename(source_path))[0] module = importlib.import_module(module_name) custom_extra_functionality = getattr(module, module_name) custom_extra_functionality(n, snapshots, snakemake) def solve_network(n, config, solving, **kwargs): set_of_options = solving["solver"]["options"] cf_solving = solving["options"] kwargs["multi_investment_periods"] = config["foresight"] == "perfect" kwargs["solver_options"] = ( solving["solver_options"][set_of_options] if set_of_options else {} ) kwargs["solver_name"] = solving["solver"]["name"] kwargs["extra_functionality"] = extra_functionality kwargs["transmission_losses"] = cf_solving.get("transmission_losses", False) kwargs["linearized_unit_commitment"] = cf_solving.get( "linearized_unit_commitment", False ) kwargs["assign_all_duals"] = cf_solving.get("assign_all_duals", False) kwargs["io_api"] = cf_solving.get("io_api", None) if kwargs["solver_name"] == "gurobi": logging.getLogger("gurobipy").setLevel(logging.CRITICAL) rolling_horizon = cf_solving.pop("rolling_horizon", False) skip_iterations = cf_solving.pop("skip_iterations", False) if not n.lines.s_nom_extendable.any(): skip_iterations = True logger.info("No expandable lines found. Skipping iterative solving.") # add to network for extra_functionality n.config = config if rolling_horizon and snakemake.rule == "solve_operations_network": kwargs["horizon"] = cf_solving.get("horizon", 365) kwargs["overlap"] = cf_solving.get("overlap", 0) n.optimize.optimize_with_rolling_horizon(**kwargs) status, condition = "", "" elif skip_iterations: status, condition = n.optimize(**kwargs) else: kwargs["track_iterations"] = cf_solving["track_iterations"] kwargs["min_iterations"] = cf_solving["min_iterations"] kwargs["max_iterations"] = cf_solving["max_iterations"] if cf_solving["post_discretization"].pop("enable"): logger.info("Add post-discretization parameters.") kwargs.update(cf_solving["post_discretization"]) status, condition = n.optimize.optimize_transmission_expansion_iteratively( **kwargs ) if status != "ok" and not rolling_horizon: logger.warning( f"Solving status '{status}' with termination condition '{condition}'" ) if "infeasible" in condition: labels = n.model.compute_infeasibilities() logger.info(f"Labels:\n{labels}") n.model.print_infeasibilities() raise RuntimeError("Solving status 'infeasible'") return n if __name__ == "__main__": if "snakemake" not in globals(): from _helpers import mock_snakemake snakemake = mock_snakemake( "solve_sector_network", configfiles="../config/test/config.perfect.yaml", simpl="", opts="", clusters="37", ll="v1.0", sector_opts="CO2L0-1H-T-H-B-I-A-dist1", planning_horizons="2030", ) configure_logging(snakemake) set_scenario_config(snakemake) update_config_from_wildcards(snakemake.config, snakemake.wildcards) solve_opts = snakemake.params.solving["options"] np.random.seed(solve_opts.get("seed", 123)) n = pypsa.Network(snakemake.input.network) n = prepare_network( n, solve_opts, config=snakemake.config, foresight=snakemake.params.foresight, planning_horizons=snakemake.params.planning_horizons, co2_sequestration_potential=snakemake.params["co2_sequestration_potential"], ) with memory_logger( filename=getattr(snakemake.log, "memory", None), interval=30.0 ) as mem: n = solve_network( n, config=snakemake.config, solving=snakemake.params.solving, log_fn=snakemake.log.solver, ) logger.info(f"Maximum memory usage: {mem.mem_usage}") n.meta = dict(snakemake.config, **dict(wildcards=dict(snakemake.wildcards))) n.export_to_netcdf(snakemake.output.network) with open(snakemake.output.config, "w") as file: yaml.dump( n.meta, file, default_flow_style=False, allow_unicode=True, sort_keys=False, )