diff --git a/scripts/_helpers.py b/scripts/_helpers.py new file mode 100644 index 00000000..77a694df --- /dev/null +++ b/scripts/_helpers.py @@ -0,0 +1,288 @@ +import pandas as pd +from pathlib import Path + +def mock_snakemake(rulename, **wildcards): + """ + This function is expected to be executed from the 'scripts'-directory of ' + the snakemake project. It returns a snakemake.script.Snakemake object, + based on the Snakefile. + + If a rule has wildcards, you have to specify them in **wildcards. + + Parameters + ---------- + rulename: str + name of the rule for which the snakemake object should be generated + **wildcards: + keyword arguments fixing the wildcards. Only necessary if wildcards are + needed. + """ + import snakemake as sm + import os + from pypsa.descriptors import Dict + from snakemake.script import Snakemake + + script_dir = Path(__file__).parent.resolve() + assert Path.cwd().resolve() == script_dir, \ + f'mock_snakemake has to be run from the repository scripts directory {script_dir}' + os.chdir(script_dir.parent) + for p in sm.SNAKEFILE_CHOICES: + if os.path.exists(p): + snakefile = p + break + workflow = sm.Workflow(snakefile) + workflow.include(snakefile) + workflow.global_resources = {} + rule = workflow.get_rule(rulename) + dag = sm.dag.DAG(workflow, rules=[rule]) + wc = Dict(wildcards) + job = sm.jobs.Job(rule, dag, wc) + + def make_accessable(*ios): + for io in ios: + for i in range(len(io)): + io[i] = os.path.abspath(io[i]) + + make_accessable(job.input, job.output, job.log) + snakemake = Snakemake(job.input, job.output, job.params, job.wildcards, + job.threads, job.resources, job.log, + job.dag.workflow.config, job.rule.name, None,) + # create log and output dir if not existent + for path in list(snakemake.log) + list(snakemake.output): + Path(path).parent.mkdir(parents=True, exist_ok=True) + + os.chdir(script_dir) + return snakemake + + +def configure_logging(snakemake, skip_handlers=False): + """ + Configure the basic behaviour for the logging module. + + Note: Must only be called once from the __main__ section of a script. + + The setup includes printing log messages to STDERR and to a log file defined + by either (in priority order): snakemake.log.python, snakemake.log[0] or "logs/{rulename}.log". + Additional keywords from logging.basicConfig are accepted via the snakemake configuration + file under snakemake.config.logging. + + Parameters + ---------- + snakemake : snakemake object + Your snakemake object containing a snakemake.config and snakemake.log. + skip_handlers : True | False (default) + Do (not) skip the default handlers created for redirecting output to STDERR and file. + """ + + import logging + + kwargs = snakemake.config.get('logging', dict()) + kwargs.setdefault("level", "INFO") + + if skip_handlers is False: + fallback_path = Path(__file__).parent.joinpath( + '..', 'logs', f"{snakemake.rule}.log") + logfile = snakemake.log.get('python', snakemake.log[0] if snakemake.log + else fallback_path) + kwargs.update( + {'handlers': [ + # Prefer the 'python' log, otherwise take the first log for each + # Snakemake rule + logging.FileHandler(logfile), + logging.StreamHandler() + ] + }) + logging.basicConfig(**kwargs) + + +def pdbcast(v, h): + return pd.DataFrame(v.values.reshape((-1, 1)) * h.values, + index=v.index, columns=h.index) + + +def load_network(fn, tech_costs, config, combine_hydro_ps=True): + import pypsa + from add_electricity import update_transmission_costs, load_costs + + opts = config['plotting'] + + n = pypsa.Network(fn) + + n.loads["carrier"] = n.loads.bus.map(n.buses.carrier) + " load" + n.stores["carrier"] = n.stores.bus.map(n.buses.carrier) + + n.links["carrier"] = ( + n.links.bus0.map( + n.buses.carrier) + + "-" + + n.links.bus1.map( + n.buses.carrier)) + n.lines["carrier"] = "AC line" + n.transformers["carrier"] = "AC transformer" + + n.lines['s_nom'] = n.lines['s_nom_min'] + n.links['p_nom'] = n.links['p_nom_min'] + + if combine_hydro_ps: + n.storage_units.loc[n.storage_units.carrier.isin( + {'PHS', 'hydro'}), 'carrier'] = 'hydro+PHS' + + # #if the carrier was not set on the heat storage units + # bus_carrier = n.storage_units.bus.map(n.buses.carrier) + # n.storage_units.loc[bus_carrier == "heat","carrier"] = "water tanks" + + Nyears = n.snapshot_weightings.sum() / 8760. + costs = load_costs( + Nyears, + tech_costs, + config['costs'], + config['electricity']) + update_transmission_costs(n, costs) + + return n + + +def aggregate_p_nom(n): + return pd.concat([ + n.generators.groupby("carrier").p_nom_opt.sum(), + n.storage_units.groupby("carrier").p_nom_opt.sum(), + n.links.groupby("carrier").p_nom_opt.sum(), + n.loads_t.p.groupby(n.loads.carrier, axis=1).sum().mean() + ]) + + +def aggregate_p(n): + return pd.concat([ + n.generators_t.p.sum().groupby(n.generators.carrier).sum(), + n.storage_units_t.p.sum().groupby(n.storage_units.carrier).sum(), + n.stores_t.p.sum().groupby(n.stores.carrier).sum(), + -n.loads_t.p.sum().groupby(n.loads.carrier).sum() + ]) + + +def aggregate_e_nom(n): + return pd.concat( + [ + (n.storage_units["p_nom_opt"] * + n.storage_units["max_hours"]).groupby( + n.storage_units["carrier"]).sum(), + n.stores["e_nom_opt"].groupby( + n.stores.carrier).sum()]) + + +def aggregate_p_curtailed(n): + return pd.concat( + [ + ((n.generators_t.p_max_pu.sum().multiply( + n.generators.p_nom_opt) - + n.generators_t.p.sum()) .groupby( + n.generators.carrier).sum()), + ((n.storage_units_t.inflow.sum() - + n.storage_units_t.p.sum()) .groupby( + n.storage_units.carrier).sum())]) + + +def aggregate_costs(n, flatten=False, opts=None, existing_only=False): + from six import iterkeys, itervalues + + components = dict(Link=("p_nom", "p0"), + Generator=("p_nom", "p"), + StorageUnit=("p_nom", "p"), + Store=("e_nom", "p"), + Line=("s_nom", None), + Transformer=("s_nom", None)) + + costs = {} + for c, (p_nom, p_attr) in zip( + n.iterate_components(iterkeys(components), skip_empty=False), + itervalues(components) + ): + if not existing_only: + p_nom += "_opt" + costs[(c.list_name, 'capital')] = (c.df[p_nom] * \ + c.df.capital_cost).groupby(c.df.carrier).sum() + if p_attr is not None: + p = c.pnl[p_attr].sum() + if c.name == 'StorageUnit': + p = p.loc[p > 0] + costs[(c.list_name, 'marginal')] = ( + p * c.df.marginal_cost).groupby(c.df.carrier).sum() + costs = pd.concat(costs) + + if flatten: + assert opts is not None + conv_techs = opts['conv_techs'] + + costs = costs.reset_index(level=0, drop=True) + costs = costs['capital'].add( + costs['marginal'].rename({t: t + ' marginal' for t in conv_techs}), + fill_value=0. + ) + + return costs + + +def progress_retrieve(url, file): + import urllib + from progressbar import ProgressBar + + pbar = ProgressBar(0, 100) + + def dlProgress(count, blockSize, totalSize): + pbar.update(int(count * blockSize * 100 / totalSize)) + + urllib.request.urlretrieve(url, file, reporthook=dlProgress) + + +def mock_snakemake(rulename, **wildcards): + """ + This function is expected to be executed from the 'scripts'-directory of ' + the snakemake project. It returns a snakemake.script.Snakemake object, + based on the Snakefile. + + If a rule has wildcards, you have to specify them in **wildcards. + + Parameters + ---------- + rulename: str + name of the rule for which the snakemake object should be generated + **wildcards: + keyword arguments fixing the wildcards. Only necessary if wildcards are + needed. + """ + import snakemake as sm + import os + from pypsa.descriptors import Dict + from snakemake.script import Snakemake + + script_dir = Path(__file__).parent.resolve() + assert Path.cwd().resolve() == script_dir, \ + f'mock_snakemake has to be run from the repository scripts directory {script_dir}' + os.chdir(script_dir.parent) + for p in sm.SNAKEFILE_CHOICES: + if os.path.exists(p): + snakefile = p + break + workflow = sm.Workflow(snakefile) + workflow.include(snakefile) + workflow.global_resources = {} + rule = workflow.get_rule(rulename) + dag = sm.dag.DAG(workflow, rules=[rule]) + wc = Dict(wildcards) + job = sm.jobs.Job(rule, dag, wc) + + def make_accessable(*ios): + for io in ios: + for i in range(len(io)): + io[i] = os.path.abspath(io[i]) + + make_accessable(job.input, job.output, job.log) + snakemake = Snakemake(job.input, job.output, job.params, job.wildcards, + job.threads, job.resources, job.log, + job.dag.workflow.config, job.rule.name, None,) + # create log and output dir if not existent + for path in list(snakemake.log) + list(snakemake.output): + Path(path).parent.mkdir(parents=True, exist_ok=True) + + os.chdir(script_dir) + return snakemake