pypsa-eur/rules/common.smk

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

149 lines
4.8 KiB
Plaintext
Raw Permalink Normal View History

2024-02-19 15:21:48 +00:00
# SPDX-FileCopyrightText: : 2023-2024 The PyPSA-Eur Authors
2023-03-08 18:22:00 +00:00
#
# SPDX-License-Identifier: MIT
2023-08-15 13:02:41 +00:00
import copy
from functools import partial, lru_cache
2023-08-15 13:02:41 +00:00
import os, sys, glob
2024-02-12 12:54:28 +00:00
path = workflow.source_path("../scripts/_helpers.py")
sys.path.insert(0, os.path.dirname(path))
from _helpers import validate_checksum, update_config_from_wildcards
from snakemake.utils import update_config
2023-03-08 18:22:29 +00:00
2023-08-15 13:02:41 +00:00
def get_config(config, keys, default=None):
2023-08-15 13:02:41 +00:00
"""Retrieve a nested value from a dictionary using a tuple of keys."""
value = config
for key in keys:
if isinstance(value, list):
value = value[key]
else:
value = value.get(key, default)
2023-08-15 13:02:41 +00:00
if value == default:
return default
return value
def merge_configs(base_config, scenario_config):
"""Merge base config with a specific scenario without modifying the original."""
merged = copy.deepcopy(base_config)
update_config(merged, scenario_config)
2023-08-15 13:02:41 +00:00
return merged
@lru_cache
def scenario_config(scenario_name):
"""Retrieve a scenario config based on the overrides from the scenario file."""
return merge_configs(config, scenarios[scenario_name])
def static_getter(wildcards, keys, default):
"""Getter function for static config values."""
config_with_wildcards = update_config_from_wildcards(
config, wildcards, inplace=False
)
return get_config(config_with_wildcards, keys, default)
def dynamic_getter(wildcards, keys, default):
"""Getter function for dynamic config values based on scenario."""
if "run" not in wildcards.keys():
return get_config(config, keys, default)
scenario_name = wildcards.run
if scenario_name not in scenarios:
raise ValueError(
2024-07-22 13:22:32 +00:00
f"Scenario {scenario_name} not found in file {config['run']['scenarios']['file']}."
)
config_with_scenario = scenario_config(scenario_name)
config_with_wildcards = update_config_from_wildcards(
config_with_scenario, wildcards, inplace=False
)
return get_config(config_with_wildcards, keys, default)
2023-08-15 13:02:41 +00:00
def config_provider(*keys, default=None):
"""Dynamically provide config values based on 'run' -> 'name'.
Usage in Snakemake rules would look something like:
params:
my_param=config_provider("key1", "key2", default="some_default_value")
"""
# Using functools.partial to freeze certain arguments in our getter functions.
if config["run"].get("scenarios", {}).get("enable", False):
return partial(dynamic_getter, keys=keys, default=default)
2023-08-15 13:02:41 +00:00
else:
return partial(static_getter, keys=keys, default=default)
2023-08-15 13:02:41 +00:00
2023-03-08 18:22:29 +00:00
def solver_threads(w):
solver_options = config_provider("solving", "solver_options")(w)
option_set = config_provider("solving", "solver", "options")(w)
solver_option_set = solver_options[option_set]
threads = solver_option_set.get("threads") or solver_option_set.get("Threads") or 4
return threads
2023-03-08 18:22:00 +00:00
def memory(w):
factor = 3.0
for o in w.opts.split("-"):
m = re.match(r"^(\d+)h$", o, re.IGNORECASE)
if m is not None:
factor /= int(m.group(1))
break
for o in w.opts.split("-"):
m = re.match(r"^(\d+)seg$", o, re.IGNORECASE)
if m is not None:
factor *= int(m.group(1)) / 8760
break
if w.clusters == "all":
2023-03-08 18:22:00 +00:00
return int(factor * (18000 + 180 * 4000))
else:
return int(factor * (10000 + 195 * int(w.clusters)))
def input_custom_extra_functionality(w):
path = config_provider(
"solving", "options", "custom_extra_functionality", default=False
)(w)
if path:
return os.path.join(os.path.dirname(workflow.snakefile), path)
return []
2024-09-11 14:31:19 +00:00
def has_internet_access(url: str = "https://www.zenodo.org", timeout: int = 3) -> bool:
"""
Checks if internet connection is available by sending a HEAD request
to a reliable server like Google.
Parameters:
- url (str): The URL to check for internet connection. Default is Google.
- timeout (int | float): The maximum time (in seconds) the request should wait.
2024-09-11 14:31:19 +00:00
Returns:
- bool: True if the internet is available, otherwise False.
"""
try:
2024-09-11 14:31:19 +00:00
# Send a HEAD request to avoid fetching full response
response = requests.head(url, timeout=timeout, allow_redirects=True)
return response.status_code == 200
except requests.ConnectionError: # (e.g., no internet, DNS issues)
return False
except requests.Timeout: # (e.g., slow or no network)
return False
def solved_previous_horizon(w):
planning_horizons = config_provider("scenario", "planning_horizons")(w)
i = planning_horizons.index(int(w.planning_horizons))
2023-03-08 18:22:00 +00:00
planning_horizon_p = str(planning_horizons[i - 1])
2023-03-08 18:22:00 +00:00
return (
RESULTS
+ "postnetworks/base_s_{clusters}_l{ll}_{opts}_{sector_opts}_"
2023-03-08 18:22:00 +00:00
+ planning_horizon_p
+ ".nc"
2023-03-08 18:22:29 +00:00
)