Merge pull request #384 from PyPSA/dask-distributed

Enable parallel computing with new dask version
This commit is contained in:
Fabian Hofmann 2022-06-24 14:27:41 +02:00 committed by GitHub
commit 467a1eb5e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 2 deletions

1
.gitignore vendored
View File

@ -19,6 +19,7 @@ gurobi.log
/data
/data/links_p_nom.csv
/cutouts
/dask-worker-space
doc/_build

View File

@ -189,6 +189,7 @@ import logging
from pypsa.geo import haversine
from shapely.geometry import LineString
import time
from dask.distributed import Client, LocalCluster
from _helpers import configure_logging
@ -216,7 +217,9 @@ if __name__ == '__main__':
if correction_factor != 1.:
logger.info(f'correction_factor is set as {correction_factor}')
cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1)
client = Client(cluster, asynchronous=True)
cutout = atlite.Cutout(snakemake.input['cutout'])
regions = gpd.read_file(snakemake.input.regions).set_index('name').rename_axis('bus')
buses = regions.index
@ -266,7 +269,7 @@ if __name__ == '__main__':
potential = capacity_per_sqkm * availability.sum('bus') * area
func = getattr(cutout, resource.pop('method'))
resource['dask_kwargs'] = {'num_workers': nprocesses}
resource['dask_kwargs'] = {"scheduler": client}
capacity_factor = correction_factor * func(capacity_factor=True, **resource)
layout = capacity_factor * area * capacity_per_sqkm
profile, capacities = func(matrix=availability.stack(spatial=['y','x']),