Fix cluster use and memory requirements
This commit is contained in:
parent
6a613ea483
commit
cb5681df18
26
Snakefile
26
Snakefile
@ -9,16 +9,13 @@ wildcard_constraints:
|
||||
sectors="[+a-zA-Z0-9]+",
|
||||
opts="[-+a-zA-Z0-9]+"
|
||||
|
||||
rule all:
|
||||
input: "results/summaries/costs2-summary.csv"
|
||||
# rule all:
|
||||
# input: "results/summaries/costs2-summary.csv"
|
||||
|
||||
rule solve_all_elec_networks:
|
||||
input:
|
||||
expand("results/networks/elec_s{simpl}_{clusters}_lv{lv}_{opts}.nc",
|
||||
simpl='',
|
||||
clusters=config['scenario']['clusters'],
|
||||
lv='1.5',
|
||||
opts=config['scenario']['opts'])
|
||||
**config['scenario'])
|
||||
|
||||
rule prepare_links_p_nom:
|
||||
output: 'data/links_p_nom.csv'
|
||||
@ -100,7 +97,7 @@ rule simplify_network:
|
||||
regions_offshore="resources/regions_offshore_{network}_s{simpl}.geojson"
|
||||
benchmark: "benchmarks/simplify_network/{network}_s{simpl}"
|
||||
threads: 1
|
||||
resources: mem_mb=1000
|
||||
resources: mem_mb=3000
|
||||
script: "scripts/simplify_network.py"
|
||||
|
||||
rule cluster_network:
|
||||
@ -114,7 +111,7 @@ rule cluster_network:
|
||||
regions_offshore="resources/regions_offshore_{network}_s{simpl}_{clusters}.geojson"
|
||||
benchmark: "benchmarks/cluster_network/{network}_s{simpl}_{clusters}"
|
||||
threads: 1
|
||||
resources: mem_mb=1000
|
||||
resources: mem_mb=3000
|
||||
script: "scripts/cluster_network.py"
|
||||
|
||||
rule add_sectors:
|
||||
@ -132,18 +129,27 @@ rule prepare_network:
|
||||
output: 'networks/{network}_s{simpl}_{clusters}_lv{lv}_{opts}.nc'
|
||||
threads: 1
|
||||
resources: mem_mb=1000
|
||||
benchmark: "benchmarks/prepare_network/{network}_s{simpl}_{clusters}_lv{lv}_{opts}"
|
||||
script: "scripts/prepare_network.py"
|
||||
|
||||
def partition(w):
|
||||
return 'vres' if int(w.clusters) >= 256 else 'x-men'
|
||||
|
||||
rule solve_network:
|
||||
input: "networks/{network}_s{simpl}_{clusters}_lv{lv}_{opts}.nc"
|
||||
output: "results/networks/{network}_s{simpl}_{clusters}_lv{lv}_{opts}.nc"
|
||||
shadow: "shallow"
|
||||
params: partition=partition
|
||||
log:
|
||||
gurobi="logs/{network}_s{simpl}_{clusters}_lv{lv}_{opts}_gurobi.log",
|
||||
python="logs/{network}_s{simpl}_{clusters}_lv{lv}_{opts}_python.log"
|
||||
python="logs/{network}_s{simpl}_{clusters}_lv{lv}_{opts}_python.log",
|
||||
memory="logs/{network}_s{simpl}_{clusters}_lv{lv}_{opts}_memory.log"
|
||||
benchmark: "benchmarks/solve_network/{network}_s{simpl}_{clusters}_lv{lv}_{opts}"
|
||||
threads: 4
|
||||
resources: mem_mb=lambda w: 100000 * int(w.clusters) // 362
|
||||
resources:
|
||||
mem_mb=lambda w: 4890+310 * int(w.clusters), # without 5000 too small for 256
|
||||
x_men=lambda w: 1 if partition(w) == 'x-men' else 0,
|
||||
vres=lambda w: 1 if partition(w) == 'vres' else 0
|
||||
script: "scripts/solve_network.py"
|
||||
|
||||
rule plot_network:
|
||||
|
12
cluster.yaml
12
cluster.yaml
@ -1,5 +1,9 @@
|
||||
__default__:
|
||||
partition: x-men
|
||||
name: "pypsa-za.{rule}.{wildcards}"
|
||||
output: "logs/cluster/{rule}.{wildcards}.out"
|
||||
error: "logs/cluster/{rule}.{wildcards}.err"
|
||||
partition: x-men
|
||||
name: "pypsa-za.{rule}.{wildcards}"
|
||||
output: "logs/cluster/{rule}.{wildcards}.out"
|
||||
error: "logs/cluster/{rule}.{wildcards}.err"
|
||||
solve_network:
|
||||
partition: "{params.partition}"
|
||||
output: "logs/cluster/{rule}.{wildcards}.out"
|
||||
error: "logs/cluster/{rule}.{wildcards}.err"
|
||||
|
@ -3,8 +3,9 @@ logging_level: INFO
|
||||
|
||||
scenario:
|
||||
sectors: [E] # ,E+EV,E+BEV,E+BEV+V2G] # [ E+EV, E+BEV, E+BEV+V2G ]
|
||||
lv: [1., 1.125, 1.25, 1.5, 2.0, 3.0]
|
||||
clusters: [45, 64, 90, 128, 181, 256, 362] # (2**np.r_[5.5:9:.5]).astype(int)
|
||||
simpl: ['']
|
||||
lv: [1.0, 1.125, 1.25, 1.5, 2.0, 3.0]
|
||||
clusters: [45, 64, 90, 128, 181] #, 256] #, 362] # (2**np.r_[5.5:9:.5]).astype(int)
|
||||
opts: [Co2L-3H] #, LC-FL, LC-T, Ep-T, Co2L-T]
|
||||
|
||||
countries: ['AL', 'AT', 'BA', 'BE', 'BG', 'CH', 'CZ', 'DE', 'DK', 'EE', 'ES', 'FI', 'FR', 'GB', 'GR', 'HR', 'HU', 'IE', 'IT', 'LT', 'LU', 'LV', 'ME', 'MK', 'NL', 'NO', 'PL', 'PT', 'RO', 'RS', 'SE', 'SI', 'SK']
|
||||
@ -120,7 +121,7 @@ solving:
|
||||
load_shedding: true
|
||||
noisy_costs: true
|
||||
min_iterations: 4
|
||||
max_iterations: 8
|
||||
max_iterations: 6
|
||||
# max_iterations: 1
|
||||
# nhours: 10
|
||||
solver:
|
||||
|
@ -2,9 +2,124 @@ import numpy as np
|
||||
import pandas as pd
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
import gc
|
||||
import os
|
||||
# TODO: provide alternative when multiprocessing is not available
|
||||
try:
|
||||
from multiprocessing import Process, Pipe
|
||||
except ImportError:
|
||||
from multiprocessing.dummy import Process, Pipe
|
||||
|
||||
import pypsa
|
||||
from pypsa.descriptors import free_output_series_dataframes
|
||||
from memory_profiler import _get_memory, choose_backend
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
import gc
|
||||
import os
|
||||
|
||||
# TODO: provide alternative when multiprocessing is not available
|
||||
try:
|
||||
from multiprocessing import Process, Pipe
|
||||
except ImportError:
|
||||
from multiprocessing.dummy import Process, Pipe
|
||||
|
||||
import pypsa
|
||||
from pypsa.descriptors import free_output_series_dataframes
|
||||
from memory_profiler import _get_memory, choose_backend
|
||||
|
||||
# The memory logging facilities have been adapted from memory_profiler
|
||||
class MemTimer(Process):
|
||||
"""
|
||||
Write memory consumption over a time interval to file until signaled to
|
||||
stop on the pipe
|
||||
"""
|
||||
|
||||
def __init__(self, monitor_pid, interval, pipe, filename, max_usage, backend, *args, **kw):
|
||||
self.monitor_pid = monitor_pid
|
||||
self.pipe = pipe
|
||||
self.interval = interval
|
||||
self.backend = backend
|
||||
self.n_measurements = 1
|
||||
self.stream = open(filename, 'w') if filename is not None else None
|
||||
self.max_usage = max_usage
|
||||
|
||||
self.timestamps = kw.pop("timestamps", False)
|
||||
self.include_children = kw.pop("include_children", False)
|
||||
|
||||
# get baseline memory usage
|
||||
self.mem_usage = [
|
||||
_get_memory(self.monitor_pid, self.backend, timestamps=self.timestamps,
|
||||
include_children=self.include_children)]
|
||||
if self.stream is not None:
|
||||
self.stream.write("MEM {0:.6f} {1:.4f}\n".format(*self.mem_usage[0]))
|
||||
self.stream.flush()
|
||||
|
||||
super(MemTimer, self).__init__(*args, **kw)
|
||||
|
||||
def run(self):
|
||||
self.pipe.send(0) # we're ready
|
||||
stop = False
|
||||
while True:
|
||||
cur_mem = _get_memory(
|
||||
self.monitor_pid, self.backend, timestamps=self.timestamps,
|
||||
include_children=self.include_children,)
|
||||
|
||||
if self.stream is not None:
|
||||
self.stream.write("MEM {0:.6f} {1:.4f}\n".format(*cur_mem))
|
||||
self.stream.flush()
|
||||
|
||||
if not self.max_usage:
|
||||
self.mem_usage.append(cur_mem)
|
||||
else:
|
||||
self.mem_usage[0] = max(cur_mem, self.mem_usage[0])
|
||||
|
||||
self.n_measurements += 1
|
||||
if stop:
|
||||
break
|
||||
stop = self.pipe.poll(self.interval)
|
||||
# do one more iteration
|
||||
|
||||
if self.stream is not None:
|
||||
self.stream.close()
|
||||
|
||||
self.pipe.send(self.mem_usage)
|
||||
self.pipe.send(self.n_measurements)
|
||||
|
||||
class log_memory(object):
|
||||
def __init__(self, filename=None, interval=1., max_usage=False,
|
||||
timestamps=False, include_children=True):
|
||||
if filename is not None:
|
||||
timestamps = True
|
||||
|
||||
self.filename = filename
|
||||
self.interval = interval
|
||||
self.max_usage = max_usage
|
||||
self.timestamps = timestamps
|
||||
self.include_children = include_children
|
||||
|
||||
def __enter__(self):
|
||||
backend = choose_backend()
|
||||
|
||||
self.child_conn, self.parent_conn = Pipe() # this will store MemTimer's results
|
||||
self.p = MemTimer(os.getpid(), self.interval, self.child_conn, self.filename,
|
||||
backend=backend, timestamps=self.timestamps, max_usage=self.max_usage,
|
||||
include_children=self.include_children)
|
||||
self.p.start()
|
||||
self.parent_conn.recv() # wait until memory logging in subprocess is ready
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is None:
|
||||
self.parent_conn.send(0) # finish timing
|
||||
|
||||
self.mem_usage = self.parent_conn.recv()
|
||||
self.n_measurements = self.parent_conn.recv()
|
||||
|
||||
return False
|
||||
|
||||
def patch_pyomo_tmpdir(tmpdir):
|
||||
# PYOMO should write its lp files into tmp here
|
||||
@ -104,6 +219,8 @@ def solve_network(n):
|
||||
solver_name = solver_options.pop('name')
|
||||
|
||||
def run_lopf(n, allow_warning_status=False, fix_zero_lines=False, fix_ext_lines=False):
|
||||
free_output_series_dataframes(n)
|
||||
|
||||
if not hasattr(n, 'opt') or not isinstance(n.opt, pypsa.opf.PersistentSolver):
|
||||
pypsa.opf.network_lopf_build_model(n, formulation=solve_opts['formulation'])
|
||||
add_opts_constraints(n)
|
||||
@ -124,11 +241,15 @@ def solve_network(n):
|
||||
lines_s_nom=n.lines.loc[n.lines.s_nom_extendable, 's_nom_opt'],
|
||||
links_p_nom=n.links.loc[n.links.p_nom_extendable, 'p_nom_opt'])
|
||||
|
||||
# Firing up solve will increase memory consumption tremendously, so
|
||||
# make sure we freed everything we can
|
||||
gc.collect()
|
||||
status, termination_condition = \
|
||||
pypsa.opf.network_lopf_solve(n,
|
||||
solver_options=solver_options,
|
||||
formulation=solve_opts['formulation'],
|
||||
free_memory={'pypsa'})
|
||||
#free_memory={'pypsa'}
|
||||
)
|
||||
|
||||
assert status == "ok" or allow_warning_status and status == 'warning', \
|
||||
("network_lopf did abort with status={} "
|
||||
@ -245,9 +366,12 @@ if __name__ == "__main__":
|
||||
logging.basicConfig(filename=snakemake.log.python,
|
||||
level=snakemake.config['logging_level'])
|
||||
|
||||
n = pypsa.Network(snakemake.input[0])
|
||||
with log_memory(filename=getattr(snakemake.log, 'memory', None), interval=30., max_usage=True) as mem:
|
||||
n = pypsa.Network(snakemake.input[0])
|
||||
|
||||
n = prepare_network(n)
|
||||
n = solve_network(n)
|
||||
n = prepare_network(n)
|
||||
n = solve_network(n)
|
||||
|
||||
n.export_to_netcdf(snakemake.output[0])
|
||||
n.export_to_netcdf(snakemake.output[0])
|
||||
|
||||
logger.info("Maximum memory usage: {}".format(mem.mem_usage[0]))
|
||||
|
@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
|
||||
snakemake --cluster-config cluster.yaml --cluster "sbatch --parsable -J '{cluster.name}' -p {cluster.partition} -n 1 --cpus-per-task {threads} -o '{cluster.output}' --mem {resources.mem_mb}" "$@"
|
||||
snakemake --latency-wait 100 --cluster-config cluster.yaml --cluster "sbatch --parsable -J '{cluster.name}' -p {cluster.partition} -n 1 -t '6-0' --cpus-per-task {threads} -o '{cluster.output}' --mem {resources.mem_mb}" "$@"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user