pypsa-eur/scripts/_helpers.py
2023-01-23 21:27:20 +00:00

348 lines
11 KiB
Python

# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: : 2017-2022 The PyPSA-Eur Authors
#
# SPDX-License-Identifier: MIT
from pathlib import Path
import pandas as pd
REGION_COLS = ["geometry", "name", "x", "y", "country"]
def configure_logging(snakemake, skip_handlers=False):
"""
Configure the basic behaviour for the logging module.
Note: Must only be called once from the __main__ section of a script.
The setup includes printing log messages to STDERR and to a log file defined
by either (in priority order): snakemake.log.python, snakemake.log[0] or "logs/{rulename}.log".
Additional keywords from logging.basicConfig are accepted via the snakemake configuration
file under snakemake.config.logging.
Parameters
----------
snakemake : snakemake object
Your snakemake object containing a snakemake.config and snakemake.log.
skip_handlers : True | False (default)
Do (not) skip the default handlers created for redirecting output to STDERR and file.
"""
import logging
kwargs = snakemake.config.get("logging", dict()).copy()
kwargs.setdefault("level", "INFO")
if skip_handlers is False:
fallback_path = Path(__file__).parent.joinpath(
"..", "logs", f"{snakemake.rule}.log"
)
logfile = snakemake.log.get(
"python", snakemake.log[0] if snakemake.log else fallback_path
)
kwargs.update(
{
"handlers": [
# Prefer the 'python' log, otherwise take the first log for each
# Snakemake rule
logging.FileHandler(logfile),
logging.StreamHandler(),
]
}
)
logging.basicConfig(**kwargs)
def load_network(import_name=None, custom_components=None):
"""
Helper for importing a pypsa.Network with additional custom components.
Parameters
----------
import_name : str
As in pypsa.Network(import_name)
custom_components : dict
Dictionary listing custom components.
For using ``snakemake.config['override_components']``
in ``config.yaml`` define:
.. code:: yaml
override_components:
ShadowPrice:
component: ["shadow_prices","Shadow price for a global constraint.",np.nan]
attributes:
name: ["string","n/a","n/a","Unique name","Input (required)"]
value: ["float","n/a",0.,"shadow value","Output"]
Returns
-------
pypsa.Network
"""
import pypsa
from pypsa.descriptors import Dict
override_components = None
override_component_attrs = None
if custom_components is not None:
override_components = pypsa.components.components.copy()
override_component_attrs = Dict(
{k: v.copy() for k, v in pypsa.components.component_attrs.items()}
)
for k, v in custom_components.items():
override_components.loc[k] = v["component"]
override_component_attrs[k] = pd.DataFrame(
columns=["type", "unit", "default", "description", "status"]
)
for attr, val in v["attributes"].items():
override_component_attrs[k].loc[attr] = val
return pypsa.Network(
import_name=import_name,
override_components=override_components,
override_component_attrs=override_component_attrs,
)
def pdbcast(v, h):
return pd.DataFrame(
v.values.reshape((-1, 1)) * h.values, index=v.index, columns=h.index
)
def load_network_for_plots(fn, tech_costs, config, combine_hydro_ps=True):
import pypsa
from add_electricity import load_costs, update_transmission_costs
n = pypsa.Network(fn)
n.loads["carrier"] = n.loads.bus.map(n.buses.carrier) + " load"
n.stores["carrier"] = n.stores.bus.map(n.buses.carrier)
n.links["carrier"] = (
n.links.bus0.map(n.buses.carrier) + "-" + n.links.bus1.map(n.buses.carrier)
)
n.lines["carrier"] = "AC line"
n.transformers["carrier"] = "AC transformer"
n.lines["s_nom"] = n.lines["s_nom_min"]
n.links["p_nom"] = n.links["p_nom_min"]
if combine_hydro_ps:
n.storage_units.loc[
n.storage_units.carrier.isin({"PHS", "hydro"}), "carrier"
] = "hydro+PHS"
# if the carrier was not set on the heat storage units
# bus_carrier = n.storage_units.bus.map(n.buses.carrier)
# n.storage_units.loc[bus_carrier == "heat","carrier"] = "water tanks"
Nyears = n.snapshot_weightings.objective.sum() / 8760.0
costs = load_costs(tech_costs, config["costs"], config["electricity"], Nyears)
update_transmission_costs(n, costs)
return n
def update_p_nom_max(n):
# if extendable carriers (solar/onwind/...) have capacity >= 0,
# e.g. existing assets from the OPSD project are included to the network,
# the installed capacity might exceed the expansion limit.
# Hence, we update the assumptions.
n.generators.p_nom_max = n.generators[["p_nom_min", "p_nom_max"]].max(1)
def aggregate_p_nom(n):
return pd.concat(
[
n.generators.groupby("carrier").p_nom_opt.sum(),
n.storage_units.groupby("carrier").p_nom_opt.sum(),
n.links.groupby("carrier").p_nom_opt.sum(),
n.loads_t.p.groupby(n.loads.carrier, axis=1).sum().mean(),
]
)
def aggregate_p(n):
return pd.concat(
[
n.generators_t.p.sum().groupby(n.generators.carrier).sum(),
n.storage_units_t.p.sum().groupby(n.storage_units.carrier).sum(),
n.stores_t.p.sum().groupby(n.stores.carrier).sum(),
-n.loads_t.p.sum().groupby(n.loads.carrier).sum(),
]
)
def aggregate_e_nom(n):
return pd.concat(
[
(n.storage_units["p_nom_opt"] * n.storage_units["max_hours"])
.groupby(n.storage_units["carrier"])
.sum(),
n.stores["e_nom_opt"].groupby(n.stores.carrier).sum(),
]
)
def aggregate_p_curtailed(n):
return pd.concat(
[
(
(
n.generators_t.p_max_pu.sum().multiply(n.generators.p_nom_opt)
- n.generators_t.p.sum()
)
.groupby(n.generators.carrier)
.sum()
),
(
(n.storage_units_t.inflow.sum() - n.storage_units_t.p.sum())
.groupby(n.storage_units.carrier)
.sum()
),
]
)
def aggregate_costs(n, flatten=False, opts=None, existing_only=False):
components = dict(
Link=("p_nom", "p0"),
Generator=("p_nom", "p"),
StorageUnit=("p_nom", "p"),
Store=("e_nom", "p"),
Line=("s_nom", None),
Transformer=("s_nom", None),
)
costs = {}
for c, (p_nom, p_attr) in zip(
n.iterate_components(components.keys(), skip_empty=False), components.values()
):
if c.df.empty:
continue
if not existing_only:
p_nom += "_opt"
costs[(c.list_name, "capital")] = (
(c.df[p_nom] * c.df.capital_cost).groupby(c.df.carrier).sum()
)
if p_attr is not None:
p = c.pnl[p_attr].sum()
if c.name == "StorageUnit":
p = p.loc[p > 0]
costs[(c.list_name, "marginal")] = (
(p * c.df.marginal_cost).groupby(c.df.carrier).sum()
)
costs = pd.concat(costs)
if flatten:
assert opts is not None
conv_techs = opts["conv_techs"]
costs = costs.reset_index(level=0, drop=True)
costs = costs["capital"].add(
costs["marginal"].rename({t: t + " marginal" for t in conv_techs}),
fill_value=0.0,
)
return costs
def progress_retrieve(url, file):
import urllib
from progressbar import ProgressBar
pbar = ProgressBar(0, 100)
def dlProgress(count, blockSize, totalSize):
pbar.update(int(count * blockSize * 100 / totalSize))
urllib.request.urlretrieve(url, file, reporthook=dlProgress)
def get_aggregation_strategies(aggregation_strategies):
# default aggregation strategies that cannot be defined in .yaml format must be specified within
# the function, otherwise (when defaults are passed in the function's definition) they get lost
# when custom values are specified in the config.
import numpy as np
from pypsa.networkclustering import _make_consense
bus_strategies = dict(country=_make_consense("Bus", "country"))
bus_strategies.update(aggregation_strategies.get("buses", {}))
generator_strategies = {"build_year": lambda x: 0, "lifetime": lambda x: np.inf}
generator_strategies.update(aggregation_strategies.get("generators", {}))
return bus_strategies, generator_strategies
def mock_snakemake(rulename, **wildcards):
"""
This function is expected to be executed from the 'scripts'-directory of '
the snakemake project. It returns a snakemake.script.Snakemake object,
based on the Snakefile.
If a rule has wildcards, you have to specify them in **wildcards.
Parameters
----------
rulename: str
name of the rule for which the snakemake object should be generated
**wildcards:
keyword arguments fixing the wildcards. Only necessary if wildcards are
needed.
"""
import os
import snakemake as sm
from packaging.version import Version, parse
from pypsa.descriptors import Dict
from snakemake.script import Snakemake
script_dir = Path(__file__).parent.resolve()
assert (
Path.cwd().resolve() == script_dir
), f"mock_snakemake has to be run from the repository scripts directory {script_dir}"
os.chdir(script_dir.parent)
for p in sm.SNAKEFILE_CHOICES:
if os.path.exists(p):
snakefile = p
break
kwargs = dict(rerun_triggers=[]) if parse(sm.__version__) > Version("7.7.0") else {}
workflow = sm.Workflow(snakefile, overwrite_configfiles=[], **kwargs)
workflow.include(snakefile)
workflow.global_resources = {}
rule = workflow.get_rule(rulename)
dag = sm.dag.DAG(workflow, rules=[rule])
wc = Dict(wildcards)
job = sm.jobs.Job(rule, dag, wc)
def make_accessable(*ios):
for io in ios:
for i in range(len(io)):
io[i] = os.path.abspath(io[i])
make_accessable(job.input, job.output, job.log)
snakemake = Snakemake(
job.input,
job.output,
job.params,
job.wildcards,
job.threads,
job.resources,
job.log,
job.dag.workflow.config,
job.rule.name,
None,
)
# create log and output dir if not existent
for path in list(snakemake.log) + list(snakemake.output):
Path(path).parent.mkdir(parents=True, exist_ok=True)
os.chdir(script_dir)
return snakemake