diff --git a/rules/build_sector.smk b/rules/build_sector.smk index c2ca80c3..86784fe8 100644 --- a/rules/build_sector.smk +++ b/rules/build_sector.smk @@ -859,6 +859,40 @@ rule build_existing_heating_distribution: "../scripts/build_existing_heating_distribution.py" +rule time_aggregation: + params: + time_resolution=config_provider("clustering", "temporal", "resolution_sector"), + drop_leap_day=config_provider("enable", "drop_leap_day"), + solver_name=config_provider("solving", "solver", "name"), + input: + network=resources("networks/elec_s{simpl}_{clusters}_ec_l{ll}_{opts}.nc"), + hourly_heat_demand_total=lambda w: ( + resources("hourly_heat_demand_total_elec_s{simpl}_{clusters}.nc") + if config_provider("sector", "heating")(w) + else None + ), + solar_thermal_total=lambda w: ( + resources("solar_thermal_total_elec_s{simpl}_{clusters}.nc") + if config_provider("sector", "solar_thermal")(w) + else None + ), + output: + snapshot_weightings=resources( + "snapshot_weightings_elec_s{simpl}_{clusters}_ec_l{ll}_{opts}.csv" + ), + threads: 1 + resources: + mem_mb=5000, + log: + logs("time_aggregation_elec_s{simpl}_{clusters}_ec_l{ll}_{opts}.log"), + benchmark: + benchmarks("time_aggregation_elec_s{simpl}_{clusters}_ec_l{ll}_{opts}") + conda: + "../envs/environment.yaml" + script: + "../scripts/time_aggregation.py" + + def input_profile_offwind(w): return { f"profile_{tech}": resources(f"profile_{tech}.nc") @@ -870,7 +904,6 @@ def input_profile_offwind(w): rule prepare_sector_network: params: time_resolution=config_provider("clustering", "temporal", "resolution_sector"), - drop_leap_day=config_provider("enable", "drop_leap_day"), co2_budget=config_provider("co2_budget"), conventional_carriers=config_provider( "existing_capacities", "conventional_carriers" @@ -891,6 +924,9 @@ rule prepare_sector_network: unpack(input_profile_offwind), **rules.cluster_gas_network.output, **rules.build_gas_input_locations.output, + snapshot_weightings=resources( + "snapshot_weightings_elec_s{simpl}_{clusters}_ec_l{ll}_{opts}.csv" + ), retro_cost=lambda w: ( resources("retro_cost_elec_s{simpl}_{clusters}.csv") if config_provider("sector", "retrofitting", "retro_endogen")(w) diff --git a/scripts/prepare_sector_network.py b/scripts/prepare_sector_network.py index d35af1c9..fb6af976 100755 --- a/scripts/prepare_sector_network.py +++ b/scripts/prepare_sector_network.py @@ -809,33 +809,6 @@ def add_co2limit(n, options, nyears=1.0, limit=0.0): ) -# TODO PyPSA-Eur merge issue -def average_every_nhours(n, offset): - logger.info(f"Resampling the network to {offset}") - m = n.copy(with_time=False) - - snapshot_weightings = n.snapshot_weightings.resample(offset).sum() - sns = snapshot_weightings.index - if snakemake.params.drop_leap_day: - sns = sns[~((sns.month == 2) & (sns.day == 29))] - snapshot_weightings = snapshot_weightings.loc[sns] - m.set_snapshots(snapshot_weightings.index) - m.snapshot_weightings = snapshot_weightings - - for c in n.iterate_components(): - pnl = getattr(m, c.list_name + "_t") - for k, df in c.pnl.items(): - if not df.empty: - if c.list_name == "stores" and k == "e_max_pu": - pnl[k] = df.resample(offset).min() - elif c.list_name == "stores" and k == "e_min_pu": - pnl[k] = df.resample(offset).max() - else: - pnl[k] = df.resample(offset).mean() - - return m - - def cycling_shift(df, steps=1): """ Cyclic shift on index of pd.Series|pd.DataFrame by number of steps. @@ -3513,100 +3486,48 @@ def cluster_heat_buses(n): import_components_from_dataframe(n, df.loc[to_add], c.name) -def apply_time_segmentation( - n, segments, solver_name="cbc", overwrite_time_dependent=True -): +def set_temporal_aggregation(n, resolution, snapshot_weightings): """ - Aggregating time series to segments with different lengths. - - Input: - n: pypsa Network - segments: (int) number of segments in which the typical period should be - subdivided - solver_name: (str) name of solver - overwrite_time_dependent: (bool) overwrite time dependent data of pypsa network - with typical time series created by tsam + Aggregate time-varying data to the given snapshots. """ - try: - import tsam.timeseriesaggregation as tsam - except ImportError: - raise ModuleNotFoundError( - "Optional dependency 'tsam' not found." "Install via 'pip install tsam'" - ) - - # get all time-dependent data - columns = pd.MultiIndex.from_tuples([], names=["component", "key", "asset"]) - raw = pd.DataFrame(index=n.snapshots, columns=columns) - for c in n.iterate_components(): - for attr, pnl in c.pnl.items(): - # exclude e_min_pu which is used for SOC of EVs in the morning - if not pnl.empty and attr != "e_min_pu": - df = pnl.copy() - df.columns = pd.MultiIndex.from_product([[c.name], [attr], df.columns]) - raw = pd.concat([raw, df], axis=1) - - # normalise all time-dependent data - annual_max = raw.max().replace(0, 1) - raw = raw.div(annual_max, level=0) - - # get representative segments - agg = tsam.TimeSeriesAggregation( - raw, - hoursPerPeriod=len(raw), - noTypicalPeriods=1, - noSegments=int(segments), - segmentation=True, - solver=solver_name, - ) - segmented = agg.createTypicalPeriods() - - weightings = segmented.index.get_level_values("Segment Duration") - offsets = np.insert(np.cumsum(weightings[:-1]), 0, 0) - timesteps = [raw.index[0] + pd.Timedelta(f"{offset}h") for offset in offsets] - snapshots = pd.DatetimeIndex(timesteps) - sn_weightings = pd.Series( - weightings, index=snapshots, name="weightings", dtype="float64" - ) - logger.info(f"Distribution of snapshot durations:\n{weightings.value_counts()}") - - n.set_snapshots(sn_weightings.index) - n.snapshot_weightings = n.snapshot_weightings.mul(sn_weightings, axis=0) - - # overwrite time-dependent data with timeseries created by tsam - if overwrite_time_dependent: - values_t = segmented.mul(annual_max).set_index(snapshots) - for component, key in values_t.columns.droplevel(2).unique(): - n.pnl(component)[key] = values_t[component, key] - - return n - - -def set_temporal_aggregation(n, resolution, solver_name): - """ - Aggregate network temporally. - """ - if not resolution: - return n - - # representative snapshots if "sn" in resolution.lower(): + # Representative snapshots are dealt with directly sn = int(resolution[:-2]) logger.info("Use every %s snapshot as representative", sn) n.set_snapshots(n.snapshots[::sn]) n.snapshot_weightings *= sn + else: + # Otherwise, use the provided snapshots + snapshot_weightings = pd.read_csv( + snapshot_weightings, index_col=0, parse_dates=True + ) - # segments with package tsam - elif "seg" in resolution.lower(): - segments = int(resolution[:-3]) - logger.info("Use temporal segmentation with %s segments", segments) - n = apply_time_segmentation(n, segments, solver_name=solver_name) + n.set_snapshots(snapshot_weightings.index) + n.snapshot_weightings = snapshot_weightings - # temporal averaging - elif "h" in resolution.lower(): - logger.info("Aggregate to frequency %s", resolution) - n = average_every_nhours(n, resolution) + # Define a series used for aggregation, mapping each hour in + # n.snapshots to the closest previous timestep in + # snapshot_weightings.index + aggregation_map = ( + pd.Series( + snapshot_weightings.index.get_indexer(n.snapshots), index=n.snapshots + ) + .replace(-1, np.nan) + .ffill() + .astype(int) + .map(lambda i: snapshot_weightings.index[i]) + ) - return n + # Aggregation all time-varying data. + for c in n.iterate_components(): + for k, df in c.pnl.items(): + if not df.empty: + if c.list_name == "stores" and k == "e_max_pu": + c.pnl[k] = df.groupby(aggregation_map).min() + elif c.list_name == "stores" and k == "e_min_pu": + c.pnl[k] = df.groupby(aggregation_map).max() + else: + c.pnl[k] = df.groupby(aggregation_map).mean() def lossy_bidirectional_links(n, carrier, efficiencies={}): @@ -3758,9 +3679,9 @@ if __name__ == "__main__": if options["allam_cycle"]: add_allam(n, costs) - solver_name = snakemake.config["solving"]["solver"]["name"] - resolution = snakemake.params.time_resolution - n = set_temporal_aggregation(n, resolution, solver_name) + set_temporal_aggregation( + n, snakemake.params.time_resolution, snakemake.input.snapshot_weightings + ) co2_budget = snakemake.params.co2_budget if isinstance(co2_budget, str) and co2_budget.startswith("cb"): diff --git a/scripts/time_aggregation.py b/scripts/time_aggregation.py new file mode 100644 index 00000000..3d908481 --- /dev/null +++ b/scripts/time_aggregation.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +import logging + +import numpy as np +import pandas as pd +import pypsa +import xarray as xr +from _helpers import ( + configure_logging, + set_scenario_config, + update_config_from_wildcards, +) + +if __name__ == "__main__": + if "snakemake" not in globals(): + from _helpers import mock_snakemake + + snakemake = mock_snakemake( + "time_aggregation", + configfiles="test/config.overnight.yaml", + simpl="", + opts="", + clusters="37", + ll="v1.0", + sector_opts="CO2L0-24h-T-H-B-I-A-dist1", + planning_horizons="2030", + ) + + configure_logging(snakemake) + set_scenario_config(snakemake) + update_config_from_wildcards(snakemake.config, snakemake.wildcards) + + n = pypsa.Network(snakemake.input.network) + resolution = snakemake.params.time_resolution + + # Representative snapshots + if "sn" in resolution.lower(): + logger.info("Use representative snapshots") + # Output an empty csv; this is taken care of in prepare_sector_network.py + pd.DataFrame().to_csv(snakemake.output.snapshot_weightings) + + # Plain resampling + elif "h" in resolution.lower(): + offset = resolution.lower() + logger.info(f"Averaging every {offset} hours") + snapshot_weightings = n.snapshot_weightings.resample(offset).sum() + sns = snapshot_weightings.index + if snakemake.params.drop_leap_day: + sns = sns[~((sns.month == 2) & (sns.day == 29))] + snapshot_weightings = snapshot_weightings.loc[sns] + snapshot_weightings.to_csv(snakemake.output.snapshot_weightings) + + # Temporal segmentation + elif "seg" in resolution.lower(): + segments = int(resolution[:-3]) + logger.info(f"Use temporal segmentation with {segments} segments") + + try: + import tsam.timeseriesaggregation as tsam + except ImportError: + raise ModuleNotFoundError( + "Optional dependency 'tsam' not found." "Install via 'pip install tsam'" + ) + + # Get all time-dependent data + dfs = [ + pnl + for c in n.iterate_components() + for attr, pnl in c.pnl.items() + if not pnl.empty and attr != "e_min_pu" + ] + dfs.append( + xr.open_dataset(snakemake.input.hourly_heat_demand_total) + .to_dataframe() + .unstack(level=1) + ) + dfs.append( + xr.open_dataset(snakemake.input.solar_thermal_total) + .to_dataframe() + .unstack(level=1) + ) + df = pd.concat(dfs, axis=1) + + # Reset columns to flat index + df = df.T.reset_index(drop=True).T + + # Normalise all time-dependent data + annual_max = df.max().replace(0, 1) + df = df.div(annual_max, level=0) + + # Get representative segments + agg = tsam.TimeSeriesAggregation( + df, + hoursPerPeriod=len(df), + noTypicalPeriods=1, + noSegments=segments, + segmentation=True, + solver=snakemake.params.solver_name, + ) + agg = agg.createTypicalPeriods() + + weightings = agg.index.get_level_values("Segment Duration") + offsets = np.insert(np.cumsum(weightings[:-1]), 0, 0) + snapshot_weightings = n.snapshot_weightings.loc[n.snapshots[offsets]].mul( + weightings, axis=0 + ) + + logger.info( + f"Distribution of snapshot durations:\n{snapshot_weightings.objective.value_counts()}" + ) + + snapshot_weightings.to_csv(snakemake.output.snapshot_weightings)