"""
Creates globals for the qcengine module
"""
import fnmatch
import getpass
import logging
import os
import socket
from typing import Any, Dict, Optional, Union
try:
import pydantic.v1 as pydantic
except ImportError:
import pydantic
from .extras import get_information
__all__ = ["get_config", "get_provenance_augments", "global_repr", "NodeDescriptor"]
# Start a globals dictionary with small starting values
_global_values = None
NODE_DESCRIPTORS = {}
LOGGER = logging.getLogger("QCEngine")
LOGGER.setLevel(logging.CRITICAL)
# Generic globals
def get_global(key: Optional[str] = None) -> Union[str, Dict[str, Any]]:
import cpuinfo
import psutil
# TODO (wardlt): Implement a means of getting CPU information from compute nodes on clusters for MPI tasks
# The QC code runs on a different node than the node running this Python function, which may have different info
global _global_values
if _global_values is None:
_global_values = {}
_global_values["hostname"] = socket.gethostname()
_global_values["memory"] = round(psutil.virtual_memory().available / (1024**3), 3)
_global_values["username"] = getpass.getuser()
# Work through VMs and logical cores.
if hasattr(psutil.Process(), "cpu_affinity"):
cpu_cnt = len(psutil.Process().cpu_affinity())
full_physical_cnt = psutil.cpu_count(logical=False)
full_logical_cnt = psutil.cpu_count(logical=True)
if (cpu_cnt == full_logical_cnt) and (full_physical_cnt is not None):
# cpu_affinity isn't capturing deliberate setting but just VMs, so use physical
cpu_cnt = full_physical_cnt
else:
cpu_cnt = psutil.cpu_count(logical=False)
if cpu_cnt is None:
cpu_cnt = psutil.cpu_count(logical=True)
_global_values["ncores"] = cpu_cnt
_global_values["nnodes"] = 1
_global_values["cpuinfo"] = cpuinfo.get_cpu_info()
try:
_global_values["cpu_brand"] = _global_values["cpuinfo"]["brand_raw"]
except KeyError:
# Remove this if py-cpuinfo is pinned to >=6.0.0
_global_values["cpu_brand"] = _global_values["cpuinfo"].get("brand", "(unknown)")
if key is None:
return _global_values.copy()
else:
return _global_values[key]
[docs]class NodeDescriptor(pydantic.BaseModel):
"""
Description of an individual node
"""
# Host data
hostname_pattern: str
name: str
scratch_directory: Optional[str] = None # What location to use as scratch
memory: Optional[float] = None
memory_safety_factor: int = 10 # Percentage of memory as a safety factor
# Specifications
ncores: Optional[int] = pydantic.Field(
None,
description="""Number of cores accessible to each task on this node
The default value, ``None``, will allow QCEngine to autodetect the number of cores.""",
)
jobs_per_node: int = 1
retries: int = 0
# Cluster options
is_batch_node: bool = pydantic.Field(
False,
help="""Whether the node running QCEngine is a batch node
Some clusters are configured such that tasks are launched from a special "batch" or "MOM" onto the compute nodes.
The compute nodes on such clusters often have a different CPU architecture than the batch nodes and
often are unable to launch MPI tasks, which has two implications:
1) QCEngine must make *all* calls to an executable via ``mpirun`` because the executables might not
be able to run on the batch node.
2) QCEngine must run on the batch node to be able to launch tasks on the more than one compute nodes
``is_batch_node`` is used when creating the task configuration as a means of determining whether
``mpiexec_command`` must always be used even for serial jobs (e.g., getting the version number)
""",
)
mpiexec_command: Optional[str] = pydantic.Field(
None,
description="""Invocation for launching node-parallel tasks with MPI
The invocation need not specify the number of nodes, tasks, or cores per node.
Information about the task configuration will be added to the command by use of
Python's string formatting. The configuration will be supplied as the following variables:
{nnodes} - Number of nodes
{ranks_per_node} - Number of MPI ranks per node
{cores_per_rank} - Number of cores to use for each MPI rank
{total_ranks} - Total number of MPI ranks
As examples, the ``aprun`` command on Cray systems should be similar to
``aprun -n {total_ranks} -N {ranks_per_node}`` and ``mpirun`` from OpenMPI should
be similar to ``mpirun -np {total_ranks} -N {ranks_per_node}``.
Programs where each MPI rank can use multiple threads (e.g., QC programs with MPI+OpenMP) can
use the {cores_per_rank} option to control the hybrid parallelism.
As an example, the Cray ``aprun`` command using this figure could be:
``aprun -n {total_ranks} -N {ranks_per_node} -d {cores_per_rank} -j 1``.
The appropriate number of ranks per node will be determined based on the number of
cores per node and the number of cores per rank.
""",
)
def __init__(self, **data: Dict[str, Any]):
data = parse_environment(data)
super().__init__(**data)
if self.mpiexec_command is not None:
# Ensure that the mpiexec_command contains necessary information
if not ("{total_ranks}" in self.mpiexec_command or "{nnodes}" in self.mpiexec_command):
raise ValueError("mpiexec_command must contain either {total_ranks} or {nnodes}")
if "{ranks_per_node}" not in self.mpiexec_command:
raise ValueError("mpiexec_command must explicitly state the number of ranks per node")
class Config:
extra = "forbid"
class TaskConfig(pydantic.BaseSettings):
"""Description of the configuration used to launch a task."""
# Specifications
ncores: int = pydantic.Field(None, description="Number cores per task on each node")
nnodes: int = pydantic.Field(None, description="Number of nodes per task")
memory: float = pydantic.Field(
None, description="Amount of memory in GiB (2^30 bytes; not GB = 10^9 bytes) per node."
)
scratch_directory: Optional[str] # What location to use as scratch
retries: int # Number of retries on random failures
mpiexec_command: Optional[str] # Command used to launch MPI tasks, see NodeDescriptor
use_mpiexec: bool = False # Whether it is necessary to use MPI to run an executable
cores_per_rank: int = pydantic.Field(1, description="Number of cores per MPI rank")
scratch_messy: bool = pydantic.Field(
False, description="Leave scratch directory and contents on disk after completion."
)
class Config(pydantic.BaseSettings.Config):
extra = "forbid"
env_prefix = "QCENGINE_"
def _load_defaults() -> None:
"""
Pulls the defaults from the QCA folder
"""
# Find the config
load_path = None
test_paths = [os.getcwd(), os.path.join(os.path.expanduser("~"), ".qcarchive")]
if "DQM_CONFIG_PATH" in os.environ:
test_paths.insert(0, os.environ["DQM_CONFIG_PATH"])
for path in test_paths:
path = os.path.join(path, "qcengine.yaml")
if os.path.exists(path):
load_path = path
break
if load_path is None:
LOGGER.info("Could not find 'qcengine.yaml'. Searched the following paths: {}".format(", ".join(test_paths)))
LOGGER.info("Using default options...")
else:
import yaml
LOGGER.info("Found 'qcengine.yaml' at path: {}".format(load_path))
with open(load_path, "r") as stream:
user_config = yaml.load(stream, Loader=yaml.SafeLoader)
for k, v in user_config.items():
NODE_DESCRIPTORS[k] = NodeDescriptor(name=k, **v)
[docs]def global_repr() -> str:
"""
A representation of the current global configuration.
"""
ret = ""
ret += "Host information:\n"
ret += "-" * 80 + "\n"
prov = get_provenance_augments()
for k in ["username", "hostname", "cpu"]:
ret += "{:<30} {:<30}\n".format(k, prov[k])
ret += "\nNode information:\n"
ret += "-" * 80 + "\n"
for k, v in get_node_descriptor():
ret += " {:<28} {}\n".format(k, v)
if k in ["scratch_directory", "memory_per_job"]:
ret += "\n"
ret += "\nJob information:\n"
ret += "-" * 80 + "\n"
for k, v in get_config():
ret += " {:<28} {}\n".format(k, v)
ret += "-" * 80 + "\n"
return ret
def get_node_descriptor(hostname: Optional[str] = None) -> NodeDescriptor:
"""
Find the correct NodeDescriptor based off current hostname
"""
if isinstance(hostname, NodeDescriptor):
return hostname
if hostname is None:
hostname = get_global("hostname")
# Find a match
for name, node in NODE_DESCRIPTORS.items():
if fnmatch.fnmatch(hostname, node.hostname_pattern):
config = node
break
else:
config = NodeDescriptor(
name="default", hostname_pattern="*", memory=get_global("memory"), ncores=get_global("ncores")
)
return config
def parse_environment(data: Dict[str, Any]) -> Dict[str, Any]:
"""Collects local environment variable values into ``data`` for any keys with RHS starting with ``$``."""
ret = {}
for k, var in data.items():
if isinstance(var, str):
var = os.path.expanduser(os.path.expandvars(var))
if var.startswith("$"):
var = None
ret[k] = var
return ret
def read_qcengine_task_environment() -> Dict[str, Any]:
"""
Reads the qcengine task-related environment variables and returns a dictionary of the values.
"""
ret = {}
for k, v in os.environ.items():
if k.startswith("QCENGINE_"):
ret[k[9:].lower()] = v
return ret
[docs]def get_config(*, hostname: Optional[str] = None, task_config: Dict[str, Any] = None) -> TaskConfig:
"""
Returns the configuration key for qcengine.
"""
if task_config is None:
task_config = {}
task_config_env = read_qcengine_task_environment()
task_config = {**task_config_env, **parse_environment(task_config)}
config = {}
# Node data
node = get_node_descriptor(hostname)
ncores = node.ncores or get_global("ncores")
config["scratch_directory"] = task_config.pop("scratch_directory", node.scratch_directory)
config["retries"] = task_config.pop("retries", node.retries)
# Jobs per node
jobs_per_node = int(task_config.pop("jobs_per_node", None) or node.jobs_per_node)
# Handle memory
memory = task_config.pop("memory", None)
if memory is None:
memory = node.memory or get_global("memory")
memory_coeff = 1 - node.memory_safety_factor / 100
memory = round(memory * memory_coeff / jobs_per_node, 3)
config["memory"] = memory
# Get the number of cores available to each task
ncores = int(task_config.pop("ncores", int(ncores / jobs_per_node)))
if ncores < 1:
raise KeyError("Number of jobs per node exceeds the number of available cores.")
config["ncores"] = ncores
config["nnodes"] = int(task_config.pop("nnodes", 1))
# Add in the MPI launch command template
config["mpiexec_command"] = node.mpiexec_command
config["use_mpiexec"] = node.is_batch_node or config["nnodes"] > 1
config["cores_per_rank"] = task_config.get("cores_per_rank", 1)
# Override any settings
if task_config is not None:
config.update(task_config)
# Make sure mpirun command is defined if needed
if config["use_mpiexec"] and config["mpiexec_command"] is None:
raise ValueError(
"You need to define the mpiexec command for this node. "
"See: https://qcengine.readthedocs.io/en/stable/environment.html"
)
return TaskConfig(**config)
[docs]def get_provenance_augments() -> Dict[str, str]:
return {
"cpu": get_global("cpu_brand"),
"hostname": get_global("hostname"),
"username": get_global("username"),
"qcengine_version": get_information("version"),
}
def get_logger() -> "Logger":
return LOGGER
# Pull in the local variables
_load_defaults()