pypsa-eur/rules/common.smk
Fabian Neumann 013b705ee4
Clustering: build renewable profiles and add all assets after clustering (#1201)
* Cluster first: build renewable profiles and add all assets after clustering

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* correction: pass landfall_lengths through functions

* assign landfall_lenghts correctly

* remove parameter add_land_use_constraint

* fix network_dict

* calculate distance to shoreline, remove underwater_fraction

* adjust simplification parameter to exclude Crete from offshore wind connections

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* remove unused geth2015 hydro capacities

* removing remaining traces of {simpl} wildcard

* add release notes and update workflow graphics

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: lisazeyen <lisa.zeyen@web.de>
2024-09-13 15:37:01 +02:00

149 lines
4.8 KiB
Plaintext

# SPDX-FileCopyrightText: : 2023-2024 The PyPSA-Eur Authors
#
# SPDX-License-Identifier: MIT
import copy
from functools import partial, lru_cache
import os, sys, glob
path = workflow.source_path("../scripts/_helpers.py")
sys.path.insert(0, os.path.dirname(path))
from _helpers import validate_checksum, update_config_from_wildcards
from snakemake.utils import update_config
def get_config(config, keys, default=None):
"""Retrieve a nested value from a dictionary using a tuple of keys."""
value = config
for key in keys:
if isinstance(value, list):
value = value[key]
else:
value = value.get(key, default)
if value == default:
return default
return value
def merge_configs(base_config, scenario_config):
"""Merge base config with a specific scenario without modifying the original."""
merged = copy.deepcopy(base_config)
update_config(merged, scenario_config)
return merged
@lru_cache
def scenario_config(scenario_name):
"""Retrieve a scenario config based on the overrides from the scenario file."""
return merge_configs(config, scenarios[scenario_name])
def static_getter(wildcards, keys, default):
"""Getter function for static config values."""
config_with_wildcards = update_config_from_wildcards(
config, wildcards, inplace=False
)
return get_config(config_with_wildcards, keys, default)
def dynamic_getter(wildcards, keys, default):
"""Getter function for dynamic config values based on scenario."""
if "run" not in wildcards.keys():
return get_config(config, keys, default)
scenario_name = wildcards.run
if scenario_name not in scenarios:
raise ValueError(
f"Scenario {scenario_name} not found in file {config['run']['scenarios']['file']}."
)
config_with_scenario = scenario_config(scenario_name)
config_with_wildcards = update_config_from_wildcards(
config_with_scenario, wildcards, inplace=False
)
return get_config(config_with_wildcards, keys, default)
def config_provider(*keys, default=None):
"""Dynamically provide config values based on 'run' -> 'name'.
Usage in Snakemake rules would look something like:
params:
my_param=config_provider("key1", "key2", default="some_default_value")
"""
# Using functools.partial to freeze certain arguments in our getter functions.
if config["run"].get("scenarios", {}).get("enable", False):
return partial(dynamic_getter, keys=keys, default=default)
else:
return partial(static_getter, keys=keys, default=default)
def solver_threads(w):
solver_options = config_provider("solving", "solver_options")(w)
option_set = config_provider("solving", "solver", "options")(w)
solver_option_set = solver_options[option_set]
threads = solver_option_set.get("threads") or solver_option_set.get("Threads") or 4
return threads
def memory(w):
factor = 3.0
for o in w.opts.split("-"):
m = re.match(r"^(\d+)h$", o, re.IGNORECASE)
if m is not None:
factor /= int(m.group(1))
break
for o in w.opts.split("-"):
m = re.match(r"^(\d+)seg$", o, re.IGNORECASE)
if m is not None:
factor *= int(m.group(1)) / 8760
break
if w.clusters == "all":
return int(factor * (18000 + 180 * 4000))
else:
return int(factor * (10000 + 195 * int(w.clusters)))
def input_custom_extra_functionality(w):
path = config_provider(
"solving", "options", "custom_extra_functionality", default=False
)(w)
if path:
return os.path.join(os.path.dirname(workflow.snakefile), path)
return []
def has_internet_access(url: str = "https://www.zenodo.org", timeout: int = 3) -> bool:
"""
Checks if internet connection is available by sending a HEAD request
to a reliable server like Google.
Parameters:
- url (str): The URL to check for internet connection. Default is Google.
- timeout (int | float): The maximum time (in seconds) the request should wait.
Returns:
- bool: True if the internet is available, otherwise False.
"""
try:
# Send a HEAD request to avoid fetching full response
response = requests.head(url, timeout=timeout, allow_redirects=True)
return response.status_code == 200
except requests.ConnectionError: # (e.g., no internet, DNS issues)
return False
except requests.Timeout: # (e.g., slow or no network)
return False
def solved_previous_horizon(w):
planning_horizons = config_provider("scenario", "planning_horizons")(w)
i = planning_horizons.index(int(w.planning_horizons))
planning_horizon_p = str(planning_horizons[i - 1])
return (
RESULTS
+ "postnetworks/base_s_{clusters}_l{ll}_{opts}_{sector_opts}_"
+ planning_horizon_p
+ ".nc"
)