Source code for qcfractalcompute.config

import logging
import os
from typing import List, Optional, Union, Dict, Any

import yaml

try:
    from pydantic.v1 import BaseModel, Field, validator
except ImportError:
    from pydantic import BaseModel, Field, validator
from typing_extensions import Literal

from qcportal.utils import seconds_to_hms


def _make_abs_path(path: Optional[str], base_folder: str, default_filename: Optional[str]) -> Optional[str]:
    # No path specified, no default
    if path is None and default_filename is None:
        return None

    # Path isn't specified, but default is given
    if path is None:
        path = default_filename

    path = os.path.expanduser(path)
    path = os.path.expandvars(path)
    if os.path.isabs(path):
        return path
    else:
        path = os.path.join(base_folder, path)
        return os.path.abspath(path)


def update_nested_dict(d, u):
    for k, v in u.items():
        if isinstance(v, dict):
            d[k] = update_nested_dict(d.get(k, {}), v)
        else:
            d[k] = v
    return d


class PackageEnvironmentSettings(BaseModel):
    """
    Environments with installed packages that can be used to run calculations

    The compute manager will query these environments to see what packages are installed, and
    direct appropriate calculations to them.
    """

    use_manager_environment: bool = True
    conda: List[str] = Field([], description="List of conda environments to query for installed packages")
    apptainer: List[str] = Field(
        [], description="List of paths to apptainer/singularity files to query for installed packages"
    )


class ExecutorConfig(BaseModel):
    type: str
    queue_tags: List[str]
    worker_init: List[str] = []

    scratch_directory: Optional[str] = None
    bind_address: Optional[str] = None

    cores_per_worker: int
    memory_per_worker: float

    extra_executor_options: Dict[str, Any] = {}

    environments: PackageEnvironmentSettings = PackageEnvironmentSettings()

    class Config(BaseModel.Config):
        case_insensitive = True
        extra = "forbid"


class CustomExecutorConfig(ExecutorConfig):
    type: Literal["custom"] = "custom"
    path: str


[docs] class LocalExecutorConfig(ExecutorConfig): type: Literal["local"] = "local" max_workers: int
[docs] class SlurmExecutorConfig(ExecutorConfig): type: Literal["slurm"] = "slurm" walltime: str exclusive: bool = True partition: Optional[str] = None account: Optional[str] = None workers_per_node: int max_nodes: int scheduler_options: List[str] = []
[docs] @validator("walltime", pre=True) def walltime_must_be_str(cls, v): if isinstance(v, int): return seconds_to_hms(v) else: return v
[docs] class TorqueExecutorConfig(ExecutorConfig): type: Literal["torque"] = "torque" walltime: str account: Optional[str] = None queue: Optional[str] = None workers_per_node: int max_nodes: int scheduler_options: List[str] = []
[docs] @validator("walltime", pre=True) def walltime_must_be_str(cls, v): if isinstance(v, int): return seconds_to_hms(v) else: return v
[docs] class LSFExecutorConfig(ExecutorConfig): type: Literal["lsf"] = "lsf" walltime: str project: Optional[str] = None queue: Optional[str] = None workers_per_node: int max_nodes: int request_by_nodes: bool = True bsub_redirection: bool = True scheduler_options: List[str] = []
[docs] @validator("walltime", pre=True) def walltime_must_be_str(cls, v): if isinstance(v, int): return seconds_to_hms(v) else: return v
AllExecutorTypes = Union[ CustomExecutorConfig, LocalExecutorConfig, SlurmExecutorConfig, TorqueExecutorConfig, LSFExecutorConfig ] class FractalServerSettings(BaseModel): """ Settings pertaining to the Fractal Server you wish to pull tasks from and push completed tasks to. Each manager supports exactly 1 Fractal Server to be in communication with, and exactly 1 user on that Fractal Server. These can be changed, but only once the Manager is shutdown and the settings changed. Multiple Managers however can be started in parallel with each other, but must be done as separate calls to the CLI. Caution: The password here is written in plain text, so it is up to the owner/writer of the configuration file to ensure its security. """ fractal_uri: str = Field(..., description="Full URI to the Fractal Server you want to connect to") username: Optional[str] = Field( None, description="Username to connect to the Fractal Server with. When not provided, a connection is attempted " "as a guest user, which in most default Servers will be unable to return results.", ) password: Optional[str] = Field( None, description="Password to authenticate to the Fractal Server with (alongside the `username`)" ) verify: Optional[bool] = Field(None, description="Use Server-side generated SSL certification or not.") class Config(BaseModel.Config): case_insensitive = True extra = "forbid" class FractalComputeConfig(BaseModel): base_folder: str = Field( ..., description="The base folder to use as the default for some options (logs, etc). Default is the location of the config file.", ) cluster: str = Field( ..., description="Name of this scheduler to present to the Fractal Server. Descriptive names help the server " "identify the manager resource and assists with debugging.", ) loglevel: str = "INFO" logfile: Optional[str] = Field( None, description="Full path to save a log file to, including the filename. If not provided, information will still " "be reported to terminal, but not saved. When set, logger information is sent to this file.", ) update_frequency: float = Field( 30, description="Time between heartbeats/update checks between this Manager and the Fractal Server. The lower this " "value, the shorter the intervals. If you have an unreliable network connection, consider " "increasing this time as repeated, consecutive network failures will cause the Manager to shut " "itself down to maintain integrity between it and the Fractal Server. Units of seconds", gt=0, ) parsl_run_dir: str = "parsl_run_dir" server: FractalServerSettings = Field(...) environments: PackageEnvironmentSettings = PackageEnvironmentSettings() executors: Dict[str, AllExecutorTypes] = Field(...) class Config(BaseModel.Config): case_insensitive = True extra = "forbid" @validator("logfile") def _check_logfile(cls, v, values): return _make_abs_path(v, values["base_folder"], None) @validator("parsl_run_dir") def _check_run_dir(cls, v, values): return _make_abs_path(v, values["base_folder"], "parsl_run_dir") def read_configuration(file_paths: List[str], extra_config: Optional[Dict[str, Any]] = None) -> FractalComputeConfig: logger = logging.getLogger(__name__) config_data: Dict[str, Any] = {} # Read all the files, in order for path in file_paths: with open(path, "r") as yf: logger.info(f"Reading configuration data from {path}") file_data = yaml.safe_load(yf) update_nested_dict(config_data, file_data) if extra_config: update_nested_dict(config_data, extra_config) # Find the base folder # 1. If specified in the environment, use that # 2. Use any specified in a config file # 3. Use the path of the last (highest-priority) file given if "QCF_COMPUTE_BASE_FOLDER" in os.environ: base_dir = os.getenv("QCF_COMPUTE_BASE_FOLDER") elif config_data.get("base_folder") is not None: base_dir = config_data["base_folder"] elif len(file_paths) > 0: # use the location of the last file as the base directory base_dir = os.path.dirname(file_paths[-1]) else: raise RuntimeError( "Base folder must be specified somehow. Maybe set QCF_COMPUTE_BASE_FOLDER in the environment?" ) config_data["base_folder"] = os.path.abspath(base_dir) try: return FractalComputeConfig(**config_data) except Exception as e: if len(file_paths) == 0: raise RuntimeError(f"Could not assemble a working configuration from environment variables:\n{str(e)}") raise