[docs]classPortalClient(PortalClientBase):""" Main class for interacting with a QCArchive server """def__init__(self,address:str,username:Optional[str]=None,password:Optional[str]=None,verify:bool=True,show_motd:bool=True,*,cache_dir:Optional[str]=None,cache_max_size:int=0,)->None:""" Parameters ---------- address The host or IP address of the FractalServer instance, including protocol and port if necessary ("https://ml.qcarchive.molssi.org", "http://192.168.1.10:8888") username The username to authenticate with. password The password to authenticate with. verify Verifies the SSL connection with a third party server. This may be False if a FractalServer was not provided an SSL certificate and defaults back to self-signed SSL keys. show_motd If a Message-of-the-Day is available, display it cache_dir Directory to store an internal cache of records and other data cache_max_size Maximum size of the cache directory """PortalClientBase.__init__(self,address,username,password,verify,show_motd)self._logger=logging.getLogger("PortalClient")self.cache=PortalCache(address,cache_dir,cache_max_size)def__repr__(self)->str:"""A short representation of the current PortalClient. Returns ------- str The desired representation. """ret="PortalClient(server_name='{}', address='{}', username='{}')".format(self.server_name,self.address,self.username)returnretdef_repr_html_(self)->str:output=f""" <h3>PortalClient</h3> <ul> <li><b>Server: </b>{self.server_name}</li> <li><b>Address: </b>{self.address}</li> <li><b>Username: </b>{self.username}</li> </ul> """# postprocess due to raw spacing abovereturn"\n".join([substr.strip()forsubstrinoutput.split("\n")])
[docs]defget_server_information(self)->Dict[str,Any]:"""Request general information about the server Returns ------- : Server information. """# Request the info, and store here for later usereturnself.make_request("get","api/v1/information",Dict[str,Any])
[docs]defget_motd(self)->str:""" Gets the Message-of-the-Day (MOTD) from the server """returnself.make_request("get","api/v1/motd",str)
[docs]defset_motd(self,new_motd:str)->str:""" Sets the Message-of-the-Day (MOTD) on the server """returnself.make_request("put","api/v1/motd",None,body=new_motd)
[docs]defdataset_from_cache(self,file_path:str)->BaseDataset:ds_meta=read_dataset_metadata(file_path)ds_type=BaseDataset.get_subclass(ds_meta["dataset_type"])ds_cache=DatasetCache(file_path,False,ds_type)ds=dataset_from_dict(ds_meta,self,cache_data=ds_cache)# Check to make sure we are connected to the same servercache_address=ds_cache.get_metadata("client_address")ifcache_address!=self.address:raiseRuntimeError(f"Cache file comes from {cache_address}, but currently connected to {self.address}")try:self.get_dataset_by_id(ds.id)except:self._logger.warning(f"Dataset {ds.id} could not be found on the server. Marking as read-only cache")ds_cache.read_only=Truereturnds
[docs]defadd_dataset(self,dataset_type:str,name:str,description:Optional[str]=None,tagline:Optional[str]=None,tags:Optional[List[str]]=None,group:Optional[str]=None,provenance:Optional[Dict[str,Any]]=None,visibility:bool=None,default_tag:str="*",default_priority:PriorityEnum=PriorityEnum.normal,extras:Optional[Dict[str,Any]]=None,owner_group:Optional[str]=None,existing_ok:bool=False,**kwargs,# For deprecated parameters)->BaseDataset:# TODO - DEPRECATED - Remove eventuallyifgroupisnotNone:self._logger.warning(f"'group' parameter has been deprecated and will be removed in a future version")ifvisibilityisnotNone:self._logger.warning(f"'visibility' parameter has been deprecated and will be removed in a future version")ifdescriptionisNone:description=""iftaglineisNone:tagline=""iftagsisNone:tags=[]ifprovenanceisNone:provenance={}ifextrasisNone:extras={}if"metadata"inkwargs:self._logger.warning(f"'metadata' parameter has been deprecated and will be removed in a future version. Use 'extras' instead")extras.update(kwargs["metadata"])body=DatasetAddBody(name=name,description=description,tagline=tagline,tags=tags,provenance=provenance,default_tag=default_tag,default_priority=default_priority,extras=extras,owner_group=owner_group,existing_ok=existing_ok,)ds_id=self.make_request("post",f"api/v1/datasets/{dataset_type}",int,body=body)returnself.get_dataset_by_id(ds_id)
[docs]defdownload_external_file(self,file_id:int,destination_path:str,overwrite:bool=False)->Tuple[int,str]:""" Downloads an external file to the given path The file size and checksum will be checked against the metadata stored on the server Parameters ---------- file_id ID of the file to obtain destination_path Full path to the destination file (including filename) overwrite If True, allow for overwriting an existing file. If False, and a file already exists at the given destination path, an exception will be raised. Returns ------- : A tuple of file size and sha256 checksum. """meta_url=f"api/v1/external_files/{file_id}"download_url=f"api/v1/external_files/{file_id}/download"# Check for local file existence before doing any requestsifos.path.exists(destination_path)andnotoverwrite:raiseRuntimeError(f"File already exists at {destination_path}. To overwrite, use `overwrite=True`")# First, get the metadatafile_info=self.make_request("get",meta_url,ExternalFile)# Now actually download the filefile_size,file_sha256=self.download_file(download_url,destination_path,overwrite=overwrite,expected_size=file_info.file_size,show_progress=True)iffile_size!=file_info.file_size:raiseRuntimeError(f"Inconsistent file size. Expected {file_info.file_size}, got {file_size}")iffile_sha256!=file_info.sha256sum:raiseRuntimeError(f"Inconsistent file checksum. Expected {file_info.sha256sum}, got {file_sha256}")returnfile_size,file_sha256
[docs]defget_molecules(self,molecule_ids:Union[int,Sequence[int]],missing_ok:bool=False,)->Union[Optional[Molecule],List[Optional[Molecule]]]:"""Obtains molecules with the specified IDs from the server Parameters ---------- molecule_ids A single molecule ID, or a list (or other sequence) of molecule IDs missing_ok If set to True, then missing molecules will be tolerated, and the returned list of Molecules will contain None for the corresponding IDs that were not found. Returns ------- : Molecules, in the same order as the requested ids. If given a sequence of ids, the return value will be a list. Otherwise, it will be a single Molecule. """is_single=notisinstance(molecule_ids,Sequence)molecule_ids=make_list(molecule_ids)ifnotmolecule_ids:return[]batch_size=self.api_limits["get_molecules"]//4all_molecules=[]formol_id_batchinchunk_iterable(molecule_ids,batch_size):body=CommonBulkGetBody(ids=mol_id_batch,missing_ok=missing_ok)mol_batch=self.make_request("post","api/v1/molecules/bulkGet",List[Optional[Molecule]],body=body)all_molecules.extend(mol_batch)ifis_single:returnall_molecules[0]else:returnall_molecules
[docs]defquery_molecules(self,*,molecule_hash:Optional[Union[str,Iterable[str]]]=None,molecular_formula:Optional[Union[str,Iterable[str]]]=None,identifiers:Optional[Dict[str,Union[str,Iterable[str]]]]=None,limit:Optional[int]=None,)->MoleculeQueryIterator:"""Query molecules by attributes. Do not rely on the returned molecules being in any particular order. Parameters ---------- molecule_hash Queries molecules by hash molecular_formula Queries molecules by molecular formula. Molecular formulas are not order-sensitive (e.g. "H2O == OH2 != Oh2"). identifiers Additional identifiers to search for (smiles, etc) limit The maximum number of Molecules to return. Note that the server limit is always obeyed. Returns ------- : An iterator that can be used to retrieve the results of the query """filter_dict={"molecule_hash":make_list(molecule_hash),"molecular_formula":make_list(molecular_formula),"limit":limit,}ifidentifiersisnotNone:filter_dict["identifiers"]={k:make_list(v)fork,vinidentifiers.items()}filter_data=MoleculeQueryFilters(**filter_dict)returnMoleculeQueryIterator(self,filter_data)
[docs]defadd_molecules(self,molecules:Sequence[Molecule])->Tuple[InsertMetadata,List[int]]:"""Add molecules to the server database If the same molecule (defined by having the same hash) already exists, then the existing molecule is kept and that particular molecule is not added. Parameters ---------- molecules A list of Molecules to add to the server. Returns ------- : Metadata about what was inserted, and a list of IDs of the molecules in the same order as the `molecules` parameter. """ifnotmolecules:returnInsertMetadata(),[]iflen(molecules)>self.api_limits["add_molecules"]:raiseRuntimeError(f"Cannot add {len(molecules)} molecules - over the limit of {self.api_limits['add_molecules']}")mols=self.make_request("post","api/v1/molecules/bulkCreate",Tuple[InsertMetadata,List[int]],body=make_list(molecules))returnmols
[docs]defmodify_molecule(self,molecule_id:int,name:Optional[str]=None,comment:Optional[str]=None,identifiers:Optional[Union[Dict[str,Any],MoleculeIdentifiers]]=None,overwrite_identifiers:bool=False,)->UpdateMetadata:""" Modify molecules on the server This is only capable of updating the name, comment, and identifiers fields (except molecule_hash and molecular formula). If a molecule with that id does not exist, an exception is raised Parameters ---------- molecule_id ID of the molecule to modify name New name for the molecule. If None, name is not changed. comment New comment for the molecule. If None, comment is not changed identifiers A new set of identifiers for the molecule overwrite_identifiers If True, the identifiers of the molecule are set to be those given exactly (ie, identifiers that exist in the DB but not in the new set will be removed). Otherwise, the new set of identifiers is merged into the existing ones. Note that molecule_hash and molecular_formula are never removed. Returns ------- : Metadata about the modification/update. """body=MoleculeModifyBody(name=name,comment=comment,identifiers=identifiers,overwrite_identifiers=overwrite_identifiers)returnself.make_request("patch",f"api/v1/molecules/{molecule_id}",UpdateMetadata,body=body)
[docs]defdelete_molecules(self,molecule_ids:Union[int,Sequence[int]])->DeleteMetadata:"""Deletes molecules from the server This will not delete any molecules that are in use Parameters ---------- molecule_ids An id or list of ids to query. Returns ------- : Metadata about what was deleted """molecule_ids=make_list(molecule_ids)ifnotmolecule_ids:returnDeleteMetadata()returnself.make_request("post","api/v1/molecules/bulkDelete",DeleteMetadata,body=molecule_ids)
############################################################### General record functions##############################################################def_fetch_records(self,record_type:Optional[Type[_T]],record_ids:Sequence[int],missing_ok:bool=False,include:Optional[Iterable[str]]=None,)->List[Optional[_T]]:""" Fetches records of a particular type with the specified IDs from the remove server. Records will be returned in the same order as the record ids. This function always returns a list. This function only fetches the top-level records - it does not fetch the children of the records. It also does not use caching at all. Parameters ---------- record_type The type of record to fetch record_ids Single ID or sequence/list of records to obtain missing_ok If set to True, then missing records will be tolerated, and the returned records will contain None for the corresponding IDs that were not found. include Additional fields to include in the returned record Returns ------- : If a single ID was specified, returns just that record. Otherwise, returns a list of records. If missing_ok was specified, None will be substituted for a record that was not found. """ifnotrecord_ids:return[]ifincludeisnotNone:# Always include the base stuffinclude=list(include)+["*"]ifrecord_typeisNone:endpoint="api/v1/records/bulkGet"else:# A little hackyrecord_type_str=record_type.__fields__["record_type"].defaultendpoint=f"api/v1/records/{record_type_str}/bulkGet"max_batch_size=self.api_limits["get_records"]initial_batch_size=math.ceil(max_batch_size//10)def_download_chunk(id_chunk:List[int]):body=CommonBulkGetBody(ids=id_chunk,include=include,missing_ok=missing_ok)returnself.make_request("post",endpoint,List[Optional[Dict[str,Any]]],body=body)all_records=[]forrecord_dictsinprocess_chunk_iterable(_download_chunk,record_ids,self.download_target_time,max_batch_size,initial_batch_size,self.n_download_threads,keep_order=True,):ifrecord_typeisNone:all_records.extend(records_from_dicts(record_dicts,self))else:all_records.extend([record_type(self,**r)ifrisnotNoneelseNoneforrinrecord_dicts])# Just to really make sure the process_chunk_iterable code is correctassertall((xisNoneorx.id==rid)forx,ridinzip(all_records,record_ids))returnall_records
[docs]defget_records(self,record_ids:Union[int,Sequence[int]],missing_ok:bool=False,*,include:Optional[Iterable[str]]=None,)->Union[List[Optional[BaseRecord]],Optional[BaseRecord]]:""" Obtain records of all types with specified IDs This function will return record objects of the given ID no matter what the type is. All records are unique by ID (ie, an optimization will never have the same ID as a singlepoint). Records will be returned in the same order as the record ids. Parameters ---------- record_ids Single ID or sequence/list of records to obtain missing_ok If set to True, then missing records will be tolerated, and the returned records will contain None for the corresponding IDs that were not found. include Additional fields to include in the returned record Returns ------- : A list of records. If missing_ok was specified, None will be substituted for a record that was not found. """returnself._get_records_by_type(None,record_ids,missing_ok,include)
def_get_records_by_type(self,record_type:Optional[Type[_T]],record_ids:Union[int,Sequence[int]],missing_ok:bool=False,include:Optional[Iterable[str]]=None,)->Union[Optional[_T],List[Optional[_T]]]:""" Obtain records of a particular type with the specified IDs from the server. Records will be returned in the same order as the record ids. This function will fetch the children of the records if enough information is fetched of the parent record. This is handled by the various fetch_children_multi class functions of the record types. This function does not use the cache. Parameters ---------- record_type The type of record to fetch record_ids Single ID or sequence/list of records to obtain missing_ok If set to True, then missing records will be tolerated, and the returned records will contain None for the corresponding IDs that were not found. include Additional fields to include in the returned record Returns ------- : If a single ID was specified, returns just that record. Otherwise, returns a list of records. If missing_ok was specified, None will be substituted for a record that was not found. """is_single=notisinstance(record_ids,Sequence)record_ids=make_list(record_ids)all_records=self._fetch_records(record_type,record_ids,missing_ok,include)# We always force fetch here. Given that this record is not part of the cache, it shouldn't be using any# cache anyway. But the semantics of this function is that is always fetches everythingifrecord_typeisNone:# Handle disparate record typesrecord_groups={}forrinall_records:ifrisnotNone:record_groups.setdefault(r.record_type,[])record_groups[r.record_type].append(r)forvinrecord_groups.values():v[0].fetch_children_multi(v,include,force_fetch=True)else:record_type.fetch_children_multi(all_records,include,force_fetch=True)ifis_single:returnall_records[0]else:returnall_records
[docs]defquery_records(self,*,record_id:Optional[Union[int,Iterable[int]]]=None,record_type:Optional[Union[str,Iterable[str]]]=None,manager_name:Optional[Union[str,Iterable[str]]]=None,status:Optional[Union[RecordStatusEnum,Iterable[RecordStatusEnum]]]=None,dataset_id:Optional[Union[int,Iterable[int]]]=None,parent_id:Optional[Union[int,Iterable[int]]]=None,child_id:Optional[Union[int,Iterable[int]]]=None,created_before:Optional[Union[datetime,str]]=None,created_after:Optional[Union[datetime,str]]=None,modified_before:Optional[Union[datetime,str]]=None,modified_after:Optional[Union[datetime,str]]=None,owner_user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,owner_group:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,limit:int=None,include:Optional[Iterable[str]]=None,)->RecordQueryIterator[BaseRecord]:""" Query records of all types based on common fields This is a general query of all record types, so it can only filter by fields that are common among all records. Do not rely on the returned records being in any particular order. Parameters ---------- record_id Query records whose ID is in the given list record_type Query records whose type is in the given list manager_name Query records that were completed (or are currently runnning) on a manager is in the given list status Query records whose status is in the given list dataset_id Query records that are part of a dataset is in the given list parent_id Query records that have a parent is in the given list child_id Query records that have a child is in the given list created_before Query records that were created before the given date/time created_after Query records that were created after the given date/time modified_before Query records that were modified before the given date/time modified_after Query records that were modified after the given date/time owner_user Query records owned by a user in the given list (usernames or IDs) owner_group Query records owned by a group in the given list (group names or IDS) limit The maximum number of records to return. Note that the server limit is always obeyed. include Additional fields to include in the returned record Returns ------- : An iterator that can be used to retrieve the results of the query """filter_dict={"record_id":make_list(record_id),"record_type":make_list(record_type),"manager_name":make_list(manager_name),"status":make_list(status),"dataset_id":make_list(dataset_id),"parent_id":make_list(parent_id),"child_id":make_list(child_id),"created_before":created_before,"created_after":created_after,"modified_before":modified_before,"modified_after":modified_after,"owner_user":make_list(owner_user),"owner_group":make_list(owner_group),"limit":limit,}filter_data=RecordQueryFilters(**filter_dict)returnRecordQueryIterator[BaseRecord](self,filter_data,None,include)
[docs]defreset_records(self,record_ids:Union[int,Sequence[int]])->UpdateMetadata:""" Resets running or errored records to be waiting again """record_ids=make_list(record_ids)ifnotrecord_ids:returnUpdateMetadata()body=RecordModifyBody(record_ids=record_ids,status=RecordStatusEnum.waiting)returnself.make_request("patch","api/v1/records",UpdateMetadata,body=body)
[docs]defcancel_records(self,record_ids:Union[int,Sequence[int]])->UpdateMetadata:""" Marks running, waiting, or errored records as cancelled A cancelled record will not be picked up by a manager. """record_ids=make_list(record_ids)ifnotrecord_ids:returnUpdateMetadata()body=RecordModifyBody(record_ids=record_ids,status=RecordStatusEnum.cancelled)returnself.make_request("patch","api/v1/records",UpdateMetadata,body=body)
[docs]definvalidate_records(self,record_ids:Union[int,Sequence[int]])->UpdateMetadata:""" Marks a completed record as invalid An invalid record is one that supposedly successfully completed. However, after review, is not correct. """record_ids=make_list(record_ids)ifnotrecord_ids:returnUpdateMetadata()body=RecordModifyBody(record_ids=record_ids,status=RecordStatusEnum.invalid)returnself.make_request("patch","api/v1/records",UpdateMetadata,body=body)
[docs]defdelete_records(self,record_ids:Union[int,Sequence[int]],soft_delete=True,delete_children:bool=True)->DeleteMetadata:""" Delete records from the database If soft_delete is True, then the record is just marked as deleted and actually deletion may happen later. Soft delete can be undone with undelete Parameters ---------- record_ids Reset the status of these record ids soft_delete Don't actually delete the record, just mark it for later deletion delete_children If True, attempt to delete child records as well """record_ids=make_list(record_ids)ifnotrecord_ids:returnDeleteMetadata()body=RecordDeleteBody(record_ids=record_ids,soft_delete=soft_delete,delete_children=delete_children)returnself.make_request("post","api/v1/records/bulkDelete",DeleteMetadata,body=body)
[docs]defuninvalidate_records(self,record_ids:Union[int,Sequence[int]])->UpdateMetadata:""" Undo the invalidation of records """record_ids=make_list(record_ids)ifnotrecord_ids:returnUpdateMetadata()body=RecordRevertBody(record_ids=record_ids,revert_status=RecordStatusEnum.invalid)returnself.make_request("post","api/v1/records/revert",UpdateMetadata,body=body)
[docs]defuncancel_records(self,record_ids:Union[int,Sequence[int]])->UpdateMetadata:""" Undo the cancellation of records """record_ids=make_list(record_ids)ifnotrecord_ids:returnUpdateMetadata()body=RecordRevertBody(record_ids=record_ids,revert_status=RecordStatusEnum.cancelled)returnself.make_request("post","api/v1/records/revert",UpdateMetadata,body=body)
[docs]defundelete_records(self,record_ids:Union[int,Sequence[int]])->UpdateMetadata:""" Undo the (soft) deletion of records """record_ids=make_list(record_ids)ifnotrecord_ids:returnUpdateMetadata()body=RecordRevertBody(record_ids=record_ids,revert_status=RecordStatusEnum.deleted)returnself.make_request("post","api/v1/records/revert",UpdateMetadata,body=body)
[docs]defmodify_records(self,record_ids:Union[int,Sequence[int]],new_compute_tag:Optional[str]=None,new_compute_priority:Optional[PriorityEnum]=None,**kwargs,# For deprecated parameters)->UpdateMetadata:""" Modify the compute tag or compute priority of a record """if"new_tag"inkwargs:self._logger.warning("'new_tag' is deprecated; use 'new_compute_tag' instead")new_compute_tag=kwargs["new_tag"]if"new_priority"inkwargs:self._logger.warning("'new_priority' is deprecated; use 'new_compute_priority' instead")new_compute_priority=kwargs["new_priority"]record_ids=make_list(record_ids)ifnotrecord_ids:returnUpdateMetadata()ifnew_compute_tagisNoneandnew_compute_priorityisNone:returnUpdateMetadata()body=RecordModifyBody(record_ids=record_ids,compute_tag=new_compute_tag,compute_priority=new_compute_priority)returnself.make_request("patch","api/v1/records",UpdateMetadata,body=body)
[docs]defadd_comment(self,record_ids:Union[int,Sequence[int]],comment:str)->UpdateMetadata:""" Adds a comment to records Parameters ---------- record_ids The record or records to add the comments to comment The comment string to add. Your username will be added automatically Returns ------- : Metadata about which records were updated """record_ids=make_list(record_ids)ifnotrecord_ids:returnUpdateMetadata()body_data=RecordModifyBody(record_ids=record_ids,comment=comment)returnself.make_request("patch","api/v1/records",UpdateMetadata,body=body_data)
[docs]defget_waiting_reason(self,record_id:int)->Dict[str,Any]:""" Get the reason a record is in the waiting status The return is a dictionary, with a 'reason' key containing the overall reason the record is waiting. If appropriate, there is a 'details' key that contains information for each active compute manager on why that manager is not able to pick up the record's task. Parameters ---------- record_id The record ID to test Returns ------- : A dictionary containing information about why the record is not being picked up by compute managers """returnself.make_request("get",f"api/v1/records/{record_id}/waiting_reason",Dict[str,Any])
[docs]defadd_singlepoints(self,molecules:Union[int,Molecule,List[Union[int,Molecule]]],program:str,driver:str,method:str,basis:Optional[str],keywords:Optional[Dict[str,Any]]=None,protocols:Optional[Union[SinglepointProtocols,Dict[str,Any]]]=None,compute_tag:str="*",compute_priority:PriorityEnum=PriorityEnum.normal,owner_group:Optional[str]=None,find_existing:bool=True,**kwargs,# For deprecated parameters)->Tuple[InsertMetadata,List[int]]:""" Adds new singlepoint computations to the server This checks if the calculations already exist in the database. If so, it returns the existing id, otherwise it will insert it and return the new id. This will add one record per molecule. Parameters ---------- molecules The Molecules or Molecule ids to compute with the above methods program The computational program to execute the result with (e.g., "rdkit", "psi4"). driver The primary result that the compute will acquire {"energy", "gradient", "hessian", "properties"} method The computational method to use (e.g., "B3LYP", "PBE") basis The basis to apply to the computation (e.g., "cc-pVDZ", "6-31G") keywords The program-specific keywords for the computation protocols Protocols for storing more/less data for each computation compute_tag The tag for the task. This will assist in routing to appropriate compute managers. compute_priority The priority of the job (high, normal, low). Default is normal. owner_group Group with additional permission for these records find_existing If True, search for existing records and return those. If False, always add new records Returns ------- : Metadata about the insertion, and a list of record ids. The ids will be in the order of the input molecules """if"tag"inkwargs:self._logger.warning("'tag' is deprecated; use 'compute_tag' instead")compute_tag=kwargs["tag"]if"priority"inkwargs:self._logger.warning("'priority' is deprecated; use 'compute_priority' instead")compute_priority=kwargs["priority"]molecules=make_list(molecules)ifnotmolecules:returnInsertMetadata(),[]iflen(molecules)>self.api_limits["add_records"]:raiseRuntimeError(f"Cannot add {len(molecules)} records - over the limit of {self.api_limits['add_records']}")body_data={"molecules":molecules,"specification":{"program":program,"driver":driver,"method":method,"basis":basis,},"compute_tag":compute_tag,"compute_priority":compute_priority,"owner_group":owner_group,"find_existing":find_existing,}# If these are None, then let the pydantic models handle the defaultsifkeywordsisnotNone:body_data["specification"]["keywords"]=keywordsifprotocolsisnotNone:body_data["specification"]["protocols"]=protocolsbody=SinglepointAddBody(**body_data)returnself.make_request("post","api/v1/records/singlepoint/bulkCreate",Tuple[InsertMetadata,List[int]],body=body)
[docs]defget_singlepoints(self,record_ids:Union[int,Sequence[int]],missing_ok:bool=False,*,include:Optional[Iterable[str]]=None,)->Union[Optional[SinglepointRecord],List[Optional[SinglepointRecord]]]:""" Obtain singlepoint records with the specified IDs. Records will be returned in the same order as the record ids. Parameters ---------- record_ids Single ID or sequence/list of records to obtain missing_ok If set to True, then missing records will be tolerated, and the returned records will contain None for the corresponding IDs that were not found. include Additional fields to include in the returned record Returns ------- : If a single ID was specified, returns just that record. Otherwise, returns a list of records. If missing_ok was specified, None will be substituted for a record that was not found. """returnself._get_records_by_type(SinglepointRecord,record_ids,missing_ok,include)
[docs]defquery_singlepoints(self,*,record_id:Optional[Union[int,Iterable[int]]]=None,manager_name:Optional[Union[str,Iterable[str]]]=None,status:Optional[Union[RecordStatusEnum,Iterable[RecordStatusEnum]]]=None,dataset_id:Optional[Union[int,Iterable[int]]]=None,parent_id:Optional[Union[int,Iterable[int]]]=None,created_before:Optional[Union[datetime,str]]=None,created_after:Optional[Union[datetime,str]]=None,modified_before:Optional[Union[datetime,str]]=None,modified_after:Optional[Union[datetime,str]]=None,program:Optional[Union[str,Iterable[str]]]=None,driver:Optional[Union[SinglepointDriver,Iterable[SinglepointDriver]]]=None,method:Optional[Union[str,Iterable[str]]]=None,basis:Optional[Union[str,Iterable[Optional[str]]]]=None,keywords:Optional[Union[Dict[str,Any],Iterable[Dict[str,Any]]]]=None,molecule_id:Optional[Union[int,Iterable[int]]]=None,owner_user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,owner_group:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,limit:Optional[int]=None,include:Optional[Iterable[str]]=None,)->RecordQueryIterator[SinglepointRecord]:""" Queries singlepoint records on the server Do not rely on the returned records being in any particular order. Parameters ---------- record_id Query records whose ID is in the given list manager_name Query records that were completed (or are currently runnning) on a manager is in the given list status Query records whose status is in the given list dataset_id Query records that are part of a dataset is in the given list parent_id Query records that have a parent is in the given list created_before Query records that were created before the given date/time created_after Query records that were created after the given date/time modified_before Query records that were modified before the given date/time modified_after Query records that were modified after the given date/time program Query records whose program is in the given list driver Query records whose driver is in the given list method Query records whose method is in the given list basis Query records whose basis is in the given list keywords Query records with these keywords (exact match) molecule_id Query records whose molecule (id) is in the given list owner_user Query records owned by a user in the given list owner_group Query records owned by a group in the given list limit The maximum number of records to return. Note that the server limit is always obeyed. include Additional fields to include in the returned record Returns ------- : An iterator that can be used to retrieve the results of the query """# Note - singlepoints don't have any childrenfilter_dict={"record_id":make_list(record_id),"manager_name":make_list(manager_name),"status":make_list(status),"dataset_id":make_list(dataset_id),"parent_id":make_list(parent_id),"program":make_list(program),"driver":make_list(driver),"method":make_list(method),"basis":make_list(basis),"keywords":make_list(keywords),"molecule_id":make_list(molecule_id),"created_before":created_before,"created_after":created_after,"modified_before":modified_before,"modified_after":modified_after,"owner_user":make_list(owner_user),"owner_group":make_list(owner_group),"limit":limit,}filter_data=SinglepointQueryFilters(**filter_dict)returnRecordQueryIterator[SinglepointRecord](self,filter_data,SinglepointRecord,include)
[docs]defadd_optimizations(self,initial_molecules:Union[int,Molecule,List[Union[int,Molecule]]],program:str,qc_specification:QCSpecification,keywords:Optional[Dict[str,Any]]=None,protocols:Optional[OptimizationProtocols]=None,compute_tag:str="*",compute_priority:PriorityEnum=PriorityEnum.normal,owner_group:Optional[str]=None,find_existing:bool=True,**kwargs,# For deprecated parameters)->Tuple[InsertMetadata,List[int]]:""" Adds new geometry optimization calculations to the server This checks if the calculations already exist in the database. If so, it returns the existing id, otherwise it will insert it and return the new id. This will add one record per initial molecule. Parameters ---------- initial_molecules Initial molecule/geometry to optimize program Which program to use for the optimization (ie, geometric) qc_specification The method, basis, etc, to optimize the geometry with keywords Program-specific keywords for the optimization program (not the qc program) protocols Protocols for storing more/less data for each computation (for the optimization) compute_tag The tag for the task. This will assist in routing to appropriate compute managers. compute_priority The priority of the job (high, normal, low). Default is normal. owner_group Group with additional permission for these records find_existing If True, search for existing records and return those. If False, always add new records Returns ------- : Metadata about the insertion, and a list of record ids. The ids will be in the order of the input molecules """if"tag"inkwargs:self._logger.warning("'tag' is deprecated; use 'compute_tag' instead")compute_tag=kwargs["tag"]if"priority"inkwargs:self._logger.warning("'priority' is deprecated; use 'compute_priority' instead")compute_priority=kwargs["priority"]initial_molecules=make_list(initial_molecules)ifnotinitial_molecules:returnInsertMetadata(),[]iflen(initial_molecules)>self.api_limits["add_records"]:raiseRuntimeError(f"Cannot add {len('initial_molecules')} records - over the limit of {self.api_limits['add_records']}")body_data={"initial_molecules":initial_molecules,"specification":{"program":program,"qc_specification":qc_specification,},"compute_tag":compute_tag,"compute_priority":compute_priority,"owner_group":owner_group,"find_existing":find_existing,}# If these are None, then let the pydantic models handle the defaultsifkeywordsisnotNone:body_data["specification"]["keywords"]=keywordsifprotocolsisnotNone:body_data["specification"]["protocols"]=protocolsbody_data=OptimizationAddBody(**body_data)returnself.make_request("post","api/v1/records/optimization/bulkCreate",Tuple[InsertMetadata,List[int]],body=body_data,)
[docs]defget_optimizations(self,record_ids:Union[int,Sequence[int]],missing_ok:bool=False,*,include:Optional[Iterable[str]]=None,)->Union[Optional[OptimizationRecord],List[Optional[OptimizationRecord]]]:""" Obtain optimization records with the specified IDs. Records will be returned in the same order as the record ids. Parameters ---------- record_ids Single ID or sequence/list of records to obtain missing_ok If set to True, then missing records will be tolerated, and the returned records will contain None for the corresponding IDs that were not found. include Additional fields to include in the returned record Returns ------- : If a single ID was specified, returns just that record. Otherwise, returns a list of records. If missing_ok was specified, None will be substituted for a record that was not found. """returnself._get_records_by_type(OptimizationRecord,record_ids,missing_ok,include)
[docs]defquery_optimizations(self,*,record_id:Optional[Union[int,Iterable[int]]]=None,manager_name:Optional[Union[str,Iterable[str]]]=None,status:Optional[Union[RecordStatusEnum,Iterable[RecordStatusEnum]]]=None,dataset_id:Optional[Union[int,Iterable[int]]]=None,parent_id:Optional[Union[int,Iterable[int]]]=None,child_id:Optional[Union[int,Iterable[int]]]=None,created_before:Optional[Union[datetime,str]]=None,created_after:Optional[Union[datetime,str]]=None,modified_before:Optional[Union[datetime,str]]=None,modified_after:Optional[Union[datetime,str]]=None,program:Optional[Union[str,Iterable[str]]]=None,qc_program:Optional[Union[str,Iterable[str]]]=None,qc_method:Optional[Union[str,Iterable[str]]]=None,qc_basis:Optional[Union[str,Iterable[Optional[str]]]]=None,initial_molecule_id:Optional[Union[int,Iterable[int]]]=None,final_molecule_id:Optional[Union[int,Iterable[int]]]=None,owner_user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,owner_group:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,limit:Optional[int]=None,include:Optional[Iterable[str]]=None,)->RecordQueryIterator[OptimizationRecord]:""" Queries optimization records on the server Do not rely on the returned records being in any particular order. Parameters ---------- record_id Query records whose ID is in the given list manager_name Query records that were completed (or are currently runnning) on a manager is in the given list status Query records whose status is in the given list dataset_id Query records that are part of a dataset is in the given list parent_id Query records that have a parent is in the given list child_id Query records that have a child (singlepoint calculation) is in the given list created_before Query records that were created before the given date/time created_after Query records that were created after the given date/time modified_before Query records that were modified before the given date/time modified_after Query records that were modified after the given date/time program Query records whose optimization program is in the given list qc_program Query records whose qc program is in the given list qc_method Query records whose method is in the given list qc_basis Query records whose basis is in the given list initial_molecule_id Query records whose initial molecule (id) is in the given list final_molecule_id Query records whose final molecule (id) is in the given list owner_user Query records owned by a user in the given list owner_group Query records owned by a group in the given list limit The maximum number of records to return. Note that the server limit is always obeyed. include Additional fields to include in the returned record Returns ------- : An iterator that can be used to retrieve the results of the query """filter_dict={"record_id":make_list(record_id),"manager_name":make_list(manager_name),"status":make_list(status),"dataset_id":make_list(dataset_id),"parent_id":make_list(parent_id),"child_id":make_list(child_id),"program":make_list(program),"qc_program":make_list(qc_program),"qc_method":make_list(qc_method),"qc_basis":make_list(qc_basis),"initial_molecule_id":make_list(initial_molecule_id),"final_molecule_id":make_list(final_molecule_id),"created_before":created_before,"created_after":created_after,"modified_before":modified_before,"modified_after":modified_after,"owner_user":make_list(owner_user),"owner_group":make_list(owner_group),"limit":limit,}filter_data=OptimizationQueryFilters(**filter_dict)returnRecordQueryIterator[OptimizationRecord](self,filter_data,OptimizationRecord,include)
[docs]defadd_torsiondrives(self,initial_molecules:List[List[Union[int,Molecule]]],program:str,optimization_specification:OptimizationSpecification,keywords:Union[TorsiondriveKeywords,Dict[str,Any]],compute_tag:str="*",compute_priority:PriorityEnum=PriorityEnum.normal,owner_group:Optional[str]=None,find_existing:bool=True,**kwargs,# For deprecated parameters)->Tuple[InsertMetadata,List[int]]:""" Adds new torsiondrive computations to the server This checks if the calculations already exist in the database. If so, it returns the existing id, otherwise it will insert it and return the new id. This will add one record per set of molecules Parameters ---------- initial_molecules Molecules to start the torsiondrives. Each torsiondrive can start with multiple molecules, so this is a nested list program The program to run the torsiondrive computation with ("torsiondrive") optimization_specification Specification of how each optimization of the torsiondrive should be run keywords The torsiondrive keywords for the computation compute_tag The tag for the task. This will assist in routing to appropriate compute managers. compute_priority The priority of the job (high, normal, low). Default is normal. owner_group Group with additional permission for these records find_existing If True, search for existing records and return those. If False, always add new records Returns ------- : Metadata about the insertion, and a list of record ids. The ids will be in the order of the input molecules """if"tag"inkwargs:self._logger.warning("'tag' is deprecated; use 'compute_tag' instead")compute_tag=kwargs["tag"]if"priority"inkwargs:self._logger.warning("'priority' is deprecated; use 'compute_priority' instead")compute_priority=kwargs["priority"]ifnotinitial_molecules:returnInsertMetadata(),[]iflen(initial_molecules)>self.api_limits["add_records"]:raiseRuntimeError(f"Cannot add {len(initial_molecules)} records - over the limit of {self.api_limits['add_records']}")body_data={"initial_molecules":initial_molecules,"specification":{"program":program,"optimization_specification":optimization_specification,"keywords":keywords,},"as_service":True,"compute_tag":compute_tag,"compute_priority":compute_priority,"owner_group":owner_group,"find_existing":find_existing,}body=TorsiondriveAddBody(**body_data)returnself.make_request("post","api/v1/records/torsiondrive/bulkCreate",Tuple[InsertMetadata,List[int]],body=body)
[docs]defget_torsiondrives(self,record_ids:Union[int,Sequence[int]],missing_ok:bool=False,*,include:Optional[Iterable[str]]=None,)->Union[Optional[TorsiondriveRecord],List[Optional[TorsiondriveRecord]]]:""" Obtain torsiondrive records with the specified IDs. Records will be returned in the same order as the record ids. Parameters ---------- record_ids Single ID or sequence/list of records to obtain missing_ok If set to True, then missing records will be tolerated, and the returned records will contain None for the corresponding IDs that were not found. include Additional fields to include in the returned record Returns ------- : If a single ID was specified, returns just that record. Otherwise, returns a list of records. If missing_ok was specified, None will be substituted for a record that was not found. """returnself._get_records_by_type(TorsiondriveRecord,record_ids,missing_ok,include)
[docs]defquery_torsiondrives(self,*,record_id:Optional[Union[int,Iterable[int]]]=None,manager_name:Optional[Union[str,Iterable[str]]]=None,status:Optional[Union[RecordStatusEnum,Iterable[RecordStatusEnum]]]=None,dataset_id:Optional[Union[int,Iterable[int]]]=None,parent_id:Optional[Union[int,Iterable[int]]]=None,child_id:Optional[Union[int,Iterable[int]]]=None,created_before:Optional[Union[datetime,str]]=None,created_after:Optional[Union[datetime,str]]=None,modified_before:Optional[Union[datetime,str]]=None,modified_after:Optional[Union[datetime,str]]=None,program:Optional[Union[str,Iterable[str]]]=None,optimization_program:Optional[Union[str,Iterable[str]]]=None,qc_program:Optional[Union[str,Iterable[str]]]=None,qc_method:Optional[Union[str,Iterable[str]]]=None,qc_basis:Optional[Union[str,Iterable[str]]]=None,initial_molecule_id:Optional[Union[int,Iterable[int]]]=None,owner_user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,owner_group:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,limit:Optional[int]=None,include:Optional[Iterable[str]]=None,)->RecordQueryIterator[TorsiondriveRecord]:""" Queries torsiondrive records on the server Do not rely on the returned records being in any particular order. Parameters ---------- record_id Query records whose ID is in the given list manager_name Query records that were completed (or are currently runnning) on a manager is in the given list status Query records whose status is in the given list dataset_id Query records that are part of a dataset is in the given list parent_id Query records that have a parent is in the given list child_id Query records that have a child (optimization calculation) is in the given list created_before Query records that were created before the given date/time created_after Query records that were created after the given date/time modified_before Query records that were modified before the given date/time modified_after Query records that were modified after the given date/time program Query records whose torsiondrive program is in the given list optimization_program Query records whose optimization program is in the given list qc_program Query records whose qc program is in the given list qc_method Query records whose method is in the given list qc_basis Query records whose basis is in the given list initial_molecule_id Query records whose initial molecule (id) is in the given list owner_user Query records owned by a user in the given list owner_group Query records owned by a group in the given list limit The maximum number of records to return. Note that the server limit is always obeyed. include Additional fields to include in the returned record Returns ------- : An iterator that can be used to retrieve the results of the query """filter_dict={"record_id":make_list(record_id),"manager_name":make_list(manager_name),"status":make_list(status),"dataset_id":make_list(dataset_id),"parent_id":make_list(parent_id),"child_id":make_list(child_id),"program":make_list(program),"optimization_program":make_list(optimization_program),"qc_program":make_list(qc_program),"qc_method":make_list(qc_method),"qc_basis":make_list(qc_basis),"initial_molecule_id":make_list(initial_molecule_id),"created_before":created_before,"created_after":created_after,"modified_before":modified_before,"modified_after":modified_after,"owner_user":make_list(owner_user),"owner_group":make_list(owner_group),"limit":limit,}filter_data=TorsiondriveQueryFilters(**filter_dict)returnRecordQueryIterator[TorsiondriveRecord](self,filter_data,TorsiondriveRecord,include)
[docs]defadd_gridoptimizations(self,initial_molecules:Union[int,Molecule,Sequence[Union[int,Molecule]]],program:str,optimization_specification:OptimizationSpecification,keywords:Union[GridoptimizationKeywords,Dict[str,Any]],compute_tag:str="*",compute_priority:PriorityEnum=PriorityEnum.normal,owner_group:Optional[str]=None,find_existing:bool=True,**kwargs,# For deprecated parameters)->Tuple[InsertMetadata,List[int]]:""" Adds new gridoptimization computations to the server This checks if the calculations already exist in the database. If so, it returns the existing id, otherwise it will insert it and return the new id. This will add one record per initial molecule Parameters ---------- initial_molecules Molecules to start the gridoptimizations. Each gridoptimization starts with a single molecule. program The program to run the gridoptimization computation with ("gridoptimization") optimization_specification Specification of how each optimization of the gridoptimization should be run keywords The gridoptimization keywords for the computation compute_tag The tag for the task. This will assist in routing to appropriate compute managers. compute_priority The priority of the job (high, normal, low). Default is normal. owner_group Group with additional permission for these records find_existing If True, search for existing records and return those. If False, always add new records Returns ------- : Metadata about the insertion, and a list of record ids. The ids will be in the order of the input molecules """if"tag"inkwargs:self._logger.warning("'tag' is deprecated; use 'compute_tag' instead")compute_tag=kwargs["tag"]if"priority"inkwargs:self._logger.warning("'priority' is deprecated; use 'compute_priority' instead")compute_priority=kwargs["priority"]initial_molecules=make_list(initial_molecules)ifnotinitial_molecules:returnInsertMetadata(),[]iflen(initial_molecules)>self.api_limits["add_records"]:raiseRuntimeError(f"Cannot add {len(initial_molecules)} records - over the limit of {self.api_limits['add_records']}")body_data={"initial_molecules":initial_molecules,"specification":{"program":program,"optimization_specification":optimization_specification,"keywords":keywords,},"compute_tag":compute_tag,"compute_priority":compute_priority,"owner_group":owner_group,"find_existing":find_existing,}body=GridoptimizationAddBody(**body_data)returnself.make_request("post","api/v1/records/gridoptimization/bulkCreate",Tuple[InsertMetadata,List[int]],body=body)
[docs]defget_gridoptimizations(self,record_ids:Union[int,Sequence[int]],missing_ok:bool=False,*,include:Optional[Iterable[str]]=None,)->Union[Optional[GridoptimizationRecord],List[Optional[GridoptimizationRecord]]]:""" Obtain gridoptimization records with the specified IDs. Records will be returned in the same order as the record ids. Parameters ---------- record_ids Single ID or sequence/list of records to obtain missing_ok If set to True, then missing records will be tolerated, and the returned records will contain None for the corresponding IDs that were not found. include Additional fields to include in the returned record Returns ------- : If a single ID was specified, returns just that record. Otherwise, returns a list of records. If missing_ok was specified, None will be substituted for a record that was not found. """returnself._get_records_by_type(GridoptimizationRecord,record_ids,missing_ok,include)
[docs]defquery_gridoptimizations(self,*,record_id:Optional[Union[int,Iterable[int]]]=None,manager_name:Optional[Union[str,Iterable[str]]]=None,status:Optional[Union[RecordStatusEnum,Iterable[RecordStatusEnum]]]=None,dataset_id:Optional[Union[int,Iterable[int]]]=None,parent_id:Optional[Union[int,Iterable[int]]]=None,child_id:Optional[Union[int,Iterable[int]]]=None,created_before:Optional[Union[datetime,str]]=None,created_after:Optional[Union[datetime,str]]=None,modified_before:Optional[Union[datetime,str]]=None,modified_after:Optional[Union[datetime,str]]=None,program:Optional[Union[str,Iterable[str]]]=None,optimization_program:Optional[Union[str,Iterable[str]]]=None,qc_program:Optional[Union[str,Iterable[str]]]=None,qc_method:Optional[Union[str,Iterable[str]]]=None,qc_basis:Optional[Union[str,Iterable[Optional[str]]]]=None,initial_molecule_id:Optional[Union[int,Iterable[int]]]=None,owner_user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,owner_group:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,limit:Optional[int]=None,include:Optional[Iterable[str]]=None,)->RecordQueryIterator[GridoptimizationRecord]:""" Queries gridoptimization records on the server Do not rely on the returned records being in any particular order. Parameters ---------- record_id Query records whose ID is in the given list manager_name Query records that were completed (or are currently runnning) on a manager is in the given list status Query records whose status is in the given list dataset_id Query records that are part of a dataset is in the given list parent_id Query records that have a parent is in the given list child_id Query records that have a child (optimization calculation) is in the given list created_before Query records that were created before the given date/time created_after Query records that were created after the given date/time modified_before Query records that were modified before the given date/time modified_after Query records that were modified after the given date/time program Query records whose gridoptimization program is in the given list optimization_program Query records whose optimization program is in the given list qc_program Query records whose qc program is in the given list qc_method Query records whose method is in the given list qc_basis Query records whose basis is in the given list initial_molecule_id Query records whose initial molecule (id) is in the given list owner_user Query records owned by a user in the given list owner_group Query records owned by a group in the given list limit The maximum number of records to return. Note that the server limit is always obeyed. include Additional fields to include in the returned record Returns ------- : An iterator that can be used to retrieve the results of the query """filter_dict={"record_id":make_list(record_id),"manager_name":make_list(manager_name),"status":make_list(status),"dataset_id":make_list(dataset_id),"parent_id":make_list(parent_id),"child_id":make_list(child_id),"program":make_list(program),"optimization_program":make_list(optimization_program),"qc_program":make_list(qc_program),"qc_method":make_list(qc_method),"qc_basis":make_list(qc_basis),"initial_molecule_id":make_list(initial_molecule_id),"created_before":created_before,"created_after":created_after,"modified_before":modified_before,"modified_after":modified_after,"owner_user":make_list(owner_user),"owner_group":make_list(owner_group),"limit":limit,}filter_data=GridoptimizationQueryFilters(**filter_dict)returnRecordQueryIterator[GridoptimizationRecord](self,filter_data,GridoptimizationRecord,include)
[docs]defadd_reactions(self,stoichiometries:Sequence[Sequence[Sequence[float,Union[int,Molecule]]]],program:str,singlepoint_specification:Optional[QCSpecification],optimization_specification:Optional[OptimizationSpecification],keywords:ReactionKeywords,compute_tag:str="*",compute_priority:PriorityEnum=PriorityEnum.normal,owner_group:Optional[str]=None,find_existing:bool=True,**kwargs,# For deprecated parameters)->Tuple[InsertMetadata,List[int]]:""" Adds new reaction computations to the server Reactions can have a singlepoint specification, optimization specification, or both; at least one must be specified. If both are specified, an optimization is done, followed by a singlepoint computation. Otherwise, only the specification that is specified is used. This checks if the calculations already exist in the database. If so, it returns the existing id, otherwise it will insert it and return the new id. This will add one record per set of molecules Parameters ---------- stoichiometries Coefficients and molecules of the reaction. Each reaction has multiple molecules/coefficients, so this is a nested list program The program for running the reaction computation ("reaction") singlepoint_specification The specification for singlepoint energy calculations optimization_specification The specification for optimization calculations keywords The keywords for the reaction calculation/service compute_tag The tag for the task. This will assist in routing to appropriate compute managers. compute_priority The priority of the job (high, normal, low). Default is normal. owner_group Group with additional permission for these records find_existing If True, search for existing records and return those. If False, always add new records Returns ------- : Metadata about the insertion, and a list of record ids. The ids will be in the order of the input molecules """if"tag"inkwargs:self._logger.warning("'tag' is deprecated; use 'compute_tag' instead")compute_tag=kwargs["tag"]if"priority"inkwargs:self._logger.warning("'priority' is deprecated; use 'compute_priority' instead")compute_priority=kwargs["priority"]ifnotstoichiometries:returnInsertMetadata(),[]iflen(stoichiometries)>self.api_limits["add_records"]:raiseRuntimeError(f"Cannot add {len(stoichiometries)} records - over the limit of {self.api_limits['add_records']}")body_data={"stoichiometries":stoichiometries,"specification":{"program":program,"singlepoint_specification":singlepoint_specification,"optimization_specification":optimization_specification,"keywords":keywords,},"compute_tag":compute_tag,"compute_priority":compute_priority,"owner_group":owner_group,"find_existing":find_existing,}body=ReactionAddBody(**body_data)returnself.make_request("post","api/v1/records/reaction/bulkCreate",Tuple[InsertMetadata,List[int]],body=body)
[docs]defget_reactions(self,record_ids:Union[int,Sequence[int]],missing_ok:bool=False,*,include:Optional[Iterable[str]]=None,)->Union[Optional[ReactionRecord],List[Optional[ReactionRecord]]]:""" Obtain reaction records with the specified IDs. Records will be returned in the same order as the record ids. Parameters ---------- record_ids Single ID or sequence/list of records to obtain missing_ok If set to True, then missing records will be tolerated, and the returned records will contain None for the corresponding IDs that were not found. include Additional fields to include in the returned record Returns ------- : If a single ID was specified, returns just that record. Otherwise, returns a list of records. If missing_ok was specified, None will be substituted for a record that was not found. """returnself._get_records_by_type(ReactionRecord,record_ids,missing_ok,include)
[docs]defquery_reactions(self,*,record_id:Optional[Union[int,Iterable[int]]]=None,manager_name:Optional[Union[str,Iterable[str]]]=None,status:Optional[Union[RecordStatusEnum,Iterable[RecordStatusEnum]]]=None,dataset_id:Optional[Union[int,Iterable[int]]]=None,parent_id:Optional[Union[int,Iterable[int]]]=None,child_id:Optional[Union[int,Iterable[int]]]=None,created_before:Optional[Union[datetime,str]]=None,created_after:Optional[Union[datetime,str]]=None,modified_before:Optional[Union[datetime,str]]=None,modified_after:Optional[Union[datetime,str]]=None,program:Optional[Union[str,Iterable[str]]]=None,optimization_program:Optional[Iterable[Optional[str]]]=None,qc_program:Optional[Union[str,Iterable[str]]]=None,qc_method:Optional[Union[str,Iterable[str]]]=None,qc_basis:Optional[Union[str,Iterable[str]]]=None,molecule_id:Optional[Union[int,Iterable[int]]]=None,owner_user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,owner_group:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,limit:Optional[int]=None,include:Optional[Iterable[str]]=None,)->RecordQueryIterator[ReactionRecord]:""" Queries reaction records on the server Do not rely on the returned records being in any particular order. Parameters ---------- record_id Query records whose ID is in the given list manager_name Query records that were completed (or are currently runnning) on a manager is in the given list status Query records whose status is in the given list dataset_id Query records that are part of a dataset is in the given list parent_id Query records that have a parent is in the given list child_id Query records that have a child (optimization calculation) is in the given list created_before Query records that were created before the given date/time created_after Query records that were created after the given date/time modified_before Query records that were modified before the given date/time modified_after Query records that were modified after the given date/time program Query records whose reaction program is in the given list optimization_program Query records whose optimization program is in the given list qc_program Query records whose qc program is in the given list qc_method Query records whose method is in the given list qc_basis Query records whose basis is in the given list molecule_id Query reactions that contain a molecule (id) is in the given list owner_user Query records owned by a user in the given list owner_group Query records owned by a group in the given list limit The maximum number of records to return. Note that the server limit is always obeyed. include Additional fields to include in the returned record Returns ------- : An iterator that can be used to retrieve the results of the query """filter_dict={"record_id":make_list(record_id),"manager_name":make_list(manager_name),"status":make_list(status),"dataset_id":make_list(dataset_id),"parent_id":make_list(parent_id),"child_id":make_list(child_id),"program":make_list(program),"optimization_program":make_list(optimization_program),"qc_program":make_list(qc_program),"qc_method":make_list(qc_method),"qc_basis":make_list(qc_basis),"molecule_id":make_list(molecule_id),"created_before":created_before,"created_after":created_after,"modified_before":modified_before,"modified_after":modified_after,"owner_user":make_list(owner_user),"owner_group":make_list(owner_group),"limit":limit,}filter_data=ReactionQueryFilters(**filter_dict)returnRecordQueryIterator[ReactionRecord](self,filter_data,ReactionRecord,include)
[docs]defadd_manybodys(self,initial_molecules:Sequence[Union[int,Molecule]],program:str,levels:Dict[Union[int,Literal["supersystem"]],QCSpecification],bsse_correction:Union[BSSECorrectionEnum,Sequence[BSSECorrectionEnum]],keywords:Union[ManybodyKeywords,Dict[str,Any]],compute_tag:str="*",compute_priority:PriorityEnum=PriorityEnum.normal,owner_group:Optional[str]=None,find_existing:bool=True,**kwargs,# For deprecated parameters)->Tuple[InsertMetadata,List[int]]:""" Adds new manybody expansion computations to the server This checks if the calculations already exist in the database. If so, it returns the existing id, otherwise it will insert it and return the new id. This will add one record per initial molecule. Parameters ---------- initial_molecules Initial molecules for the manybody expansion. Must have > 1 fragments. program The program to run the manybody computation with ("manybody") singlepoint_specification Specification for the singlepoint calculations done in the expansion keywords The keywords for the manybody program compute_tag The tag for the task. This will assist in routing to appropriate compute managers. compute_priority The priority of the job (high, normal, low). Default is normal. owner_group Group with additional permission for these records find_existing If True, search for existing records and return those. If False, always add new records Returns ------- : Metadata about the insertion, and a list of record ids. The ids will be in the order of the input molecules """if"tag"inkwargs:self._logger.warning("'tag' is deprecated; use 'compute_tag' instead")compute_tag=kwargs["tag"]if"priority"inkwargs:self._logger.warning("'priority' is deprecated; use 'compute_priority' instead")compute_priority=kwargs["priority"]initial_molecules=make_list(initial_molecules)ifnotinitial_molecules:returnInsertMetadata(),[]iflen(initial_molecules)>self.api_limits["add_records"]:raiseRuntimeError(f"Cannot add {len(initial_molecules)} records - over the limit of {self.api_limits['add_records']}")body_data={"initial_molecules":initial_molecules,"specification":{"program":program,"levels":levels,"bsse_correction":make_list(bsse_correction),"keywords":keywords,},"compute_tag":compute_tag,"compute_priority":compute_priority,"owner_group":owner_group,"find_existing":find_existing,}body=ManybodyAddBody(**body_data)returnself.make_request("post","api/v1/records/manybody/bulkCreate",Tuple[InsertMetadata,List[int]],body=body)
[docs]defget_manybodys(self,record_ids:Union[int,Sequence[int]],missing_ok:bool=False,*,include:Optional[Iterable[str]]=None,)->Union[Optional[ManybodyRecord],List[Optional[ManybodyRecord]]]:""" Obtain manybody records with the specified IDs. Records will be returned in the same order as the record ids. Parameters ---------- record_ids Single ID or sequence/list of records to obtain missing_ok If set to True, then missing records will be tolerated, and the returned records will contain None for the corresponding IDs that were not found. include Additional fields to include in the returned record Returns ------- : If a single ID was specified, returns just that record. Otherwise, returns a list of records. If missing_ok was specified, None will be substituted for a record that was not found. """returnself._get_records_by_type(ManybodyRecord,record_ids,missing_ok,include)
[docs]defquery_manybodys(self,*,record_id:Optional[Union[int,Iterable[int]]]=None,manager_name:Optional[Union[str,Iterable[str]]]=None,status:Optional[Union[RecordStatusEnum,Iterable[RecordStatusEnum]]]=None,dataset_id:Optional[Union[int,Iterable[int]]]=None,parent_id:Optional[Union[int,Iterable[int]]]=None,child_id:Optional[Union[int,Iterable[int]]]=None,created_before:Optional[Union[datetime,str]]=None,created_after:Optional[Union[datetime,str]]=None,modified_before:Optional[Union[datetime,str]]=None,modified_after:Optional[Union[datetime,str]]=None,program:Optional[Union[str,Iterable[str]]]=None,qc_program:Optional[Union[str,Iterable[str]]]=None,qc_method:Optional[Union[str,Iterable[str]]]=None,qc_basis:Optional[Union[str,Iterable[str]]]=None,initial_molecule_id:Optional[Union[int,Iterable[int]]]=None,owner_user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,owner_group:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,limit:Optional[int]=None,include:Optional[Iterable[str]]=None,)->RecordQueryIterator[ManybodyRecord]:""" Queries reaction records on the server Do not rely on the returned records being in any particular order. Parameters ---------- record_id Query records whose ID is in the given list manager_name Query records that were completed (or are currently runnning) on a manager is in the given list status Query records whose status is in the given list dataset_id Query records that are part of a dataset is in the given list parent_id Query records that have a parent is in the given list child_id Query records that have a child (optimization calculation) is in the given list created_before Query records that were created before the given date/time created_after Query records that were created after the given date/time modified_before Query records that were modified before the given date/time modified_after Query records that were modified after the given date/time program Query records whose reaction program is in the given list qc_program Query records whose qc program is in the given list qc_method Query records whose qc method is in the given list qc_basis Query records whose qc basis is in the given list initial_molecule_id Query manybody calculations that contain an initial molecule (id) is in the given list owner_user Query records owned by a user in the given list owner_group Query records owned by a group in the given list limit The maximum number of records to return. Note that the server limit is always obeyed. include Additional fields to include in the returned record Returns ------- : An iterator that can be used to retrieve the results of the query """filter_dict={"record_id":make_list(record_id),"manager_name":make_list(manager_name),"status":make_list(status),"dataset_id":make_list(dataset_id),"parent_id":make_list(parent_id),"child_id":make_list(child_id),"program":make_list(program),"qc_program":make_list(qc_program),"qc_method":make_list(qc_method),"qc_basis":make_list(qc_basis),"initial_molecule_id":make_list(initial_molecule_id),"created_before":created_before,"created_after":created_after,"modified_before":modified_before,"modified_after":modified_after,"owner_user":make_list(owner_user),"owner_group":make_list(owner_group),"limit":limit,}filter_data=ManybodyQueryFilters(**filter_dict)returnRecordQueryIterator[ManybodyRecord](self,filter_data,ManybodyRecord,include)
[docs]defadd_nebs(self,initial_chains:List[List[Union[int,Molecule]]],program:str,singlepoint_specification:QCSpecification,optimization_specification:Optional[OptimizationSpecification],keywords:Union[NEBKeywords,Dict[str,Any]],compute_tag:str="*",compute_priority:PriorityEnum=PriorityEnum.normal,owner_group:Optional[str]=None,find_existing:bool=True,**kwargs,# For deprecated parameters)->Tuple[InsertMetadata,List[int]]:""" Adds neb calculations to the server This checks if the calculations already exist in the database. If so, it returns the existing id, otherwise it will insert it and return the new id. This will add one record per set of molecules Parameters ---------- initial_chains The initial chains to run the NEB calculations on . Each NEB calculation starts with a single chain (list of molecules), so this is a nested list program The program to run the neb computation with ("geometric") singlepoint_specification Specification of how each singlepoint (gradient/hessian) should be run optimization_specification Specification of how any optimizations of the torsiondrive should be run keywords The torsiondrive keywords for the computation compute_tag The tag for the task. This will assist in routing to appropriate compute managers. compute_priority The priority of the job (high, normal, low). Default is normal. owner_group Group with additional permission for these records find_existing If True, search for existing records and return those. If False, always add new records Returns ------- : Metadata about the insertion, and a list of record ids. The ids will be in the order of the input molecules """if"tag"inkwargs:self._logger.warning("'tag' is deprecated; use 'compute_tag' instead")compute_tag=kwargs["tag"]if"priority"inkwargs:self._logger.warning("'priority' is deprecated; use 'compute_priority' instead")compute_priority=kwargs["priority"]ifnotinitial_chains:returnInsertMetadata(),[]body_data={"initial_chains":initial_chains,"specification":{"program":program,"singlepoint_specification":singlepoint_specification,"optimization_specification":optimization_specification,"keywords":keywords,},"compute_tag":compute_tag,"compute_priority":compute_priority,"owner_group":owner_group,"find_existing":find_existing,}iflen(body_data["initial_chains"])>self.api_limits["add_records"]:raiseRuntimeError(f"Cannot add {len(body_data['initial_chains'])} records - over the limit of {self.api_limits['add_records']}")body=NEBAddBody(**body_data)returnself.make_request("post","api/v1/records/neb/bulkCreate",Tuple[InsertMetadata,List[int]],body=body,)
[docs]defget_nebs(self,record_ids:Union[int,Sequence[int]],missing_ok:bool=False,*,include:Optional[Iterable[str]]=None,)->Union[Optional[NEBRecord],List[Optional[NEBRecord]]]:""" Obtain NEB records with the specified IDs. Records will be returned in the same order as the record ids. Parameters ---------- record_ids Single ID or sequence/list of records to obtain missing_ok If set to True, then missing records will be tolerated, and the returned records will contain None for the corresponding IDs that were not found. include Additional fields to include in the returned record Returns ------- : If a single ID was specified, returns just that record. Otherwise, returns a list of records. If missing_ok was specified, None will be substituted for a record that was not found. """returnself._get_records_by_type(NEBRecord,record_ids,missing_ok,include)
[docs]defquery_nebs(self,*,record_id:Optional[Union[int,Iterable[int]]]=None,manager_name:Optional[Union[str,Iterable[str]]]=None,status:Optional[Union[RecordStatusEnum,Iterable[RecordStatusEnum]]]=None,dataset_id:Optional[Union[int,Iterable[int]]]=None,parent_id:Optional[Union[int,Iterable[int]]]=None,child_id:Optional[Union[int,Iterable[int]]]=None,created_before:Optional[Union[datetime,str]]=None,created_after:Optional[Union[datetime,str]]=None,modified_before:Optional[Union[datetime,str]]=None,modified_after:Optional[Union[datetime,str]]=None,program:Optional[Union[str,Iterable[str]]]=None,qc_program:Optional[Union[str,Iterable[str]]]=None,qc_method:Optional[Union[str,Iterable[str]]]=None,qc_basis:Optional[Union[str,Iterable[str]]]=None,molecule_id:Optional[Union[int,Iterable[int]]]=None,owner_user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,owner_group:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,limit:Optional[int]=None,include:Optional[Iterable[str]]=None,)->RecordQueryIterator[NEBRecord]:""" Queries neb records from the server Do not rely on the returned records being in any particular order. Parameters ---------- record_id Query records whose ID is in the given list manager_name Query records that were completed (or are currently runnning) on a manager is in the given list status Query records whose status is in the given list dataset_id Query records that are part of a dataset is in the given list parent_id Query records that have a parent is in the given list child_id Query records that have a child (optimization calculation) is in the given list created_before Query records that were created before the given date/time created_after Query records that were created after the given date/time modified_before Query records that were modified before the given date/time modified_after Query records that were modified after the given date/time program Query records whose torsiondrive program is in the given list qc_program Query records whose qc program is in the given list qc_method Query records whose method is in the given list qc_basis Query records whose basis is in the given list molecule_id Query records whose initial chains contain a molecule (id) that is in the given list owner_user Query records owned by a user in the given list owner_group Query records owned by a group in the given list limit The maximum number of records to return. Note that the server limit is always obeyed. include Additional fields to include in the returned record Returns ------- : An iterator that can be used to retrieve the results of the query """filter_dict={"record_id":make_list(record_id),"manager_name":make_list(manager_name),"status":make_list(status),"dataset_id":make_list(dataset_id),"parent_id":make_list(parent_id),"child_id":make_list(child_id),"program":make_list(program),"qc_program":make_list(qc_program),"qc_method":make_list(qc_method),"qc_basis":make_list(qc_basis),"molecule_id":make_list(molecule_id),"created_before":created_before,"created_after":created_after,"modified_before":modified_before,"modified_after":modified_after,"owner_user":make_list(owner_user),"owner_group":make_list(owner_group),"limit":limit,}filter_data=NEBQueryFilters(**filter_dict)returnRecordQueryIterator[NEBRecord](self,filter_data,NEBRecord,include)
[docs]defget_managers(self,names:Union[str,Sequence[str]],missing_ok:bool=False,)->Union[Optional[ComputeManager],List[Optional[ComputeManager]]]:"""Obtain manager information from the server with the specified names Parameters ---------- names A manager name or list of names missing_ok If set to True, then missing managers will be tolerated, and the returned managers will contain None for the corresponding managers that were not found. Returns ------- : If a single name was specified, returns just that manager. Otherwise, returns a list of managers. If missing_ok was specified, None will be substituted for a manager that was not found. """is_single=isinstance(names,str)names=make_list(names)ifnotnames:return[]body=CommonBulkGetNamesBody(names=names,missing_ok=missing_ok)managers=self.make_request("post","api/v1/managers/bulkGet",List[Optional[ComputeManager]],body=body)forminmanagers:ifmisnotNone:m.propagate_client(self)ifis_single:returnmanagers[0]else:returnmanagers
[docs]defquery_managers(self,*,manager_id:Optional[Union[int,Iterable[int]]]=None,name:Optional[Union[str,Iterable[str]]]=None,cluster:Optional[Union[str,Iterable[str]]]=None,hostname:Optional[Union[str,Iterable[str]]]=None,status:Optional[Union[RecordStatusEnum,Iterable[RecordStatusEnum]]]=None,modified_before:Optional[Union[datetime,str]]=None,modified_after:Optional[Union[datetime,str]]=None,limit:Optional[int]=None,)->ManagerQueryIterator:""" Queries manager information on the server Parameters ---------- manager_id ID assigned to the manager (this is not the UUID. This should be used very rarely). name Queries managers whose name is in the given list cluster Queries managers whose assigned cluster is in the given list hostname Queries managers whose hostname is in the given list status Queries managers whose status is in the given list modified_before Query for managers last modified before a certain time modified_after Query for managers last modified after a certain time limit The maximum number of managers to return. Note that the server limit is always obeyed. Returns ------- : An iterator that can be used to retrieve the results of the query """filter_dict={"manager_id":make_list(manager_id),"name":make_list(name),"cluster":make_list(cluster),"hostname":make_list(hostname),"status":make_list(status),"modified_before":modified_before,"modified_after":modified_after,"limit":limit,}filter_data=ManagerQueryFilters(**filter_dict)returnManagerQueryIterator(self,filter_data)
[docs]defquery_access_log(self,*,module:Optional[Union[str,Iterable[str]]]=None,method:Optional[Union[str,Iterable[str]]]=None,before:Optional[Union[datetime,str]]=None,after:Optional[Union[datetime,str]]=None,user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,limit:Optional[int]=None,)->AccessLogQueryIterator:""" Query the server access log This log contains information about who accessed the server, and when. Parameters ---------- module Return log entries whose module is in the given list method Return log entries whose access_method is in the given list before Return log entries captured before the specified date/time after Return log entries captured after the specified date/time user User name or ID associated with the log entry limit The maximum number of log entries to return. Note that the server limit is always obeyed. Returns ------- : An iterator that can be used to retrieve the results of the query """filter_data=AccessLogQueryFilters(module=make_list(module),method=make_list(method),before=before,after=after,user=make_list(user),limit=limit,)returnAccessLogQueryIterator(self,filter_data)
[docs]defdelete_access_log(self,before:datetime)->int:""" Delete access log entries from the server Parameters ---------- before Delete access log entries captured before the given date/time Returns ------- : The number of access log entries deleted from the server """body=DeleteBeforeDateBody(before=before)returnself.make_request("post","api/v1/access_logs/bulkDelete",int,body=body)
[docs]defquery_error_log(self,*,error_id:Optional[Union[int,Iterable[int]]]=None,user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,before:Optional[Union[datetime,str]]=None,after:Optional[Union[datetime,str]]=None,limit:Optional[int]=None,)->ErrorLogQueryIterator:""" Query the server's internal error log This log contains internal errors that are not always passed to the user. Parameters ---------- error_id Return error log entries whose id is in the list user Return error log entries whose user name or ID is in the list before Return error log entries captured before the specified date/time after Return error log entries captured after the specified date/time limit The maximum number of log entries to return. Note that the server limit is always obeyed. Returns ------- : An iterator that can be used to retrieve the results of the query """filter_data=ErrorLogQueryFilters(error_id=make_list(error_id),user=make_list(user),before=before,after=after,limit=limit,)returnErrorLogQueryIterator(self,filter_data)
[docs]defdelete_error_log(self,before:datetime)->int:""" Delete error log entries from the server Parameters ---------- before Delete error log entries captured before the given date/time Returns ------- : The number of error log entries deleted from the server """body=DeleteBeforeDateBody(before=before)returnself.make_request("post","api/v1/server_errors/bulkDelete",int,body=body)
[docs]defget_internal_job(self,job_id:int)->InternalJob:""" Gets information about an internal job on the server """ij_dict=self.make_request("get",f"api/v1/internal_jobs/{job_id}",Dict[str,Any])returnInternalJob(client=self,**ij_dict)
[docs]defquery_internal_jobs(self,*,job_id:Optional[int,Iterable[int]]=None,name:Optional[Union[str,Iterable[str]]]=None,user:Optional[Union[int,str,Iterable[Union[int,str]]]]=None,runner_hostname:Optional[Union[str,Iterable[str]]]=None,status:Optional[Union[InternalJobStatusEnum,Iterable[InternalJobStatusEnum]]]=None,last_updated_before:Optional[Union[datetime,str]]=None,last_updated_after:Optional[Union[datetime,str]]=None,added_before:Optional[Union[datetime,str]]=None,added_after:Optional[Union[datetime,str]]=None,scheduled_before:Optional[Union[datetime,str]]=None,scheduled_after:Optional[Union[datetime,str]]=None,limit:Optional[int]=None,)->InternalJobQueryIterator:""" Queries the internal job queue on the server Parameters ---------- job_id ID assigned to the job name Queries jobs whose name is in the given list user User name or ID associated with the log entry runner_hostname Queries jobs that were run/are running on a given host status Queries jobs whose status is in the given list last_updated_before Query for jobs last updated before a certain time last_updated_after Query for jobs last updated after a certain time added_before Query for jobs added before a certain time added_after Query for jobs added after a certain time scheduled_before Query for jobs scheduled to run before a certain time scheduled_after Query for jobs scheduled to run after a certain time limit The maximum number of jobs to return. Note that the server limit is always obeyed. Returns ------- : An iterator that can be used to retrieve the results of the query """filter_dict={"job_id":make_list(job_id),"name":make_list(name),"runner_hostname":make_list(runner_hostname),"status":make_list(status),"user":make_list(user),"last_updated_before":last_updated_before,"last_updated_after":last_updated_after,"added_before":added_before,"added_after":added_after,"scheduled_before":scheduled_before,"scheduled_after":scheduled_after,"limit":limit,}filter_data=InternalJobQueryFilters(**filter_dict)returnInternalJobQueryIterator(self,filter_data)
[docs]defcancel_internal_job(self,job_id:int):""" Cancels (to the best of our ability) an internal job """returnself.make_request("put",f"api/v1/internal_jobs/{job_id}/status",None,body=InternalJobStatusEnum.cancelled)
[docs]defquery_access_summary(self,*,group_by:str="day",before:Optional[Union[datetime,str]]=None,after:Optional[Union[datetime,str]]=None,)->AccessLogSummary:"""Obtains summaries of access data This aggregate data is created on the server, so you don't need to download all the log entries and do it yourself. Parameters ---------- group_by How to group the data. Valid options are "user", "hour", "day", "country", "subdivision" before Query for log entries with a timestamp before a specific time after Query for log entries with a timestamp after a specific time """url_params=AccessLogSummaryFilters(group_by=group_by,before=before,after=after)entries=self.make_request("get","api/v1/access_logs/summary",Dict[str,List[AccessLogSummaryEntry]],url_params=url_params)returnAccessLogSummary(entries=entries)
############################################################### User & group management##############################################################
[docs]deflist_groups(self)->List[GroupInfo]:""" List all user groups on the server """returnself.make_request("get","api/v1/groups",List[GroupInfo])
[docs]defget_group(self,groupname_or_id:Union[int,str])->GroupInfo:""" Get information about a group on the server """ifisinstance(groupname_or_id,str):is_valid_groupname(groupname_or_id)returnself.make_request("get",f"api/v1/groups/{groupname_or_id}",GroupInfo)
[docs]defadd_group(self,group_info:GroupInfo)->None:""" Adds a group with permissions to the server If not successful, an exception is raised. """ifgroup_info.idisnotNone:raiseRuntimeError("Cannot add group when group_info contains an id")returnself.make_request("post","api/v1/groups",None,body=group_info)
[docs]defdelete_group(self,groupname_or_id:Union[int,str]):""" Deletes a group on the server Deleted groups will be removed from all users groups list """ifisinstance(groupname_or_id,str):is_valid_groupname(groupname_or_id)returnself.make_request("delete",f"api/v1/groups/{groupname_or_id}",None)
[docs]deflist_users(self)->List[UserInfo]:""" List all user roles on the server """returnself.make_request("get","api/v1/users",List[UserInfo])
[docs]defget_user(self,username_or_id:Optional[Union[int,str]]=None)->UserInfo:""" Get information about a user on the server If the username is not supplied, then info about the currently logged-in user is obtained. Parameters ---------- username_or_id The username or ID to get info about Returns ------- : Information about the user """ifusername_or_idisNone:username_or_id=self.usernameifisinstance(username_or_id,str):is_valid_username(username_or_id)ifusername_or_idisNone:raiseRuntimeError("Cannot get user - not logged in?")returnself.make_request("get",f"api/v1/users/{username_or_id}",UserInfo)
[docs]defadd_user(self,user_info:UserInfo,password:Optional[str]=None)->str:""" Adds a user to the server Parameters ---------- user_info Info about the user to add password The user's password. If None, then one will be generated Returns ------- : The password of the user (either the same as the supplied password, or the server-generated one) """ifpasswordisnotNone:is_valid_password(password)ifuser_info.idisnotNone:raiseRuntimeError("Cannot add user when user_info contains an id")returnself.make_request("post","api/v1/users",str,body=(user_info,password))
[docs]defmodify_user(self,user_info:UserInfo)->UserInfo:""" Modifies a user on the server The user is determined by the id field of the input UserInfo, although the id and username are checked for consistency. Depending on the current user's permissions, some fields may not be updatable. Parameters ---------- user_info Updated information for a user Returns ------- : The updated user information as it appears on the server """returnself.make_request("patch",f"api/v1/users",UserInfo,body=user_info)
[docs]defchange_user_password(self,username_or_id:Optional[Union[int,str]]=None,new_password:Optional[str]=None)->str:""" Change a users password If the username is not specified, then the current logged-in user is used. If the password is not specified, then one is automatically generated by the server. Parameters ---------- username_or_id The name or ID of the user whose password to change. If None, then use the currently logged-in user new_password Password to change to. If None, let the server generate one. Returns ------- : The new password (either the same as the supplied one, or the server generated one """ifusername_or_idisNone:username_or_id=self.usernameifusername_or_idisNone:raiseRuntimeError("Cannot change user - not logged in?")ifnotisinstance(username_or_id,int):is_valid_username(username_or_id)ifnew_passwordisnotNone:is_valid_password(new_password)returnself.make_request("put",f"api/v1/users/{username_or_id}/password",str,body_model=Optional[str],body=new_password)
[docs]defdelete_user(self,username_or_id:Union[int,str])->None:""" Delete a user from the server """ifnotisinstance(username_or_id,int):is_valid_username(username_or_id)returnself.make_request("delete",f"api/v1/users/{username_or_id}",None)