From d18867ce61c7e6d60ad8ef3ba557d8b03bbefd3b Mon Sep 17 00:00:00 2001 From: Fabian Date: Thu, 23 Jun 2022 21:27:18 +0200 Subject: [PATCH 1/3] build_renewable_profiles: use dask client instead of kwargs --- scripts/build_renewable_profiles.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/build_renewable_profiles.py b/scripts/build_renewable_profiles.py index 70cadab4..b77d79e1 100644 --- a/scripts/build_renewable_profiles.py +++ b/scripts/build_renewable_profiles.py @@ -189,6 +189,7 @@ import logging from pypsa.geo import haversine from shapely.geometry import LineString import time +from dask.distributed import Client from _helpers import configure_logging @@ -216,6 +217,7 @@ if __name__ == '__main__': if correction_factor != 1.: logger.info(f'correction_factor is set as {correction_factor}') + client = Client(n_workers=nprocesses) cutout = atlite.Cutout(snakemake.input['cutout']) regions = gpd.read_file(snakemake.input.regions).set_index('name').rename_axis('bus') @@ -266,7 +268,7 @@ if __name__ == '__main__': potential = capacity_per_sqkm * availability.sum('bus') * area func = getattr(cutout, resource.pop('method')) - resource['dask_kwargs'] = {'num_workers': nprocesses} + # resource['dask_kwargs'] = {'num_workers': nprocesses, "scheduler": "threading"} capacity_factor = correction_factor * func(capacity_factor=True, **resource) layout = capacity_factor * area * capacity_per_sqkm profile, capacities = func(matrix=availability.stack(spatial=['y','x']), From 743fdea874aac727e356ba6acf9c787a9eed4cc3 Mon Sep 17 00:00:00 2001 From: Fabian Date: Thu, 23 Jun 2022 21:39:28 +0200 Subject: [PATCH 2/3] add dask-worker-space to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index b4734ab2..559dde47 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ gurobi.log /data /data/links_p_nom.csv /cutouts +/dask-worker-space doc/_build From 75f9719076c39d7d6ff3653917d16b966c5a3e07 Mon Sep 17 00:00:00 2001 From: Fabian Date: Fri, 24 Jun 2022 14:07:51 +0200 Subject: [PATCH 3/3] build_renewable_profiles: use LocalCluster instance --- scripts/build_renewable_profiles.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/scripts/build_renewable_profiles.py b/scripts/build_renewable_profiles.py index b77d79e1..37e1e9de 100644 --- a/scripts/build_renewable_profiles.py +++ b/scripts/build_renewable_profiles.py @@ -189,7 +189,7 @@ import logging from pypsa.geo import haversine from shapely.geometry import LineString import time -from dask.distributed import Client +from dask.distributed import Client, LocalCluster from _helpers import configure_logging @@ -217,8 +217,9 @@ if __name__ == '__main__': if correction_factor != 1.: logger.info(f'correction_factor is set as {correction_factor}') - client = Client(n_workers=nprocesses) - + cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1) + client = Client(cluster, asynchronous=True) + cutout = atlite.Cutout(snakemake.input['cutout']) regions = gpd.read_file(snakemake.input.regions).set_index('name').rename_axis('bus') buses = regions.index @@ -268,7 +269,7 @@ if __name__ == '__main__': potential = capacity_per_sqkm * availability.sum('bus') * area func = getattr(cutout, resource.pop('method')) - # resource['dask_kwargs'] = {'num_workers': nprocesses, "scheduler": "threading"} + resource['dask_kwargs'] = {"scheduler": client} capacity_factor = correction_factor * func(capacity_factor=True, **resource) layout = capacity_factor * area * capacity_per_sqkm profile, capacities = func(matrix=availability.stack(spatial=['y','x']),