[pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci
This commit is contained in:
pre-commit-ci[bot] 2023-09-11 21:03:58 +00:00
parent 823df52309
commit 3eed341044
5 changed files with 88 additions and 56 deletions

View File

@ -479,8 +479,8 @@ rule prepare_network:
gaslimit=config["electricity"].get("gaslimit"),
max_hours=config["electricity"]["max_hours"],
costs=config["costs"],
snapshot_opts=config.get("snapshot_opts",{}),
autarky=config["electricity"].get("autarky",{}),
snapshot_opts=config.get("snapshot_opts", {}),
autarky=config["electricity"].get("autarky", {}),
input:
RESOURCES + "networks/elec_s{simpl}_{clusters}_ec.nc",
tech_costs=COSTS,

View File

@ -718,7 +718,7 @@ rule prepare_sector_network:
countries=config["countries"],
emissions_scope=config["energy"]["emissions"],
eurostat_report_year=config["energy"]["eurostat_report_year"],
snapshot_opts=config.get("snapshot_opts",{}),
snapshot_opts=config.get("snapshot_opts", {}),
RDIR=RDIR,
input:
**build_retro_cost_output,

View File

@ -6,8 +6,8 @@
import contextlib
import logging
import os
import urllib
import re
import urllib
from pathlib import Path
import pandas as pd
@ -25,6 +25,7 @@ REGION_COLS = ["geometry", "name", "x", "y", "country"]
def get_opt(opts, expr, flags=None):
"""
Return the first option matching the regular expression.
The regular expression is case-insensitive by default.
"""
if flags is None:
@ -35,6 +36,7 @@ def get_opt(opts, expr, flags=None):
return match.group(0)
return None
# Define a context manager to temporarily mute print statements
@contextlib.contextmanager
def mute_print():

View File

@ -71,6 +71,7 @@ idx = pd.IndexSlice
logger = logging.getLogger(__name__)
def find_opt(opts, expr):
"""
Return if available the float after the expression.
@ -84,6 +85,7 @@ def find_opt(opts, expr):
return True, None
return False, None
def add_co2limit(n, co2limit, Nyears=1.0):
n.add(
"GlobalConstraint",
@ -308,45 +310,57 @@ if __name__ == "__main__":
)
set_line_s_max_pu(n, snakemake.params.lines["s_max_pu"])
# temporal averaging
nhours_opts_config = snakemake.params.snapshot_opts.get("average_every_nhours",{})
nhours_enable_config = nhours_opts_config.get("enable",None)
nhours_config = str(nhours_opts_config.get("hour",None)) + "h"
nhours_opts_config = snakemake.params.snapshot_opts.get("average_every_nhours", {})
nhours_enable_config = nhours_opts_config.get("enable", None)
nhours_config = str(nhours_opts_config.get("hour", None)) + "h"
nhours_wildcard = get_opt(opts, r"^\d+h$")
if nhours_wildcard is not None or (nhours_enable_config and nhours_config is not None):
if nhours_wildcard is not None or (
nhours_enable_config and nhours_config is not None
):
nhours = nhours_wildcard or nhours_config
n = average_every_nhours(n, nhours)
# segments with package tsam
time_seg_opts_config = snakemake.params.snapshot_opts.get("time_segmentation",{})
time_seg_enable_config = nhours_opts_config.get("enable",None)
time_seg_config = str(nhours_opts_config.get("hour",None)) + "seg"
time_seg_opts_config = snakemake.params.snapshot_opts.get("time_segmentation", {})
time_seg_enable_config = nhours_opts_config.get("enable", None)
time_seg_config = str(nhours_opts_config.get("hour", None)) + "seg"
time_seg_wildcard = get_opt(opts, r"^\d+seg$")
if time_seg_wildcard is not None or (time_seg_enable_config and time_seg_config is not None):
if time_seg_wildcard is not None or (
time_seg_enable_config and time_seg_config is not None
):
time_seg = time_seg_wildcard or time_seg_config
solver_name = snakemake.config["solving"]["solver"]["name"]
n = apply_time_segmentation(n, time_seg, solver_name)
Co2L_config = snakemake.params.co2limit_enable and isinstance(snakemake.params.co2limit,float)
Co2L_config = snakemake.params.co2limit_enable and isinstance(
snakemake.params.co2limit, float
)
Co2L_wildcard, co2limit_wildcard = find_opt(opts, "Co2L")
if Co2L_wildcard or Co2L_config:
if co2limit_wildcard is not None: # TODO: what if you wat to determine the factor through the wildcard?
if (
co2limit_wildcard is not None
): # TODO: what if you wat to determine the factor through the wildcard?
co2limit = co2limit_wildcard * snakemake.params.co2base
add_co2limit(n, co2limit, Nyears)
logger.info("Setting CO2 limit according to wildcard value.")
else:
else:
add_co2limit(n, snakemake.params.co2limit, Nyears)
logger.info("Setting CO2 limit according to config value.")
CH4L_config = snakemake.params.gaslimit_enable and isinstance(snakemake.params.gaslimit,float)
CH4L_config = snakemake.params.gaslimit_enable and isinstance(
snakemake.params.gaslimit, float
)
CH4L_wildcard, gaslimit_wildcard = find_opt(opts, "CH4L")
if CH4L_wildcard or CH4L_config:
if gaslimit_wildcard is not None: # TODO: what if you wat to determine the factor through the wildcard?
if (
gaslimit_wildcard is not None
): # TODO: what if you wat to determine the factor through the wildcard?
gaslimit = gaslimit_wildcard * 1e6
add_gaslimit(n, gaslimit, Nyears)
logger.info("Setting gas usage limit according to wildcard value.")
else:
else:
add_gaslimit(n, snakemake.params.gaslimit, Nyears)
logger.info("Setting gas usage limit according to config value.")
@ -369,7 +383,7 @@ if __name__ == "__main__":
sel = c.df.carrier.str.contains(carrier)
c.df.loc[sel, attr] *= factor
Ept_config = snakemake.params.costs.get("enable",{}).get("monthly_prices", False)
Ept_config = snakemake.params.costs.get("enable", {}).get("monthly_prices", False)
for o in opts:
if "Ept" in o or Ept_config:
logger.info(
@ -378,13 +392,13 @@ if __name__ == "__main__":
add_dynamic_emission_prices(n)
Ept_config = True
Ep_config = snakemake.params.costs.get("enable",{}).get("emission_prices", False)
Ep_config = snakemake.params.costs.get("enable", {}).get("emission_prices", False)
Ep_wildcard, co2_wildcard = find_opt(opts, "Ep")
if (Ep_wildcard or Ep_config) and not Ept_config:
if co2_wildcard is not None:
logger.info("Setting emission prices according to wildcard value.")
add_emission_prices(n, dict(co2=co2_wildcard))
else:
else:
logger.info("Setting emission prices according to config value.")
add_emission_prices(n, snakemake.params.costs["emission_prices"])

View File

@ -17,7 +17,7 @@ import numpy as np
import pandas as pd
import pypsa
import xarray as xr
from _helpers import generate_periodic_profiles, update_config_with_sector_opts, get_opt
from _helpers import generate_periodic_profiles, get_opt, update_config_with_sector_opts
from add_electricity import calculate_annuity, sanitize_carriers
from build_energy_totals import build_co2_totals, build_eea_co2, build_eurostat_co2
from networkx.algorithms import complement
@ -161,11 +161,11 @@ spatial = SimpleNamespace()
def emission_sectors_from_opts(opts):
sectors = ["electricity"]
if "T" in opts or opts_config.get("land_transport",False):
if "T" in opts or opts_config.get("land_transport", False):
sectors += ["rail non-elec", "road non-elec"]
if "H" in opts or opts_config.get("heating",False):
if "H" in opts or opts_config.get("heating", False):
sectors += ["residential non-elec", "services non-elec"]
if "I" in opts or opts_config.get("industry",False):
if "I" in opts or opts_config.get("industry", False):
sectors += [
"industrial non-elec",
"industrial processes",
@ -175,9 +175,13 @@ def emission_sectors_from_opts(opts):
"international navigation",
]
heat_and_industry = opts_config.get("industry",False) and opts_config.get("heating",False)
heat_and_industry = opts_config.get("industry", False) and opts_config.get(
"heating", False
)
if ("I" in opts and "H" in opts and"A" in opts) or (heat_and_industry and opts_config.get("agriculture_machinery",False)):
if ("I" in opts and "H" in opts and "A" in opts) or (
heat_and_industry and opts_config.get("agriculture_machinery", False)
):
sectors += ["agriculture"]
return sectors
@ -3260,21 +3264,25 @@ def set_temporal_aggregation(n, opts, solver_name):
Aggregate network temporally.
"""
# temporal averaging
nhours_opts_config = snakemake.params.snapshot_opts.get("average_every_nhours",{})
nhours_enable_config = nhours_opts_config.get("enable",None)
nhours_config = str(nhours_opts_config.get("hour",None)) + "H"
nhours_opts_config = snakemake.params.snapshot_opts.get("average_every_nhours", {})
nhours_enable_config = nhours_opts_config.get("enable", None)
nhours_config = str(nhours_opts_config.get("hour", None)) + "H"
nhours_wildcard = get_opt(opts, r"^\d+h$")
if nhours_wildcard is not None or (nhours_enable_config and nhours_config is not None):
if nhours_wildcard is not None or (
nhours_enable_config and nhours_config is not None
):
nhours = nhours_wildcard or nhours_config
n = average_every_nhours(n, nhours)
return n
# representative snapshots
snapshots_opts_config = snakemake.params.snapshot_opts.get("set_snapshots",{})
snapshots_enable_config = snapshots_opts_config.get("enable",None)
snapshots_config = snapshots_opts_config.get("hour",None)
snapshots_opts_config = snakemake.params.snapshot_opts.get("set_snapshots", {})
snapshots_enable_config = snapshots_opts_config.get("enable", None)
snapshots_config = snapshots_opts_config.get("hour", None)
snapshots_wildcard = get_opt(opts, r"(^\d+)sn$")
if snapshots_wildcard is not None or (snapshots_enable_config and snapshots_config is not None):
if snapshots_wildcard is not None or (
snapshots_enable_config and snapshots_config is not None
):
sn = int(snapshots_wildcard[:-2]) or snapshots_config
logger.info(f"Use every {sn} snapshot as representative")
n.set_snapshots(n.snapshots[::sn])
@ -3282,11 +3290,13 @@ def set_temporal_aggregation(n, opts, solver_name):
return n
# segments with package tsam
time_seg_opts_config = snakemake.params.snapshot_opts.get("time_segmentation",{})
time_seg_enable_config = nhours_opts_config.get("enable",None)
time_seg_config = nhours_opts_config.get("hour",None)
time_seg_opts_config = snakemake.params.snapshot_opts.get("time_segmentation", {})
time_seg_enable_config = nhours_opts_config.get("enable", None)
time_seg_config = nhours_opts_config.get("hour", None)
time_seg_wildcard = get_opt(opts, r"^(\d+)seg$")
if time_seg_wildcard is not None or (time_seg_enable_config and time_seg_config is not None):
if time_seg_wildcard is not None or (
time_seg_enable_config and time_seg_config is not None
):
segments = int(time_seg_wildcard[:-3]) or time_seg_config
logger.info(f"Use temporal segmentation with {segments} segments")
n = apply_time_segmentation(n, segments, solver_name=solver_name)
@ -3320,7 +3330,9 @@ if __name__ == "__main__":
opts_config = snakemake.params.enable_sector
heat_and_industry = opts_config.get("industry",False) and opts_config.get("heating",False)
heat_and_industry = opts_config.get("industry", False) and opts_config.get(
"heating", False
)
investment_year = int(snakemake.wildcards.planning_horizons[-4:])
@ -3359,53 +3371,57 @@ if __name__ == "__main__":
# TODO merge with opts cost adjustment below
for o in opts:
if o[:4] == "wave": # TODO: add config wildcard options or depreciated?
if o[:4] == "wave": # TODO: add config wildcard options or depreciated?
wave_cost_factor = float(o[4:].replace("p", ".").replace("m", "-"))
logger.info(
f"Including wave generators with cost factor of {wave_cost_factor}"
)
add_wave(n, wave_cost_factor)
if o[:4] == "dist": # TODO: add config wildcard options
if o[:4] == "dist": # TODO: add config wildcard options
options["electricity_distribution_grid"] = True
options["electricity_distribution_grid_cost_factor"] = float(
o[4:].replace("p", ".").replace("m", "-")
)
for o in opts:
if o == "biomasstransport" or opts_config.get("biomass_transport",False):
if o == "biomasstransport" or opts_config.get("biomass_transport", False):
options["biomass_transport"] = True
break
if "nodistrict" in opts or opts_config.get("no_heat_district",False):
if "nodistrict" in opts or opts_config.get("no_heat_district", False):
options["district_heating"]["progress"] = 0.0
if "T" in opts or opts_config.get("land_transport",False):
if "T" in opts or opts_config.get("land_transport", False):
add_land_transport(n, costs)
if "H" in opts or opts_config.get("heating",False):
if "H" in opts or opts_config.get("heating", False):
add_heat(n, costs)
if "B" in opts or opts_config.get("biomass",False):
if "B" in opts or opts_config.get("biomass", False):
add_biomass(n, costs)
if options["ammonia"]:
add_ammonia(n, costs)
if "I" in opts or opts_config.get("industry",False):
if "I" in opts or opts_config.get("industry", False):
add_industry(n, costs)
if ("I" in opts and "H" in opts) or (heat_and_industry and opts_config.get("waste_heat",False)):
if ("I" in opts and "H" in opts) or (
heat_and_industry and opts_config.get("waste_heat", False)
):
add_waste_heat(n)
if ("I" in opts and "H" in opts and"A" in opts) or (heat_and_industry and opts_config.get("agriculture_machinery",False)): # requires H and I
if ("I" in opts and "H" in opts and "A" in opts) or (
heat_and_industry and opts_config.get("agriculture_machinery", False)
): # requires H and I
add_agriculture(n, costs)
if options["dac"]:
add_dac(n, costs)
if "decentral" in opts or opts_config.get("decentral",False):
if "decentral" in opts or opts_config.get("decentral", False):
decentral(n)
if "noH2network" in opts or opts_config.get("noH2network",False):
if "noH2network" in opts or opts_config.get("noH2network", False):
remove_h2_network(n)
if options["co2network"]:
@ -3420,7 +3436,7 @@ if __name__ == "__main__":
limit_type = "config"
limit = get(snakemake.params.co2_budget, investment_year)
for o in opts:
if "cb" not in o or opts_config.get("carbon_budget",False) is False:
if "cb" not in o or opts_config.get("carbon_budget", False) is False:
continue
limit_type = "carbon budget"
fn = "results/" + snakemake.params.RDIR + "csvs/carbon_budget_distribution.csv"
@ -3440,7 +3456,7 @@ if __name__ == "__main__":
limit = co2_cap.loc[investment_year]
break
for o in opts:
if "Co2L" not in o or opts_config.get("co2limit_sector",False) is False:
if "Co2L" not in o or opts_config.get("co2limit_sector", False) is False:
continue
limit_type = "wildcard"
limit = o[o.find("Co2L") + 4 :]
@ -3449,7 +3465,7 @@ if __name__ == "__main__":
logger.info(f"Add CO2 limit from {limit_type}")
add_co2limit(n, nyears, limit)
for o in opts: # TODO: add config wildcard options or depreciated?
for o in opts: # TODO: add config wildcard options or depreciated?
if not o[:10] == "linemaxext":
continue
maxext = float(o[10:]) * 1e3