From cb5681df18ae6941391be8be5d10e3d83af341b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20H=C3=B6rsch?= Date: Sat, 10 Feb 2018 17:19:46 +0100 Subject: [PATCH] Fix cluster use and memory requirements --- Snakefile | 26 +++++--- cluster.yaml | 12 ++-- config.yaml | 7 +- scripts/solve_network.py | 134 +++++++++++++++++++++++++++++++++++++-- snakemake_cluster | 2 +- 5 files changed, 158 insertions(+), 23 deletions(-) diff --git a/Snakefile b/Snakefile index bba6cbf2..e8161026 100644 --- a/Snakefile +++ b/Snakefile @@ -9,16 +9,13 @@ wildcard_constraints: sectors="[+a-zA-Z0-9]+", opts="[-+a-zA-Z0-9]+" -rule all: - input: "results/summaries/costs2-summary.csv" +# rule all: +# input: "results/summaries/costs2-summary.csv" rule solve_all_elec_networks: input: expand("results/networks/elec_s{simpl}_{clusters}_lv{lv}_{opts}.nc", - simpl='', - clusters=config['scenario']['clusters'], - lv='1.5', - opts=config['scenario']['opts']) + **config['scenario']) rule prepare_links_p_nom: output: 'data/links_p_nom.csv' @@ -100,7 +97,7 @@ rule simplify_network: regions_offshore="resources/regions_offshore_{network}_s{simpl}.geojson" benchmark: "benchmarks/simplify_network/{network}_s{simpl}" threads: 1 - resources: mem_mb=1000 + resources: mem_mb=3000 script: "scripts/simplify_network.py" rule cluster_network: @@ -114,7 +111,7 @@ rule cluster_network: regions_offshore="resources/regions_offshore_{network}_s{simpl}_{clusters}.geojson" benchmark: "benchmarks/cluster_network/{network}_s{simpl}_{clusters}" threads: 1 - resources: mem_mb=1000 + resources: mem_mb=3000 script: "scripts/cluster_network.py" rule add_sectors: @@ -132,18 +129,27 @@ rule prepare_network: output: 'networks/{network}_s{simpl}_{clusters}_lv{lv}_{opts}.nc' threads: 1 resources: mem_mb=1000 + benchmark: "benchmarks/prepare_network/{network}_s{simpl}_{clusters}_lv{lv}_{opts}" script: "scripts/prepare_network.py" +def partition(w): + return 'vres' if int(w.clusters) >= 256 else 'x-men' + rule solve_network: input: "networks/{network}_s{simpl}_{clusters}_lv{lv}_{opts}.nc" output: "results/networks/{network}_s{simpl}_{clusters}_lv{lv}_{opts}.nc" shadow: "shallow" + params: partition=partition log: gurobi="logs/{network}_s{simpl}_{clusters}_lv{lv}_{opts}_gurobi.log", - python="logs/{network}_s{simpl}_{clusters}_lv{lv}_{opts}_python.log" + python="logs/{network}_s{simpl}_{clusters}_lv{lv}_{opts}_python.log", + memory="logs/{network}_s{simpl}_{clusters}_lv{lv}_{opts}_memory.log" benchmark: "benchmarks/solve_network/{network}_s{simpl}_{clusters}_lv{lv}_{opts}" threads: 4 - resources: mem_mb=lambda w: 100000 * int(w.clusters) // 362 + resources: + mem_mb=lambda w: 4890+310 * int(w.clusters), # without 5000 too small for 256 + x_men=lambda w: 1 if partition(w) == 'x-men' else 0, + vres=lambda w: 1 if partition(w) == 'vres' else 0 script: "scripts/solve_network.py" rule plot_network: diff --git a/cluster.yaml b/cluster.yaml index 023e266e..1c1cccef 100644 --- a/cluster.yaml +++ b/cluster.yaml @@ -1,5 +1,9 @@ __default__: - partition: x-men - name: "pypsa-za.{rule}.{wildcards}" - output: "logs/cluster/{rule}.{wildcards}.out" - error: "logs/cluster/{rule}.{wildcards}.err" + partition: x-men + name: "pypsa-za.{rule}.{wildcards}" + output: "logs/cluster/{rule}.{wildcards}.out" + error: "logs/cluster/{rule}.{wildcards}.err" +solve_network: + partition: "{params.partition}" + output: "logs/cluster/{rule}.{wildcards}.out" + error: "logs/cluster/{rule}.{wildcards}.err" diff --git a/config.yaml b/config.yaml index 3c737600..e08502c9 100644 --- a/config.yaml +++ b/config.yaml @@ -3,8 +3,9 @@ logging_level: INFO scenario: sectors: [E] # ,E+EV,E+BEV,E+BEV+V2G] # [ E+EV, E+BEV, E+BEV+V2G ] - lv: [1., 1.125, 1.25, 1.5, 2.0, 3.0] - clusters: [45, 64, 90, 128, 181, 256, 362] # (2**np.r_[5.5:9:.5]).astype(int) + simpl: [''] + lv: [1.0, 1.125, 1.25, 1.5, 2.0, 3.0] + clusters: [45, 64, 90, 128, 181] #, 256] #, 362] # (2**np.r_[5.5:9:.5]).astype(int) opts: [Co2L-3H] #, LC-FL, LC-T, Ep-T, Co2L-T] countries: ['AL', 'AT', 'BA', 'BE', 'BG', 'CH', 'CZ', 'DE', 'DK', 'EE', 'ES', 'FI', 'FR', 'GB', 'GR', 'HR', 'HU', 'IE', 'IT', 'LT', 'LU', 'LV', 'ME', 'MK', 'NL', 'NO', 'PL', 'PT', 'RO', 'RS', 'SE', 'SI', 'SK'] @@ -120,7 +121,7 @@ solving: load_shedding: true noisy_costs: true min_iterations: 4 - max_iterations: 8 + max_iterations: 6 # max_iterations: 1 # nhours: 10 solver: diff --git a/scripts/solve_network.py b/scripts/solve_network.py index 9dc29aba..df95f3a8 100644 --- a/scripts/solve_network.py +++ b/scripts/solve_network.py @@ -2,9 +2,124 @@ import numpy as np import pandas as pd import logging logger = logging.getLogger(__name__) +import gc +import os +# TODO: provide alternative when multiprocessing is not available +try: + from multiprocessing import Process, Pipe +except ImportError: + from multiprocessing.dummy import Process, Pipe import pypsa +from pypsa.descriptors import free_output_series_dataframes +from memory_profiler import _get_memory, choose_backend +import numpy as np +import pandas as pd +import logging +logger = logging.getLogger(__name__) +import gc +import os +# TODO: provide alternative when multiprocessing is not available +try: + from multiprocessing import Process, Pipe +except ImportError: + from multiprocessing.dummy import Process, Pipe + +import pypsa +from pypsa.descriptors import free_output_series_dataframes +from memory_profiler import _get_memory, choose_backend + +# The memory logging facilities have been adapted from memory_profiler +class MemTimer(Process): + """ + Write memory consumption over a time interval to file until signaled to + stop on the pipe + """ + + def __init__(self, monitor_pid, interval, pipe, filename, max_usage, backend, *args, **kw): + self.monitor_pid = monitor_pid + self.pipe = pipe + self.interval = interval + self.backend = backend + self.n_measurements = 1 + self.stream = open(filename, 'w') if filename is not None else None + self.max_usage = max_usage + + self.timestamps = kw.pop("timestamps", False) + self.include_children = kw.pop("include_children", False) + + # get baseline memory usage + self.mem_usage = [ + _get_memory(self.monitor_pid, self.backend, timestamps=self.timestamps, + include_children=self.include_children)] + if self.stream is not None: + self.stream.write("MEM {0:.6f} {1:.4f}\n".format(*self.mem_usage[0])) + self.stream.flush() + + super(MemTimer, self).__init__(*args, **kw) + + def run(self): + self.pipe.send(0) # we're ready + stop = False + while True: + cur_mem = _get_memory( + self.monitor_pid, self.backend, timestamps=self.timestamps, + include_children=self.include_children,) + + if self.stream is not None: + self.stream.write("MEM {0:.6f} {1:.4f}\n".format(*cur_mem)) + self.stream.flush() + + if not self.max_usage: + self.mem_usage.append(cur_mem) + else: + self.mem_usage[0] = max(cur_mem, self.mem_usage[0]) + + self.n_measurements += 1 + if stop: + break + stop = self.pipe.poll(self.interval) + # do one more iteration + + if self.stream is not None: + self.stream.close() + + self.pipe.send(self.mem_usage) + self.pipe.send(self.n_measurements) + +class log_memory(object): + def __init__(self, filename=None, interval=1., max_usage=False, + timestamps=False, include_children=True): + if filename is not None: + timestamps = True + + self.filename = filename + self.interval = interval + self.max_usage = max_usage + self.timestamps = timestamps + self.include_children = include_children + + def __enter__(self): + backend = choose_backend() + + self.child_conn, self.parent_conn = Pipe() # this will store MemTimer's results + self.p = MemTimer(os.getpid(), self.interval, self.child_conn, self.filename, + backend=backend, timestamps=self.timestamps, max_usage=self.max_usage, + include_children=self.include_children) + self.p.start() + self.parent_conn.recv() # wait until memory logging in subprocess is ready + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is None: + self.parent_conn.send(0) # finish timing + + self.mem_usage = self.parent_conn.recv() + self.n_measurements = self.parent_conn.recv() + + return False def patch_pyomo_tmpdir(tmpdir): # PYOMO should write its lp files into tmp here @@ -104,6 +219,8 @@ def solve_network(n): solver_name = solver_options.pop('name') def run_lopf(n, allow_warning_status=False, fix_zero_lines=False, fix_ext_lines=False): + free_output_series_dataframes(n) + if not hasattr(n, 'opt') or not isinstance(n.opt, pypsa.opf.PersistentSolver): pypsa.opf.network_lopf_build_model(n, formulation=solve_opts['formulation']) add_opts_constraints(n) @@ -124,11 +241,15 @@ def solve_network(n): lines_s_nom=n.lines.loc[n.lines.s_nom_extendable, 's_nom_opt'], links_p_nom=n.links.loc[n.links.p_nom_extendable, 'p_nom_opt']) + # Firing up solve will increase memory consumption tremendously, so + # make sure we freed everything we can + gc.collect() status, termination_condition = \ pypsa.opf.network_lopf_solve(n, solver_options=solver_options, formulation=solve_opts['formulation'], - free_memory={'pypsa'}) + #free_memory={'pypsa'} + ) assert status == "ok" or allow_warning_status and status == 'warning', \ ("network_lopf did abort with status={} " @@ -245,9 +366,12 @@ if __name__ == "__main__": logging.basicConfig(filename=snakemake.log.python, level=snakemake.config['logging_level']) - n = pypsa.Network(snakemake.input[0]) + with log_memory(filename=getattr(snakemake.log, 'memory', None), interval=30., max_usage=True) as mem: + n = pypsa.Network(snakemake.input[0]) - n = prepare_network(n) - n = solve_network(n) + n = prepare_network(n) + n = solve_network(n) - n.export_to_netcdf(snakemake.output[0]) + n.export_to_netcdf(snakemake.output[0]) + + logger.info("Maximum memory usage: {}".format(mem.mem_usage[0])) diff --git a/snakemake_cluster b/snakemake_cluster index 1851bc87..5900bb50 100755 --- a/snakemake_cluster +++ b/snakemake_cluster @@ -1,4 +1,4 @@ #!/bin/bash -snakemake --cluster-config cluster.yaml --cluster "sbatch --parsable -J '{cluster.name}' -p {cluster.partition} -n 1 --cpus-per-task {threads} -o '{cluster.output}' --mem {resources.mem_mb}" "$@" +snakemake --latency-wait 100 --cluster-config cluster.yaml --cluster "sbatch --parsable -J '{cluster.name}' -p {cluster.partition} -n 1 -t '6-0' --cpus-per-task {threads} -o '{cluster.output}' --mem {resources.mem_mb}" "$@"