pypsa-eur/scripts/_benchmark.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

257 lines
7.0 KiB
Python
Raw Permalink Normal View History

2023-08-30 12:32:57 +00:00
# -*- coding: utf-8 -*-
2024-02-19 15:21:48 +00:00
# SPDX-FileCopyrightText: : 2020-2024 The PyPSA-Eur Authors
2023-10-05 10:03:06 +00:00
#
# SPDX-License-Identifier: MIT
2023-08-30 12:32:57 +00:00
"""
2023-08-30 12:32:57 +00:00
"""
from __future__ import absolute_import, print_function
import logging
import os
import sys
import time
2024-01-19 09:47:58 +00:00
from memory_profiler import _get_memory, choose_backend
2023-08-30 12:32:57 +00:00
logger = logging.getLogger(__name__)
# TODO: provide alternative when multiprocessing is not available
try:
from multiprocessing import Pipe, Process
except ImportError:
2024-01-19 09:47:58 +00:00
from multiprocessing.dummy import Pipe, Process
2023-08-30 12:32:57 +00:00
2023-08-30 12:32:57 +00:00
# The memory logging facilities have been adapted from memory_profiler
class MemTimer(Process):
"""
Write memory consumption over a time interval to file until signaled to
stop on the pipe.
"""
def __init__(
self, monitor_pid, interval, pipe, filename, max_usage, backend, *args, **kw
):
self.monitor_pid = monitor_pid
self.interval = interval
self.pipe = pipe
self.filename = filename
self.max_usage = max_usage
self.backend = backend
self.timestamps = kw.pop("timestamps", True)
self.include_children = kw.pop("include_children", True)
super(MemTimer, self).__init__(*args, **kw)
def run(self):
# get baseline memory usage
cur_mem = _get_memory(
self.monitor_pid,
self.backend,
timestamps=self.timestamps,
include_children=self.include_children,
)
n_measurements = 1
mem_usage = cur_mem if self.max_usage else [cur_mem]
if self.filename is not None:
stream = open(self.filename, "w")
stream.write("MEM {0:.6f} {1:.4f}\n".format(*cur_mem))
stream.flush()
else:
stream = None
self.pipe.send(0) # we're ready
stop = False
while True:
cur_mem = _get_memory(
self.monitor_pid,
self.backend,
timestamps=self.timestamps,
include_children=self.include_children,
)
if stream is not None:
stream.write("MEM {0:.6f} {1:.4f}\n".format(*cur_mem))
stream.flush()
n_measurements += 1
if not self.max_usage:
mem_usage.append(cur_mem)
else:
mem_usage = max(cur_mem, mem_usage)
if stop:
break
stop = self.pipe.poll(self.interval)
# do one more iteration
if stream is not None:
stream.close()
self.pipe.send(mem_usage)
self.pipe.send(n_measurements)
2023-08-30 12:32:57 +00:00
class memory_logger(object):
"""
Context manager for taking and reporting memory measurements at fixed
intervals from a separate process, for the duration of a context.
Parameters
----------
filename : None|str
Name of the text file to log memory measurements, if None no log is
created (defaults to None)
interval : float
Interval between measurements (defaults to 1.)
max_usage : bool
If True, only store and report the maximum value (defaults to True)
timestamps : bool
Whether to record tuples of memory usage and timestamps; if logging to
a file timestamps are always kept (defaults to True)
include_children : bool
Whether the memory of subprocesses is to be included (default: True)
Arguments
---------
n_measurements : int
Number of measurements that have been taken
mem_usage : (float, float)|[(float, float)]
All memory measurements and timestamps (if timestamps was True) or only
the maximum memory usage and its timestamp
Note
----
The arguments are only set after all the measurements, i.e. outside of the
with statement.
Example
-------
with memory_logger(filename="memory.log", max_usage=True) as mem:
# Do a lot of long running memory intensive stuff
hard_memory_bound_stuff()
max_mem, timestamp = mem.mem_usage
"""
2023-08-30 12:32:57 +00:00
def __init__(
self,
filename=None,
interval=1.0,
max_usage=True,
timestamps=True,
include_children=True,
):
if filename is not None:
timestamps = True
self.filename = filename
self.interval = interval
self.max_usage = max_usage
self.timestamps = timestamps
self.include_children = include_children
def __enter__(self):
backend = choose_backend()
self.child_conn, self.parent_conn = Pipe() # this will store MemTimer's results
self.p = MemTimer(
os.getpid(),
self.interval,
self.child_conn,
self.filename,
backend=backend,
timestamps=self.timestamps,
max_usage=self.max_usage,
include_children=self.include_children,
)
self.p.start()
self.parent_conn.recv() # wait until memory logging in subprocess is ready
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.parent_conn.send(0) # finish timing
self.mem_usage = self.parent_conn.recv()
self.n_measurements = self.parent_conn.recv()
else:
self.p.terminate()
return False
2023-08-30 12:32:57 +00:00
class timer(object):
level = 0
opened = False
def __init__(self, name="", verbose=True):
self.name = name
self.verbose = verbose
def __enter__(self):
if self.verbose:
if self.opened:
sys.stdout.write("\n")
if len(self.name) > 0:
sys.stdout.write((".. " * self.level) + self.name + ": ")
sys.stdout.flush()
self.__class__.opened = True
self.__class__.level += 1
self.start = time.time()
return self
def print_usec(self, usec):
if usec < 1000:
print("%.1f usec" % usec)
else:
msec = usec / 1000
if msec < 1000:
print("%.1f msec" % msec)
else:
sec = msec / 1000
print("%.1f sec" % sec)
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.opened and self.verbose:
sys.stdout.write(".. " * self.level)
if exc_type is None:
stop = time.time()
self.usec = usec = (stop - self.start) * 1e6
if self.verbose:
self.print_usec(usec)
elif self.verbose:
print("failed")
sys.stdout.flush()
self.__class__.level -= 1
if self.verbose:
self.__class__.opened = False
return False
2023-08-30 12:32:57 +00:00
class optional(object):
def __init__(self, variable, contextman):
self.variable = variable
self.contextman = contextman
def __enter__(self):
if self.variable:
return self.contextman.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.variable:
return self.contextman.__exit__(exc_type, exc_val, exc_tb)
return False