pypsa-eur/scripts/time_aggregation.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

152 lines
4.7 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
2024-05-15 11:34:20 +00:00
# SPDX-FileCopyrightText: : 2017-2024 The PyPSA-Eur Authors
#
# SPDX-License-Identifier: MIT
"""
Defines the time aggregation to be used for sector-coupled network.
Relevant Settings
-----------------
.. code:: yaml
clustering:
temporal:
resolution_sector:
enable:
drop_leap_day:
Inputs
------
- ``networks/base_s_{clusters}_elec_l{ll}_{opts}.nc``: the network whose
2024-05-20 09:52:23 +00:00
snapshots are to be aggregated
- ``resources/hourly_heat_demand_total_base_s_{clusters}.nc``: the total
2024-05-20 09:52:23 +00:00
hourly heat demand
- ``resources/solar_thermal_total_base_s_{clusters}.nc``: the total
2024-05-20 09:52:23 +00:00
hourly solar thermal generation
2024-05-15 11:34:20 +00:00
Outputs
-------
- ``snapshot_weightings_base_s_{clusters}_elec_l{ll}_{opts}.csv``
2024-05-15 11:34:20 +00:00
Description
-----------
2024-05-20 09:52:23 +00:00
Computes a time aggregation scheme for the given network, in the form of a CSV
file with the snapshot weightings, indexed by the new subset of snapshots. This
rule only computes said aggregation scheme; aggregation of time-varying network
data is done in ``prepare_sector_network.py``.
2024-05-15 11:34:20 +00:00
"""
import logging
import numpy as np
import pandas as pd
import pypsa
2024-05-21 13:28:24 +00:00
import tsam.timeseriesaggregation as tsam
2024-05-21 13:30:56 +00:00
import xarray as xr
from _helpers import (
configure_logging,
set_scenario_config,
update_config_from_wildcards,
)
logger = logging.getLogger(__name__)
if __name__ == "__main__":
if "snakemake" not in globals():
from _helpers import mock_snakemake
snakemake = mock_snakemake(
"time_aggregation",
configfiles="test/config.overnight.yaml",
opts="",
clusters="37",
ll="v1.0",
sector_opts="Co2L0-24h-T-H-B-I-A-dist1",
planning_horizons="2030",
)
configure_logging(snakemake)
set_scenario_config(snakemake)
update_config_from_wildcards(snakemake.config, snakemake.wildcards)
n = pypsa.Network(snakemake.input.network)
resolution = snakemake.params.time_resolution
# Representative snapshots
if not resolution or isinstance(resolution, str) and "sn" in resolution.lower():
logger.info("Use representative snapshot or no aggregation at all")
# Output an empty csv; this is taken care of in prepare_sector_network.py
pd.DataFrame().to_csv(snakemake.output.snapshot_weightings)
# Plain resampling
elif isinstance(resolution, str) and "h" in resolution.lower():
offset = resolution.lower()
logger.info(f"Averaging every {offset} hours")
snapshot_weightings = n.snapshot_weightings.resample(offset).sum()
sns = snapshot_weightings.index
if snakemake.params.drop_leap_day:
sns = sns[~((sns.month == 2) & (sns.day == 29))]
snapshot_weightings = snapshot_weightings.loc[sns]
snapshot_weightings.to_csv(snakemake.output.snapshot_weightings)
# Temporal segmentation
elif isinstance(resolution, str) and "seg" in resolution.lower():
segments = int(resolution[:-3])
logger.info(f"Use temporal segmentation with {segments} segments")
# Get all time-dependent data
dfs = [
pnl
for c in n.iterate_components()
for attr, pnl in c.pnl.items()
if not pnl.empty and attr != "e_min_pu"
]
2024-05-15 11:34:20 +00:00
if snakemake.input.hourly_heat_demand_total:
dfs.append(
xr.open_dataset(snakemake.input.hourly_heat_demand_total)
.to_dataframe()
.unstack(level=1)
)
if snakemake.input.solar_thermal_total:
dfs.append(
xr.open_dataset(snakemake.input.solar_thermal_total)
.to_dataframe()
.unstack(level=1)
)
df = pd.concat(dfs, axis=1)
# Reset columns to flat index
df = df.T.reset_index(drop=True).T
# Normalise all time-dependent data
annual_max = df.max().replace(0, 1)
df = df.div(annual_max, level=0)
# Get representative segments
agg = tsam.TimeSeriesAggregation(
df,
hoursPerPeriod=len(df),
noTypicalPeriods=1,
noSegments=segments,
segmentation=True,
solver=snakemake.params.solver_name,
)
agg = agg.createTypicalPeriods()
weightings = agg.index.get_level_values("Segment Duration")
offsets = np.insert(np.cumsum(weightings[:-1]), 0, 0)
snapshot_weightings = n.snapshot_weightings.loc[n.snapshots[offsets]].mul(
weightings, axis=0
)
logger.info(
f"Distribution of snapshot durations:\n{snapshot_weightings.objective.value_counts()}"
)
snapshot_weightings.to_csv(snakemake.output.snapshot_weightings)