Source code for eqc_direct.client
- """
- :class:`.EqcClient` contains all RPC calls to process, get system status, 
- and fetch results.
- """
- import logging
- import time
- import os
- import warnings
- from typing import TypedDict, List, Optional, Union
- import grpc
- from grpc._channel import _InactiveRpcError
- import numpy as np
- from . import eqc_pb2, eqc_pb2_grpc
- from .utils import (
-     SysStatus,
-     message_to_dict,
-     PREC_MIN_RECOMMENDED_LEVELS,
-     get_decimal_places
- )
- [docs]
- class InactiveRpcError(Exception):
-     """Custom exception wrapper around grpc._channel._InactiveRpcError."""
- [docs]
- class EqcResult(TypedDict):
-     """
-     EQC results object. Will not contain a energy or solution if err_code is not 0.
-     :param err_code: the error code for a given job. Full list of :code:`err_code` 
-         values can be found :class:`eqc_direct.utils.JobCodes`
-     :param err_desc: the error description for a given job submission. Full list of 
-         :code:`err_desc` values can be found  in :class:`eqc_direct.utils.JobCodes`
-     :param preprocessing_time: data validation and time to re-format input data for 
-         running on the device in seconds
-     :param runtime: solving time in seconds for Dirac hardware
-     :param energy: energy for best solution found (float32 precision)
-     :param solution: vector representing the lowest energy solution (float32 precision)
-     :param distilled_runtime: runtime for distillation of solutions in seconds
-     :param distilled_energy: energy for distilled solution for input polynomial 
-         (float32 precision)
-     :param distilled_solution: a vector representing the solution after 
-         the distillation procedure is applied to the original solution 
-         derived from the hardware. (float32 precision)
-     :note:
-        * solutions are length n vector of floats \
-          that sum to the device constraint
-     .. Must use native python types to ensure can be dumped to json
-     """
-     err_code: int
-     err_desc: str
-     preprocessing_time: float
-     runtime: float
-     energy: Optional[float]
-     solution: Optional[List[float]]
-     distilled_runtime: Optional[float]
-     distilled_energy: Optional[float]
-     distilled_solution: Optional[List[float]]
- [docs]
- class EqcClient:
-     """
-     Provides calls to process jobs using EQC RPC server
-     :param ip_address: The IP address of the RPC server
-     :param port: The port that the RPC server is running on
-     :param max_data_size: the max send and recieve message length for RPC server
-     .. note::
-        :code:`lock_id` is used by a variety of class functions.
-        It is set to an empty string by default since default for device server
-        :code:`lock_id` is also an empty string. This allows for single user
-        processing without having to acquire a device lock.
-     .. All GRPC calls follow a specific pattern:
-     ..   1. Fill in data to be sent in message stub
-     ..   2. Send data using stub service method
-     ..   3. Parse response
-     """
-     def __init__(
-         self,
-         ip_address: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"),
-         port: str = os.getenv("DEVICE_PORT", "50051"),
-         max_data_size: int = 512 * 1024 * 1024,
-     ):
-         self._ip_address = ip_address
-         self._max_data_size = max_data_size
-         self._ip_add_port = ip_address + ":" + port
-         self._channel_opt = [
-             ("grpc.max_send_message_length", max_data_size),
-             ("grpc.max_receive_message_length", max_data_size),
-         ]
-         self.channel = grpc.insecure_channel(
-             self._ip_add_port,
-             options=self._channel_opt,
-         )
-         self.eqc_stub = eqc_pb2_grpc.EqcServiceStub(self.channel)
- [docs]
-     def submit_job(  
-         self,
-         poly_coefficients: np.ndarray,
-         poly_indices: np.ndarray,
-         num_variables: Optional[int] = None,
-         lock_id: str = "",
-         sum_constraint: Union[int, float] = 10000,
-         relaxation_schedule: int = 2,
-         solution_precision: Optional[float] = None,
-     ) -> dict:
-         """
-         Submits data to be processed by EQC device
-         :param poly_coefficients:
-             coefficient values for the polynomial to be minimized
-         :param poly_indices:
-             list of lists containing polynomial indices associated with
-             coefficient values for problem to be optimized.
-         :param num_variables: the number of total variables for the submitted
-             polynomial must not be less than max index in :code:`poly_indices`.
-             If no value is provided then will be set to max value in
-             :code:`poly_indices`.
-         :param lock_id: a UUID to allow for multi-user processing
-         :param sum_constraint: a normalization constraint that is applied to the
-             problem space that is used to calculate :code:`energy`. This 
-             parameter will be rounded if exceeds float32 precision 
-             (e.g. 7-decimal places). Value must be between 1 and 10000.
-         :param relaxation_schedule: four different schedules represented
-             in integer parameter. Higher values reduce the variation in
-             the analog spin values and therefore, are more probable to lead to
-             improved objective function energy for input problem.
-             Accepts range of values in set {1, 2, 3, 4}.
-         :param solution_precision: the level of precision to apply to the solutions.
-             This parameter will be rounded if exceeds float32 precision 
-             (e.g. 7-decimal places). If specified a distillation method is 
-             applied to the continuous solutions to map them to the submitted 
-             :code:`solution_precision`. Input :code:`solution_precision` must
-             satisfy :code:`solution_precision` greater than or equal to 
-             :code:`sum_constraint`/10000 in order to be valid.
-             Also :code:`sum_constraint` must be divisible by :code:`solution_precision`. 
-             If :code:`solution_precision` is not specified no distillation will be 
-             applied to the solution derived by the device.
-             
-         :return: a member of :class:`eqc_direct.utils.JobCodes` as a dict
-            with the following keys:
-            - **err_code**: `int`- job submission error code
-            - **err_desc**: `str`- error code description for submission
-         """
-         
-         
-         if solution_precision is None:
-             solution_precision = 0
-         
-         poly_coefficients = np.array(poly_coefficients)
-         poly_indices = np.array(poly_indices)
-         coefficient_dtype = poly_coefficients.dtype
-         if not (
-             np.issubdtype(coefficient_dtype, np.integer)
-             or (
-                 np.issubdtype(coefficient_dtype, np.floating)
-                 and np.finfo(coefficient_dtype).bits <= 32
-             )
-         ):
-             warn_dtype_msg = (
-                 f"Max precision for EQC device is float32 input type "
-                 f"was dtype {np.dtype(coefficient_dtype).name}."
-                 f" Input matrix will be rounded"
-             )
-             logging.warning(warn_dtype_msg)
-             warnings.warn(warn_dtype_msg, Warning)
-         if get_decimal_places(solution_precision)>7:
-             soln_prec_warn = (
-                 f"`solution_precision`precision is greater than 7 "
-                 f"decimal places. Will be modified on submission to "
-                 f"device to float32 precision"
-             )
-             logging.warning(soln_prec_warn)
-             warnings.warn(soln_prec_warn, Warning)
-         if get_decimal_places(sum_constraint)>7:
-             sum_constraint_warn = (
-                 f"`sum_constraint` precision is greater than 7 decimal "
-                 f"places. Will be modified on submission to device "
-                 f"to float32"
-             )
-             logging.warning(sum_constraint_warn)
-             warnings.warn(sum_constraint_warn, Warning)
-                 
-         try:
-             _, degree_poly = poly_indices.shape
-         except ValueError as err:
-             err_msg = "`poly_indices` array must be two dimensions"
-             logging.error(err_msg, exc_info=True)
-             raise ValueError(err_msg) from err
-         if not num_variables:
-             num_variables = np.max(poly_indices)
-         
-         poly_indices = poly_indices.flatten(order="c").tolist()
-         job_input = eqc_pb2.JobInput(
-             num_variables=num_variables,
-             degree=degree_poly,
-             poly_indices=poly_indices,
-             coef_values=poly_coefficients.tolist(),
-             sum_constraint=sum_constraint,
-             relaxation_schedule=relaxation_schedule,
-             soln_precision=solution_precision,
-             lock_id=lock_id,
-         )
-         try:
-             job_results = self.eqc_stub.SubmitJob(job_input)
-         except _InactiveRpcError as exc:
-             
-             raise InactiveRpcError(
-                 "EQC submit_job failed due to grpc._channel._InactiveRpcError."
-             ) from exc
-         return message_to_dict(job_results)
- [docs]
-     def fetch_result(self, lock_id: str = "") -> EqcResult:
-         """
-         Request last EQC job results. Returns results from the most recent
-         run on the device.
-         :param lock_id: a valid :code:`lock_id` that matches current device
-             :code:`lock_id`
-         :return: an :class:`.EqcResult` object
-         """
-         fetch_input = eqc_pb2.LockMessage(lock_id=lock_id)
-         try:
-             eqc_results = self.eqc_stub.FetchResults(fetch_input)
-         except _InactiveRpcError as exc:
-             
-             raise InactiveRpcError(
-                 "EQC fetch_results failed due to grpc._channel._InactiveRpcError."
-             ) from exc
-         result = message_to_dict(eqc_results)
-         
-         
-         
-         
-         
-         result["solution"] = [
-             float(f"{np.float32(val):.7f}") for val in result["solution"]
-         ]
-         result["distilled_solution"] = [
-             float(f"{np.float32(val):.7f}") for val in result["distilled_solution"]
-         ]
-         result["energy"] = float(f"{np.float32(result['energy']):.7f}")
-         result["distilled_energy"] = float(
-             f"{np.float32(result['distilled_energy']):.7f}"
-         )
-         return result
- [docs]
-     def system_status(self) -> dict:
-         """
-         Client call to obtain EQC system status
-         :returns: a member of :class:`eqc_direct.utils.SysStatus` as a dict:
-             - **status_code**: `int`- current system status code
-             - **status_desc**: `str`- description of current system status
-         """
-         try:
-             sys_resp = self.eqc_stub.SystemStatus(eqc_pb2.Empty())
-         except _InactiveRpcError as exc:
-             raise InactiveRpcError(
-                 "EQC system_status failed due to grpc._channel._InactiveRpcError."
-             ) from exc
-         return message_to_dict(sys_resp)
- [docs]
-     def acquire_lock(self) -> dict:
-         """
-         Makes a single attempt to acquire exclusive lock on hardware execution.
-         Locking can be used to ensure orderly processing in multi-user environments.
-         Lock can only be acquired when no other user has acquired the lock or when
-         the system has been idle for 60 seconds while another user has the lock.
-         This idle timeout prevents one user from blocking other users from using
-         the machine even if they are not active.
-         :return:
-            a member of :class:`eqc_direct.utils.LockManageStatus` as a dict along
-            with an additional key :code:`lock_id`:
-            - **lock_id**: `str`- if acquired the current device `lock_id`
-              else empty string
-            - **status_code**: `int`- status code for lock id acquisition
-            - **status_desc**: `str`- a description for the associated status code
-         """
-         try:
-             acquire_lock_resp = self.eqc_stub.AcquireLock(eqc_pb2.Empty())
-         except _InactiveRpcError as exc:
-             raise InactiveRpcError(
-                 "EQC acquire_lock failed due to grpc._channel._InactiveRpcError."
-             ) from exc
-         return {
-             "lock_id": acquire_lock_resp.lock_id,
-             "status_code": acquire_lock_resp.lock_status.status_code,
-             "status_desc": acquire_lock_resp.lock_status.status_desc,
-         }
- [docs]
-     def release_lock(self, lock_id: str = "") -> dict:
-         """
-         Releases exclusive lock for running health check or submitting job
-         :param lock_id: a UUID with currently acquired exclusive device lock
-         :return: a member of :class:`eqc_direct.utils.LockManageStatus` as a dict:
-            - **status_code**: `int`- status code for lock id acquisition
-            - **status_desc**: `str`- a description for the associated status code
-         """
-         release_input = eqc_pb2.LockMessage(lock_id=lock_id)
-         try:
-             release_lock_resp = self.eqc_stub.ReleaseLock(release_input)
-         except _InactiveRpcError as exc:
-             raise InactiveRpcError(
-                 "EQC release_lock failed due to grpc._channel._InactiveRpcError."
-             ) from exc
-         return message_to_dict(release_lock_resp)
- [docs]
-     def check_lock(self, lock_id: str = "") -> dict:
-         """
-         Checks if submitted :code:`lock_id` has execution lock on the device
-         :param lock_id: a UUID which will be checked to determine if has exclusive
-             device execution lock
-         :return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict:
-            - **status_code**: `int`- status code for lock check
-            - **status_desc**: `str`- a description for the associated status code
-         """
-         check_input = eqc_pb2.LockMessage(lock_id=lock_id)
-         check_output = self.eqc_stub.CheckLock(check_input)
-         return message_to_dict(check_output)
- [docs]
-     def stop_running_process(self, lock_id: str = "") -> dict:
-         """
-         Stops a running process either a health check or a Eqc job.
-         Process locks will release automatically based on a timeout
-         which is maintained in the server code if they are
-         not released using this.
-         :param lock_id: requires a lock_id that was acquired by
-         :return:
-            a member of :class:`eqc_direct.utils.SysStatus`
-            as dict with following keys:
-            - **status_code**: `int`- the system code after stopping
-            - **status_desc**: `str`- the associated system status description
-         """
-         stop_input = eqc_pb2.LockMessage(lock_id=lock_id)
-         try:
-             stop_resp = self.eqc_stub.StopRunning(stop_input)
-         except _InactiveRpcError as exc:
-             raise InactiveRpcError(
-                 "EQC stop_running_process failed due to "
-                 "grpc._channel._InactiveRpcError."
-             ) from exc
-         return message_to_dict(stop_resp)
- [docs]
-     def wait_for_lock(self) -> tuple:
-         """
-         Waits for lock indefinitely calling :func:`acquire_lock`
-         :return: a tuple of the following items:
-            - **lock_id**: `str`- exclusive lock for device execution with a timeout
-            - **start_queue_ts**: `int`- time in ns on which lock was acquired is an int
-            - **end_queue_ts**: `int`- time in ns on which queue for
-              lock ended is an int.
-         """
-         lock_id = ""
-         start_queue_ts = time.time_ns()
-         while lock_id == "":
-             sys_code = self.system_status()["status_code"]
-             
-             if sys_code >= 3:
-                 raise RuntimeError(f"System unavailable status_code: {sys_code}")
-             lock_id = self.acquire_lock()["lock_id"]
-             
-             if lock_id == "":
-                 time.sleep(1)
-         end_queue_ts = time.time_ns()
-         return lock_id, start_queue_ts, end_queue_ts
- [docs]
-     def system_version(self) -> dict:
-         """
-         Provides information regarding Dirac server
-         
-         :return: a dict with a single item:
-         
-             - **server_version**: `str` - the current gRPC server version
-         """
-         try:
-             sys_ver_resp = self.eqc_stub.ServerVersion(eqc_pb2.Empty())
-         except _InactiveRpcError as exc:
-             raise InactiveRpcError(
-                 "EQC system_version call failed due to inactive grpc channel"
-             ) from exc
-         return message_to_dict(sys_ver_resp)
- [docs]
-     def process_job(  
-         self,
-         poly_coefficients: np.ndarray,
-         poly_indices: np.ndarray,
-         num_variables: Optional[int] = None,
-         lock_id: str = "",
-         sum_constraint: Union[int, float] = 10000,
-         relaxation_schedule: int = 2,
-         solution_precision: Optional[float] = None,
-     ) -> dict:
-         """
-         Processes a job by:
-            1. Submitting job
-            2. Checks for status, until completes or fails
-            3. Returns results
-         :param poly_coefficients: coefficient values for the polynomial to be minimized
-         :param poly_indices:
-             list of lists containing polynomial indices associated with
-             coefficient values for problem to be optimized.
-         :param lock_id: a UUID to allow for multi-user processing
-         :param sum_constraint: a normalization constraint that is applied to the
-             problem space that is used to calculate :code:`energy`. This 
-             parameter will be rounded if exceeds float32 precision 
-             (e.g. 7-decimal places). Value must be between 1 and 10000.
-         :param relaxation_schedule: four different schedules represented
-             in integer parameter. Higher values reduce the variation in
-             the analog spin values and therefore, are more probable to lead to
-             improved objective function energy for input problem.
-             Accepts range of values in set {1, 2, 3, 4}.
-         :param solution_precision: the level of precision to apply to the solutions.
-             This parameter will be rounded if exceeds float32 precision 
-             (e.g. 7-decimal places). If specified a distillation method is 
-             applied to the continuous solutions to map them to the submitted 
-             :code:`solution_precision`. Input :code:`solution_precision` must
-             satisfy :code:`solution_precision` greater than or equal to 
-             :code:`sum_constraint`/10000 in order to be valid.
-             Also :code:`sum_constraint` must be divisible by :code:`solution_precision`. 
-             If :code:`solution_precision` is not specified no distillation will be 
-             applied to the solution derived by the device.
-         :return: dict of results and timings with the following keys:
-            - results: :class:`.EqcResult` dict
-            - start_job_ts: time in ns marking start of job_submission
-            - end_job_ts: time in ns marking end of job submission complete
-         """
-         
-         if not solution_precision:
-             solution_precision = 0
-         start_job = time.time_ns()
-         submit_job_resp = self.submit_job(
-             poly_coefficients=poly_coefficients,
-             poly_indices=poly_indices,
-             num_variables=num_variables,
-             lock_id=lock_id,
-             sum_constraint=sum_constraint,
-             relaxation_schedule=relaxation_schedule,
-             solution_precision=solution_precision,
-         )
-         logging.info("Job submitted")
-         if submit_job_resp["err_code"] != 0:
-             err_msg = f"Job submission failed with response: {submit_job_resp}"
-             logging.error(err_msg, exc_info=True)
-             raise RuntimeError(err_msg)
-         sys_code = self.system_status()["status_code"]
-         while sys_code != SysStatus.IDLE["status_code"]:
-             sys_code = self.system_status()["status_code"]
-             
-             if sys_code > 3:
-                 err_msg = f"System unavailable status_code: {sys_code}"
-                 logging.error(err_msg, exc_info=True)
-                 raise RuntimeError(err_msg)
-             
-             if sys_code != SysStatus.IDLE["status_code"]:
-                 time.sleep(1)
-         end_job = time.time_ns()
-         
-         logging.info("Fetching results")
-         job_result = self.fetch_result(lock_id=lock_id)
-         if job_result["err_code"] != 0:
-             raise RuntimeError(
-                 f"Job execution error\n"
-                 f"err_code: {job_result['err_code']}\n"
-                 f"err_desc: {job_result['err_desc']}"
-             )
-         job_result["start_job_ts"] = start_job
-         job_result["end_job_ts"] = end_job
-         return job_result