from __future__ import annotations
import logging
import math
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Union, Sequence, Iterable, TypeVar, Type
from tabulate import tabulate
from qcportal.cache import DatasetCache, read_dataset_metadata
from qcportal.gridoptimization import (
GridoptimizationKeywords,
GridoptimizationAddBody,
GridoptimizationRecord,
GridoptimizationQueryFilters,
)
from qcportal.manybody import (
ManybodyKeywords,
ManybodyRecord,
ManybodyAddBody,
ManybodyQueryFilters,
)
from qcportal.neb import (
NEBKeywords,
NEBAddBody,
NEBQueryFilters,
NEBRecord,
)
from qcportal.optimization import (
OptimizationProtocols,
OptimizationRecord,
OptimizationQueryFilters,
OptimizationSpecification,
OptimizationAddBody,
)
from qcportal.reaction import (
ReactionAddBody,
ReactionRecord,
ReactionKeywords,
ReactionQueryFilters,
)
from qcportal.services.models import ( # noqa
ServiceSubtaskRecord,
)
from qcportal.singlepoint import (
QCSpecification,
SinglepointRecord,
SinglepointAddBody,
SinglepointQueryFilters,
SinglepointDriver,
SinglepointProtocols,
)
from qcportal.torsiondrive import (
TorsiondriveKeywords,
TorsiondriveAddBody,
TorsiondriveRecord,
TorsiondriveQueryFilters,
)
from .auth import (
UserInfo,
RoleInfo,
GroupInfo,
is_valid_username,
is_valid_password,
is_valid_rolename,
is_valid_groupname,
)
from .base_models import CommonBulkGetNamesBody, CommonBulkGetBody
from .cache import PortalCache
from .client_base import PortalClientBase
from .dataset_models import (
BaseDataset,
DatasetQueryModel,
DatasetQueryRecords,
DatasetDeleteParams,
DatasetAddBody,
dataset_from_dict,
load_dataset_view, # noqa
create_dataset_view,
)
from .internal_jobs import InternalJob, InternalJobQueryFilters, InternalJobQueryIterator, InternalJobStatusEnum
from .managers import ManagerQueryFilters, ManagerQueryIterator, ComputeManager
from .metadata_models import UpdateMetadata, InsertMetadata, DeleteMetadata
from .molecules import Molecule, MoleculeIdentifiers, MoleculeModifyBody, MoleculeQueryIterator, MoleculeQueryFilters
from .record_models import (
RecordStatusEnum,
PriorityEnum,
RecordQueryFilters,
RecordModifyBody,
RecordDeleteBody,
RecordRevertBody,
BaseRecord,
RecordQueryIterator,
records_from_dicts,
)
from .serverinfo import (
AccessLogQueryFilters,
AccessLogSummaryFilters,
AccessLogSummaryEntry,
AccessLogSummary,
AccessLogQueryIterator,
ErrorLogQueryFilters,
ErrorLogQueryIterator,
DeleteBeforeDateBody,
)
from .utils import make_list, chunk_iterable, process_chunk_iterable
_T = TypeVar("_T", bound=BaseRecord)
[docs]
class PortalClient(PortalClientBase):
"""
Main class for interacting with a QCArchive server
"""
def __init__(
self,
address: str,
username: Optional[str] = None,
password: Optional[str] = None,
verify: bool = True,
show_motd: bool = True,
*,
cache_dir: Optional[str] = None,
cache_max_size: int = 0,
memory_cache_key: Optional[str] = None,
) -> None:
"""
Parameters
----------
address
The host or IP address of the FractalServer instance, including protocol and port if necessary
("https://ml.qcarchive.molssi.org", "http://192.168.1.10:8888")
username
The username to authenticate with.
password
The password to authenticate with.
verify
Verifies the SSL connection with a third party server. This may be False if a
FractalServer was not provided an SSL certificate and defaults back to self-signed
SSL keys.
show_motd
If a Message-of-the-Day is available, display it
cache_dir
Directory to store an internal cache of records and other data
cache_max_size
Maximum size of the cache directory
"""
PortalClientBase.__init__(self, address, username, password, verify, show_motd)
self._logger = logging.getLogger("PortalClient")
self.cache = PortalCache(address, cache_dir, cache_max_size)
def __repr__(self) -> str:
"""A short representation of the current PortalClient.
Returns
-------
str
The desired representation.
"""
ret = "PortalClient(server_name='{}', address='{}', username='{}')".format(
self.server_name, self.address, self.username
)
return ret
def _repr_html_(self) -> str:
output = f"""
<h3>PortalClient</h3>
<ul>
<li><b>Server: </b>{self.server_name}</li>
<li><b>Address: </b>{self.address}</li>
<li><b>Username: </b>{self.username}</li>
</ul>
"""
# postprocess due to raw spacing above
return "\n".join([substr.strip() for substr in output.split("\n")])
#################################################
# Message-of-the-Day (MOTD)
#################################################
[docs]
def get_motd(self) -> str:
"""
Gets the Message-of-the-Day (MOTD) from the server
"""
return self.make_request("get", "api/v1/motd", str)
[docs]
def set_motd(self, new_motd: str) -> str:
"""
Sets the Message-of-the-Day (MOTD) on the server
"""
return self.make_request("put", "api/v1/motd", None, body=new_motd)
##############################################################
# Datasets
##############################################################
[docs]
def list_datasets(self):
return self.make_request("get", f"api/v1/datasets", List[Dict[str, Any]])
[docs]
def list_datasets_table(self) -> str:
ds_list = self.list_datasets()
# older servers don't have record_count
if all("record_count" in x for x in ds_list):
headers = ["id", "type", "record_count", "name"]
table = [(x["id"], x["dataset_type"], x["record_count"], x["dataset_name"]) for x in ds_list]
else:
headers = ["id", "type", "name"]
table = [(x["id"], x["dataset_type"], x["dataset_name"]) for x in ds_list]
return tabulate(table, headers=headers)
[docs]
def print_datasets_table(self) -> None:
print(self.list_datasets_table())
[docs]
def get_dataset(self, dataset_type: str, dataset_name: str):
body = DatasetQueryModel(dataset_name=dataset_name, dataset_type=dataset_type)
ds = self.make_request("post", f"api/v1/datasets/query", Dict[str, Any], body=body)
return dataset_from_dict(ds, self)
[docs]
def query_dataset_records(
self,
record_id: Union[int, Iterable[int]],
dataset_type: Optional[Iterable[str]] = None,
):
body = DatasetQueryRecords(record_id=make_list(record_id), dataset_type=dataset_type)
return self.make_request("post", f"api/v1/datasets/queryrecords", List[Dict], body=body)
[docs]
def get_dataset_by_id(self, dataset_id: int):
ds = self.make_request("get", f"api/v1/datasets/{dataset_id}", Dict[str, Any])
return dataset_from_dict(ds, self)
[docs]
def dataset_from_cache(self, file_path: str) -> BaseDataset:
ds_meta = read_dataset_metadata(file_path)
ds_type = BaseDataset.get_subclass(ds_meta["dataset_type"])
ds_cache = DatasetCache(file_path, False, ds_type)
ds = dataset_from_dict(ds_meta, self, cache_data=ds_cache)
# Check to make sure we are connected to the same server
cache_address = ds_cache.get_metadata("client_address")
if cache_address != self.address:
raise RuntimeError(f"Cache file comes from {cache_address}, but currently connected to {self.address}")
try:
self.get_dataset_by_id(ds.id)
except:
self._logger.warning(f"Dataset {ds.id} could not be found on the server. Marking as read-only cache")
ds_cache.read_only = True
return ds
[docs]
def create_dataset_view(
self, dataset_id: int, file_path: str, include: Optional[Iterable[str]] = None, overwrite: bool = False
):
return create_dataset_view(self, dataset_id, file_path, include, overwrite)
[docs]
def get_dataset_status_by_id(self, dataset_id: int) -> Dict[str, Dict[RecordStatusEnum, int]]:
return self.make_request("get", f"api/v1/datasets/{dataset_id}/status", Dict[str, Dict[RecordStatusEnum, int]])
[docs]
def add_dataset(
self,
dataset_type: str,
name: str,
description: Optional[str] = None,
tagline: Optional[str] = None,
tags: Optional[List[str]] = None,
group: Optional[str] = None,
provenance: Optional[Dict[str, Any]] = None,
visibility: bool = True,
default_tag: str = "*",
default_priority: PriorityEnum = PriorityEnum.normal,
metadata: Optional[Dict[str, Any]] = None,
owner_group: Optional[str] = None,
existing_ok: bool = False,
) -> BaseDataset:
if description is None:
description = ""
if tagline is None:
tagline = ""
if tags is None:
tags = []
if group is None:
group = "default"
if provenance is None:
provenance = {}
if metadata is None:
metadata = {}
body = DatasetAddBody(
name=name,
description=description,
tagline=tagline,
tags=tags,
group=group,
provenance=provenance,
visibility=visibility,
default_tag=default_tag,
default_priority=default_priority,
metadata=metadata,
owner_group=owner_group,
existing_ok=existing_ok,
)
ds_id = self.make_request("post", f"api/v1/datasets/{dataset_type}", int, body=body)
return self.get_dataset_by_id(ds_id)
[docs]
def delete_dataset(self, dataset_id: int, delete_records: bool):
params = DatasetDeleteParams(delete_records=delete_records)
return self.make_request("delete", f"api/v1/datasets/{dataset_id}", None, url_params=params)
##############################################################
# Molecules
##############################################################
[docs]
def get_molecules(
self,
molecule_ids: Union[int, Sequence[int]],
missing_ok: bool = False,
) -> Union[Optional[Molecule], List[Optional[Molecule]]]:
"""Obtains molecules with the specified IDs from the server
Parameters
----------
molecule_ids
A single molecule ID, or a list (or other sequence) of molecule IDs
missing_ok
If set to True, then missing molecules will be tolerated, and the returned list of
Molecules will contain None for the corresponding IDs that were not found.
Returns
-------
:
Molecules, in the same order as the requested ids.
If given a sequence of ids, the return value will be a list.
Otherwise, it will be a single Molecule.
"""
is_single = not isinstance(molecule_ids, Sequence)
molecule_ids = make_list(molecule_ids)
if not molecule_ids:
return []
batch_size = self.api_limits["get_molecules"] // 4
all_molecules = []
for mol_id_batch in chunk_iterable(molecule_ids, batch_size):
body = CommonBulkGetBody(ids=mol_id_batch, missing_ok=missing_ok)
mol_batch = self.make_request("post", "api/v1/molecules/bulkGet", List[Optional[Molecule]], body=body)
all_molecules.extend(mol_batch)
if is_single:
return all_molecules[0]
else:
return all_molecules
[docs]
def query_molecules(
self,
*,
molecule_hash: Optional[Union[str, Iterable[str]]] = None,
molecular_formula: Optional[Union[str, Iterable[str]]] = None,
identifiers: Optional[Dict[str, Union[str, Iterable[str]]]] = None,
limit: Optional[int] = None,
) -> MoleculeQueryIterator:
"""Query molecules by attributes.
Do not rely on the returned molecules being in any particular order.
Parameters
----------
molecule_hash
Queries molecules by hash
molecular_formula
Queries molecules by molecular formula.
Molecular formulas are not order-sensitive (e.g. "H2O == OH2 != Oh2").
identifiers
Additional identifiers to search for (smiles, etc)
limit
The maximum number of Molecules to return. Note that the server limit is always obeyed.
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_dict = {
"molecule_hash": make_list(molecule_hash),
"molecular_formula": make_list(molecular_formula),
"limit": limit,
}
if identifiers is not None:
filter_dict["identifiers"] = {k: make_list(v) for k, v in identifiers.items()}
filter_data = MoleculeQueryFilters(**filter_dict)
return MoleculeQueryIterator(self, filter_data)
[docs]
def add_molecules(self, molecules: Sequence[Molecule]) -> Tuple[InsertMetadata, List[int]]:
"""Add molecules to the server database
If the same molecule (defined by having the same hash) already exists, then the existing
molecule is kept and that particular molecule is not added.
Parameters
----------
molecules
A list of Molecules to add to the server.
Returns
-------
:
Metadata about what was inserted, and a list of IDs of the molecules
in the same order as the `molecules` parameter.
"""
if not molecules:
return InsertMetadata(), []
if len(molecules) > self.api_limits["add_molecules"]:
raise RuntimeError(
f"Cannot add {len(molecules)} molecules - over the limit of {self.api_limits['add_molecules']}"
)
mols = self.make_request(
"post", "api/v1/molecules/bulkCreate", Tuple[InsertMetadata, List[int]], body=make_list(molecules)
)
return mols
[docs]
def modify_molecule(
self,
molecule_id: int,
name: Optional[str] = None,
comment: Optional[str] = None,
identifiers: Optional[Union[Dict[str, Any], MoleculeIdentifiers]] = None,
overwrite_identifiers: bool = False,
) -> UpdateMetadata:
"""
Modify molecules on the server
This is only capable of updating the name, comment, and identifiers fields (except molecule_hash
and molecular formula).
If a molecule with that id does not exist, an exception is raised
Parameters
----------
molecule_id
ID of the molecule to modify
name
New name for the molecule. If None, name is not changed.
comment
New comment for the molecule. If None, comment is not changed
identifiers
A new set of identifiers for the molecule
overwrite_identifiers
If True, the identifiers of the molecule are set to be those given exactly (ie, identifiers
that exist in the DB but not in the new set will be removed). Otherwise, the new set of
identifiers is merged into the existing ones. Note that molecule_hash and molecular_formula
are never removed.
Returns
-------
:
Metadata about the modification/update.
"""
body = MoleculeModifyBody(
name=name, comment=comment, identifiers=identifiers, overwrite_identifiers=overwrite_identifiers
)
return self.make_request("patch", f"api/v1/molecules/{molecule_id}", UpdateMetadata, body=body)
[docs]
def delete_molecules(self, molecule_ids: Union[int, Sequence[int]]) -> DeleteMetadata:
"""Deletes molecules from the server
This will not delete any molecules that are in use
Parameters
----------
molecule_ids
An id or list of ids to query.
Returns
-------
:
Metadata about what was deleted
"""
molecule_ids = make_list(molecule_ids)
if not molecule_ids:
return DeleteMetadata()
return self.make_request("post", "api/v1/molecules/bulkDelete", DeleteMetadata, body=molecule_ids)
##############################################################
# General record functions
##############################################################
def _fetch_records(
self,
record_type: Optional[Type[_T]],
record_ids: Sequence[int],
missing_ok: bool = False,
include: Optional[Iterable[str]] = None,
) -> List[Optional[_T]]:
"""
Fetches records of a particular type with the specified IDs from the remove server.
Records will be returned in the same order as the record ids. This function always returns a list.
This function only fetches the top-level records - it does not fetch the children of the records. It also
does not use caching at all.
Parameters
----------
record_type
The type of record to fetch
record_ids
Single ID or sequence/list of records to obtain
missing_ok
If set to True, then missing records will be tolerated, and the returned
records will contain None for the corresponding IDs that were not found.
include
Additional fields to include in the returned record
Returns
-------
:
If a single ID was specified, returns just that record. Otherwise, returns
a list of records. If missing_ok was specified, None will be substituted for a record
that was not found.
"""
if not record_ids:
return []
if include is not None:
# Always include the base stuff
include = list(include) + ["*"]
if record_type is None:
endpoint = "api/v1/records/bulkGet"
else:
# A little hacky
record_type_str = record_type.__fields__["record_type"].default
endpoint = f"api/v1/records/{record_type_str}/bulkGet"
max_batch_size = self.api_limits["get_records"]
initial_batch_size = math.ceil(max_batch_size // 10)
def _download_chunk(id_chunk: List[int]):
body = CommonBulkGetBody(ids=id_chunk, include=include, missing_ok=missing_ok)
return self.make_request("post", endpoint, List[Optional[Dict[str, Any]]], body=body)
all_records = []
for record_dicts in process_chunk_iterable(
_download_chunk,
record_ids,
self.download_target_time,
max_batch_size,
initial_batch_size,
self.n_download_threads,
keep_order=True,
):
if record_type is None:
all_records.extend(records_from_dicts(record_dicts, self))
else:
all_records.extend([record_type(self, **r) if r is not None else None for r in record_dicts])
# Just to really make sure the process_chunk_iterable code is correct
assert all((x is None or x.id == rid) for x, rid in zip(all_records, record_ids))
return all_records
[docs]
def get_records(
self,
record_ids: Union[int, Sequence[int]],
missing_ok: bool = False,
*,
include: Optional[Iterable[str]] = None,
) -> Union[List[Optional[BaseRecord]], Optional[BaseRecord]]:
"""
Obtain records of all types with specified IDs
This function will return record objects of the given ID no matter
what the type is. All records are unique by ID (ie, an optimization will never
have the same ID as a singlepoint).
Records will be returned in the same order as the record ids.
Parameters
----------
record_ids
Single ID or sequence/list of records to obtain
missing_ok
If set to True, then missing records will be tolerated, and the returned
records will contain None for the corresponding IDs that were not found.
include
Additional fields to include in the returned record
Returns
-------
:
A list of records. If missing_ok was specified, None will be substituted for a record
that was not found.
"""
return self._get_records_by_type(None, record_ids, missing_ok, include)
def _get_records_by_type(
self,
record_type: Optional[Type[_T]],
record_ids: Union[int, Sequence[int]],
missing_ok: bool = False,
include: Optional[Iterable[str]] = None,
) -> Union[Optional[_T], List[Optional[_T]]]:
"""
Obtain records of a particular type with the specified IDs from the server.
Records will be returned in the same order as the record ids.
This function will fetch the children of the records if enough information
is fetched of the parent record. This is handled by the various fetch_children_multi
class functions of the record types.
This function does not use the cache.
Parameters
----------
record_type
The type of record to fetch
record_ids
Single ID or sequence/list of records to obtain
missing_ok
If set to True, then missing records will be tolerated, and the returned
records will contain None for the corresponding IDs that were not found.
include
Additional fields to include in the returned record
Returns
-------
:
If a single ID was specified, returns just that record. Otherwise, returns
a list of records. If missing_ok was specified, None will be substituted for a record
that was not found.
"""
is_single = not isinstance(record_ids, Sequence)
record_ids = make_list(record_ids)
all_records = self._fetch_records(record_type, record_ids, missing_ok, include)
# We always force fetch here. Given that this record is not part of the cache, it shouldn't be using any
# cache anyway. But the semantics of this function is that is always fetches everything
if record_type is None:
# Handle disparate record types
record_groups = {}
for r in all_records:
if r is not None:
record_groups.setdefault(r.record_type, [])
record_groups[r.record_type].append(r)
for v in record_groups.values():
v[0].fetch_children_multi(v, include, force_fetch=True)
else:
record_type.fetch_children_multi(all_records, include, force_fetch=True)
if is_single:
return all_records[0]
else:
return all_records
[docs]
def query_records(
self,
*,
record_id: Optional[Union[int, Iterable[int]]] = None,
record_type: Optional[Union[str, Iterable[str]]] = None,
manager_name: Optional[Union[str, Iterable[str]]] = None,
status: Optional[Union[RecordStatusEnum, Iterable[RecordStatusEnum]]] = None,
dataset_id: Optional[Union[int, Iterable[int]]] = None,
parent_id: Optional[Union[int, Iterable[int]]] = None,
child_id: Optional[Union[int, Iterable[int]]] = None,
created_before: Optional[Union[datetime, str]] = None,
created_after: Optional[Union[datetime, str]] = None,
modified_before: Optional[Union[datetime, str]] = None,
modified_after: Optional[Union[datetime, str]] = None,
owner_user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
owner_group: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
limit: int = None,
include: Optional[Iterable[str]] = None,
) -> RecordQueryIterator[BaseRecord]:
"""
Query records of all types based on common fields
This is a general query of all record types, so it can only filter by fields
that are common among all records.
Do not rely on the returned records being in any particular order.
Parameters
----------
record_id
Query records whose ID is in the given list
record_type
Query records whose type is in the given list
manager_name
Query records that were completed (or are currently runnning) on a manager is in the given list
status
Query records whose status is in the given list
dataset_id
Query records that are part of a dataset is in the given list
parent_id
Query records that have a parent is in the given list
child_id
Query records that have a child is in the given list
created_before
Query records that were created before the given date/time
created_after
Query records that were created after the given date/time
modified_before
Query records that were modified before the given date/time
modified_after
Query records that were modified after the given date/time
owner_user
Query records owned by a user in the given list (usernames or IDs)
owner_group
Query records owned by a group in the given list (group names or IDS)
limit
The maximum number of records to return. Note that the server limit is always obeyed.
include
Additional fields to include in the returned record
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_dict = {
"record_id": make_list(record_id),
"record_type": make_list(record_type),
"manager_name": make_list(manager_name),
"status": make_list(status),
"dataset_id": make_list(dataset_id),
"parent_id": make_list(parent_id),
"child_id": make_list(child_id),
"created_before": created_before,
"created_after": created_after,
"modified_before": modified_before,
"modified_after": modified_after,
"owner_user": make_list(owner_user),
"owner_group": make_list(owner_group),
"limit": limit,
}
filter_data = RecordQueryFilters(**filter_dict)
return RecordQueryIterator[BaseRecord](self, filter_data, None, include)
[docs]
def reset_records(self, record_ids: Union[int, Sequence[int]]) -> UpdateMetadata:
"""
Resets running or errored records to be waiting again
"""
record_ids = make_list(record_ids)
if not record_ids:
return UpdateMetadata()
body = RecordModifyBody(record_ids=record_ids, status=RecordStatusEnum.waiting)
return self.make_request("patch", "api/v1/records", UpdateMetadata, body=body)
[docs]
def cancel_records(self, record_ids: Union[int, Sequence[int]]) -> UpdateMetadata:
"""
Marks running, waiting, or errored records as cancelled
A cancelled record will not be picked up by a manager.
"""
record_ids = make_list(record_ids)
if not record_ids:
return UpdateMetadata()
body = RecordModifyBody(record_ids=record_ids, status=RecordStatusEnum.cancelled)
return self.make_request("patch", "api/v1/records", UpdateMetadata, body=body)
[docs]
def invalidate_records(self, record_ids: Union[int, Sequence[int]]) -> UpdateMetadata:
"""
Marks a completed record as invalid
An invalid record is one that supposedly successfully completed. However, after review,
is not correct.
"""
record_ids = make_list(record_ids)
if not record_ids:
return UpdateMetadata()
body = RecordModifyBody(record_ids=record_ids, status=RecordStatusEnum.invalid)
return self.make_request("patch", "api/v1/records", UpdateMetadata, body=body)
[docs]
def delete_records(
self, record_ids: Union[int, Sequence[int]], soft_delete=True, delete_children: bool = True
) -> DeleteMetadata:
"""
Delete records from the database
If soft_delete is True, then the record is just marked as deleted and actually deletion may
happen later. Soft delete can be undone with undelete
Parameters
----------
record_ids
Reset the status of these record ids
soft_delete
Don't actually delete the record, just mark it for later deletion
delete_children
If True, attempt to delete child records as well
"""
record_ids = make_list(record_ids)
if not record_ids:
return DeleteMetadata()
body = RecordDeleteBody(record_ids=record_ids, soft_delete=soft_delete, delete_children=delete_children)
return self.make_request("post", "api/v1/records/bulkDelete", DeleteMetadata, body=body)
[docs]
def uninvalidate_records(self, record_ids: Union[int, Sequence[int]]) -> UpdateMetadata:
"""
Undo the invalidation of records
"""
record_ids = make_list(record_ids)
if not record_ids:
return UpdateMetadata()
body = RecordRevertBody(record_ids=record_ids, revert_status=RecordStatusEnum.invalid)
return self.make_request("post", "api/v1/records/revert", UpdateMetadata, body=body)
[docs]
def uncancel_records(self, record_ids: Union[int, Sequence[int]]) -> UpdateMetadata:
"""
Undo the cancellation of records
"""
record_ids = make_list(record_ids)
if not record_ids:
return UpdateMetadata()
body = RecordRevertBody(record_ids=record_ids, revert_status=RecordStatusEnum.cancelled)
return self.make_request("post", "api/v1/records/revert", UpdateMetadata, body=body)
[docs]
def undelete_records(self, record_ids: Union[int, Sequence[int]]) -> UpdateMetadata:
"""
Undo the (soft) deletion of records
"""
record_ids = make_list(record_ids)
if not record_ids:
return UpdateMetadata()
body = RecordRevertBody(record_ids=record_ids, revert_status=RecordStatusEnum.deleted)
return self.make_request("post", "api/v1/records/revert", UpdateMetadata, body=body)
[docs]
def modify_records(
self,
record_ids: Union[int, Sequence[int]],
new_tag: Optional[str] = None,
new_priority: Optional[PriorityEnum] = None,
) -> UpdateMetadata:
"""
Modify the tag or priority of a record
"""
record_ids = make_list(record_ids)
if not record_ids:
return UpdateMetadata()
if new_tag is None and new_priority is None:
return UpdateMetadata()
body = RecordModifyBody(record_ids=record_ids, tag=new_tag, priority=new_priority)
return self.make_request("patch", "api/v1/records", UpdateMetadata, body=body)
[docs]
def get_waiting_reason(self, record_id: int) -> Dict[str, Any]:
"""
Get the reason a record is in the waiting status
The return is a dictionary, with a 'reason' key containing the overall reason the record is
waiting. If appropriate, there is a 'details' key that contains information for each
active compute manager on why that manager is not able to pick up the record's task.
Parameters
----------
record_id
The record ID to test
Returns
-------
:
A dictionary containing information about why the record is not being picked up by compute managers
"""
return self.make_request("get", f"api/v1/records/{record_id}/waiting_reason", Dict[str, Any])
##############################################################
# Singlepoint calculations
##############################################################
[docs]
def add_singlepoints(
self,
molecules: Union[int, Molecule, List[Union[int, Molecule]]],
program: str,
driver: str,
method: str,
basis: Optional[str],
keywords: Optional[Dict[str, Any]] = None,
protocols: Optional[Union[SinglepointProtocols, Dict[str, Any]]] = None,
tag: str = "*",
priority: PriorityEnum = PriorityEnum.normal,
owner_group: Optional[str] = None,
find_existing: bool = True,
) -> Tuple[InsertMetadata, List[int]]:
"""
Adds new singlepoint computations to the server
This checks if the calculations already exist in the database. If so, it returns
the existing id, otherwise it will insert it and return the new id.
This will add one record per molecule.
Parameters
----------
molecules
The Molecules or Molecule ids to compute with the above methods
program
The computational program to execute the result with (e.g., "rdkit", "psi4").
driver
The primary result that the compute will acquire {"energy", "gradient", "hessian", "properties"}
method
The computational method to use (e.g., "B3LYP", "PBE")
basis
The basis to apply to the computation (e.g., "cc-pVDZ", "6-31G")
keywords
The program-specific keywords for the computation
protocols
Protocols for storing more/less data for each computation
tag
The tag for the task. This will assist in routing to appropriate compute managers.
priority
The priority of the job (high, normal, low). Default is normal.
owner_group
Group with additional permission for these records
find_existing
If True, search for existing records and return those. If False, always add new records
Returns
-------
:
Metadata about the insertion, and a list of record ids. The ids will be in the
order of the input molecules
"""
molecules = make_list(molecules)
if not molecules:
return InsertMetadata(), []
if len(molecules) > self.api_limits["add_records"]:
raise RuntimeError(
f"Cannot add {len(molecules)} records - over the limit of {self.api_limits['add_records']}"
)
body_data = {
"molecules": molecules,
"specification": {
"program": program,
"driver": driver,
"method": method,
"basis": basis,
},
"tag": tag,
"priority": priority,
"owner_group": owner_group,
"find_existing": find_existing,
}
# If these are None, then let the pydantic models handle the defaults
if keywords is not None:
body_data["specification"]["keywords"] = keywords
if protocols is not None:
body_data["specification"]["protocols"] = protocols
body = SinglepointAddBody(**body_data)
return self.make_request(
"post", "api/v1/records/singlepoint/bulkCreate", Tuple[InsertMetadata, List[int]], body=body
)
[docs]
def get_singlepoints(
self,
record_ids: Union[int, Sequence[int]],
missing_ok: bool = False,
*,
include: Optional[Iterable[str]] = None,
) -> Union[Optional[SinglepointRecord], List[Optional[SinglepointRecord]]]:
"""
Obtain singlepoint records with the specified IDs.
Records will be returned in the same order as the record ids.
Parameters
----------
record_ids
Single ID or sequence/list of records to obtain
missing_ok
If set to True, then missing records will be tolerated, and the returned
records will contain None for the corresponding IDs that were not found.
include
Additional fields to include in the returned record
Returns
-------
:
If a single ID was specified, returns just that record. Otherwise, returns
a list of records. If missing_ok was specified, None will be substituted for a record
that was not found.
"""
return self._get_records_by_type(SinglepointRecord, record_ids, missing_ok, include)
[docs]
def query_singlepoints(
self,
*,
record_id: Optional[Union[int, Iterable[int]]] = None,
manager_name: Optional[Union[str, Iterable[str]]] = None,
status: Optional[Union[RecordStatusEnum, Iterable[RecordStatusEnum]]] = None,
dataset_id: Optional[Union[int, Iterable[int]]] = None,
parent_id: Optional[Union[int, Iterable[int]]] = None,
created_before: Optional[Union[datetime, str]] = None,
created_after: Optional[Union[datetime, str]] = None,
modified_before: Optional[Union[datetime, str]] = None,
modified_after: Optional[Union[datetime, str]] = None,
program: Optional[Union[str, Iterable[str]]] = None,
driver: Optional[Union[SinglepointDriver, Iterable[SinglepointDriver]]] = None,
method: Optional[Union[str, Iterable[str]]] = None,
basis: Optional[Union[str, Iterable[Optional[str]]]] = None,
keywords: Optional[Union[Dict[str, Any], Iterable[Dict[str, Any]]]] = None,
molecule_id: Optional[Union[int, Iterable[int]]] = None,
owner_user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
owner_group: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
limit: Optional[int] = None,
include: Optional[Iterable[str]] = None,
) -> RecordQueryIterator[SinglepointRecord]:
"""
Queries singlepoint records on the server
Do not rely on the returned records being in any particular order.
Parameters
----------
record_id
Query records whose ID is in the given list
manager_name
Query records that were completed (or are currently runnning) on a manager is in the given list
status
Query records whose status is in the given list
dataset_id
Query records that are part of a dataset is in the given list
parent_id
Query records that have a parent is in the given list
created_before
Query records that were created before the given date/time
created_after
Query records that were created after the given date/time
modified_before
Query records that were modified before the given date/time
modified_after
Query records that were modified after the given date/time
program
Query records whose program is in the given list
driver
Query records whose driver is in the given list
method
Query records whose method is in the given list
basis
Query records whose basis is in the given list
keywords
Query records with these keywords (exact match)
molecule_id
Query records whose molecule (id) is in the given list
owner_user
Query records owned by a user in the given list
owner_group
Query records owned by a group in the given list
limit
The maximum number of records to return. Note that the server limit is always obeyed.
include
Additional fields to include in the returned record
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
# Note - singlepoints don't have any children
filter_dict = {
"record_id": make_list(record_id),
"manager_name": make_list(manager_name),
"status": make_list(status),
"dataset_id": make_list(dataset_id),
"parent_id": make_list(parent_id),
"program": make_list(program),
"driver": make_list(driver),
"method": make_list(method),
"basis": make_list(basis),
"keywords": make_list(keywords),
"molecule_id": make_list(molecule_id),
"created_before": created_before,
"created_after": created_after,
"modified_before": modified_before,
"modified_after": modified_after,
"owner_user": make_list(owner_user),
"owner_group": make_list(owner_group),
"limit": limit,
}
filter_data = SinglepointQueryFilters(**filter_dict)
return RecordQueryIterator[SinglepointRecord](self, filter_data, SinglepointRecord, include)
##############################################################
# Optimization calculations
##############################################################
[docs]
def add_optimizations(
self,
initial_molecules: Union[int, Molecule, List[Union[int, Molecule]]],
program: str,
qc_specification: QCSpecification,
keywords: Optional[Dict[str, Any]] = None,
protocols: Optional[OptimizationProtocols] = None,
tag: str = "*",
priority: PriorityEnum = PriorityEnum.normal,
owner_group: Optional[str] = None,
find_existing: bool = True,
) -> Tuple[InsertMetadata, List[int]]:
"""
Adds new geometry optimization calculations to the server
This checks if the calculations already exist in the database. If so, it returns
the existing id, otherwise it will insert it and return the new id.
This will add one record per initial molecule.
Parameters
----------
initial_molecules
Initial molecule/geometry to optimize
program
Which program to use for the optimization (ie, geometric)
qc_specification
The method, basis, etc, to optimize the geometry with
keywords
Program-specific keywords for the optimization program (not the qc program)
protocols
Protocols for storing more/less data for each computation (for the optimization)
tag
The tag for the task. This will assist in routing to appropriate compute managers.
priority
The priority of the job (high, normal, low). Default is normal.
owner_group
Group with additional permission for these records
find_existing
If True, search for existing records and return those. If False, always add new records
Returns
-------
:
Metadata about the insertion, and a list of record ids. The ids will be in the
order of the input molecules
"""
initial_molecules = make_list(initial_molecules)
if not initial_molecules:
return InsertMetadata(), []
if len(initial_molecules) > self.api_limits["add_records"]:
raise RuntimeError(
f"Cannot add {len('initial_molecules')} records - over the limit of {self.api_limits['add_records']}"
)
body_data = {
"initial_molecules": initial_molecules,
"specification": {
"program": program,
"qc_specification": qc_specification,
},
"tag": tag,
"priority": priority,
"owner_group": owner_group,
"find_existing": find_existing,
}
# If these are None, then let the pydantic models handle the defaults
if keywords is not None:
body_data["specification"]["keywords"] = keywords
if protocols is not None:
body_data["specification"]["protocols"] = protocols
body_data = OptimizationAddBody(**body_data)
return self.make_request(
"post",
"api/v1/records/optimization/bulkCreate",
Tuple[InsertMetadata, List[int]],
body=body_data,
)
[docs]
def get_optimizations(
self,
record_ids: Union[int, Sequence[int]],
missing_ok: bool = False,
*,
include: Optional[Iterable[str]] = None,
) -> Union[Optional[OptimizationRecord], List[Optional[OptimizationRecord]]]:
"""
Obtain optimization records with the specified IDs.
Records will be returned in the same order as the record ids.
Parameters
----------
record_ids
Single ID or sequence/list of records to obtain
missing_ok
If set to True, then missing records will be tolerated, and the returned
records will contain None for the corresponding IDs that were not found.
include
Additional fields to include in the returned record
Returns
-------
:
If a single ID was specified, returns just that record. Otherwise, returns
a list of records. If missing_ok was specified, None will be substituted for a record
that was not found.
"""
return self._get_records_by_type(OptimizationRecord, record_ids, missing_ok, include)
[docs]
def query_optimizations(
self,
*,
record_id: Optional[Union[int, Iterable[int]]] = None,
manager_name: Optional[Union[str, Iterable[str]]] = None,
status: Optional[Union[RecordStatusEnum, Iterable[RecordStatusEnum]]] = None,
dataset_id: Optional[Union[int, Iterable[int]]] = None,
parent_id: Optional[Union[int, Iterable[int]]] = None,
child_id: Optional[Union[int, Iterable[int]]] = None,
created_before: Optional[Union[datetime, str]] = None,
created_after: Optional[Union[datetime, str]] = None,
modified_before: Optional[Union[datetime, str]] = None,
modified_after: Optional[Union[datetime, str]] = None,
program: Optional[Union[str, Iterable[str]]] = None,
qc_program: Optional[Union[str, Iterable[str]]] = None,
qc_method: Optional[Union[str, Iterable[str]]] = None,
qc_basis: Optional[Union[str, Iterable[Optional[str]]]] = None,
initial_molecule_id: Optional[Union[int, Iterable[int]]] = None,
final_molecule_id: Optional[Union[int, Iterable[int]]] = None,
owner_user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
owner_group: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
limit: Optional[int] = None,
include: Optional[Iterable[str]] = None,
) -> RecordQueryIterator[OptimizationRecord]:
"""
Queries optimization records on the server
Do not rely on the returned records being in any particular order.
Parameters
----------
record_id
Query records whose ID is in the given list
manager_name
Query records that were completed (or are currently runnning) on a manager is in the given list
status
Query records whose status is in the given list
dataset_id
Query records that are part of a dataset is in the given list
parent_id
Query records that have a parent is in the given list
child_id
Query records that have a child (singlepoint calculation) is in the given list
created_before
Query records that were created before the given date/time
created_after
Query records that were created after the given date/time
modified_before
Query records that were modified before the given date/time
modified_after
Query records that were modified after the given date/time
program
Query records whose optimization program is in the given list
qc_program
Query records whose qc program is in the given list
qc_method
Query records whose method is in the given list
qc_basis
Query records whose basis is in the given list
initial_molecule_id
Query records whose initial molecule (id) is in the given list
final_molecule_id
Query records whose final molecule (id) is in the given list
owner_user
Query records owned by a user in the given list
owner_group
Query records owned by a group in the given list
limit
The maximum number of records to return. Note that the server limit is always obeyed.
include
Additional fields to include in the returned record
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_dict = {
"record_id": make_list(record_id),
"manager_name": make_list(manager_name),
"status": make_list(status),
"dataset_id": make_list(dataset_id),
"parent_id": make_list(parent_id),
"child_id": make_list(child_id),
"program": make_list(program),
"qc_program": make_list(qc_program),
"qc_method": make_list(qc_method),
"qc_basis": make_list(qc_basis),
"initial_molecule_id": make_list(initial_molecule_id),
"final_molecule_id": make_list(final_molecule_id),
"created_before": created_before,
"created_after": created_after,
"modified_before": modified_before,
"modified_after": modified_after,
"owner_user": make_list(owner_user),
"owner_group": make_list(owner_group),
"limit": limit,
}
filter_data = OptimizationQueryFilters(**filter_dict)
return RecordQueryIterator[OptimizationRecord](self, filter_data, OptimizationRecord, include)
##############################################################
# Torsiondrive calculations
##############################################################
[docs]
def add_torsiondrives(
self,
initial_molecules: List[List[Union[int, Molecule]]],
program: str,
optimization_specification: OptimizationSpecification,
keywords: Union[TorsiondriveKeywords, Dict[str, Any]],
tag: str = "*",
priority: PriorityEnum = PriorityEnum.normal,
owner_group: Optional[str] = None,
find_existing: bool = True,
) -> Tuple[InsertMetadata, List[int]]:
"""
Adds new torsiondrive computations to the server
This checks if the calculations already exist in the database. If so, it returns
the existing id, otherwise it will insert it and return the new id.
This will add one record per set of molecules
Parameters
----------
initial_molecules
Molecules to start the torsiondrives. Each torsiondrive can start with
multiple molecules, so this is a nested list
program
The program to run the torsiondrive computation with ("torsiondrive")
optimization_specification
Specification of how each optimization of the torsiondrive should be run
keywords
The torsiondrive keywords for the computation
tag
The tag for the task. This will assist in routing to appropriate compute managers.
priority
The priority of the job (high, normal, low). Default is normal.
owner_group
Group with additional permission for these records
find_existing
If True, search for existing records and return those. If False, always add new records
Returns
-------
:
Metadata about the insertion, and a list of record ids. The ids will be in the
order of the input molecules
"""
if not initial_molecules:
return InsertMetadata(), []
if len(initial_molecules) > self.api_limits["add_records"]:
raise RuntimeError(
f"Cannot add {len(initial_molecules)} records - over the limit of {self.api_limits['add_records']}"
)
body_data = {
"initial_molecules": initial_molecules,
"specification": {
"program": program,
"optimization_specification": optimization_specification,
"keywords": keywords,
},
"as_service": True,
"tag": tag,
"priority": priority,
"owner_group": owner_group,
"find_existing": find_existing,
}
body = TorsiondriveAddBody(**body_data)
return self.make_request(
"post", "api/v1/records/torsiondrive/bulkCreate", Tuple[InsertMetadata, List[int]], body=body
)
[docs]
def get_torsiondrives(
self,
record_ids: Union[int, Sequence[int]],
missing_ok: bool = False,
*,
include: Optional[Iterable[str]] = None,
) -> Union[Optional[TorsiondriveRecord], List[Optional[TorsiondriveRecord]]]:
"""
Obtain torsiondrive records with the specified IDs.
Records will be returned in the same order as the record ids.
Parameters
----------
record_ids
Single ID or sequence/list of records to obtain
missing_ok
If set to True, then missing records will be tolerated, and the returned
records will contain None for the corresponding IDs that were not found.
include
Additional fields to include in the returned record
Returns
-------
:
If a single ID was specified, returns just that record. Otherwise, returns
a list of records. If missing_ok was specified, None will be substituted for a record
that was not found.
"""
return self._get_records_by_type(TorsiondriveRecord, record_ids, missing_ok, include)
[docs]
def query_torsiondrives(
self,
*,
record_id: Optional[Union[int, Iterable[int]]] = None,
manager_name: Optional[Union[str, Iterable[str]]] = None,
status: Optional[Union[RecordStatusEnum, Iterable[RecordStatusEnum]]] = None,
dataset_id: Optional[Union[int, Iterable[int]]] = None,
parent_id: Optional[Union[int, Iterable[int]]] = None,
child_id: Optional[Union[int, Iterable[int]]] = None,
created_before: Optional[Union[datetime, str]] = None,
created_after: Optional[Union[datetime, str]] = None,
modified_before: Optional[Union[datetime, str]] = None,
modified_after: Optional[Union[datetime, str]] = None,
program: Optional[Union[str, Iterable[str]]] = None,
optimization_program: Optional[Union[str, Iterable[str]]] = None,
qc_program: Optional[Union[str, Iterable[str]]] = None,
qc_method: Optional[Union[str, Iterable[str]]] = None,
qc_basis: Optional[Union[str, Iterable[str]]] = None,
initial_molecule_id: Optional[Union[int, Iterable[int]]] = None,
owner_user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
owner_group: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
limit: Optional[int] = None,
include: Optional[Iterable[str]] = None,
) -> RecordQueryIterator[TorsiondriveRecord]:
"""
Queries torsiondrive records on the server
Do not rely on the returned records being in any particular order.
Parameters
----------
record_id
Query records whose ID is in the given list
manager_name
Query records that were completed (or are currently runnning) on a manager is in the given list
status
Query records whose status is in the given list
dataset_id
Query records that are part of a dataset is in the given list
parent_id
Query records that have a parent is in the given list
child_id
Query records that have a child (optimization calculation) is in the given list
created_before
Query records that were created before the given date/time
created_after
Query records that were created after the given date/time
modified_before
Query records that were modified before the given date/time
modified_after
Query records that were modified after the given date/time
program
Query records whose torsiondrive program is in the given list
optimization_program
Query records whose optimization program is in the given list
qc_program
Query records whose qc program is in the given list
qc_method
Query records whose method is in the given list
qc_basis
Query records whose basis is in the given list
initial_molecule_id
Query records whose initial molecule (id) is in the given list
owner_user
Query records owned by a user in the given list
owner_group
Query records owned by a group in the given list
limit
The maximum number of records to return. Note that the server limit is always obeyed.
include
Additional fields to include in the returned record
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_dict = {
"record_id": make_list(record_id),
"manager_name": make_list(manager_name),
"status": make_list(status),
"dataset_id": make_list(dataset_id),
"parent_id": make_list(parent_id),
"child_id": make_list(child_id),
"program": make_list(program),
"optimization_program": make_list(optimization_program),
"qc_program": make_list(qc_program),
"qc_method": make_list(qc_method),
"qc_basis": make_list(qc_basis),
"initial_molecule_id": make_list(initial_molecule_id),
"created_before": created_before,
"created_after": created_after,
"modified_before": modified_before,
"modified_after": modified_after,
"owner_user": make_list(owner_user),
"owner_group": make_list(owner_group),
"limit": limit,
}
filter_data = TorsiondriveQueryFilters(**filter_dict)
return RecordQueryIterator[TorsiondriveRecord](self, filter_data, TorsiondriveRecord, include)
##############################################################
# Grid optimization calculations
##############################################################
[docs]
def add_gridoptimizations(
self,
initial_molecules: Union[int, Molecule, Sequence[Union[int, Molecule]]],
program: str,
optimization_specification: OptimizationSpecification,
keywords: Union[GridoptimizationKeywords, Dict[str, Any]],
tag: str = "*",
priority: PriorityEnum = PriorityEnum.normal,
owner_group: Optional[str] = None,
find_existing: bool = True,
) -> Tuple[InsertMetadata, List[int]]:
"""
Adds new gridoptimization computations to the server
This checks if the calculations already exist in the database. If so, it returns
the existing id, otherwise it will insert it and return the new id.
This will add one record per initial molecule
Parameters
----------
initial_molecules
Molecules to start the gridoptimizations. Each gridoptimization starts with
a single molecule.
program
The program to run the gridoptimization computation with ("gridoptimization")
optimization_specification
Specification of how each optimization of the gridoptimization should be run
keywords
The gridoptimization keywords for the computation
tag
The tag for the task. This will assist in routing to appropriate compute managers.
priority
The priority of the job (high, normal, low). Default is normal.
owner_group
Group with additional permission for these records
find_existing
If True, search for existing records and return those. If False, always add new records
Returns
-------
:
Metadata about the insertion, and a list of record ids. The ids will be in the
order of the input molecules
"""
initial_molecules = make_list(initial_molecules)
if not initial_molecules:
return InsertMetadata(), []
if len(initial_molecules) > self.api_limits["add_records"]:
raise RuntimeError(
f"Cannot add {len(initial_molecules)} records - over the limit of {self.api_limits['add_records']}"
)
body_data = {
"initial_molecules": initial_molecules,
"specification": {
"program": program,
"optimization_specification": optimization_specification,
"keywords": keywords,
},
"tag": tag,
"priority": priority,
"owner_group": owner_group,
"find_existing": find_existing,
}
body = GridoptimizationAddBody(**body_data)
return self.make_request(
"post", "api/v1/records/gridoptimization/bulkCreate", Tuple[InsertMetadata, List[int]], body=body
)
[docs]
def get_gridoptimizations(
self,
record_ids: Union[int, Sequence[int]],
missing_ok: bool = False,
*,
include: Optional[Iterable[str]] = None,
) -> Union[Optional[GridoptimizationRecord], List[Optional[GridoptimizationRecord]]]:
"""
Obtain gridoptimization records with the specified IDs.
Records will be returned in the same order as the record ids.
Parameters
----------
record_ids
Single ID or sequence/list of records to obtain
missing_ok
If set to True, then missing records will be tolerated, and the returned
records will contain None for the corresponding IDs that were not found.
include
Additional fields to include in the returned record
Returns
-------
:
If a single ID was specified, returns just that record. Otherwise, returns
a list of records. If missing_ok was specified, None will be substituted for a record
that was not found.
"""
return self._get_records_by_type(GridoptimizationRecord, record_ids, missing_ok, include)
[docs]
def query_gridoptimizations(
self,
*,
record_id: Optional[Union[int, Iterable[int]]] = None,
manager_name: Optional[Union[str, Iterable[str]]] = None,
status: Optional[Union[RecordStatusEnum, Iterable[RecordStatusEnum]]] = None,
dataset_id: Optional[Union[int, Iterable[int]]] = None,
parent_id: Optional[Union[int, Iterable[int]]] = None,
child_id: Optional[Union[int, Iterable[int]]] = None,
created_before: Optional[Union[datetime, str]] = None,
created_after: Optional[Union[datetime, str]] = None,
modified_before: Optional[Union[datetime, str]] = None,
modified_after: Optional[Union[datetime, str]] = None,
program: Optional[Union[str, Iterable[str]]] = None,
optimization_program: Optional[Union[str, Iterable[str]]] = None,
qc_program: Optional[Union[str, Iterable[str]]] = None,
qc_method: Optional[Union[str, Iterable[str]]] = None,
qc_basis: Optional[Union[str, Iterable[Optional[str]]]] = None,
initial_molecule_id: Optional[Union[int, Iterable[int]]] = None,
owner_user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
owner_group: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
limit: Optional[int] = None,
include: Optional[Iterable[str]] = None,
) -> RecordQueryIterator[GridoptimizationRecord]:
"""
Queries gridoptimization records on the server
Do not rely on the returned records being in any particular order.
Parameters
----------
record_id
Query records whose ID is in the given list
manager_name
Query records that were completed (or are currently runnning) on a manager is in the given list
status
Query records whose status is in the given list
dataset_id
Query records that are part of a dataset is in the given list
parent_id
Query records that have a parent is in the given list
child_id
Query records that have a child (optimization calculation) is in the given list
created_before
Query records that were created before the given date/time
created_after
Query records that were created after the given date/time
modified_before
Query records that were modified before the given date/time
modified_after
Query records that were modified after the given date/time
program
Query records whose gridoptimization program is in the given list
optimization_program
Query records whose optimization program is in the given list
qc_program
Query records whose qc program is in the given list
qc_method
Query records whose method is in the given list
qc_basis
Query records whose basis is in the given list
initial_molecule_id
Query records whose initial molecule (id) is in the given list
owner_user
Query records owned by a user in the given list
owner_group
Query records owned by a group in the given list
limit
The maximum number of records to return. Note that the server limit is always obeyed.
include
Additional fields to include in the returned record
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_dict = {
"record_id": make_list(record_id),
"manager_name": make_list(manager_name),
"status": make_list(status),
"dataset_id": make_list(dataset_id),
"parent_id": make_list(parent_id),
"child_id": make_list(child_id),
"program": make_list(program),
"optimization_program": make_list(optimization_program),
"qc_program": make_list(qc_program),
"qc_method": make_list(qc_method),
"qc_basis": make_list(qc_basis),
"initial_molecule_id": make_list(initial_molecule_id),
"created_before": created_before,
"created_after": created_after,
"modified_before": modified_before,
"modified_after": modified_after,
"owner_user": make_list(owner_user),
"owner_group": make_list(owner_group),
"limit": limit,
}
filter_data = GridoptimizationQueryFilters(**filter_dict)
return RecordQueryIterator[GridoptimizationRecord](self, filter_data, GridoptimizationRecord, include)
##############################################################
# Reactions
##############################################################
[docs]
def add_reactions(
self,
stoichiometries: Sequence[Sequence[Sequence[float, Union[int, Molecule]]]],
program: str,
singlepoint_specification: Optional[QCSpecification],
optimization_specification: Optional[OptimizationSpecification],
keywords: ReactionKeywords,
tag: str = "*",
priority: PriorityEnum = PriorityEnum.normal,
owner_group: Optional[str] = None,
find_existing: bool = True,
) -> Tuple[InsertMetadata, List[int]]:
"""
Adds new reaction computations to the server
Reactions can have a singlepoint specification, optimization specification,
or both; at least one must be specified. If both are specified, an optimization
is done, followed by a singlepoint computation. Otherwise, only the specification
that is specified is used.
This checks if the calculations already exist in the database. If so, it returns
the existing id, otherwise it will insert it and return the new id.
This will add one record per set of molecules
Parameters
----------
stoichiometries
Coefficients and molecules of the reaction. Each reaction has multiple
molecules/coefficients, so this is a nested list
program
The program for running the reaction computation ("reaction")
singlepoint_specification
The specification for singlepoint energy calculations
optimization_specification
The specification for optimization calculations
keywords
The keywords for the reaction calculation/service
tag
The tag for the task. This will assist in routing to appropriate compute managers.
priority
The priority of the job (high, normal, low). Default is normal.
owner_group
Group with additional permission for these records
find_existing
If True, search for existing records and return those. If False, always add new records
Returns
-------
:
Metadata about the insertion, and a list of record ids. The ids will be in the
order of the input molecules
"""
if not stoichiometries:
return InsertMetadata(), []
if len(stoichiometries) > self.api_limits["add_records"]:
raise RuntimeError(
f"Cannot add {len(stoichiometries)} records - over the limit of {self.api_limits['add_records']}"
)
body_data = {
"stoichiometries": stoichiometries,
"specification": {
"program": program,
"singlepoint_specification": singlepoint_specification,
"optimization_specification": optimization_specification,
"keywords": keywords,
},
"tag": tag,
"priority": priority,
"owner_group": owner_group,
"find_existing": find_existing,
}
body = ReactionAddBody(**body_data)
return self.make_request(
"post", "api/v1/records/reaction/bulkCreate", Tuple[InsertMetadata, List[int]], body=body
)
[docs]
def get_reactions(
self,
record_ids: Union[int, Sequence[int]],
missing_ok: bool = False,
*,
include: Optional[Iterable[str]] = None,
) -> Union[Optional[ReactionRecord], List[Optional[ReactionRecord]]]:
"""
Obtain reaction records with the specified IDs.
Records will be returned in the same order as the record ids.
Parameters
----------
record_ids
Single ID or sequence/list of records to obtain
missing_ok
If set to True, then missing records will be tolerated, and the returned
records will contain None for the corresponding IDs that were not found.
include
Additional fields to include in the returned record
Returns
-------
:
If a single ID was specified, returns just that record. Otherwise, returns
a list of records. If missing_ok was specified, None will be substituted for a record
that was not found.
"""
return self._get_records_by_type(ReactionRecord, record_ids, missing_ok, include)
[docs]
def query_reactions(
self,
*,
record_id: Optional[Union[int, Iterable[int]]] = None,
manager_name: Optional[Union[str, Iterable[str]]] = None,
status: Optional[Union[RecordStatusEnum, Iterable[RecordStatusEnum]]] = None,
dataset_id: Optional[Union[int, Iterable[int]]] = None,
parent_id: Optional[Union[int, Iterable[int]]] = None,
child_id: Optional[Union[int, Iterable[int]]] = None,
created_before: Optional[Union[datetime, str]] = None,
created_after: Optional[Union[datetime, str]] = None,
modified_before: Optional[Union[datetime, str]] = None,
modified_after: Optional[Union[datetime, str]] = None,
program: Optional[Union[str, Iterable[str]]] = None,
optimization_program: Optional[Iterable[Optional[str]]] = None,
qc_program: Optional[Union[str, Iterable[str]]] = None,
qc_method: Optional[Union[str, Iterable[str]]] = None,
qc_basis: Optional[Union[str, Iterable[str]]] = None,
molecule_id: Optional[Union[int, Iterable[int]]] = None,
owner_user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
owner_group: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
limit: Optional[int] = None,
include: Optional[Iterable[str]] = None,
) -> RecordQueryIterator[ReactionRecord]:
"""
Queries reaction records on the server
Do not rely on the returned records being in any particular order.
Parameters
----------
record_id
Query records whose ID is in the given list
manager_name
Query records that were completed (or are currently runnning) on a manager is in the given list
status
Query records whose status is in the given list
dataset_id
Query records that are part of a dataset is in the given list
parent_id
Query records that have a parent is in the given list
child_id
Query records that have a child (optimization calculation) is in the given list
created_before
Query records that were created before the given date/time
created_after
Query records that were created after the given date/time
modified_before
Query records that were modified before the given date/time
modified_after
Query records that were modified after the given date/time
program
Query records whose reaction program is in the given list
optimization_program
Query records whose optimization program is in the given list
qc_program
Query records whose qc program is in the given list
qc_method
Query records whose method is in the given list
qc_basis
Query records whose basis is in the given list
molecule_id
Query reactions that contain a molecule (id) is in the given list
owner_user
Query records owned by a user in the given list
owner_group
Query records owned by a group in the given list
limit
The maximum number of records to return. Note that the server limit is always obeyed.
include
Additional fields to include in the returned record
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_dict = {
"record_id": make_list(record_id),
"manager_name": make_list(manager_name),
"status": make_list(status),
"dataset_id": make_list(dataset_id),
"parent_id": make_list(parent_id),
"child_id": make_list(child_id),
"program": make_list(program),
"optimization_program": make_list(optimization_program),
"qc_program": make_list(qc_program),
"qc_method": make_list(qc_method),
"qc_basis": make_list(qc_basis),
"molecule_id": make_list(molecule_id),
"created_before": created_before,
"created_after": created_after,
"modified_before": modified_before,
"modified_after": modified_after,
"owner_user": make_list(owner_user),
"owner_group": make_list(owner_group),
"limit": limit,
}
filter_data = ReactionQueryFilters(**filter_dict)
return RecordQueryIterator[ReactionRecord](self, filter_data, ReactionRecord, include)
##############################################################
# Manybody calculations
##############################################################
[docs]
def add_manybodys(
self,
initial_molecules: Sequence[Union[int, Molecule]],
program: str,
singlepoint_specification: QCSpecification,
keywords: ManybodyKeywords,
tag: str = "*",
priority: PriorityEnum = PriorityEnum.normal,
owner_group: Optional[str] = None,
find_existing: bool = True,
) -> Tuple[InsertMetadata, List[int]]:
"""
Adds new manybody expansion computations to the server
This checks if the calculations already exist in the database. If so, it returns
the existing id, otherwise it will insert it and return the new id.
This will add one record per initial molecule.
Parameters
----------
initial_molecules
Initial molecules for the manybody expansion. Must have > 1 fragments.
program
The program to run the manybody computation with ("manybody")
singlepoint_specification
Specification for the singlepoint calculations done in the expansion
keywords
The keywords for the manybody program
tag
The tag for the task. This will assist in routing to appropriate compute managers.
priority
The priority of the job (high, normal, low). Default is normal.
owner_group
Group with additional permission for these records
find_existing
If True, search for existing records and return those. If False, always add new records
Returns
-------
:
Metadata about the insertion, and a list of record ids. The ids will be in the
order of the input molecules
"""
initial_molecules = make_list(initial_molecules)
if not initial_molecules:
return InsertMetadata(), []
if len(initial_molecules) > self.api_limits["add_records"]:
raise RuntimeError(
f"Cannot add {len(initial_molecules)} records - over the limit of {self.api_limits['add_records']}"
)
body_data = {
"initial_molecules": initial_molecules,
"specification": {
"program": program,
"singlepoint_specification": singlepoint_specification,
"keywords": keywords,
},
"tag": tag,
"priority": priority,
"owner_group": owner_group,
"find_existing": find_existing,
}
body = ManybodyAddBody(**body_data)
return self.make_request(
"post", "api/v1/records/manybody/bulkCreate", Tuple[InsertMetadata, List[int]], body=body
)
[docs]
def get_manybodys(
self,
record_ids: Union[int, Sequence[int]],
missing_ok: bool = False,
*,
include: Optional[Iterable[str]] = None,
) -> Union[Optional[ManybodyRecord], List[Optional[ManybodyRecord]]]:
"""
Obtain manybody records with the specified IDs.
Records will be returned in the same order as the record ids.
Parameters
----------
record_ids
Single ID or sequence/list of records to obtain
missing_ok
If set to True, then missing records will be tolerated, and the returned
records will contain None for the corresponding IDs that were not found.
include
Additional fields to include in the returned record
Returns
-------
:
If a single ID was specified, returns just that record. Otherwise, returns
a list of records. If missing_ok was specified, None will be substituted for a record
that was not found.
"""
return self._get_records_by_type(ManybodyRecord, record_ids, missing_ok, include)
[docs]
def query_manybodys(
self,
*,
record_id: Optional[Union[int, Iterable[int]]] = None,
manager_name: Optional[Union[str, Iterable[str]]] = None,
status: Optional[Union[RecordStatusEnum, Iterable[RecordStatusEnum]]] = None,
dataset_id: Optional[Union[int, Iterable[int]]] = None,
parent_id: Optional[Union[int, Iterable[int]]] = None,
child_id: Optional[Union[int, Iterable[int]]] = None,
created_before: Optional[Union[datetime, str]] = None,
created_after: Optional[Union[datetime, str]] = None,
modified_before: Optional[Union[datetime, str]] = None,
modified_after: Optional[Union[datetime, str]] = None,
program: Optional[Union[str, Iterable[str]]] = None,
qc_program: Optional[Union[str, Iterable[str]]] = None,
qc_method: Optional[Union[str, Iterable[str]]] = None,
qc_basis: Optional[Union[str, Iterable[str]]] = None,
initial_molecule_id: Optional[Union[int, Iterable[int]]] = None,
owner_user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
owner_group: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
limit: Optional[int] = None,
include: Optional[Iterable[str]] = None,
) -> RecordQueryIterator[ManybodyRecord]:
"""
Queries reaction records on the server
Do not rely on the returned records being in any particular order.
Parameters
----------
record_id
Query records whose ID is in the given list
manager_name
Query records that were completed (or are currently runnning) on a manager is in the given list
status
Query records whose status is in the given list
dataset_id
Query records that are part of a dataset is in the given list
parent_id
Query records that have a parent is in the given list
child_id
Query records that have a child (optimization calculation) is in the given list
created_before
Query records that were created before the given date/time
created_after
Query records that were created after the given date/time
modified_before
Query records that were modified before the given date/time
modified_after
Query records that were modified after the given date/time
program
Query records whose reaction program is in the given list
qc_program
Query records whose qc program is in the given list
qc_method
Query records whose qc method is in the given list
qc_basis
Query records whose qc basis is in the given list
initial_molecule_id
Query manybody calculations that contain an initial molecule (id) is in the given list
owner_user
Query records owned by a user in the given list
owner_group
Query records owned by a group in the given list
limit
The maximum number of records to return. Note that the server limit is always obeyed.
include
Additional fields to include in the returned record
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_dict = {
"record_id": make_list(record_id),
"manager_name": make_list(manager_name),
"status": make_list(status),
"dataset_id": make_list(dataset_id),
"parent_id": make_list(parent_id),
"child_id": make_list(child_id),
"program": make_list(program),
"qc_program": make_list(qc_program),
"qc_method": make_list(qc_method),
"qc_basis": make_list(qc_basis),
"initial_molecule_id": make_list(initial_molecule_id),
"created_before": created_before,
"created_after": created_after,
"modified_before": modified_before,
"modified_after": modified_after,
"owner_user": make_list(owner_user),
"owner_group": make_list(owner_group),
"limit": limit,
}
filter_data = ManybodyQueryFilters(**filter_dict)
return RecordQueryIterator[ManybodyRecord](self, filter_data, ManybodyRecord, include)
##############################################################
# NEB
##############################################################
[docs]
def add_nebs(
self,
initial_chains: List[List[Union[int, Molecule]]],
program: str,
singlepoint_specification: QCSpecification,
optimization_specification: Optional[OptimizationSpecification],
keywords: Union[NEBKeywords, Dict[str, Any]],
tag: str = "*",
priority: PriorityEnum = PriorityEnum.normal,
owner_group: Optional[str] = None,
find_existing: bool = True,
) -> Tuple[InsertMetadata, List[int]]:
"""
Adds neb calculations to the server
This checks if the calculations already exist in the database. If so, it returns
the existing id, otherwise it will insert it and return the new id.
This will add one record per set of molecules
Parameters
----------
initial_chains
The initial chains to run the NEB calculations on . Each NEB calculation starts with a single
chain (list of molecules), so this is a nested list
program
The program to run the neb computation with ("geometric")
singlepoint_specification
Specification of how each singlepoint (gradient/hessian) should be run
optimization_specification
Specification of how any optimizations of the torsiondrive should be run
keywords
The torsiondrive keywords for the computation
tag
The tag for the task. This will assist in routing to appropriate compute managers.
priority
The priority of the job (high, normal, low). Default is normal.
owner_group
Group with additional permission for these records
find_existing
If True, search for existing records and return those. If False, always add new records
Returns
-------
:
Metadata about the insertion, and a list of record ids. The ids will be in the
order of the input molecules
"""
if not initial_chains:
return InsertMetadata(), []
body_data = {
"initial_chains": initial_chains,
"specification": {
"program": program,
"singlepoint_specification": singlepoint_specification,
"optimization_specification": optimization_specification,
"keywords": keywords,
},
"tag": tag,
"priority": priority,
"owner_group": owner_group,
"find_existing": find_existing,
}
if len(body_data["initial_chains"]) > self.api_limits["add_records"]:
raise RuntimeError(
f"Cannot add {len(body_data['initial_chains'])} records - over the limit of {self.api_limits['add_records']}"
)
body = NEBAddBody(**body_data)
return self.make_request(
"post",
"api/v1/records/neb/bulkCreate",
Tuple[InsertMetadata, List[int]],
body=body,
)
[docs]
def get_nebs(
self,
record_ids: Union[int, Sequence[int]],
missing_ok: bool = False,
*,
include: Optional[Iterable[str]] = None,
) -> Union[Optional[NEBRecord], List[Optional[NEBRecord]]]:
"""
Obtain NEB records with the specified IDs.
Records will be returned in the same order as the record ids.
Parameters
----------
record_ids
Single ID or sequence/list of records to obtain
missing_ok
If set to True, then missing records will be tolerated, and the returned
records will contain None for the corresponding IDs that were not found.
include
Additional fields to include in the returned record
Returns
-------
:
If a single ID was specified, returns just that record. Otherwise, returns
a list of records. If missing_ok was specified, None will be substituted for a record
that was not found.
"""
return self._get_records_by_type(NEBRecord, record_ids, missing_ok, include)
[docs]
def query_nebs(
self,
*,
record_id: Optional[Union[int, Iterable[int]]] = None,
manager_name: Optional[Union[str, Iterable[str]]] = None,
status: Optional[Union[RecordStatusEnum, Iterable[RecordStatusEnum]]] = None,
dataset_id: Optional[Union[int, Iterable[int]]] = None,
parent_id: Optional[Union[int, Iterable[int]]] = None,
child_id: Optional[Union[int, Iterable[int]]] = None,
created_before: Optional[Union[datetime, str]] = None,
created_after: Optional[Union[datetime, str]] = None,
modified_before: Optional[Union[datetime, str]] = None,
modified_after: Optional[Union[datetime, str]] = None,
program: Optional[Union[str, Iterable[str]]] = None,
qc_program: Optional[Union[str, Iterable[str]]] = None,
qc_method: Optional[Union[str, Iterable[str]]] = None,
qc_basis: Optional[Union[str, Iterable[str]]] = None,
molecule_id: Optional[Union[int, Iterable[int]]] = None,
owner_user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
owner_group: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
limit: Optional[int] = None,
include: Optional[Iterable[str]] = None,
) -> RecordQueryIterator[NEBRecord]:
"""
Queries neb records from the server
Do not rely on the returned records being in any particular order.
Parameters
----------
record_id
Query records whose ID is in the given list
manager_name
Query records that were completed (or are currently runnning) on a manager is in the given list
status
Query records whose status is in the given list
dataset_id
Query records that are part of a dataset is in the given list
parent_id
Query records that have a parent is in the given list
child_id
Query records that have a child (optimization calculation) is in the given list
created_before
Query records that were created before the given date/time
created_after
Query records that were created after the given date/time
modified_before
Query records that were modified before the given date/time
modified_after
Query records that were modified after the given date/time
program
Query records whose torsiondrive program is in the given list
qc_program
Query records whose qc program is in the given list
qc_method
Query records whose method is in the given list
qc_basis
Query records whose basis is in the given list
molecule_id
Query records whose initial chains contain a molecule (id) that is in the given list
owner_user
Query records owned by a user in the given list
owner_group
Query records owned by a group in the given list
limit
The maximum number of records to return. Note that the server limit is always obeyed.
include
Additional fields to include in the returned record
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_dict = {
"record_id": make_list(record_id),
"manager_name": make_list(manager_name),
"status": make_list(status),
"dataset_id": make_list(dataset_id),
"parent_id": make_list(parent_id),
"child_id": make_list(child_id),
"program": make_list(program),
"qc_program": make_list(qc_program),
"qc_method": make_list(qc_method),
"qc_basis": make_list(qc_basis),
"molecule_id": make_list(molecule_id),
"created_before": created_before,
"created_after": created_after,
"modified_before": modified_before,
"modified_after": modified_after,
"owner_user": make_list(owner_user),
"owner_group": make_list(owner_group),
"limit": limit,
}
filter_data = NEBQueryFilters(**filter_dict)
return RecordQueryIterator[NEBRecord](self, filter_data, NEBRecord, include)
##############################################################
# Managers
##############################################################
[docs]
def get_managers(
self,
names: Union[str, Sequence[str]],
missing_ok: bool = False,
) -> Union[Optional[ComputeManager], List[Optional[ComputeManager]]]:
"""Obtain manager information from the server with the specified names
Parameters
----------
names
A manager name or list of names
missing_ok
If set to True, then missing managers will be tolerated, and the returned
managers will contain None for the corresponding managers that were not found.
Returns
-------
:
If a single name was specified, returns just that manager. Otherwise, returns
a list of managers. If missing_ok was specified, None will be substituted for a manager
that was not found.
"""
is_single = isinstance(names, str)
names = make_list(names)
if not names:
return []
body = CommonBulkGetNamesBody(names=names, missing_ok=missing_ok)
managers = self.make_request("post", "api/v1/managers/bulkGet", List[Optional[ComputeManager]], body=body)
for m in managers:
if m is not None:
m.propagate_client(self)
if is_single:
return managers[0]
else:
return managers
[docs]
def query_managers(
self,
*,
manager_id: Optional[Union[int, Iterable[int]]] = None,
name: Optional[Union[str, Iterable[str]]] = None,
cluster: Optional[Union[str, Iterable[str]]] = None,
hostname: Optional[Union[str, Iterable[str]]] = None,
status: Optional[Union[RecordStatusEnum, Iterable[RecordStatusEnum]]] = None,
modified_before: Optional[Union[datetime, str]] = None,
modified_after: Optional[Union[datetime, str]] = None,
limit: Optional[int] = None,
) -> ManagerQueryIterator:
"""
Queries manager information on the server
Parameters
----------
manager_id
ID assigned to the manager (this is not the UUID. This should be used very rarely).
name
Queries managers whose name is in the given list
cluster
Queries managers whose assigned cluster is in the given list
hostname
Queries managers whose hostname is in the given list
status
Queries managers whose status is in the given list
modified_before
Query for managers last modified before a certain time
modified_after
Query for managers last modified after a certain time
limit
The maximum number of managers to return. Note that the server limit is always obeyed.
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_dict = {
"manager_id": make_list(manager_id),
"name": make_list(name),
"cluster": make_list(cluster),
"hostname": make_list(hostname),
"status": make_list(status),
"modified_before": modified_before,
"modified_after": modified_after,
"limit": limit,
}
filter_data = ManagerQueryFilters(**filter_dict)
return ManagerQueryIterator(self, filter_data)
[docs]
def query_access_log(
self,
*,
module: Optional[Union[str, Iterable[str]]] = None,
method: Optional[Union[str, Iterable[str]]] = None,
before: Optional[Union[datetime, str]] = None,
after: Optional[Union[datetime, str]] = None,
user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
limit: Optional[int] = None,
) -> AccessLogQueryIterator:
"""
Query the server access log
This log contains information about who accessed the server, and when.
Parameters
----------
module
Return log entries whose module is in the given list
method
Return log entries whose access_method is in the given list
before
Return log entries captured before the specified date/time
after
Return log entries captured after the specified date/time
user
User name or ID associated with the log entry
limit
The maximum number of log entries to return. Note that the server limit is always obeyed.
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_data = AccessLogQueryFilters(
module=make_list(module),
method=make_list(method),
before=before,
after=after,
user=make_list(user),
limit=limit,
)
return AccessLogQueryIterator(self, filter_data)
[docs]
def delete_access_log(self, before: datetime) -> int:
"""
Delete access log entries from the server
Parameters
----------
before
Delete access log entries captured before the given date/time
Returns
-------
:
The number of access log entries deleted from the server
"""
body = DeleteBeforeDateBody(before=before)
return self.make_request("post", "api/v1/access_logs/bulkDelete", int, body=body)
[docs]
def query_error_log(
self,
*,
error_id: Optional[Union[int, Iterable[int]]] = None,
user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
before: Optional[Union[datetime, str]] = None,
after: Optional[Union[datetime, str]] = None,
limit: Optional[int] = None,
) -> ErrorLogQueryIterator:
"""
Query the server's internal error log
This log contains internal errors that are not always passed to the user.
Parameters
----------
error_id
Return error log entries whose id is in the list
user
Return error log entries whose user name or ID is in the list
before
Return error log entries captured before the specified date/time
after
Return error log entries captured after the specified date/time
limit
The maximum number of log entries to return. Note that the server limit is always obeyed.
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_data = ErrorLogQueryFilters(
error_id=make_list(error_id),
user=make_list(user),
before=before,
after=after,
limit=limit,
)
return ErrorLogQueryIterator(self, filter_data)
[docs]
def delete_error_log(self, before: datetime) -> int:
"""
Delete error log entries from the server
Parameters
----------
before
Delete error log entries captured before the given date/time
Returns
-------
:
The number of error log entries deleted from the server
"""
body = DeleteBeforeDateBody(before=before)
return self.make_request("post", "api/v1/server_errors/bulkDelete", int, body=body)
[docs]
def get_internal_job(self, job_id: int) -> InternalJob:
"""
Gets information about an internal job on the server
"""
return self.make_request("get", f"api/v1/internal_jobs/{job_id}", InternalJob)
[docs]
def query_internal_jobs(
self,
*,
job_id: Optional[int, Iterable[int]] = None,
name: Optional[Union[str, Iterable[str]]] = None,
user: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
runner_hostname: Optional[Union[str, Iterable[str]]] = None,
status: Optional[Union[InternalJobStatusEnum, Iterable[InternalJobStatusEnum]]] = None,
last_updated_before: Optional[Union[datetime, str]] = None,
last_updated_after: Optional[Union[datetime, str]] = None,
added_before: Optional[Union[datetime, str]] = None,
added_after: Optional[Union[datetime, str]] = None,
scheduled_before: Optional[Union[datetime, str]] = None,
scheduled_after: Optional[Union[datetime, str]] = None,
limit: Optional[int] = None,
) -> InternalJobQueryIterator:
"""
Queries the internal job queue on the server
Parameters
----------
job_id
ID assigned to the job
name
Queries jobs whose name is in the given list
user
User name or ID associated with the log entry
runner_hostname
Queries jobs that were run/are running on a given host
status
Queries jobs whose status is in the given list
last_updated_before
Query for jobs last updated before a certain time
last_updated_after
Query for jobs last updated after a certain time
added_before
Query for jobs added before a certain time
added_after
Query for jobs added after a certain time
scheduled_before
Query for jobs scheduled to run before a certain time
scheduled_after
Query for jobs scheduled to run after a certain time
limit
The maximum number of jobs to return. Note that the server limit is always obeyed.
Returns
-------
:
An iterator that can be used to retrieve the results of the query
"""
filter_dict = {
"job_id": make_list(job_id),
"name": make_list(name),
"runner_hostname": make_list(runner_hostname),
"status": make_list(status),
"user": make_list(user),
"last_updated_before": last_updated_before,
"last_updated_after": last_updated_after,
"added_before": added_before,
"added_after": added_after,
"scheduled_before": scheduled_before,
"scheduled_after": scheduled_after,
"limit": limit,
}
filter_data = InternalJobQueryFilters(**filter_dict)
return InternalJobQueryIterator(self, filter_data)
[docs]
def cancel_internal_job(self, job_id: int):
"""
Cancels (to the best of our ability) an internal job
"""
return self.make_request(
"put", f"api/v1/internal_jobs/{job_id}/status", None, body=InternalJobStatusEnum.cancelled
)
[docs]
def delete_internal_job(self, job_id: int):
return self.make_request("delete", f"api/v1/internal_jobs/{job_id}", None)
[docs]
def query_access_summary(
self,
*,
group_by: str = "day",
before: Optional[Union[datetime, str]] = None,
after: Optional[Union[datetime, str]] = None,
) -> AccessLogSummary:
"""Obtains summaries of access data
This aggregate data is created on the server, so you don't need to download all the
log entries and do it yourself.
Parameters
----------
group_by
How to group the data. Valid options are "user", "hour", "day", "country", "subdivision"
before
Query for log entries with a timestamp before a specific time
after
Query for log entries with a timestamp after a specific time
"""
url_params = AccessLogSummaryFilters(group_by=group_by, before=before, after=after)
entries = self.make_request(
"get", "api/v1/access_logs/summary", Dict[str, List[AccessLogSummaryEntry]], url_params=url_params
)
return AccessLogSummary(entries=entries)
##############################################################
# User, group, & role management
##############################################################
[docs]
def list_roles(self) -> List[RoleInfo]:
"""
List all user roles on the server
"""
return self.make_request("get", "api/v1/roles", List[RoleInfo])
[docs]
def get_role(self, rolename: str) -> RoleInfo:
"""
Get information about a role on the server
"""
is_valid_rolename(rolename)
return self.make_request("get", f"api/v1/roles/{rolename}", RoleInfo)
[docs]
def add_role(self, role_info: RoleInfo) -> None:
"""
Adds a role with permissions to the server
If not successful, an exception is raised.
"""
is_valid_rolename(role_info.rolename)
return self.make_request("post", "api/v1/roles", None, body=role_info)
[docs]
def modify_role(self, role_info: RoleInfo) -> RoleInfo:
"""
Modifies the permissions of a role on the server
If not successful, an exception is raised.
Returns
-------
:
A copy of the role as it now appears on the server
"""
is_valid_rolename(role_info.rolename)
return self.make_request("put", f"api/v1/roles/{role_info.rolename}", RoleInfo, body=role_info)
[docs]
def delete_role(self, rolename: str) -> None:
"""
Deletes a role from the server
This will not delete any role to which a user is assigned
Will raise an exception on error
Parameters
----------
rolename
Name of the role to delete
"""
is_valid_rolename(rolename)
return self.make_request("delete", f"api/v1/roles/{rolename}", None)
[docs]
def list_groups(self) -> List[GroupInfo]:
"""
List all user groups on the server
"""
return self.make_request("get", "api/v1/groups", List[GroupInfo])
[docs]
def get_group(self, groupname_or_id: Union[int, str]) -> GroupInfo:
"""
Get information about a group on the server
"""
if isinstance(groupname_or_id, str):
is_valid_groupname(groupname_or_id)
return self.make_request("get", f"api/v1/groups/{groupname_or_id}", GroupInfo)
[docs]
def add_group(self, group_info: GroupInfo) -> None:
"""
Adds a group with permissions to the server
If not successful, an exception is raised.
"""
if group_info.id is not None:
raise RuntimeError("Cannot add group when group_info contains an id")
return self.make_request("post", "api/v1/groups", None, body=group_info)
[docs]
def delete_group(self, groupname_or_id: Union[int, str]):
"""
Deletes a group on the server
Deleted groups will be removed from all users groups list
"""
if isinstance(groupname_or_id, str):
is_valid_groupname(groupname_or_id)
return self.make_request("delete", f"api/v1/groups/{groupname_or_id}", None)
[docs]
def list_users(self) -> List[UserInfo]:
"""
List all user roles on the server
"""
return self.make_request("get", "api/v1/users", List[UserInfo])
[docs]
def get_user(self, username_or_id: Optional[Union[int, str]] = None) -> UserInfo:
"""
Get information about a user on the server
If the username is not supplied, then info about the currently logged-in user is obtained.
Parameters
----------
username_or_id
The username or ID to get info about
Returns
-------
:
Information about the user
"""
if username_or_id is None:
username_or_id = self.username
if isinstance(username_or_id, str):
is_valid_username(username_or_id)
if username_or_id is None:
raise RuntimeError("Cannot get user - not logged in?")
return self.make_request("get", f"api/v1/users/{username_or_id}", UserInfo)
[docs]
def add_user(self, user_info: UserInfo, password: Optional[str] = None) -> str:
"""
Adds a user to the server
Parameters
----------
user_info
Info about the user to add
password
The user's password. If None, then one will be generated
Returns
-------
:
The password of the user (either the same as the supplied password, or the
server-generated one)
"""
if password is not None:
is_valid_password(password)
if user_info.id is not None:
raise RuntimeError("Cannot add user when user_info contains an id")
return self.make_request("post", "api/v1/users", str, body=(user_info, password))
[docs]
def modify_user(self, user_info: UserInfo) -> UserInfo:
"""
Modifies a user on the server
The user is determined by the id field of the input UserInfo, although the id
and username are checked for consistency.
Depending on the current user's permissions, some fields may not be updatable.
Parameters
----------
user_info
Updated information for a user
Returns
-------
:
The updated user information as it appears on the server
"""
return self.make_request("patch", f"api/v1/users", UserInfo, body=user_info)
[docs]
def change_user_password(
self, username_or_id: Optional[Union[int, str]] = None, new_password: Optional[str] = None
) -> str:
"""
Change a users password
If the username is not specified, then the current logged-in user is used.
If the password is not specified, then one is automatically generated by the server.
Parameters
----------
username_or_id
The name or ID of the user whose password to change. If None, then use the currently logged-in user
new_password
Password to change to. If None, let the server generate one.
Returns
-------
:
The new password (either the same as the supplied one, or the server generated one
"""
if username_or_id is None:
username_or_id = self.username
if username_or_id is None:
raise RuntimeError("Cannot change user - not logged in?")
if not isinstance(username_or_id, int):
is_valid_username(username_or_id)
if new_password is not None:
is_valid_password(new_password)
return self.make_request(
"put", f"api/v1/users/{username_or_id}/password", str, body_model=Optional[str], body=new_password
)
[docs]
def delete_user(self, username_or_id: Union[int, str]) -> None:
"""
Delete a user from the server
"""
if not isinstance(username_or_id, int):
is_valid_username(username_or_id)
return self.make_request("delete", f"api/v1/users/{username_or_id}", None)