Source code for eqc_models.graph.partition
- from typing import Tuple
- import numpy as np
- import scipy.sparse as sp
- import networkx as nx
- from math import modf
- from eqc_models.graph.base import GraphModel
- [docs]
- class GraphPartitionModel(GraphModel):
-     """
-     A model for graph partitioning into `k` parts with objective and constraints
-     derived from the Laplacian matrix and additional penalties for balance and constraints.
-     """
-     def __init__(self, G: nx.Graph, k: int = 2, weight: str = "weight", alpha: float = 1.0, beta_obj: float = 1.0,
-                  gamma: float = 1.0):
-         """
-         Parameters:
-         -----------
-         G : nx.Graph
-             The graph to partition.
-         k : int
-             The number of partitions.
-         weight : str
-             The key for edge weights in the graph.
-         alpha : float
-             The penalty multiplier for balance constraints.
-         beta_obj : float
-             The penalty multiplier for minimizing edge cuts (Laplacian term).
-         gamma : float
-             The penalty multiplier for assignment constraints.
-         """
-         self._G = G
-         self._k = k
-         self._weight = weight
-         self._alpha = alpha
-         self._beta_obj = beta_obj
-         self._gamma = gamma
-         self._laplacian = nx.laplacian_matrix(G, weight=weight)
-         self._num_nodes = G.number_of_nodes()
-         self._sorted_nodes = sorted(G.nodes)
-         self._constraints_offset = 0
-         self._balanced_partition_offset = 0
-         self.set_and_validate_k()
-         self._objective_matrix = self.initialize_model()
-         super().__init__(self._G)
- [docs]
-     def set_and_validate_k(self):
-         """
-         Sets k and encoding length for a graph problem
-         """
-         
-         
-         assert modf(self._k)[0] == 0, "'k' must be an integer."
-         
-         self._k = int(self._k)
-         
-         assert self._k >= 2, f"ERROR, k={self._k}: k must be greater than or equal to 2."
-         
-         assert self._k <= self._num_nodes, (
-             f"ERROR, k={self._k}: k must be less than number of nodes or variables. k = {self._k} and "
-             f"number of nodes = {self._num_nodes}"
-         )
- [docs]
-     def initialize_model(self):
-         """
-         Build the objective matrix and constraints for the k-partition problem.
-         """
-         if self._k == 2:
-             
-             return self.get_two_partition_qubo()
-         else:
-             
-             laplacian_blocks = 0.5 * sp.block_diag([self._laplacian] * self._k, format="csr")
-             balance_term = self.get_balanced_partition_term()
-             constraints = self.get_constraints()
-             return (
-                 self._alpha * balance_term
-                 + self._gamma * constraints
-                 + self._beta_obj * laplacian_blocks
-             )
- [docs]
-     def get_balanced_partition_term(self) -> sp.spmatrix:
-         """
-         Construct the quadratic penalty term for balanced partitions.
-         """
-         I_k = sp.identity(self._k)
-         Ones_n = np.ones((self._num_nodes, self._num_nodes))
-         balanced_partition_term = sp.kron(I_k, Ones_n, format="csr")
-         balanced_partition_term -= (
-             2 * self._num_nodes / self._k * sp.identity(balanced_partition_term.shape[0])
-         )
-         self._balanced_partition_offset = self._num_nodes**2 / self._k
-         return balanced_partition_term
- [docs]
-     def get_constraints(self) -> sp.spmatrix:
-         """
-         Construct the quadratic penalty term for assignment constraints.
-         """
-         I_n = sp.identity(self._num_nodes)
-         Ones_k = np.ones((self._k, self._k))
-         constraints = sp.kron(Ones_k, I_n, format="csr")
-         constraints -= 2 * sp.identity(constraints.shape[0])
-         self._constraints_offset = self._num_nodes
-         return constraints
- [docs]
-     def get_two_partition_qubo(self) -> sp.spmatrix:
-         """
-         Construct the QUBO matrix for two partitions using adjacency and penalties.
-         """
-         Garr = nx.to_scipy_sparse_matrix(self._G, weight=self._weight, nodelist=self._sorted_nodes)
-         Q = (
-             self._alpha * np.ones(Garr.shape, dtype=np.float32)
-             - self._beta_obj * Garr
-         )
-         degrees = Garr.sum(axis=1).A1  
-         diag = self._beta_obj * degrees - self._alpha * (self._num_nodes - 1)
-         np.fill_diagonal(Q, diag)
-         return sp.csr_matrix(Q)
- [docs]
-     def evaluate(self, solution: np.ndarray) -> float:
-         """
-         Evaluate the objective function for a given solution.
-         """
-         assert len(solution) == self._objective_matrix.shape[0], "Solution size mismatch."
-         return float(solution.T @ self._objective_matrix @ solution)
- [docs]
-     def decode(self, solution: np.ndarray) -> dict:
-         """
-         Decode the solution vector into a partition assignment.
-         """
-         if self._k == 2:
-             return {node: int(solution[i]) for i, node in enumerate(self._sorted_nodes)}
-         else:
-             partitions, nodes = np.where(solution.reshape((self._k, self._num_nodes)) == 1)
-             return {self._sorted_nodes[node]: int(partition) for partition, node in zip(partitions, nodes)}
- [docs]
-     def costFunction(self) -> Tuple[np.ndarray, np.ndarray]:
-         """
-         Return the linear and quadratic components of the objective function.
-         """
-         Q = self._objective_matrix
-         h = Q.diagonal()
-         J = 2 * sp.triu(Q, k=1).tocsr() 
-         return h, J