Source code for emucore_direct.client """ Client gRPC services for running on FPGA reservoir computer EmuCore developed by QCi. """ import  logging import  json import  time from  typing  import   List , TypedDict import  os import  grpc from  grpc._channel  import  _InactiveRpcError import  numpy  as  np from  .  import  emucore_pb2, emucore_pb2_grpc from  .utils  import  set_filter_coefficients, \      bytes_to_array, \      prep_input_data, \      message_to_dict grpc_service_config = json.dumps(      {          "methodConfig" : [              {                  "name" : [{ "service" :  "EmuCore.EmuCoreService" }],                  "retryPolicy" : {                      "maxAttempts" :  5 ,                       "initialBackoff" :  "0.2s" ,                      "maxBackoff" :  "10s" ,                      "backoffMultiplier" :  2.5 ,                      "retryableStatusCodes" : [ "UNAVAILABLE" ],                  },              }          ]      } ) MAX_INPUT_SIZE =  20 * 1024 * 1024 [docs] class   StatusMessage ( TypedDict ):      """      Structure of responses for configuration requests to EmuCore device.           :param status: the status of the request      :param message: a description for the recieved status      """      status:  int      message:  str [docs] class   InactiveRpcError ( Exception ):      """Custom exception wrapper around grpc._channel._InactiveRpcError.""" [docs] class   EmuCoreClient :      """      Provides services for accessing EmuCore server      :param ip_addr: the IP address of the gRPC server      :param port:          The port that the RPC server is running on      :param max_data_size: int          The max send and recieve message length for RPC server      .. note::         :code:`lock_id` is used by a variety of class functions.         It is set to an empty string by default since default for device server         :code:`lock_id` is also an empty string. This allows for single user         processing without having to acquire a device lock.      .. All GRPC calls follow a specific pattern:      ..   1. Fill in data to be sent in message stub      ..   2. Send data using stub service method      ..   3. Parse response      """      def   __init__ (          self,          ip_addr:  str  = os.getenv( "DEVICE_IP_ADDRESS" ,  "localhost" ),          port:  str  = os.getenv( "DEVICE_PORT" ,  "50051" ),          max_data_size:  int  = 512  *  1024  *  1024 ,      ):          self ._ip_add_port = ip_addr +  ":"  + port          self ._channel_opt = [              ( "grpc.max_send_message_length" , max_data_size),              ( "grpc.max_receive_message_length" , max_data_size),              ( "grpc.service_config" , grpc_service_config),          ]          self .channel = grpc.insecure_channel(              self ._ip_add_port, options= self ._channel_opt          )          self .stub = emucore_pb2_grpc.EmuCoreServiceStub( self .channel) [docs]      def   check_lock ( self, lock_id:  str  =  "" ) ->  dict :          """          Checks if submitted :code:`lock_id` has execution lock on the device          :param lock_id: a UUID which will be checked to determine if has exclusive              device execution lock          :return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict:             - **status_code**: `int`- status code for lock check             - **status_desc**: `str`- a description for the associated status code          """          check_input = emucore_pb2.lock_message(lock_id=lock_id)          check_output =  self .eqc_stub.check_lock(check_input)          return  message_to_dict(check_output) [docs]      def   reservoir_reset ( self,                          lock_id ) -> StatusMessage:          """          Resets a reservoir instance by clearing RAM on the server          :param lock_id: a lock_id which has an active reserve on the device                   :return:              dictionary with with values from members of             :class:`emucore_direct.types.StatusResponses`          """          reset_message = emucore_pb2.lock_message(lock_id=lock_id)          return  message_to_dict( self .stub.reservoir_reset(reset_message)) [docs]      def   system_info ( self ) ->  dict :          """          Provides system info on call                   :return:              dict with following keys:             - **system_name**: `str`- product name             - **system_version**: `str`- server version                      """          sys_info_resp =  self .stub.system_info(emucore_pb2.empty_message())          return  message_to_dict(sys_info_resp) [docs]      def   acquire_lock ( self ) ->  dict :          """          Acquires exclusive lock for running health check or submitting job          :return:             a member of :class:`emucore_direct.types.LockManageStatus` as a dict along             with an additional key :code:`lock_id`:             - **lock_id**: `str`- if acquired the current device `lock_id`               else empty string             - **status**: `int`- status code for lock id acquisition             - **message**: `str`- a description for the associated status code          """          try :              acquire_lock_resp =  self .stub.acquire_lock(emucore_pb2.empty_message())          except  _InactiveRpcError  as  exc:              raise  InactiveRpcError(                  "acquire_lock failed due to grpc._channel._InactiveRpcError."              )  from  exc          return  message_to_dict(acquire_lock_resp) [docs]      def   release_lock ( self, lock_id ):          """          Releases exclusive lock for running health check or submitting job          :param lock_id: a UUID with currently acquired exclusive device lock          :return:              a dict with the following keys:             - **lock_released**: `bool`- if released is True else False             - **message**: `str`- a description of release operation result          """          release_input = emucore_pb2.lock_message(lock_id = lock_id)          try :              release_lock_resp =  self .stub.release_lock(release_input)          except  _InactiveRpcError  as  exc:              raise  InactiveRpcError(                  "release_lock failed due to grpc._channel._InactiveRpcError."              )  from  exc          return  message_to_dict(release_lock_resp) [docs]      def   rc_config ( self,                 lock_id:  str ,                 vbias:  float ,                 gain:  float ,                 num_nodes:  int ,                 num_taps:  int ) -> StatusMessage:          """          :param lock_id: a lock_id which has an active reserve on the device          :param vbias: bias to apply to each node in reservoir range for parameter [0,1]          :param gain: memory setting for system how long should inputs effect reservoir similar                 to beta in adaptive gradient descent range for parameter [0,1]          :param num_nodes: the total number of hidden nodes to instantiate within the reservoir,                 a single hidden layer          :param num_taps: number of connections in reservoir, generally should be set to less             than the number of nodes in reservoir. Defines interconnection between nodes.          :return: dictionary with with values from one of the members of                :class:`emucore_direct.types.StatusResponses`                     """          filter_coefs = set_filter_coefficients(num_taps=num_taps)          config_message = emucore_pb2.rc_config_message(lock_id=lock_id,                                                     vbias = vbias,                                                     gain = gain,                                                     num_nodes = num_nodes,                                                     num_taps = num_taps)          rc_status =  self .stub.rc_config(config_message)          return  message_to_dict(rc_status) [docs]      def   rc_run ( self,                 lock_id:  str ,                 reservoir_input:  List [ int ], ):          """          Runs a series of data through the reservoir and returns response from device          based on current reservoir configuration.          :param lock_id: a lock_id which has an active reserve on the device          :param reservoir_input: a list of digitized values to input to the reservoir must be                 less than MAX_INPUT_SIZE          :return: a dictionary with the folowing keys:             - **status**: `int`- the status for the reservoir submission             - **message**: `str`- a description of the status for the submission             - **states**: `bytes`- response from reservoir as bytes.          """          assert  MAX_INPUT_SIZE>= len (reservoir_input), \              f"Input to reservoir must be of less than or equal to  {MAX_INPUT_SIZE}  was  { len (reservoir_input)} "          run_message = emucore_pb2.rc_run_message(lock_id=lock_id,                                                     input  = reservoir_input,)          rc_response =  self .stub.rc_run(run_message)          return  message_to_dict(rc_response) [docs]      def   wait_for_lock ( self ):          """          Waits for lock indefinitely calling :func:`acquire_lock`          :return: a tuple of the following items:             - **lock_id**: `str`- exclusive lock for device execution with a timeout             - **start_queue_ts**: `int`- time in ns when began lock acquisition.             - **end_queue_ts**: `int`- time in ns when lock was acquired.          """          lock_id =  ""          start_queue_ts = time.time_ns()          while  lock_id ==  "" :              lock_id =  self .acquire_lock()[ "lock_id" ]                           if  lock_id ==  "" :                  time.sleep( 1 )          end_queue_ts = time.time_ns()          return  lock_id, start_queue_ts, end_queue_ts         [docs]      def   process_all_data ( self,                           lock_id:  str ,                           input_data: np.ndarray,                           num_nodes:  int ,                           density:  float ,                           feature_scaling:  float ,                           max_scale_val:  float = None ,                           weights: np.ndarray= None ,                           seed_val_weights:  int = 13 ):          """          Run dataset through reservoir:          1. Get lock          2. Reset Reservoir          3. Calculate input coefficients          4. Run data through reservoir          5. Combine data from reservoir and reformat          6. Release lock          :param lock_id: a UUID that currently has lock on the device          :param input_data: data or series to process via reservoir          :param num_taps: defines connectivity of neurons in the reservoir higher taps can lead to overfitting it is not recommended to not set num_taps significantly higher than num_nodes          :param feature_scaling: after applying max abs scalar feature scaling factor applied           :param max_scale_val: max absolute value used to scale data if provided          :param seed_val_weights: seeds randomness for weigths to allow for reproducibility          :note: if doing multiple runs without reset the max value mustn't exceed original data max value in order for results to register properly          :return:             a tuple of the following elements:                         - **reservoir_response**: `np.ndarray`- reservoir response represented as an array dimension of array will be nrows of input matrix by num nodes.             - **max_scale_value**: `np.ndarray`- the scaling value that was applied to the input data before it was processed by the reservoir.             - **weights**: `np.ndarray`- the weights that were used to apply the random mask to the data prior to being processed by the reservoir.          """          n_rows, n_cols = input_data.shape          input_packets, max_scale_val, weights = prep_input_data(              input_data=input_data,              num_nodes=num_nodes,              density=density,              feature_scaling=feature_scaling,              max_scale_val=max_scale_val,              weights=weights,              seed_val_weights=seed_val_weights)          try :              reservoir_resp = np.array([])              for  i  in  input_packets:                  packet_resp =  self .rc_run(                      lock_id=lock_id,                      reservoir_input=i)                  packet_bytes =  bytearray (np.array(packet_resp[ "states" ]))                              packet_arr = bytes_to_array(input_bytes=packet_bytes)                  reservoir_resp = np.concatenate((reservoir_resp, packet_arr),axis= 0 )          except  Exception  as  err:              print ( "ERROR OCCURRED" , err)              raise  RuntimeError( "Error while processing data" )  from  err          return  np.array(reservoir_resp).reshape(n_rows,num_nodes), max_scale_val, weights