Source code for snek5000.util.files

import bisect
import os
import re
from pathlib import Path
from shutil import copy2

from pymech.neksuite.field import read_header

from fluiddyn.io import FLUIDSIM_PATH

from .. import logger
from ..params import load_params


def _is_empty_directory(path):
    path = Path(path)
    if not path.is_dir():
        return False
    return not bool(os.listdir(path))


[docs]def next_path(old_path, force_suffix=False, return_suffix=False): """Generate a new path with an integer suffix Parameters ---------- old_path: str or path-like Path to check for existence force_suffix: If true, will not check if the `old_path` can be used and adds a suffix in the end. return_suffix: If true, returns the integer suffix along with the path/ Returns ------- (i, new_path): tuple[int, Path] If `return_suffix` is `True`. new_path: Path A path (with an integer suffix) which does not yet exist in the filesystem. Examples -------- >>> import os >>> os.chdir("/tmp") >>> next_path("test.txt") # path does not exist PosixPath('test.txt') >>> next_path("test.txt", force_suffix=True) # path does not exists PosixPath('test_00.txt') >>> Path("test.txt").touch() >>> next_path("test.txt") # path exists PosixPath('test_00.txt') >>> Path("test_00.txt").touch() >>> next_path("test.txt") # path and the next one both exists PosixPath('test_01.txt') >>> Path("test.txt").unlink() # cleanup >>> Path("test_00.txt").unlink() """ def int_suffix(p, integer): stem = p.stem # for example: remove .tar from the end, if any for suffix in p.suffixes: stem = re.sub(f"{suffix}$", "", stem) return p.parent / "".join([stem, f"_{integer:02d}", *p.suffixes]) old_path = Path(old_path) if not force_suffix and not ( old_path.exists() and not _is_empty_directory(old_path) ): return old_path i = 0 new_path = int_suffix(old_path, i) while new_path.exists() and not _is_empty_directory(new_path): logger.debug(f"Path exists: {new_path}") i += 1 new_path = int_suffix(old_path, i) logger.debug(f"Next path available: {new_path}") return (i, new_path) if return_suffix else new_path
[docs]def create_session(case, re2, ma2, par): """Creates a session and write the path to a `SESSION.NAME` file. Then, symlinks re2 and ma2 files, and copies the par file. Parameters ---------- case : str Case name re2 : str Mesh file name ma2 : str Connectivity mapping file name par : str Parameter file name """ params = load_params() session_dir = Path(params.output.path_session).relative_to(params.path_run) session_dir.mkdir(exist_ok=True) with open("SESSION.NAME", "w") as session_name: # use relative paths to avoid 132 character limit in Nek5000 session_name.write(f"{case}\n./{session_dir}\n") for file in (re2, ma2): # use relative symlinks (session_dir / file).symlink_to(f"../{file}") # Copy par files to run without recompiling copy2(par, session_dir / par)
[docs]class LazyNekFile: """A small data stucture to assist bisection sort by simulation time, :func:`bisect_nek_files_by_time` """ def __init__(self, path): self.path = path @property def time(self): return read_header(self.path).time def __gt__(self, other): time = other.time if isinstance(other, type(self)) else other return self.time > time def __lt__(self, other): time = other.time if isinstance(other, type(self)) else other return self.time < time def __repr__(self): return f"LazyNekFile <{self.path}>"
[docs]def bisect_nek_files_by_time(files, time): """Bisect Nek5000 field files by the simulation time metadata. The bisection search is done using :func:`bisect.bisect_left` Parameters ---------- files: iterable of str or path-like List of file names / paths. time: float Approximate simulation time. Finds ``file`` such that :func:`read_header(file).time <pymech:pymech.neksuite.field.read_header>` < ``time``. Returns ------- file: str or path-like Path to field file. """ files = sorted(files) lazy_files = [LazyNekFile(file) for file in files] index = min(bisect.bisect_left(lazy_files, time), len(files) - 1) return files[index]
def _path_try_from_fluidsim_path(path_dir): """Converts to a :class:`pathlib.Path` object and if it does not exists, attempts a path relative to environment variable ``FLUIDSIM_PATH``. """ path = Path(path_dir) if not path.exists(): logger.info("Trying to open the path relative to $FLUIDSIM_PATH") path = Path(FLUIDSIM_PATH) / path_dir return path