build_temperatur_profiles: parallelize

This commit is contained in:
Fabian Neumann 2022-12-28 12:20:34 +01:00
parent 99343b1221
commit 097d054f06
2 changed files with 26 additions and 24 deletions

View File

@ -177,19 +177,14 @@ rule build_heat_demands:
rule build_temperature_profiles: rule build_temperature_profiles:
input: input:
pop_layout_total="resources/pop_layout_total.nc", pop_layout="resources/pop_layout_{scope}.nc",
pop_layout_urban="resources/pop_layout_urban.nc",
pop_layout_rural="resources/pop_layout_rural.nc",
regions_onshore=pypsaeur("resources/regions_onshore_elec_s{simpl}_{clusters}.geojson") regions_onshore=pypsaeur("resources/regions_onshore_elec_s{simpl}_{clusters}.geojson")
output: output:
temp_soil_total="resources/temp_soil_total_elec_s{simpl}_{clusters}.nc", temp_soil="resources/temp_soil_{scope}_elec_s{simpl}_{clusters}.nc",
temp_soil_rural="resources/temp_soil_rural_elec_s{simpl}_{clusters}.nc", temp_air="resources/temp_air_{scope}_elec_s{simpl}_{clusters}.nc",
temp_soil_urban="resources/temp_soil_urban_elec_s{simpl}_{clusters}.nc",
temp_air_total="resources/temp_air_total_elec_s{simpl}_{clusters}.nc",
temp_air_rural="resources/temp_air_rural_elec_s{simpl}_{clusters}.nc",
temp_air_urban="resources/temp_air_urban_elec_s{simpl}_{clusters}.nc"
resources: mem_mb=20000 resources: mem_mb=20000
benchmark: "benchmarks/build_temperature_profiles/s{simpl}_{clusters}" threads: 8
benchmark: "benchmarks/build_temperature_profiles/{scope}_s{simpl}_{clusters}"
script: "scripts/build_temperature_profiles.py" script: "scripts/build_temperature_profiles.py"

View File

@ -5,6 +5,7 @@ import atlite
import pandas as pd import pandas as pd
import xarray as xr import xarray as xr
import numpy as np import numpy as np
from dask.distributed import Client, LocalCluster
if __name__ == '__main__': if __name__ == '__main__':
if 'snakemake' not in globals(): if 'snakemake' not in globals():
@ -15,6 +16,10 @@ if __name__ == '__main__':
clusters=48, clusters=48,
) )
nprocesses = int(snakemake.threads)
cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1)
client = Client(cluster, asynchronous=True)
time = pd.date_range(freq='h', **snakemake.config['snapshots']) time = pd.date_range(freq='h', **snakemake.config['snapshots'])
cutout_config = snakemake.config['atlite']['cutout'] cutout_config = snakemake.config['atlite']['cutout']
cutout = atlite.Cutout(cutout_config).sel(time=time) cutout = atlite.Cutout(cutout_config).sel(time=time)
@ -24,23 +29,25 @@ if __name__ == '__main__':
I = cutout.indicatormatrix(clustered_regions) I = cutout.indicatormatrix(clustered_regions)
for area in ["total", "rural", "urban"]: pop_layout = xr.open_dataarray(snakemake.input.pop_layout)
pop_layout = xr.open_dataarray(snakemake.input[f'pop_layout_{area}']) stacked_pop = pop_layout.stack(spatial=('y', 'x'))
M = I.T.dot(np.diag(I.dot(stacked_pop)))
stacked_pop = pop_layout.stack(spatial=('y', 'x')) nonzero_sum = M.sum(axis=0, keepdims=True)
M = I.T.dot(np.diag(I.dot(stacked_pop))) nonzero_sum[nonzero_sum == 0.] = 1.
M_tilde = M / nonzero_sum
nonzero_sum = M.sum(axis=0, keepdims=True) temp_air = cutout.temperature(
nonzero_sum[nonzero_sum == 0.] = 1. matrix=M_tilde.T, index=clustered_regions.index,
M_tilde = M / nonzero_sum dask_kwargs=dict(scheduler=client),
show_progress=False)
temp_air = cutout.temperature( temp_air.to_netcdf(snakemake.output.temp_air)
matrix=M_tilde.T, index=clustered_regions.index)
temp_air.to_netcdf(snakemake.output[f"temp_air_{area}"]) temp_soil = cutout.soil_temperature(
matrix=M_tilde.T, index=clustered_regions.index,
dask_kwargs=dict(scheduler=client),
show_progress=False)
temp_soil = cutout.soil_temperature( temp_soil.to_netcdf(snakemake.output.temp_soil)
matrix=M_tilde.T, index=clustered_regions.index)
temp_soil.to_netcdf(snakemake.output[f"temp_soil_{area}"])