# -*- coding: utf-8 -*- # SPDX-FileCopyrightText: : 2017-2023 The PyPSA-Eur Authors # # SPDX-License-Identifier: MIT import contextlib import logging import os import re import urllib from functools import partial from pathlib import Path import pandas as pd import pytz import yaml from snakemake.utils import update_config from tqdm import tqdm logger = logging.getLogger(__name__) REGION_COLS = ["geometry", "name", "x", "y", "country"] def get_run_path(fn, dir, rdir, shared_resources): """ Dynamically provide paths based on shared resources and filename. Use this function for snakemake rule inputs or outputs that should be optionally shared across runs or created individually for each run. Parameters ---------- fn : str The filename for the path to be generated. dir : str The base directory. rdir : str Relative directory for non-shared resources. shared_resources : str, list, or bool Specifies which resources should be shared. - If string or list, assumed to be superset of wildcards for sharing. - If "base", special handling for shared "base" resources. - If boolean, directly specifies if the resource is shared. Returns ------- str Full path where the resource should be stored. Notes ----- Special case for "base" allows no wildcards other than "technology" and excludes filenames starting with "networks/elec" or "add_electricity". """ pattern = r"\{([^{}]+)\}" existing_wildcards = list(re.findall(pattern, fn)) if shared_resources == "base": # special case for shared "base" resources no_relevant_wildcards = not len(set(existing_wildcards) - {"technology"}) no_elec_rule = not fn.startswith("networks/elec") and not fn.startswith( "add_electricity" ) is_shared = no_relevant_wildcards and no_elec_rule elif isinstance(shared_resources, (str, list)): if isinstance(shared_resources, str): shared_resources = [shared_resources] is_shared = set(existing_wildcards).issubset(shared_resources) else: is_shared = shared_resources if is_shared: return f"{dir}{fn}" else: return f"{dir}{rdir}{fn}" def path_provider(dir, rdir, shared_resources): """ Returns a partial function that dynamically provides paths based on shared resources and the filename. Returns ------- partial function A partial function that takes a filename as input and returns the path to the file based on the shared_resources parameter. """ return partial(get_run_path, dir=dir, rdir=rdir, shared_resources=shared_resources) # Define a context manager to temporarily mute print statements @contextlib.contextmanager def mute_print(): with open(os.devnull, "w") as devnull: with contextlib.redirect_stdout(devnull): yield def set_scenario_config(snakemake): scenario = snakemake.config["run"].get("scenarios", {}) if scenario.get("enable") and "run" in snakemake.wildcards.keys(): try: with open(scenario["file"], "r") as f: scenario_config = yaml.safe_load(f) except FileNotFoundError: # fallback for mock_snakemake script_dir = Path(__file__).parent.resolve() root_dir = script_dir.parent with open(root_dir / scenario["file"], "r") as f: scenario_config = yaml.safe_load(f) update_config(snakemake.config, scenario_config[snakemake.wildcards.run]) def configure_logging(snakemake, skip_handlers=False): """ Configure the basic behaviour for the logging module. Note: Must only be called once from the __main__ section of a script. The setup includes printing log messages to STDERR and to a log file defined by either (in priority order): snakemake.log.python, snakemake.log[0] or "logs/{rulename}.log". Additional keywords from logging.basicConfig are accepted via the snakemake configuration file under snakemake.config.logging. Parameters ---------- snakemake : snakemake object Your snakemake object containing a snakemake.config and snakemake.log. skip_handlers : True | False (default) Do (not) skip the default handlers created for redirecting output to STDERR and file. """ import logging kwargs = snakemake.config.get("logging", dict()).copy() kwargs.setdefault("level", "INFO") if skip_handlers is False: fallback_path = Path(__file__).parent.joinpath( "..", "logs", f"{snakemake.rule}.log" ) logfile = snakemake.log.get( "python", snakemake.log[0] if snakemake.log else fallback_path ) kwargs.update( { "handlers": [ # Prefer the 'python' log, otherwise take the first log for each # Snakemake rule logging.FileHandler(logfile), logging.StreamHandler(), ] } ) logging.basicConfig(**kwargs) def update_p_nom_max(n): # if extendable carriers (solar/onwind/...) have capacity >= 0, # e.g. existing assets from the OPSD project are included to the network, # the installed capacity might exceed the expansion limit. # Hence, we update the assumptions. n.generators.p_nom_max = n.generators[["p_nom_min", "p_nom_max"]].max(1) def aggregate_p_nom(n): return pd.concat( [ n.generators.groupby("carrier").p_nom_opt.sum(), n.storage_units.groupby("carrier").p_nom_opt.sum(), n.links.groupby("carrier").p_nom_opt.sum(), n.loads_t.p.groupby(n.loads.carrier, axis=1).sum().mean(), ] ) def aggregate_p(n): return pd.concat( [ n.generators_t.p.sum().groupby(n.generators.carrier).sum(), n.storage_units_t.p.sum().groupby(n.storage_units.carrier).sum(), n.stores_t.p.sum().groupby(n.stores.carrier).sum(), -n.loads_t.p.sum().groupby(n.loads.carrier).sum(), ] ) def aggregate_e_nom(n): return pd.concat( [ (n.storage_units["p_nom_opt"] * n.storage_units["max_hours"]) .groupby(n.storage_units["carrier"]) .sum(), n.stores["e_nom_opt"].groupby(n.stores.carrier).sum(), ] ) def aggregate_p_curtailed(n): return pd.concat( [ ( ( n.generators_t.p_max_pu.sum().multiply(n.generators.p_nom_opt) - n.generators_t.p.sum() ) .groupby(n.generators.carrier) .sum() ), ( (n.storage_units_t.inflow.sum() - n.storage_units_t.p.sum()) .groupby(n.storage_units.carrier) .sum() ), ] ) def aggregate_costs(n, flatten=False, opts=None, existing_only=False): components = dict( Link=("p_nom", "p0"), Generator=("p_nom", "p"), StorageUnit=("p_nom", "p"), Store=("e_nom", "p"), Line=("s_nom", None), Transformer=("s_nom", None), ) costs = {} for c, (p_nom, p_attr) in zip( n.iterate_components(components.keys(), skip_empty=False), components.values() ): if c.df.empty: continue if not existing_only: p_nom += "_opt" costs[(c.list_name, "capital")] = ( (c.df[p_nom] * c.df.capital_cost).groupby(c.df.carrier).sum() ) if p_attr is not None: p = c.pnl[p_attr].sum() if c.name == "StorageUnit": p = p.loc[p > 0] costs[(c.list_name, "marginal")] = ( (p * c.df.marginal_cost).groupby(c.df.carrier).sum() ) costs = pd.concat(costs) if flatten: assert opts is not None conv_techs = opts["conv_techs"] costs = costs.reset_index(level=0, drop=True) costs = costs["capital"].add( costs["marginal"].rename({t: t + " marginal" for t in conv_techs}), fill_value=0.0, ) return costs def progress_retrieve(url, file, disable=False): if disable: urllib.request.urlretrieve(url, file) else: with tqdm(unit="B", unit_scale=True, unit_divisor=1024, miniters=1) as t: def update_to(b=1, bsize=1, tsize=None): if tsize is not None: t.total = tsize t.update(b * bsize - t.n) urllib.request.urlretrieve(url, file, reporthook=update_to) def mock_snakemake(rulename, configfiles=[], **wildcards): """ This function is expected to be executed from the 'scripts'-directory of ' the snakemake project. It returns a snakemake.script.Snakemake object, based on the Snakefile. If a rule has wildcards, you have to specify them in **wildcards. Parameters ---------- rulename: str name of the rule for which the snakemake object should be generated configfiles: list, str list of configfiles to be used to update the config **wildcards: keyword arguments fixing the wildcards. Only necessary if wildcards are needed. """ import os import snakemake as sm from packaging.version import Version, parse from pypsa.descriptors import Dict from snakemake.script import Snakemake script_dir = Path(__file__).parent.resolve() root_dir = script_dir.parent user_in_script_dir = Path.cwd().resolve() == script_dir if user_in_script_dir: os.chdir(root_dir) elif Path.cwd().resolve() != root_dir: raise RuntimeError( "mock_snakemake has to be run from the repository root" f" {root_dir} or scripts directory {script_dir}" ) try: for p in sm.SNAKEFILE_CHOICES: if os.path.exists(p): snakefile = p break kwargs = ( dict(rerun_triggers=[]) if parse(sm.__version__) > Version("7.7.0") else {} ) if isinstance(configfiles, str): configfiles = [configfiles] workflow = sm.Workflow(snakefile, overwrite_configfiles=configfiles, **kwargs) workflow.include(snakefile) if configfiles: for f in configfiles: if not os.path.exists(f): raise FileNotFoundError(f"Config file {f} does not exist.") workflow.configfile(f) workflow.global_resources = {} rule = workflow.get_rule(rulename) dag = sm.dag.DAG(workflow, rules=[rule]) wc = Dict(wildcards) job = sm.jobs.Job(rule, dag, wc) def make_accessable(*ios): for io in ios: for i in range(len(io)): io[i] = os.path.abspath(io[i]) make_accessable(job.input, job.output, job.log) snakemake = Snakemake( job.input, job.output, job.params, job.wildcards, job.threads, job.resources, job.log, job.dag.workflow.config, job.rule.name, None, ) # create log and output dir if not existent for path in list(snakemake.log) + list(snakemake.output): Path(path).parent.mkdir(parents=True, exist_ok=True) finally: if user_in_script_dir: os.chdir(script_dir) return snakemake def generate_periodic_profiles(dt_index, nodes, weekly_profile, localize=None): """ Give a 24*7 long list of weekly hourly profiles, generate this for each country for the period dt_index, taking account of time zones and summer time. """ weekly_profile = pd.Series(weekly_profile, range(24 * 7)) week_df = pd.DataFrame(index=dt_index, columns=nodes) for node in nodes: timezone = pytz.timezone(pytz.country_timezones[node[:2]][0]) tz_dt_index = dt_index.tz_convert(timezone) week_df[node] = [24 * dt.weekday() + dt.hour for dt in tz_dt_index] week_df[node] = week_df[node].map(weekly_profile) week_df = week_df.tz_localize(localize) return week_df def parse(l): if len(l) == 1: return yaml.safe_load(l[0]) else: return {l.pop(0): parse(l)} def update_config_with_sector_opts(config, sector_opts): from snakemake.utils import update_config for o in sector_opts.split("-"): if o.startswith("CF+"): l = o.split("+")[1:] update_config(config, parse(l))