Source code for eqc_direct.eqc_client
- """
- :class:`.EqcClient` contains all RPC calls to process, get system status, 
- and fetch results.
- """
- import logging
- import time
- import os
- import warnings
- from typing import TypedDict, List, Optional
- import grpc
- from grpc._channel import _InactiveRpcError
- import numpy as np
- from . import eqc_pb2, eqc_pb2_grpc
- from .utils import (
-     SysStatus,
-     message_to_dict,
- )
- [docs]
- class InactiveRpcError(Exception):
-     """Custom exception wrapper around grpc._channel._InactiveRpcError."""
- [docs]
- class EqcResult(TypedDict):
-     """
-     EQC results object. Will not contain a ground state or spins if err_code not 0.
-     :param err_code: the error code for a given job. Full list of :code:`err_code` 
-         values can be found :class:`eqc_direct.utils.JobCodes`
-     :param err_desc: the error description for a given job submission. Full list of 
-         :code:`err_desc` values can be found  in :class:`eqc_direct.utils.JobCodes`
-     :param runtime: solving time in seconds
-     :param energy: energy for best solution found
-     :param solution: vector of floats representing the lowest energy solution
-     :note:
-        * Eqc1 only support ising formulation where possible solution values are {-1, 1}
-        * all other formulations have length n solution vector of floats \
-          that sum to the device constraint (Eqc2 and Eqc3)
-     """
-     err_code: int
-     err_desc: str
-     runtime: float
-     energy: Optional[float]
-     solution: Optional[List[float]]
- [docs]
- class HealthCheckResponse(TypedDict):
-     """
-     Health check response object structure. Unless :code:`debug` is specified
-     at submission only returns pass/fail for each test.
-     :param debug: whether health check was run in debug mode
-     :param err_code: if non-zero indicates an error in health check
-     :param err_desc: describes errors that occurred during health check run
-     :param entropy_pass: pass/fail for entropy test
-     :param stability_pass: pass/fail for stability test
-     :param extinction_ratio_pass: pass/fail for extinction ratio tests
-     :param small_problem_pass: pass/fail for small problem ground state test
-     :param entropy_data: test data for entropy tests only returned if debug=True
-     :param stability_data: test data for stability tests only returned if debug=True
-     :param extinction_ratio_data: test data for extinction ratio tests only
-         returned if debug=True
-     :param small_problem_result: Eqc results object for small problem test if debug=True
-     """
-     debug: bool
-     err_code: int
-     err_desc: str
-     entropy_pass: Optional[bool]
-     stability_pass: Optional[bool]
-     extinction_ratio_pass: Optional[bool]
-     small_problem_pass: Optional[bool]
-     entropy_data: Optional[List[float]]
-     stability_data: Optional[List[float]]
-     extinction_ratio_data: Optional[List[float]]
-     small_problem_result: Optional[EqcResult]
- [docs]
- class EqcClient:
-     """
-     Provides calls to process jobs using EQC RPC server
-     :param ip_addr: The IP address of the RPC server
-     :param port: The port that the RPC server is running on
-     :param max_data_size: the max send and recieve message length for RPC server
-     .. note::
-        :code:`lock_id` is used by a variety of class functions.
-        It is set to an empty string by default since default for device server
-        :code:`lock_id` is also an empty string. This allows for single user
-        processing without having to acquire a device lock.
-     .. All GRPC calls follow a specific pattern:
-     ..   1. Fill in data to be sent in message stub
-     ..   2. Send data using stub service method
-     ..   3. Parse response
-     """
-     def __init__(
-         self,
-         ip_addr: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"),
-         port: str = os.getenv("DEVICE_PORT", "50051"),
-         max_data_size: int = 512 * 1024 * 1024,
-     ):
-         self._ip_addr = ip_addr
-         self._max_data_size = max_data_size
-         self._ip_add_port = ip_addr + ":" + port
-         self._channel_opt = [
-             ("grpc.max_send_message_length", max_data_size),
-             ("grpc.max_receive_message_length", max_data_size),
-         ]
-         self.channel = grpc.insecure_channel(
-             self._ip_add_port,
-             options=self._channel_opt,
-         )
-         self.eqc_stub = eqc_pb2_grpc.EqcServiceStub(self.channel)
- [docs]
-     def submit_job(
-         self,
-         problem_data: np.ndarray,
-         lock_id: str = "",
-         sum_constraint: float = 1,
-         relaxation_schedule: int=2,
-         continuous_soln: bool=True,
-     ) -> dict:
-         """
-         Submits data to be processed by EQC device
-         :param problem_data: an array of problem data to be optimized
-         :param lock_id: a UUID to allow for multi-user processing
-         :param sum_constraint: a normalization constraint that is applied to the
-                problem space that is used to calculate :code:`ground_state` energy.
-                Value must be greater than or equal to 1.
-         :param relaxation_schedule: four different schedules represented 
-             in integer parameter. Higher values reduce the variation in 
-             the analog spin values and therefore, lead to better ground state 
-             for input problem. Accepts range of values in set [1,4].
-         :param continuous_soln: whether solutions should be returned as integer 
-             or continuous values. In order to obtain integer solutions a 
-             distillation method is applied to the continuous solutions to map 
-             them to integer values.
-         :return: a member of :class:`eqc_direct.utils.JobCodes` as a dict
-            with the following keys:
-            - **err_code**: `int`- job submission error code
-            - **err_desc**: `str`- error code description for submission
-         """
-         if problem_data.dtype==np.float64:
-             warn_dtype_msg = "Max precision for EQC device is float32 input type was float64. Input matrix will be rounded"
-             logging.warning(warn_dtype_msg)
-             warnings.warn(warn_dtype_msg, Warning)
-         
-         prob_data = problem_data.flatten(order="F")
-         
-         try:
-             dimx, _ = problem_data.shape
-         except ValueError as err:
-             err_msg = "Input data must be two dimensions"
-             logging.error(err_msg, exc_info=True)
-             raise ValueError(err_msg) from err
-         job_input = eqc_pb2.JobInput(
-             nvars=dimx,
-             sum_constraint=sum_constraint,
-             relaxation_schedule = relaxation_schedule,
-             prob_data=prob_data,
-             continuous_soln = continuous_soln,
-             lock_id=lock_id,
-         )
-         try:
-             job_results = self.eqc_stub.SubmitJob(job_input)
-         except _InactiveRpcError as exc:
-             
-             raise InactiveRpcError(
-                 "EQC submit_job failed due to grpc._channel._InactiveRpcError."
-             ) from exc
-         return message_to_dict(job_results)
- [docs]
-     def fetch_result(self, lock_id: str = "") -> EqcResult:
-         """
-         Request last EQC job results. Returns results from the most recent
-         run on the device.
-         :param lock_id: a valid :code:`lock_id` that matches current device
-             :code:`lock_id`
-         :return: an :class:`.EqcResult` object
-         """
-         fetch_input = eqc_pb2.LockMessage(lock_id=lock_id)
-         try:
-             eqc_results = self.eqc_stub.FetchResults(fetch_input)
-         except _InactiveRpcError as exc:
-             
-             raise InactiveRpcError(
-                 "EQC fetch_results failed due to grpc._channel._InactiveRpcError."
-             ) from exc
-         result = message_to_dict(eqc_results)
-         
-         
-         result["solution"] = [np.float32(val) for val in result["solution"]]
-         result["energy"] = np.float32(result["energy"])
-         return result
- [docs]
-     def system_status(self) -> dict:
-         """
-         Client call to obtain EQC system status
-         :returns: a member of :class:`eqc_direct.utils.SysStatus` as a dict:
-             - **status_code**: `int`- current system status code
-             - **status_desc**: `str`- description of current system status
-         """
-         try:
-             sys_resp = self.eqc_stub.SystemStatus(eqc_pb2.Empty())
-         except _InactiveRpcError as exc:
-             raise InactiveRpcError(
-                 "EQC system_status failed due to grpc._channel._InactiveRpcError."
-             ) from exc
-         return message_to_dict(sys_resp)
- [docs]
-     def acquire_lock(self) -> dict:
-         """
-         Makes a single attempt to acquire exclusive lock on hardware execution.
-         Locking can be used to ensure orderly processing in multi-user environments.
-         Lock can only be acquired when no other user has acquired the lock or when
-         the system has been idle for 60 seconds while another user has the lock.
-         This idle timeout prevents one user from blocking other users from using
-         the machine even if they are not active.
-         :return:
-            a member of :class:`eqc_direct.utils.LockManageStatus` as a dict along
-            with an additional key :code:`lock_id`:
-            - **lock_id**: `str`- if acquired the current device `lock_id`
-              else empty string
-            - **status_code**: `int`- status code for lock id acquisition
-            - **status_desc**: `str`- a description for the associated status code
-         """
-         try:
-             acquire_lock_resp = self.eqc_stub.AcquireLock(eqc_pb2.Empty())
-         except _InactiveRpcError as exc:
-             raise InactiveRpcError(
-                 "EQC acquire_lock failed due to grpc._channel._InactiveRpcError."
-             ) from exc
-         return message_to_dict(acquire_lock_resp)
- [docs]
-     def release_lock(self, lock_id: str = "") -> dict:
-         """
-         Releases exclusive lock for running health check or submitting job
-         :param lock_id: a UUID with currently acquired exclusive device lock
-         :return: a member of :class:`eqc_direct.utils.LockManageStatus` as a dict:
-            - **status_code**: `int`- status code for lock id acquisition
-            - **status_desc**: `str`- a description for the associated status code
-         """
-         release_input = eqc_pb2.LockMessage(lock_id=lock_id)
-         try:
-             release_lock_resp = self.eqc_stub.ReleaseLock(release_input)
-         except _InactiveRpcError as exc:
-             raise InactiveRpcError(
-                 "EQC release_lock failed due to grpc._channel._InactiveRpcError."
-             ) from exc
-         return message_to_dict(release_lock_resp)
- [docs]
-     def check_lock(self, lock_id: str = "") -> dict:
-         """
-         Checks if submitted :code:`lock_id` has execution lock on the device
-         :param lock_id: a UUID which will be checked to determine if has exclusive
-             device execution lock
-         :return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict:
-            - **status_code**: `int`- status code for lock check
-            - **status_desc**: `str`- a description for the associated status code
-         """
-         check_input = eqc_pb2.LockMessage(lock_id=lock_id)
-         check_output = self.eqc_stub.CheckLock(check_input)
-         return message_to_dict(check_output)
- [docs]
-     def start_health_check(
-         self,
-         lock_id: str = "",
-         entropy: bool = False,
-         stability: bool = False,
-         extinction_ratio: bool = False,
-         small_problem: bool = False,
-         debug: bool = False,
-     ) -> dict:
-         """
-         Runs health checks for an Eqc device must have lock to run.
-         :param lock_id: the execution lock_id as acquired by acquire_lock
-         :param entropy: request run of entropy test on Eqc device (more info)
-         :param stability: request run of stability test on Eqc device (more info)
-         :param extinction_ratio: request test of extinction ratio on Eqc device
-                (more info)
-         :param small_problem: run small problem and test valid result (more info)
-         :param debug: return verbose output from health check
-         :return:
-            one of the members of :class:`eqc_direct.utils.JobCodes`
-            as a dict with the following keys:
-            - **err_code**: `int`- non-zero value indicates error
-            - **err_desc**: `str`- a description for associated error code
-         """
-         health_input = eqc_pb2.HealthInput(
-             entropy=entropy,
-             stability=stability,
-             extinction_ratio=extinction_ratio,
-             small_problem=small_problem,
-             lock_id=lock_id,
-             debug=debug,
-         )
-         health_resp = self.eqc_stub.HealthCheck(health_input)
-         return message_to_dict(health_resp)
- [docs]
-     def fetch_health_check_result(self, lock_id="") -> HealthCheckResponse:
-         """
-         Fetch health check data from previous run of health check tests
-         :param lock_id: requires a lock_id that was acquired by
-         :return: dict object :class:`.HealthCheckResponse`
-         .. note::
-            This result structure hasn't been finalized.
-            When C++ code is written will know exact format of augmented data.
-         """
-         health_result_input = eqc_pb2.LockMessage(lock_id=lock_id)
-         try:
-             health_result_resp = self.eqc_stub.FetchHealth(health_result_input)
-         except _InactiveRpcError as exc:
-             raise InactiveRpcError(
-                 "EQC fetch_health_check_result failed due to "
-                 "grpc._channel._InactiveRpcError."
-             ) from exc
-         health_dict = message_to_dict(health_result_resp)
-         if health_dict["debug"]:
-             
-             health_dict["small_problem_result"] = message_to_dict(
-                 health_dict["small_problem_result"]
-             )
-             return health_dict
-         
-         drop_keys = [
-             "entropy_data",
-             "stability_data",
-             "extinction_ratio_data",
-             "small_problem_result",
-         ]
-         return {
-             key: value for key, value in health_dict.items() if key not in drop_keys
-         }
- [docs]
-     def stop_running_process(self, lock_id: str = "") -> dict:
-         """
-         Stops a running process either a health check or a Eqc job.
-         Process locks will release automatically based on a timeout
-         which is maintained in the server code if they are
-         not released using this.
-         :param lock_id: requires a lock_id that was acquired by
-         :return:
-            a member of :class:`eqc_direct.utils.SysStatus`
-            as dict with following keys:
-            - **status_code**: `int`- the system code after stopping
-            - **status_desc**: `str`- the associated system status description
-         """
-         stop_input = eqc_pb2.LockMessage(lock_id=lock_id)
-         try:
-             stop_resp = self.eqc_stub.StopRunning(stop_input)
-         except _InactiveRpcError as exc:
-             raise InactiveRpcError(
-                 "EQC fetch_health_check_result failed due to "
-                 "grpc._channel._InactiveRpcError."
-             ) from exc
-         return message_to_dict(stop_resp)
- [docs]
-     def run_health_check(
-         self,
-         lock_id: str = "",
-         entropy: bool = False,
-         stability: bool = False,
-         extinction_ratio: bool = False,
-         small_problem: bool = False,
-         debug: bool = False,
-     ) -> HealthCheckResponse:
-         """
-         Runs health checks for an Eqc device. Requires a validate lock on the device.
-         :param lock_id: the execution lock_id as acquired by acquire_lock
-         :param entropy: request run of entropy test on Eqc device (more info)
-         :param stability: request run of stability test on Eqc device (more info)
-         :param extinction_ratio: request test of extinction ratio on Eqc device
-                (more info)
-         :param small_problem: run small problem and test valid result (more info)
-         :param debug: return verbose output from health check
-         :param lock_id: requires a lock_id that was acquired by
-         :return: dict object :class:`.HealthCheckResponse`
-         .. note::
-            This result structure hasn't been finalized.
-            When C++ code is written will know exact format of augmented data.
-         ..  What happens when all health checks turned off just return blank message?
-         """
-         health_start_resp = self.start_health_check(
-             lock_id=lock_id,
-             entropy=entropy,
-             stability=stability,
-             extinction_ratio=extinction_ratio,
-             small_problem=small_problem,
-             debug=debug,
-         )
-         if health_start_resp["err_code"] != 0:
-             err_msg = f"Failed to start health check with response: {health_start_resp}"
-             logging.error(err_msg, exc_info=True)
-             raise RuntimeError(err_msg)
-         sys_code = self.system_status()["sys_code"]
-         while sys_code != SysStatus.IDLE["sys_code"]:
-             sys_code = self.system_status()["sys_code"]
-             
-             if sys_code >= 3:
-                 raise RuntimeError(f"System unavailable sys_code: {sys_code}")
-             
-             if sys_code != SysStatus.IDLE["sys_code"]:
-                 time.sleep(1)
-         
-         health_result = self.fetch_health_check_result(lock_id=lock_id)
-         lock_status = self.release_lock(lock_id=lock_id)
-         if not lock_status["lock_released"]:
-             err_msg = f"Failed to release lock with message: {lock_status['message']}"
-             logging.error(err_msg, exc_info=True)
-             raise RuntimeError(err_msg)
-         return health_result
- [docs]
-     def wait_for_lock(self) -> tuple:
-         """
-         Waits for lock indefinitely calling :func:`acquire_lock`
-         :return: a tuple of the following items:
-            - **lock_id**: `str`- exclusive lock for device execution with a timeout
-            - **start_queue_ts**: `int`- time in ns on which lock was acquired is an int
-            - **end_queue_ts**: `int`- time in ns on which queue for
-              lock ended is an int.
-         """
-         lock_id = ""
-         start_queue_ts = time.time_ns()
-         while lock_id == "":
-             sys_code = self.system_status()["status_code"]
-             
-             if sys_code >= 3:
-                 raise RuntimeError(f"System unavailable status_code: {sys_code}")
-             lock_id = self.acquire_lock()["lock_id"]
-             
-             if lock_id == "":
-                 time.sleep(1)
-         end_queue_ts = time.time_ns()
-         return lock_id, start_queue_ts, end_queue_ts
- [docs]
-     def process_job(
-         self,
-         hamiltonian: np.ndarray,
-         sum_constraint: float = 1,
-         relaxation_schedule: int = 4,
-         continuous_soln: bool=True,
-         lock_id: str = "",
-     ) -> dict:
-         """
-         Processes a job by:
-            1. submitting job
-            2. checks for status, until completes or fails
-            3. returns results
-         :param hamiltonian: np.ndarray
-             an (n,n+1) array representing the problem hamiltonian
-         :param sum_constraint: a normalization constraint that is applied to the
-                problem space that is used to calculate :code:`ground_state` energy.
-                Value must be greater than or equal to 1.
-         :param relaxation_schedule: four different schedules represented in 
-             integer parameter. Higher values reduce the variation in the 
-             analog spin values and therefore, lead to better ground state 
-             for input problem. Accepts range of values in set [1,4].
-         :param continuous_soln: whether solutions should be returned as integer or 
-             continuous values.
-         :param lock_id: a str with exclusive lock for device execution with a timeout
-         :return: dict of results and timings with the following keys:
-            - results: :class:`.EqcResult` dict
-            - start_job_ts: time in ns marking start of job_submission
-            - end_job_ts: time in ns marking end of job submission complete
-         """
-         start_job = time.time_ns()
-         submit_job_resp = self.submit_job(
-             problem_data=hamiltonian,
-             sum_constraint=sum_constraint,
-             relaxation_schedule = relaxation_schedule,
-             continuous_soln = continuous_soln,
-             lock_id=lock_id,
-         )
-         if submit_job_resp["err_code"] != 0:
-             err_msg = f"Job submission failed with response: {submit_job_resp}"
-             logging.error(err_msg, exc_info=True)
-             raise RuntimeError(err_msg)
-         sys_code = self.system_status()["status_code"]
-         while sys_code != SysStatus.IDLE["status_code"]:
-             sys_code = self.system_status()["status_code"]
-             
-             if sys_code >= 3:
-                 err_msg = f"System unavailable status_code: {sys_code}"
-                 logging.error(err_msg, exc_info=True)
-                 raise RuntimeError(err_msg)
-             
-             if sys_code != SysStatus.IDLE["status_code"]:
-                 time.sleep(1)
-         end_job = time.time_ns()
-         
-         job_result = self.fetch_result(lock_id=lock_id)
-         if job_result["err_code"] != 0:
-             raise RuntimeError(
-                 f"Job execution error\n"
-                 f"err_code: {job_result['err_code']}\n"
-                 f"err_desc: {job_result['err_desc']}"
-             )
-         job_result["start_job_ts"] = start_job
-         job_result["end_job_ts"] = end_job
-         return job_result