# -*- coding: utf-8 -*- import contextlib import logging import os import sys from pathlib import Path import pandas as pd import pytz import yaml from pypsa.components import component_attrs, components from pypsa.descriptors import Dict from snakemake.utils import update_config logger = logging.getLogger(__name__) # Define a context manager to temporarily mute print statements @contextlib.contextmanager def mute_print(): with open(os.devnull, "w") as devnull: with contextlib.redirect_stdout(devnull): yield def override_component_attrs(directory): """Tell PyPSA that links can have multiple outputs by overriding the component_attrs. This can be done for as many buses as you need with format busi for i = 2,3,4,5,.... See https://pypsa.org/doc/components.html#link-with-multiple-outputs-or-inputs Parameters ---------- directory : string Folder where component attributes to override are stored analogous to ``pypsa/component_attrs``, e.g. `links.csv`. Returns ------- Dictionary of overridden component attributes. """ attrs = Dict({k: v.copy() for k, v in component_attrs.items()}) for component, list_name in components.list_name.items(): fn = f"{directory}/{list_name}.csv" if os.path.isfile(fn): overrides = pd.read_csv(fn, index_col=0, na_values="n/a") attrs[component] = overrides.combine_first(attrs[component]) return attrs # from pypsa-eur/_helpers.py def mock_snakemake(rulename, **wildcards): """ This function is expected to be executed from the 'scripts'-directory of ' the snakemake project. It returns a snakemake.script.Snakemake object, based on the Snakefile. If a rule has wildcards, you have to specify them in **wildcards. Parameters ---------- rulename: str name of the rule for which the snakemake object should be generated **wildcards: keyword arguments fixing the wildcards. Only necessary if wildcards are needed. """ import os import snakemake as sm from packaging.version import Version, parse from pypsa.descriptors import Dict from snakemake.script import Snakemake script_dir = Path(__file__).parent.resolve() assert ( Path.cwd().resolve() == script_dir ), f"mock_snakemake has to be run from the repository scripts directory {script_dir}" os.chdir(script_dir.parent) for p in sm.SNAKEFILE_CHOICES: if os.path.exists(p): snakefile = p break kwargs = dict(rerun_triggers=[]) if parse(sm.__version__) > Version("7.7.0") else {} workflow = sm.Workflow(snakefile, overwrite_configfiles=[], **kwargs) workflow.include(snakefile) workflow.global_resources = {} rule = workflow.get_rule(rulename) dag = sm.dag.DAG(workflow, rules=[rule]) wc = Dict(wildcards) job = sm.jobs.Job(rule, dag, wc) def make_accessable(*ios): for io in ios: for i in range(len(io)): io[i] = os.path.abspath(io[i]) make_accessable(job.input, job.output, job.log) snakemake = Snakemake( job.input, job.output, job.params, job.wildcards, job.threads, job.resources, job.log, job.dag.workflow.config, job.rule.name, None, ) # create log and output dir if not existent for path in list(snakemake.log) + list(snakemake.output): Path(path).parent.mkdir(parents=True, exist_ok=True) os.chdir(script_dir) return snakemake # from pypsa-eur/_helpers.py def progress_retrieve(url, file): import urllib from progressbar import ProgressBar pbar = ProgressBar(0, 100) def dlProgress(count, blockSize, totalSize): pbar.update(int(count * blockSize * 100 / totalSize)) urllib.request.urlretrieve(url, file, reporthook=dlProgress) def generate_periodic_profiles(dt_index, nodes, weekly_profile, localize=None): """ Give a 24*7 long list of weekly hourly profiles, generate this for each country for the period dt_index, taking account of time zones and summer time. """ weekly_profile = pd.Series(weekly_profile, range(24 * 7)) week_df = pd.DataFrame(index=dt_index, columns=nodes) for node in nodes: timezone = pytz.timezone(pytz.country_timezones[node[:2]][0]) tz_dt_index = dt_index.tz_convert(timezone) week_df[node] = [24 * dt.weekday() + dt.hour for dt in tz_dt_index] week_df[node] = week_df[node].map(weekly_profile) week_df = week_df.tz_localize(localize) return week_df def parse(l): if len(l) == 1: return yaml.safe_load(l[0]) else: return {l.pop(0): parse(l)} def update_config_with_sector_opts(config, sector_opts): for o in sector_opts.split("-"): if o.startswith("CF+"): l = o.split("+")[1:] update_config(config, parse(l))