pypsa-eur/scripts/_helpers.py
2024-01-18 13:18:28 +00:00

436 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: : 2017-2023 The PyPSA-Eur Authors
#
# SPDX-License-Identifier: MIT
import contextlib
import hashlib
import logging
import os
import re
import urllib
from pathlib import Path
import pandas as pd
import pytz
import requests
import yaml
from pypsa.components import component_attrs, components
from pypsa.descriptors import Dict
from tqdm import tqdm
logger = logging.getLogger(__name__)
REGION_COLS = ["geometry", "name", "x", "y", "country"]
def get_opt(opts, expr, flags=None):
"""
Return the first option matching the regular expression.
The regular expression is case-insensitive by default.
"""
if flags is None:
flags = re.IGNORECASE
for o in opts:
match = re.match(expr, o, flags=flags)
if match:
return match.group(0)
return None
def find_opt(opts, expr):
"""
Return if available the float after the expression.
"""
for o in opts:
if expr in o:
m = re.findall("[0-9]*\.?[0-9]+$", o)
if len(m) > 0:
return True, float(m[0])
else:
return True, None
return False, None
# Define a context manager to temporarily mute print statements
@contextlib.contextmanager
def mute_print():
with open(os.devnull, "w") as devnull:
with contextlib.redirect_stdout(devnull):
yield
def configure_logging(snakemake, skip_handlers=False):
"""
Configure the basic behaviour for the logging module.
Note: Must only be called once from the __main__ section of a script.
The setup includes printing log messages to STDERR and to a log file defined
by either (in priority order): snakemake.log.python, snakemake.log[0] or "logs/{rulename}.log".
Additional keywords from logging.basicConfig are accepted via the snakemake configuration
file under snakemake.config.logging.
Parameters
----------
snakemake : snakemake object
Your snakemake object containing a snakemake.config and snakemake.log.
skip_handlers : True | False (default)
Do (not) skip the default handlers created for redirecting output to STDERR and file.
"""
import logging
import sys
kwargs = snakemake.config.get("logging", dict()).copy()
kwargs.setdefault("level", "INFO")
if skip_handlers is False:
fallback_path = Path(__file__).parent.joinpath(
"..", "logs", f"{snakemake.rule}.log"
)
logfile = snakemake.log.get(
"python", snakemake.log[0] if snakemake.log else fallback_path
)
kwargs.update(
{
"handlers": [
# Prefer the 'python' log, otherwise take the first log for each
# Snakemake rule
logging.FileHandler(logfile),
logging.StreamHandler(),
]
}
)
logging.basicConfig(**kwargs)
# Setup a function to handle uncaught exceptions and include them with their stacktrace into logfiles
def handle_exception(exc_type, exc_value, exc_traceback):
# Log the exception
logger = logging.getLogger()
logger.error(
"Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback)
)
sys.excepthook = handle_exception
def update_p_nom_max(n):
# if extendable carriers (solar/onwind/...) have capacity >= 0,
# e.g. existing assets from the OPSD project are included to the network,
# the installed capacity might exceed the expansion limit.
# Hence, we update the assumptions.
n.generators.p_nom_max = n.generators[["p_nom_min", "p_nom_max"]].max(1)
def aggregate_p_nom(n):
return pd.concat(
[
n.generators.groupby("carrier").p_nom_opt.sum(),
n.storage_units.groupby("carrier").p_nom_opt.sum(),
n.links.groupby("carrier").p_nom_opt.sum(),
n.loads_t.p.groupby(n.loads.carrier, axis=1).sum().mean(),
]
)
def aggregate_p(n):
return pd.concat(
[
n.generators_t.p.sum().groupby(n.generators.carrier).sum(),
n.storage_units_t.p.sum().groupby(n.storage_units.carrier).sum(),
n.stores_t.p.sum().groupby(n.stores.carrier).sum(),
-n.loads_t.p.sum().groupby(n.loads.carrier).sum(),
]
)
def aggregate_e_nom(n):
return pd.concat(
[
(n.storage_units["p_nom_opt"] * n.storage_units["max_hours"])
.groupby(n.storage_units["carrier"])
.sum(),
n.stores["e_nom_opt"].groupby(n.stores.carrier).sum(),
]
)
def aggregate_p_curtailed(n):
return pd.concat(
[
(
(
n.generators_t.p_max_pu.sum().multiply(n.generators.p_nom_opt)
- n.generators_t.p.sum()
)
.groupby(n.generators.carrier)
.sum()
),
(
(n.storage_units_t.inflow.sum() - n.storage_units_t.p.sum())
.groupby(n.storage_units.carrier)
.sum()
),
]
)
def aggregate_costs(n, flatten=False, opts=None, existing_only=False):
components = dict(
Link=("p_nom", "p0"),
Generator=("p_nom", "p"),
StorageUnit=("p_nom", "p"),
Store=("e_nom", "p"),
Line=("s_nom", None),
Transformer=("s_nom", None),
)
costs = {}
for c, (p_nom, p_attr) in zip(
n.iterate_components(components.keys(), skip_empty=False), components.values()
):
if c.df.empty:
continue
if not existing_only:
p_nom += "_opt"
costs[(c.list_name, "capital")] = (
(c.df[p_nom] * c.df.capital_cost).groupby(c.df.carrier).sum()
)
if p_attr is not None:
p = c.pnl[p_attr].sum()
if c.name == "StorageUnit":
p = p.loc[p > 0]
costs[(c.list_name, "marginal")] = (
(p * c.df.marginal_cost).groupby(c.df.carrier).sum()
)
costs = pd.concat(costs)
if flatten:
assert opts is not None
conv_techs = opts["conv_techs"]
costs = costs.reset_index(level=0, drop=True)
costs = costs["capital"].add(
costs["marginal"].rename({t: t + " marginal" for t in conv_techs}),
fill_value=0.0,
)
return costs
def progress_retrieve(url, file, disable=False):
if disable:
urllib.request.urlretrieve(url, file)
else:
with tqdm(unit="B", unit_scale=True, unit_divisor=1024, miniters=1) as t:
def update_to(b=1, bsize=1, tsize=None):
if tsize is not None:
t.total = tsize
t.update(b * bsize - t.n)
urllib.request.urlretrieve(url, file, reporthook=update_to)
def mock_snakemake(
rulename,
root_dir=None,
configfiles=[],
submodule_dir="workflow/submodules/pypsa-eur",
**wildcards,
):
"""
This function is expected to be executed from the 'scripts'-directory of '
the snakemake project. It returns a snakemake.script.Snakemake object,
based on the Snakefile.
If a rule has wildcards, you have to specify them in **wildcards.
Parameters
----------
rulename: str
name of the rule for which the snakemake object should be generated
root_dir: str/path-like
path to the root directory of the snakemake project
configfiles: list, str
list of configfiles to be used to update the config
submodule_dir: str, Path
in case PyPSA-Eur is used as a submodule, submodule_dir is
the path of pypsa-eur relative to the project directory.
**wildcards:
keyword arguments fixing the wildcards. Only necessary if wildcards are
needed.
"""
import os
import snakemake as sm
from packaging.version import Version, parse
from pypsa.descriptors import Dict
from snakemake.script import Snakemake
script_dir = Path(__file__).parent.resolve()
if root_dir is None:
root_dir = script_dir.parent
else:
root_dir = Path(root_dir).resolve()
user_in_script_dir = Path.cwd().resolve() == script_dir
if str(submodule_dir) in __file__:
# the submodule_dir path is only need to locate the project dir
os.chdir(Path(__file__[: __file__.find(str(submodule_dir))]))
elif user_in_script_dir:
os.chdir(root_dir)
elif Path.cwd().resolve() != root_dir:
raise RuntimeError(
"mock_snakemake has to be run from the repository root"
f" {root_dir} or scripts directory {script_dir}"
)
try:
for p in sm.SNAKEFILE_CHOICES:
if os.path.exists(p):
snakefile = p
break
kwargs = (
dict(rerun_triggers=[]) if parse(sm.__version__) > Version("7.7.0") else {}
)
if isinstance(configfiles, str):
configfiles = [configfiles]
workflow = sm.Workflow(snakefile, overwrite_configfiles=configfiles, **kwargs)
workflow.include(snakefile)
if configfiles:
for f in configfiles:
if not os.path.exists(f):
raise FileNotFoundError(f"Config file {f} does not exist.")
workflow.configfile(f)
workflow.global_resources = {}
rule = workflow.get_rule(rulename)
dag = sm.dag.DAG(workflow, rules=[rule])
wc = Dict(wildcards)
job = sm.jobs.Job(rule, dag, wc)
def make_accessable(*ios):
for io in ios:
for i in range(len(io)):
io[i] = os.path.abspath(io[i])
make_accessable(job.input, job.output, job.log)
snakemake = Snakemake(
job.input,
job.output,
job.params,
job.wildcards,
job.threads,
job.resources,
job.log,
job.dag.workflow.config,
job.rule.name,
None,
)
# create log and output dir if not existent
for path in list(snakemake.log) + list(snakemake.output):
Path(path).parent.mkdir(parents=True, exist_ok=True)
finally:
if user_in_script_dir:
os.chdir(script_dir)
return snakemake
def generate_periodic_profiles(dt_index, nodes, weekly_profile, localize=None):
"""
Give a 24*7 long list of weekly hourly profiles, generate this for each
country for the period dt_index, taking account of time zones and summer
time.
"""
weekly_profile = pd.Series(weekly_profile, range(24 * 7))
week_df = pd.DataFrame(index=dt_index, columns=nodes)
for node in nodes:
timezone = pytz.timezone(pytz.country_timezones[node[:2]][0])
tz_dt_index = dt_index.tz_convert(timezone)
week_df[node] = [24 * dt.weekday() + dt.hour for dt in tz_dt_index]
week_df[node] = week_df[node].map(weekly_profile)
week_df = week_df.tz_localize(localize)
return week_df
def parse(l):
return yaml.safe_load(l[0]) if len(l) == 1 else {l.pop(0): parse(l)}
def update_config_with_sector_opts(config, sector_opts):
from snakemake.utils import update_config
for o in sector_opts.split("-"):
if o.startswith("CF+"):
l = o.split("+")[1:]
update_config(config, parse(l))
def get_checksum_from_zenodo(file_url):
parts = file_url.split("/")
record_id = parts[parts.index("record") + 1]
filename = parts[-1]
response = requests.get(f"https://zenodo.org/api/records/{record_id}", timeout=30)
response.raise_for_status()
data = response.json()
for file in data["files"]:
if file["key"] == filename:
return file["checksum"]
return None
def validate_checksum(file_path, zenodo_url=None, checksum=None):
"""
Validate file checksum against provided or Zenodo-retrieved checksum.
Calculates the hash of a file using 64KB chunks. Compares it against a
given checksum or one from a Zenodo URL.
Parameters
----------
file_path : str
Path to the file for checksum validation.
zenodo_url : str, optional
URL of the file on Zenodo to fetch the checksum.
checksum : str, optional
Checksum (format 'hash_type:checksum_value') for validation.
Raises
------
AssertionError
If the checksum does not match, or if neither `checksum` nor `zenodo_url` is provided.
Examples
--------
>>> validate_checksum("/path/to/file", checksum="md5:abc123...")
>>> validate_checksum(
... "/path/to/file",
... zenodo_url="https://zenodo.org/record/12345/files/example.txt",
... )
If the checksum is invalid, an AssertionError will be raised.
"""
assert checksum or zenodo_url, "Either checksum or zenodo_url must be provided"
if zenodo_url:
checksum = get_checksum_from_zenodo(zenodo_url)
hash_type, checksum = checksum.split(":")
hasher = hashlib.new(hash_type)
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""): # 64kb chunks
hasher.update(chunk)
calculated_checksum = hasher.hexdigest()
assert (
calculated_checksum == checksum
), "Checksum is invalid. This may be due to an incomplete download. Delete the file and re-execute the rule."