Source code for snek5000.output.base

"""Base class for ``sim.output``.

"""

import inspect
import logging
import os
import pkgutil
import shutil
import stat
import textwrap
import warnings
from importlib import resources
from itertools import chain
from pathlib import Path
from socket import gethostname

import yaml
from inflection import underscore

from fluiddyn.io import stdout_redirected
from fluiddyn.util import mpi
from fluidsim_core.output import OutputCore
from fluidsim_core.params import iter_complete_params
from snek5000 import __version__, get_snek_resource, logger
from snek5000.make import _Nek5000Make
from snek5000.params import _save_par_file
from snek5000.solvers import get_solver_package, is_package
from snek5000.util import docstring_params
from snek5000.util.files import bisect_nek_files_by_time
from snek5000.util.smake import append_debug_flags, set_compiler_verbosity

from . import _make_path_session


[docs]class MissingConfigFilter:
[docs] def filter(self, record): msg = record.msg if hasattr(msg, "startswith") and msg.startswith( "Missing a configuration file" ): if hasattr(self, "emitted") and self.emitted: return False self.emitted = True return True
missing_config_filter = MissingConfigFilter() logger.addFilter(missing_config_filter)
[docs]class Output(OutputCore): """Container and methods for getting paths of and copying case files. Some important methods: - :meth:`snek5000.output.base.Output.get_path_solver_package` points to the directory containing the case files. - :meth:`snek5000.output.base.Output.get_paths` populates a list of files to copy. and important attributes: .. autoattribute:: name_solver :noindex: Initialized from ``sim.info.solver.short_name`` used to discover source code files, such as usr, box, par files. The value of ``name_solver`` is also used to identify the entrypoint pointing to the solver module. Have a look at the :ref:`packaging tutorial <packaging>`. .. autoattribute:: package Initialized using :func:`snek5000.solvers.get_solver_package` and :attr:`name_solver`, points to the path to the package to discover source code files. .. autoattribute:: path_run :noindex: Path to the generated simulation directory. .. autoattribute:: path_session Path to subdirectory under :attr:`path_run` which would contain the field files upon execution. This path would be written into the `SESSION.NAME` file. """ _config_filename = "config_simul.yml" @property def excludes(self): """Prefixes and suffixes of files which should be excluded from being copied.""" return { "prefix": "__", "suffix": (".vimrc", ".tar.gz", ".o", ".py", ".usr.f", ".par.cfg"), } @property def makefile_usr_sources(self): """ Sources for inclusion to makefile_usr.inc Dict[directory] -> list of source files """ return { # "source_directory": [ # (src1a.f, src1b.f) # (src2a.f, src2b.f, src2c.f), ... # ] } @property def makefile_usr_obj(self): """Object files to be included in compilation. Should be exported as USR environment variable. """ makefile_usr_obj = [ sources[0].replace(".f", ".o") for sources in chain.from_iterable(self.makefile_usr_sources.values()) ] return makefile_usr_obj @property def fortran_inc_flags(self): return (f"-I{inc_dir}" for inc_dir in self.makefile_usr_sources)
[docs] @classmethod def _set_info_solver_classes(cls, classes): """Set the the classes for info_solver.classes.Output""" classes._set_child( "PrintStdout", dict( module_name="snek5000.output.print_stdout", class_name="PrintStdOut", ), ) classes._set_child( "PhysFields", dict(module_name="snek5000.output.phys_fields", class_name="PhysFields"), ) classes._set_child( "HistoryPoints", dict( module_name="snek5000.output.history_points", class_name="HistoryPoints", ), ) classes._set_child( "RemainingClockTime", dict( module_name="snek5000.output.remaining_clock_time", class_name="RemainingClockTime", ), )
[docs] @classmethod def _complete_info_solver(cls, info_solver): """Complete the info_solver instance with child class details (module and class names). """ classes = info_solver.classes.Output._set_child("classes") cls._set_info_solver_classes(classes) # iteratively call _complete_info_solver of the above classes info_solver.classes.Output.complete_with_classes()
[docs] @staticmethod def _complete_params_with_default(params, info_solver): """This static method is used to complete the *params* container.""" # Bare minimum attribs = { "HAS_TO_SAVE": True, "sub_directory": "", "session_id": 0, } params._set_child("output", attribs=attribs) params.output._set_doc( textwrap.dedent( """ - ``HAS_TO_SAVE``: bool (default: True) If False, nothing new is saved in the directory of the simulation. - ``sub_directory``: str (default: "") A name of a sub-directory (relative to $FLUIDDYN_PATH_SCRATCH) wherein the directory of the simulation (``path_run``) is saved. - ``session_id``: int (default: 0) Determines the sub-directory, ``path_session`` in which the field files would be generated during runtime. The session directory takes the form `session_{session_id}`. .. note:: In short, the field files would be generated under ``$FLUIDDYN_PATH_SCRATCH/<path_run>/<path_session>`` """ ) ) dict_classes = info_solver.classes.Output.import_classes() iter_complete_params(params, info_solver, dict_classes.values())
[docs] @classmethod def get_path_solver_package(cls): """Get the path towards the solver package.""" return Path(inspect.getmodule(cls).__file__).parent
[docs] @classmethod def find_configfile(cls, host=None) -> Path: """Get path of the Snakemake configuration file for the current machine. All configuration files are stored under ``etc`` sub-package. Parameters ---------- host: str Override hostname detection and specify it instead """ if not host: host = os.getenv( "SNIC_RESOURCE", os.getenv("GITHUB_WORKFLOW", gethostname()) ) path_solver_package = cls.get_path_solver_package() xdg_config = Path( os.path.expandvars(os.getenv("XDG_CONFIG_HOME", "$HOME/.config")) ) configfile_root = path_solver_package / "etc" / f"{host}.yml" configfile_xdg_config_host = xdg_config / f"snek5000/{host}.yml" configfile_xdg_config = xdg_config / "snek5000.yml" configfile_default = Path(get_snek_resource("default_configfile.yml")) custom_configfiles = ( configfile_xdg_config_host, configfile_xdg_config, configfile_root, ) for configfile in custom_configfiles: if configfile.exists(): break else: configfile = configfile_default logger.warning( ( "Missing a configuration file describing compilers and " "flags. Create one at either of the following paths to " "avoid future warnings:\n" ) + "\n".join(map(str, custom_configfiles)) + "\nThe command `snek-generate-config` could be used to create " "a user config file for you." f"\nUsing default configuration for now:\n{configfile}" ) return configfile
[docs] @classmethod def update_snakemake_config( cls, config, name_solver, /, verbosity=0, env_sensitive=None, **kwargs ): """Update snakemake config in-place with name of the solver / case, path to configfile and compiler flags Parameters ---------- config: dict Snakemake configuration name_solver: str Short name of the solver, also known as case name verbosity: int Set compiler verbosity level. See :func:`snek5000.util.smake.set_compiler_verbosity` env_sensitive: bool (None) If ``False``, the ``config`` dictionary is not modified (allows for reproducible runs). If ``True``, the ``config`` dictionary is modified based on environment variables. If ``None`` (default), the value of ``env_sensitive`` is obtained with ``os.environ.get("SNEK_UPDATE_CONFIG_ENV_SENSITIVE", False)``. .. deprecated:: 0.8.0 The ``warnings`` parameter is deprecated! Use ``verbosity=0`` (now default) to disable warnings. If you need ``warnings=True``, similar behaviour can be obtained by ``verbosity=1`` or ``verbosity=2``. """ mandatory_config = { "CC", "FC", "MPICC", "MPIFC", "MPIEXEC", "MPIEXEC_FLAGS", "CFLAGS", "FFLAGS", } missing_config = mandatory_config - set(config) if missing_config: raise ValueError( f"Some keys are missing from the configfile " f"{cls.find_configfile()}: {missing_config}" ) try: # Suppress warnings for not instantiating Output with sim or params logging_level = logger.getEffectiveLevel() logger.setLevel(logging.ERROR) temp = cls() finally: logger.setLevel(logging_level) config.update( { "CASE": name_solver, "file": Path(cls._config_filename).resolve(), "includes": " ".join(temp.fortran_inc_flags), "objects": " ".join(temp.makefile_usr_obj), } ) if "warnings" in kwargs: warnings.warn( "Parameter warnings is deprecated, use ``verbosity`` instead/", DeprecationWarning, ) verbosity = int(kwargs["warnings"]) set_compiler_verbosity(config, verbosity) append_debug_flags(config) if env_sensitive is None: env_sensitive = os.environ.get( "SNEK_UPDATE_CONFIG_ENV_SENSITIVE", False ) if isinstance(env_sensitive, str): # correct for "0", "false", "False" env_sensitive = bool(yaml.safe_load(env_sensitive)) if env_sensitive: logger.info( "env_sensitive = True => attempting to update config from environment variables." ) config.update( { key: os.getenv(key, original_value) for key, original_value in config.items() } )
def __init__(self, sim=None, params=None): self.sim = sim try: self.name_solver = sim.info.solver.short_name except AttributeError: pass else: self.package = get_solver_package(self.name_solver) self.path_solver_package = self.get_path_solver_package() if sim: self.oper = sim.oper self.params = sim.params.output # Same as package name __name__ super().__init__(sim) elif params: # At least initialize params self.params = params.output else: self.params = None logger.warning( "Initializing Output class without sim or params might lead to errors." ) self.path_session = self._init_path_session() if sim: # initialize objects dict_classes = sim.info.solver.classes.Output.import_classes() for cls_name, Class in dict_classes.items(): # only initialize if Class is not the Output class if not isinstance(self, Class): obj_name = underscore(cls_name) setattr(self, obj_name, Class(self)) self.sim._objects_to_print += "{:28s}{}\n".format( f"sim.output.{obj_name}: ", Class ) def _init_path_session(self): """Initialize :attr:`path_session` and ``params.output.path_session`` from ``params.output.session_id``. Unlike :meth:`_init_path_run`, the directory will not be created. Returns ------- path_session: path-like """ if not self.params or not hasattr(self, "path_run"): logger.debug("Attribute sim.output.path_session will not be initialized.") return None try: session_id = self.params.session_id except AttributeError: # For compatibility while loading old simulations path_session = Path(self.path_run) logger.warning( "Parameter params.output.session_id is undefined. " "Attribute sim.output.path_session is set as sim.output.path_run." ) else: path_session = _make_path_session(self.path_run, session_id) self.params._set_attrib("path_session", path_session) return path_session
[docs] def _init_sim_repr_maker(self): """Adds mesh description to name of the simulation. Called by the ``_init_name_run`` method""" sim_repr_maker = super()._init_sim_repr_maker() self.oper._modify_sim_repr_maker(sim_repr_maker) return sim_repr_maker
[docs] def _get_resources(self, package=None): """Get a generator of resources (files) in a package, excluding directories (subpackages). :returns: generator """ excludes = self.excludes if not package: package = self.package try: contents_pkg = resources.contents(package) except ImportError: raise FileNotFoundError( f"Cannot resolve subpackage name_solver={package} " f"at path_solver_package={self.path_solver_package}" ) return ( f for f in contents_pkg if ( resources.is_resource(package, f) and not any(f.startswith(ext) for ext in excludes["prefix"]) and not any(f.endswith(ext) for ext in excludes["suffix"]) ) )
[docs] def _get_subpackages(self): """Get a dictionary of subpackages with values generated by :meth:`_get_resources`. :returns: dict """ root = self.path_solver_package subpackages = { subpkg.name.replace(f"{root.name}.", ""): self._get_resources(subpkg.name) for subpkg in pkgutil.walk_packages([str(root)], prefix=f"{self.package}.") if is_package(subpkg) } return subpackages
[docs] def get_paths(self): """Get a list of paths to all case files. :returns: list """ paths = [] # abl.usr -> /path/to/abl/abl.usr paths += [ self.path_solver_package / resource for resource in self._get_resources() ] for subpkg, res in self._get_subpackages().items(): # toolbox -> /path/to/abl/toolbox subpkg_root = self.path_solver_package / subpkg.replace(".", os.sep) # main.f -> /path/to/abl/toolbox/main.f paths += [subpkg_root / resource for resource in res] return paths
[docs] def copy(self, new_dir, force=False): """Copy case files to a new directory. The directory does not have to be present. :param new_dir: A str or Path-like instance pointing to the new directory. :param force: Force copy would overwrite if files already exist. """ # Avoid race conditions! Should be only executed by rank 0. if mpi.rank != 0: return abs_paths = self.get_paths() subpackages = self._get_subpackages() path_solver_package = self.path_solver_package def conditional_ignore(src, names): """Ignore if not found in ``abs_paths``.""" src = Path(src) include = abs_paths + [ path_solver_package / subpkg for subpkg in subpackages ] exclude = tuple( name for name in names if not any((src / name) == path for path in include) ) logger.debug( "".join( ( f"- src: {src}", "\n- include:", " ".join(Path(i).name for i in include), "\n- exclude:", " ".join(Path(i).name for i in exclude), ) ) ) return exclude new_root = Path(new_dir) # `dirs_exist_ok`` new in Python 3.8 shutil.copytree( src=path_solver_package, dst=new_root, symlinks=False, ignore=conditional_ignore, dirs_exist_ok=True, ) # special case for .usr.f: copy to .usr paths_usr_f = list(path_solver_package.glob("*.usr.f")) for path_usr_f in paths_usr_f: shutil.copyfile(path_usr_f, new_root / path_usr_f.stem)
[docs] def write_box(self, template): """Write <case name>.box file from box.j2 template. .. seealso:: :ref:`nek:tools_genbox` """ if mpi.rank == 0: box_file = self.sim.path_run / f"{self.name_solver}.box" logger.info(f"Writing box file... {box_file}") with open(box_file, "w") as fp: self.sim.oper.write_box( template, fp, comments=self.sim.params.short_name_type_run )
[docs] def write_size(self, template): """Write SIZE file from SIZE.j2 template. .. seealso:: Nek5000 docs on :ref:`nek:case_files_size` """ if mpi.rank == 0: size_file = self.sim.path_run / "SIZE" logger.info(f"Writing SIZE file... {size_file}") with open(size_file, "w") as fp: self.oper.write_size( template, fp, comments=self.sim.params.short_name_type_run )
[docs] def write_makefile_usr(self, template, fp=None, **template_vars): """Write the makefile_usr.inc file which gets included in the main makefile by the ``makenek`` tool. Parameters ---------- template : jinja2.environment.Template Template instance loaded from something like ``makefile_usr.inc.j2`` fp : io.TextIOWrapper File handler to write to comments: str Comments on top of the box file template_vars: dict Keyword arguments passed while rendering the Jinja templates """ paths_of_sources = [] for path_dir, list_of_sources in self.makefile_usr_sources.items(): for sources in list_of_sources: paths_of_sources.append([f"{path_dir}/{file}" for file in sources]) if mpi.rank == 0: comments = "Autogenerated using snek5000.output.Output.write_makefile_usr\n" if self.sim is not None: comments += self.sim.params.short_name_type_run template_vars.update( {"list_of_sources": paths_of_sources, "comments": comments} ) output = template.render(**template_vars) if fp: fp.write(output) else: makefile_usr = self.sim.path_run / "makefile_usr.inc" with open(makefile_usr, "w") as fp: fp.write(output)
[docs] def write_snakemake_config(self, custom_env_vars=None, host=None): """Write the config file in the simulation directory Parameters ---------- custom_env_vars: dict (None) Environment variables used to update the configuration found by :meth:`find_configfile`. host: str Override hostname detection and specify it instead """ if mpi.rank != 0: return path_configfile = self.find_configfile(host=host) path_configfile_simul = self.sim.path_run / self._config_filename with open(path_configfile) as file: config = yaml.safe_load(file) if custom_env_vars is None: shutil.copyfile(path_configfile, path_configfile_simul) else: config.update(custom_env_vars) with open(path_configfile_simul, "w") as file: yaml.dump(config, file) return config
[docs] @staticmethod def build_nek5000(config): """Build Nek5000, if needed. This method is automatically invoked during :meth:`post_init`. Examples -------- If compiler configuration is changed via a script after Simulation initialization, a rebuild can be manually triggered as follows: >>> config = sim.output.write_snakemake_config( ... custom_env_vars={"CFLAGS": "-O0 -g", FFLAGS": "-O0 -g"} ... ) >>> sim.output.build_nek5000(config) """ nek5000 = _Nek5000Make() if not nek5000.build(config): raise RuntimeError("Nek5000 build failed.")
[docs] @staticmethod def write_compile_sh(template, config, fp=None, path=None): """Write a standalone ``compile.sh`` shell script to compile the user code. Parameters ---------- template: jinja2.environment.Template Template similar to ``snek5000/resources/compiler_sh.j2`` config: dict Snakemake configuration fp: io.TextIOWrapper File pointer path: str or Path Path to write the file to """ output = template.render( INC=config["includes"], USR=config["objects"], **config, ) path = str(path) if fp: fp.write(output) elif path: with open(path, "w") as fp: fp.write(output) else: raise ValueError("Either file pointer or the path to it must be provided.") if path: os.chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH)
[docs] def get_field_file(self, prefix="", index=-1, t_approx=None): """Get a field file from ``path_session``. Parameters ---------- prefix: str Prefix for special field files; for examples KTH statistics files use prefix `sts`. index: int Index to match a specific field file. If index > 0, the file extension is matched as ``{prefix}case0.f{index:05d}``. If index < 0, the file is indexed from the end of a list of files t_approx: float Find a file from approximate simulation time Returns ------- file: Path """ case = self.name_solver path_session = self.sim.output.path_session if index > 0 and t_approx: raise ValueError("Specify either index or t_approx at a time, not both.") elif index > 0: pattern = f"{prefix}{case}0.f{index:05d}" file = path_session / pattern if file.exists(): return file else: logger.warning( f"{file} not found. Attempting to index a file from a " "sorted list of field files" ) elif t_approx: index = slice(None) pattern = f"{prefix}{case}0.f?????" try: result = sorted(path_session.glob(pattern))[index] if t_approx: result = bisect_nek_files_by_time(result, t_approx) except IndexError as err: raise FileNotFoundError( f"Cannot {index =} / find {t_approx =} in {path_session}/{pattern} " ) from err else: return result
[docs] def post_init(self): """Logs info on instantiated classes and finally :meth:`copy` all source code to simulation directory """ if mpi.rank == 0: print(f"path_run: {self.path_run}") logger.info(f"session_id: {self.params.session_id}") # This also calls _save_info_solver_params_xml with stdout_redirected(): # We gather objects to print within Snek5000 super().post_init() logger.info(self.sim._objects_to_print) # Write source files to compile the simulation if mpi.rank == 0 and self._has_to_save and self.sim.params.NEW_DIR_RESULTS: self.copy(self.path_run) config = self.write_snakemake_config() self.build_nek5000(config) self.post_init_create_additional_source_files()
[docs] def post_init_create_additional_source_files(self): """Create the .box, SIZE and makefile_usr files from their template""" for name in ("box", "size", "makefile_usr"): try: template = getattr(self, f"template_{name}") except AttributeError: pass else: if template is not None: getattr(self, f"write_{name}")(template)
[docs] def _save_info_solver_params_xml(self, replace=False): """Saves the par file, along with ``params_simul.xml`` and ``info_solver.xml``""" params = self.sim.params if mpi.rank == 0: par_file = Path(self.path_run) / f"{self.name_solver}.par" if self._has_to_save and params.NEW_DIR_RESULTS: logger.info( f"Writing params files... {par_file}, params_simul.xml, " "info_solver.xml" ) _save_par_file(params, par_file, mode="x") elif self._has_to_save: logger.info(f"Updating {par_file}, params_simul.xml") _save_par_file(params, par_file) # Update params_simul.xml here, since FluidSim Core will only # do it if NEW_DIR_RESULTS = True params_xml_path = self.path_run / "params_simul.xml" params_xml_path.unlink() comment = f"""\ This file should not be modified (except for adding xml comments). Created by the Python programs: snek5000 {__version__} """ params._save_as_xml(path_file=params_xml_path, comment=comment) super()._save_info_solver_params_xml(replace, comment=f"snek5000 {__version__}")
Output.__doc__ += """ Notes ----- Here, only the documention for ``params.output`` is displayed. .. seealso:: - ``params.oper`` at :mod:`snek5000.operators` - ``params.nek`` at :mod:`snek5000.solvers.base` """ + docstring_params( Output )