"""Runtime parameters
=====================
Scripting interface for Nek5000 :ref:`parameter file <nek:case_files_par>`.
"""
import json
import logging
import os
import sys
import textwrap
from ast import literal_eval
from configparser import ConfigParser
from io import StringIO
from math import nan
from pathlib import Path
from inflection import camelize, underscore
from fluidsim_core.params import Parameters as _Parameters
from .log import logger
from .solvers import get_solver_short_name, import_cls_simul
literal_python2nek = {
nan: "<real>",
"nan": "nan",
"None": "none",
"True": "yes",
"False": "no",
}
literal_nek2python = {v: k for k, v in literal_python2nek.items()}
literal_prune = ("<real>", "", "nan")
#: JSON file name to which recorded user_params are saved
filename_map_user_params = "map_user_params.json"
[docs]def _as_nek_value(input_value):
"""Convert Python values to equivalent Nek5000 par values."""
# Convert to string to avoid hash collisions
# hash(1) == hash(True)
literal = str(input_value) if input_value is not nan else nan
value = literal_python2nek.get(literal, input_value)
return value
[docs]def camelcase(value):
"""Convert strings to ``camelCase``."""
return camelize(str(value).lower(), uppercase_first_letter=False)
[docs]def _check_user_param_index(idx):
"""Check if the index of user parameter is within bounds"""
if idx > 20:
raise ValueError(f"userParam {idx = } > 20")
[docs]def _as_python_value(input_value):
"""Convert Nek5000 par values to equivalent Python values if possible."""
value = literal_nek2python.get(str(input_value), input_value)
try:
return literal_eval(value)
except (SyntaxError, ValueError):
return value
[docs]def load_params(path_dir="."):
"""Load a :class:`snek5000.params.Parameters` instance from `path_dir`.
Parameters
----------
path_dir : str or path-like
Path to a simulation directory.
Returns
-------
params: :class:`snek5000.params.Parameters`
"""
from snek5000.util.files import _path_try_from_fluidsim_path
path_dir = _path_try_from_fluidsim_path(path_dir)
short_name = get_solver_short_name(path_dir)
Simul = import_cls_simul(short_name)
return Simul.load_params_from_file(
path_xml=path_dir / "params_simul.xml",
path_par=path_dir / f"{short_name}.par",
)
[docs]class Parameters(_Parameters):
"""Container for reading, modifying and writing :ref:`par files
<nek:case_files_par>`.
:param tag: A string representing name of case files (for example: provide
``"abl"`` for files like ``abl.usr, abl.par`` etc).
"""
[docs] @classmethod
def _load_params_simul(cls, path=None):
"""Alias for :func:`load_params`"""
return load_params(path or Path.cwd())
def __init__(self, *args, **kwargs):
comments = ("#",)
self._set_internal_attr(
"_par_file",
ConfigParser(comment_prefixes=comments, inline_comment_prefixes=comments),
)
# Only enabled parameters would be written into par file
self._set_internal_attr("_enabled", True)
# User parameters sections should begin with an underscore
self._set_internal_attr("_user", True)
super().__init__(*args, **kwargs)
# Like in Python Nek5000's par files are case insensitive.
# However for consistency, case sensitivity is enforced:
self._par_file.optionxform = str
[docs] def _make_dict_attribs(self):
d = super()._make_dict_attribs()
# Append internal attributes
d.update({"_enabled": self._enabled, "_user": self._user})
if hasattr(self, "_recorded_user_params"):
d["_recorded_user_params"] = self._recorded_user_params
return d
def __update_par_section(
self, section_name, section_dict, has_to_prune_literals=True
):
"""Updates a section of the ``par_file`` object from a dictionary."""
par = self._par_file
# Start with underscore if it is a user section
section_name_par = "_" if section_dict["_user"] else ""
section_name_par += section_name.upper().lstrip("_")
if section_name_par not in par.sections():
par.add_section(section_name_par)
if "_recorded_user_params" in section_dict:
recorded_user_params = section_dict.pop("_recorded_user_params")
else:
recorded_user_params = False
for option, value in section_dict.items():
value = _as_nek_value(value)
if has_to_prune_literals and value in literal_prune:
continue
# Make everything consistent where values refer to option names
# if option in ("stop_at", "write_control"):
if str(value) in section_dict:
value = camelcase(value)
par.set(section_name_par, camelcase(option), str(value))
# _recorded_user_params -> userParam%%
if not recorded_user_params:
return
params = self._parent
if self._tag != "nek" or params._tag != "params":
raise RuntimeError(
"_recorded_user_params should only be in params.nek.general"
)
for idx_uparam in sorted(recorded_user_params.keys()):
tag = recorded_user_params[idx_uparam]
_check_user_param_index(idx_uparam)
value = _as_nek_value(params[tag])
par.set(
section_name_par,
f"userParam{idx_uparam:02d}",
str(value),
)
[docs] def _sync_par(self, has_to_prune_literals=True, keep_all_sections=False):
"""Sync values in param children and attributes to ``self._par_file``
object.
"""
if self._tag_children:
data = [
(child, getattr(self, child)._make_dict_tree())
for child in self._tag_children
]
else:
# No children
data = [(self._tag, self._make_dict_attribs())]
for child, d in data:
# Section name is often written in [UPPERCASE]
section_name = child.upper()
self.__update_par_section(
section_name, d, has_to_prune_literals=has_to_prune_literals
)
self.__tidy_par(keep_all_sections)
def __tidy_par(self, keep_all_sections=False):
"""Remove internal attributes and disabled sections from par file."""
par = self._par_file
for section_name in par.sections():
par.remove_option(section_name, "_user")
if keep_all_sections:
enabled = True
else:
enabled = par.getboolean(section_name, "_enabled")
if enabled:
par.remove_option(section_name, "_enabled")
else:
par.remove_section(section_name)
[docs] def _autodoc_par(self, indent=0):
"""Autodoc a code block with ``ini`` syntax and set docstring."""
self._sync_par(has_to_prune_literals=False, keep_all_sections=True)
docstring = "\n.. code-block:: ini\n\n"
with StringIO() as output:
self._par_file.write(output)
ini = output.getvalue()
docstring += textwrap.indent(ini, " ")
if ini:
self._set_doc(self._doc + textwrap.indent(docstring, " " * indent))
[docs] def _record_nek_user_params(self, nek_params_keys, overwrite=False):
"""Record some Nek user parameters
Examples
--------
>>> params._record_nek_user_params({"prandtl": 2, "rayleigh": 3})
>>> params.output.history_points._record_nek_user_params({"write_interval": 4})
This is going to set or modify the internal attribute
``params.nek.general._recorded_user_params`` to ``{2: "prandtl", 3:
"rayleigh", 4: "output.other.write_interval"}``.
This attribute is then used to write the ``[GENERAL]`` section of the
.par file.
Note that this attribute is only for ``params.nek.general`` and should
never be set for other parameter children.
"""
# we need to find where is self in the tree compared to `params`
current = self
parent = current._parent
tag = current._tag
path = tag
# iterate up the `params` tree to the top
while not (parent is None and tag == "params") and not (
parent._tag == "info_simul" and tag == "params"
):
current = parent
parent = current._parent
tag = current._tag
path = f"{tag}.{path}"
params = current
assert params._tag == "params"
# path relative to params:
# we have `(path, name)` equal to
# `("params.output.history_points", "write_interval")` or
# `("params", "rayleigh")` and we want to end up with
# `"output.history_points.write_interval"` or `rayleigh`, resp.
path = path[len("params") :]
if path.startswith("."):
path = path[1:]
if path:
path = path + "."
user_params = {}
for name, key in nek_params_keys.items():
user_params[key] = f"{path}{name}"
# Useful while building isolated `params` for a specific class,
# for e.g.: Operators, Output etc.
if not hasattr(params, "nek"):
log_level = logging.DEBUG if "sphinx" in sys.modules else logging.WARNING
logger.log(
log_level,
(
"Attribute params.nek does not exist, skipping "
"initializing user parameters."
),
)
return
general = params.nek.general
if not hasattr(general, "_recorded_user_params"):
general._set_internal_attr("_recorded_user_params", {})
if overwrite:
general._recorded_user_params.update(user_params)
return
for key, value in user_params.items():
if key in general._recorded_user_params:
raise ValueError(
f"{key = } already used for user parameter "
f"{general._recorded_user_params[key]}"
)
general._recorded_user_params[key] = value
[docs] def _change_index_userparams(self, user_params):
"""Change indices for user parameters
This method can be used in the ``create_default_params`` class method
of a solver to overwrite the default indices used in the base snek5000
package.
This method checks that no already recorded parameters are overwritten.
To overwrite a parameter, use ``_record_nek_user_params`` with the
``overwrite`` argument.
Examples
--------
>>> params._change_index_userparams({8: "output.history_points.write_interval"}
"""
if self._tag != "params":
raise ValueError(
"The method `_change_index_userparams` has to be called "
"directly with the root `params` object."
)
try:
general = self.nek.general
except AttributeError:
raise AttributeError("No `params.nek.general` attribute.")
try:
recorded_user_params = general._recorded_user_params
except AttributeError:
raise AttributeError(
"No `general._recorded_user_params` attribute. This attribute "
"can be created with `_record_nek_user_params`."
)
# check that no user parameters are overwritten
modified_labels = []
for index in user_params:
try:
modified_labels.append(recorded_user_params[index])
except KeyError:
pass
values = user_params.values()
for label in modified_labels:
if label not in values:
raise ValueError(
f"The value {label} would be removed from the user params."
)
reverted = {value: key for key, value in recorded_user_params.items()}
for label in user_params.values():
try:
key = reverted[label]
except KeyError:
raise ValueError(
f"User parameter {label = } is not already recorded. "
"Use `_record_nek_user_params`"
)
del recorded_user_params[key]
recorded_user_params.update(user_params)
[docs] def _save_as_xml(self, path_file=None, comment=None, find_new_name=False):
"""Invoke :func:`_save_recorded_user_params` and then save to an XML file at ``path_file``."""
try:
user_params = self.nek.general._recorded_user_params
except AttributeError:
pass
else:
if path_file is None:
path_dir = Path.cwd()
else:
path_dir = Path(path_file).parent
_save_recorded_user_params(user_params, path_dir)
return super()._save_as_xml(
path_file=path_file, comment=comment, find_new_name=find_new_name
)
[docs]def _save_recorded_user_params(user_params, path_dir):
"""Save a JSON file from a dictionary denoting ``user_params``"""
with open(path_dir / filename_map_user_params, "w") as file:
json.dump(user_params, file)
[docs]def _load_recorded_user_params(path):
"""Load a JSON file and return a dictionary denoting ``user_params``"""
with open(path) as file:
tmp = json.load(file)
return {int(key): value for key, value in tmp.items()}
[docs]def _check_path_like(path):
"""Ensure input is a path-like object"""
if not isinstance(path, os.PathLike):
raise TypeError(f"Expected path-like object, not {type(path) = }")
[docs]def _get_params_nek(params):
"""Check if params is the top level object (via the ``_tag`` attribute) and
return the ``params.nek`` object.
Parameters
----------
params: :class:`Parameters`
The ``params`` object
Returns
-------
params.nek: :class:`Parameters`
The ``params.nek`` object
"""
if not isinstance(params, Parameters):
raise TypeError
if params._tag != "params":
raise ValueError(f'{params._tag = } != "params"')
if params.nek._tag != "nek":
raise RuntimeError(f'{params.nek._tag =} != "nek"')
return params.nek
[docs]def _save_par_file(params, path, mode="w"):
"""Save the ``params.nek`` object as a `.par` file."""
nek = _get_params_nek(params)
nek._sync_par()
_check_path_like(path)
with open(path, mode) as fp:
nek._par_file.write(fp)
if hasattr(nek.general, "_recorded_user_params"):
_save_recorded_user_params(nek.general._recorded_user_params, path.parent)
[docs]def _str_par_file(params):
"""Preview contents of the resulting `.par` file as a string"""
nek = _get_params_nek(params)
nek._sync_par()
with StringIO() as output:
nek._par_file.write(output)
return output.getvalue()
[docs]def complete_params_from_par_file(params, path):
"""Populate the ``params.nek`` object by reading a `.par` file and
:attr:`filename_map_user_params`.
"""
_check_path_like(path)
if not path.exists():
raise IOError(f"{path} does not exist.")
nek = _get_params_nek(params)
nek._par_file.read(path)
recorded_user_params_path = path.with_name(filename_map_user_params)
if recorded_user_params_path.exists():
recorded_user_params = _load_recorded_user_params(recorded_user_params_path)
elif hasattr(nek.general, "_recorded_user_params"):
recorded_user_params = nek.general._recorded_user_params
else:
recorded_user_params = {}
for section in nek._par_file.sections():
params_child = getattr(nek, section.lower().lstrip("_"))
for option, value in nek._par_file.items(section):
value = _as_python_value(value)
# userParam%% -> user_params
if option.lower().startswith("userparam"):
idx_uparam = int(option[-2:])
_check_user_param_index(idx_uparam)
if idx_uparam not in recorded_user_params:
logger.warning(
f"{idx_uparam = } not in {recorded_user_params = } so we"
"cannot update the right parameter in the object `params`."
"It might be because you load a simulation done with "
"an old snek5000 (< 0.8), or it might be a bug :-)"
)
tag = recorded_user_params[idx_uparam]
# set the corresponding parameter
params[tag] = value
else:
attrib = underscore(option)
setattr(params_child, attrib, value)
[docs]def _complete_params_from_xml_file(params, path_xml):
"""Populate the ``params.nek`` object by reading a `.xml` file and
:attr:`filename_map_user_params`.
"""
_check_path_like(path_xml)
params._load_from_xml_file(str(path_xml))
nek = _get_params_nek(params)
path_recorded_user_params = Path(path_xml).parent / filename_map_user_params
if path_recorded_user_params.exists():
nek.general._set_internal_attr(
"_recorded_user_params",
_load_recorded_user_params(path_recorded_user_params),
)
create_params = Parameters._create_params