Source code for eqc_models.ml.classifierqsvm
- import os
- import sys
- import time
- import datetime
- import json
- import warnings
- from functools import wraps
- import numpy as np
- from eqc_models.ml.classifierbase import ClassifierBase
- [docs]
- class QSVMClassifier(ClassifierBase):
-     """An implementation of QSVM classifier that uses QCi's Dirac-3.
-     
-     Parameters
-     ----------
-     
-     relaxation_schedule: Relaxation schedule used by Dirac-3; default:
-     2.
-     
-     num_samples: Number of samples used by Dirac-3; default: 1.
-     
-     upper_limit: Coefficient upper limit; a regularization parameter;
-     default: 1.0.
-     
-     gamma: Gaussian kernel parameter; default: 1.0.
-     
-     eta: A penalty multiplier; default: 1.0.
-     
-     zeta: A penalty multiplier; default: 1.0.
-     Examples
-     -----------
-     >>> from sklearn import datasets
-     >>> from sklearn.preprocessing import MinMaxScaler
-     >>> from sklearn.model_selection import train_test_split
-     >>> iris = datasets.load_iris()
-     >>> X = iris.data
-     >>> y = iris.target
-     >>> scaler = MinMaxScaler()
-     >>> X = scaler.fit_transform(X)
-     >>> for i in range(len(y)):
-     ...     if y[i] == 0:
-     ...         y[i] = -1
-     ...     elif y[i] == 2:
-     ...         y[i] = 1
-     >>> X_train, X_test, y_train, y_test = train_test_split(
-     ...     X,
-     ...     y,
-     ...     test_size=0.2,
-     ...     random_state=42,
-     ... )
-     >>> from eqc_models.ml.classifierqsvm import QSVMClassifier
-     >>> obj = QSVMClassifier(
-     ...     relaxation_schedule=2,
-     ...     num_samples=1,
-     ...     upper_limit=1.0,
-     ...     gamma=1.0,
-     ...     eta=1.0,
-     ...     zeta=1.0,
-     ... )
-     >>> from contextlib import redirect_stdout
-     >>> import io
-     >>> f = io.StringIO()
-     >>> with redirect_stdout(f):
-     ...    obj = obj.fit(X_train, y_train)
-     ...    y_train_prd = obj.predict(X_train)
-     ...    y_test_prd = obj.predict(X_test)
-     
-     """
-     
-     def __init__(
-         self,
-         relaxation_schedule=2,
-         num_samples=1,
-         upper_limit=1.0,
-         gamma=1.0,
-         eta=1.0,
-         zeta=1.0,
-     ):        
-         super(QSVMClassifier).__init__()
-         self.relaxation_schedule = relaxation_schedule
-         self.num_samples = num_samples
-         self.upper_limit = upper_limit
-         self.gamma = gamma
-         self.eta = eta
-         self.zeta = zeta
- [docs]
-     def kernel(self, vec1, vec2):
-         return np.exp(-self.gamma * np.linalg.norm(vec1 - vec2) ** 2)
- [docs]
-     def fit(self, X, y):
-         """
-         Build a QSVM classifier from the training set (X, y).
-     
-         Parameters
-         ----------
-         X : {array-like, sparse matrix} of shape (n_samples, n_features)
-         The training input samples. 
-     
-         y : array-like of shape (n_samples,)
-         The target values.
-     
-         Returns
-         -------
-         Response of Dirac-3 in JSON format.
-         """
-                 
-         assert X.shape[0] == y.shape[0], "Inconsistent sizes!"
-         assert set(y) == {-1, 1}, "Target values should be in {-1, 1}"
-         J, C, sum_constraint = self.get_hamiltonian(X, y)
-         assert J.shape[0] == J.shape[1], "Inconsistent hamiltonian size!"
-         assert J.shape[0] == C.shape[0], "Inconsistent hamiltonian size!"
-         self.set_model(J, C, sum_constraint)
-         sol, response = self.solve()
-         assert len(sol) == C.shape[0], "Inconsistent solution size!"
-         self.params = self.convert_sol_to_params(sol)
-         self.X_train = X
-         self.y_train = y
-         n_records = X.shape[0]
-         self.kernel_mat_train = np.zeros(
-             shape=(n_records, n_records), dtype=np.float32
-         )
-         for m in range(n_records):
-             for n in range(n_records):
-                 self.kernel_mat_train[m][n] = self.kernel(X[m], X[n])
-         return response
- [docs]
-     def predict(self, X: np.array):
-         """
-         Predict classes for X.
-         
-         Parameters
-         ----------
-         X : {array-like, sparse matrix} of shape (n_samples, n_features)
-     
-         Returns
-         -------
-         y : ndarray of shape (n_samples,)
-         The predicted classes.
-         """
-                 
-         assert self.X_train is not None, "Model not trained yet!"
-         assert self.y_train is not None, "Model not trained yet!"
-         assert (
-             X.shape[1] == self.X_train.shape[1]
-         ), "Inconsistent dimensions!"
-         n_records = X.shape[0]
-         n_records_train = self.X_train.shape[0]
-         kernel_mat = np.zeros(
-             shape=(n_records, n_records_train), dtype=np.float32
-         )
-         for m in range(n_records):
-             for n in range(n_records_train):
-                 kernel_mat[m][n] = self.kernel(X[m], self.X_train[n])
-         intercept = 0
-         tmp_vec1 = np.tensordot(
-             self.params * self.y_train, self.kernel_mat_train, axes=(0, 0)
-         )
-         assert tmp_vec1.shape[0] == n_records_train, "Inconsistent size!"
-         tmp1 = np.sum(
-             self.params
-             * (self.upper_limit - self.params)
-             * (self.y_train - tmp_vec1)
-         )
-         tmp2 = np.sum(self.params * (self.upper_limit - self.params))
-         assert tmp2 != 0, "Something went wrong!"
-         intercept = tmp1 / tmp2
-         y = np.zeros(shape=(n_records), dtype=np.float32)
-         y += np.tensordot(
-             self.params * self.y_train, kernel_mat, axes=(0, 1)
-         )
-         y += intercept
-         y = np.sign(y)
-         return y
- [docs]
-     def get_hamiltonian(
-         self,
-         X: np.array,
-         y: np.array,
-     ):
-         n_records = X.shape[0]
-         n_dims = X.shape[1]
-         J = np.zeros(
-             shape=(2 * n_records, 2 * n_records), dtype=np.float32
-         )
-         C = np.zeros(shape=(2 * n_records,), dtype=np.float32)
-         for n in range(n_records):
-             for m in range(n_records):
-                 J[n][m] = (
-                     0.5 * y[n] * y[m] * self.kernel(X[n], X[m])
-                     + self.zeta * y[n] * y[m]
-                 )
-             J[n][n] += self.eta
-             J[n][n + n_records] = self.eta
-             J[n + n_records][n] = self.eta
-             J[n + n_records][n + n_records] = self.eta
-             C[n] = -1.0 - 2.0 * self.eta * self.upper_limit
-             C[n + n_records] = -2.0 * self.eta * self.upper_limit
-         C = C.reshape((2 * n_records, 1))
-         J = 0.5 * (J + J.transpose())
-         return J, C, n_records * self.upper_limit
- [docs]
-     def convert_sol_to_params(self, sol):
-         assert len(sol) % 2 == 0, "Expected an even solution size!"
-         sol = sol[: int(len(sol) / 2)]
-         return np.array(sol)