Source code for qcportal.internal_jobs.models

import time
from datetime import datetime
from enum import Enum
from typing import Optional, Dict, Any, List, Union

from dateutil.parser import parse as date_parser

try:
    from pydantic.v1 import BaseModel, Extra, validator, PrivateAttr
except ImportError:
    from pydantic import BaseModel, Extra, validator, PrivateAttr

from qcportal.base_models import QueryProjModelBase
from ..base_models import QueryIteratorBase
from tqdm import tqdm


[docs] class InternalJobStatusEnum(str, Enum): """ The state of a record object. The states which are available are a finite set. """ complete = "complete" waiting = "waiting" running = "running" error = "error" cancelled = "cancelled" @classmethod def _missing_(cls, name): """Attempts to find the correct status in a case-insensitive way If a string being converted to an InternalJobStatusEnum is missing, then this function will convert the case and try to find the appropriate status. """ name = name.lower() # Search this way rather than doing 'in' since we are comparing # a string to an enum for status in cls: if name == status: return status
[docs] class InternalJob(BaseModel): class Config: extra = Extra.forbid id: int name: str status: InternalJobStatusEnum added_date: datetime scheduled_date: datetime started_date: Optional[datetime] last_updated: Optional[datetime] ended_date: Optional[datetime] runner_hostname: Optional[str] runner_uuid: Optional[str] repeat_delay: Optional[int] serial_group: Optional[str] progress: int progress_description: Optional[str] = None function: str kwargs: Dict[str, Any] after_function: Optional[str] after_function_kwargs: Optional[Dict[str, Any]] result: Any user: Optional[str] _client: Any = PrivateAttr(None) _refresh_url: Optional[str] = PrivateAttr(None) def __init__(self, client=None, refresh_url=None, **kwargs): BaseModel.__init__(self, **kwargs) self._client = client self._refresh_url = refresh_url
[docs] def refresh(self): """ Updates the data of this object with information from the server """ if self._client is None: raise RuntimeError("Client is not set") if self._refresh_url is None: server_data = self._client.get_internal_job(self.id) else: server_data = self._client.make_request("get", self._refresh_url, InternalJob) for k, v in server_data: setattr(self, k, v)
[docs] def watch(self, interval: float = 2.0, timeout: Optional[float] = None): """ Watch an internal job for completion Will poll every `interval` seconds until the job is finished (complete, error, cancelled, etc). Parameters ---------- interval Time (in seconds) between polls on the server timeout Max amount of time (in seconds) to wait. If None, will wait forever. Returns ------- """ if self.status not in [InternalJobStatusEnum.waiting, InternalJobStatusEnum.running]: return begin_time = time.time() end_time = None if timeout is not None: end_time = begin_time + timeout pbar = tqdm(initial=self.progress, total=100, desc=self.progress_description) while True: t = time.time() self.refresh() pbar.update(self.progress - pbar.n) pbar.set_description(self.progress_description) if end_time is not None and t >= end_time: raise TimeoutError("Timed out waiting for job to complete") if self.status == InternalJobStatusEnum.error: print("Internal job resulted in an error:") print(self.result) break elif self.status not in [InternalJobStatusEnum.waiting, InternalJobStatusEnum.running]: print(f"Internal job final status: {self.status.value}") break curtime = time.time() if end_time is not None: # sleep the normal interval, or up to the timeout time time.sleep(min(interval, end_time - curtime + 0.1)) else: time.sleep(interval)
[docs] class InternalJobQueryFilters(QueryProjModelBase): job_id: Optional[List[int]] = None name: Optional[List[str]] = None user: Optional[List[Union[int, str]]] = None runner_hostname: Optional[List[str]] = None status: Optional[List[InternalJobStatusEnum]] = None last_updated_before: Optional[datetime] = None last_updated_after: Optional[datetime] = None added_before: Optional[datetime] = None added_after: Optional[datetime] = None scheduled_before: Optional[datetime] = None scheduled_after: Optional[datetime] = None
[docs] @validator( "last_updated_before", "last_updated_after", "added_before", "added_after", "scheduled_before", "scheduled_after", pre=True, ) def parse_dates(cls, v): if isinstance(v, str): return date_parser(v) return v
[docs] class InternalJobQueryIterator(QueryIteratorBase[InternalJob]): """ Iterator for internal job queries This iterator transparently handles batching and pagination over the results of an internal job query. """ def __init__(self, client, query_filters: InternalJobQueryFilters): """ Construct an iterator Parameters ---------- client QCPortal client object used to contact/retrieve data from the server query_filters The actual query information to send to the server """ batch_limit = client.api_limits["get_internal_jobs"] // 4 QueryIteratorBase.__init__(self, client, query_filters, batch_limit) def _request(self) -> List[InternalJob]: ij_dicts = self._client.make_request( "post", "api/v1/internal_jobs/query", List[Dict[str, Any]], body=self._query_filters, ) return [InternalJob(client=self, **ij_dict) for ij_dict in ij_dicts]