From d37ffddd4fd64c1479026cf860ab207c48b18075 Mon Sep 17 00:00:00 2001 From: Fabian Neumann Date: Wed, 28 Dec 2022 12:21:46 +0100 Subject: [PATCH] build_solar_thermal_profiles: parallelize --- Snakefile | 11 +++----- scripts/build_solar_thermal_profiles.py | 34 +++++++++++-------------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/Snakefile b/Snakefile index 7ce29b4f..f22546ba 100644 --- a/Snakefile +++ b/Snakefile @@ -207,16 +207,13 @@ rule build_cop_profiles: rule build_solar_thermal_profiles: input: - pop_layout_total="resources/pop_layout_total.nc", - pop_layout_urban="resources/pop_layout_urban.nc", - pop_layout_rural="resources/pop_layout_rural.nc", + pop_layout="resources/pop_layout_{scope}.nc", regions_onshore=pypsaeur("resources/regions_onshore_elec_s{simpl}_{clusters}.geojson") output: - solar_thermal_total="resources/solar_thermal_total_elec_s{simpl}_{clusters}.nc", - solar_thermal_urban="resources/solar_thermal_urban_elec_s{simpl}_{clusters}.nc", - solar_thermal_rural="resources/solar_thermal_rural_elec_s{simpl}_{clusters}.nc" + solar_thermal="resources/solar_thermal_{scope}_elec_s{simpl}_{clusters}.nc", resources: mem_mb=20000 - benchmark: "benchmarks/build_solar_thermal_profiles/s{simpl}_{clusters}" + threads: 16 + benchmark: "benchmarks/build_solar_thermal_profiles/{scope}_s{simpl}_{clusters}" script: "scripts/build_solar_thermal_profiles.py" diff --git a/scripts/build_solar_thermal_profiles.py b/scripts/build_solar_thermal_profiles.py index f6d05859..0fb8b6e6 100644 --- a/scripts/build_solar_thermal_profiles.py +++ b/scripts/build_solar_thermal_profiles.py @@ -5,6 +5,7 @@ import atlite import pandas as pd import xarray as xr import numpy as np +from dask.distributed import Client, LocalCluster if __name__ == '__main__': if 'snakemake' not in globals(): @@ -15,14 +16,9 @@ if __name__ == '__main__': clusters=48, ) - if 'snakemake' not in globals(): - from vresutils import Dict - import yaml - snakemake = Dict() - with open('config.yaml') as f: - snakemake.config = yaml.safe_load(f) - snakemake.input = Dict() - snakemake.output = Dict() + nprocesses = int(snakemake.threads) + cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1) + client = Client(cluster, asynchronous=True) config = snakemake.config['solar_thermal'] @@ -35,18 +31,18 @@ if __name__ == '__main__': I = cutout.indicatormatrix(clustered_regions) - for area in ["total", "rural", "urban"]: + pop_layout = xr.open_dataarray(snakemake.input.pop_layout) - pop_layout = xr.open_dataarray(snakemake.input[f'pop_layout_{area}']) + stacked_pop = pop_layout.stack(spatial=('y', 'x')) + M = I.T.dot(np.diag(I.dot(stacked_pop))) - stacked_pop = pop_layout.stack(spatial=('y', 'x')) - M = I.T.dot(np.diag(I.dot(stacked_pop))) + nonzero_sum = M.sum(axis=0, keepdims=True) + nonzero_sum[nonzero_sum == 0.] = 1. + M_tilde = M / nonzero_sum - nonzero_sum = M.sum(axis=0, keepdims=True) - nonzero_sum[nonzero_sum == 0.] = 1. - M_tilde = M / nonzero_sum + solar_thermal = cutout.solar_thermal(**config, matrix=M_tilde.T, + index=clustered_regions.index, + dask_kwargs=dict(scheduler=client), + show_progress=False) - solar_thermal = cutout.solar_thermal(**config, matrix=M_tilde.T, - index=clustered_regions.index) - - solar_thermal.to_netcdf(snakemake.output[f"solar_thermal_{area}"]) + solar_thermal.to_netcdf(snakemake.output.solar_thermal)