FoundryProductsTechnologyCompanyInvestor relationsResource libraryNews
Contact us
Resource library
    Resource library home
    Developer resources
      Entropy quantum optimization
        Dirac Quick Start
        Dirac-3 Developer Beginner Guide
        Multibody formulation
        QUBO formulation
        QLCBO formulation
        QBoost formulation
        qci-client software package
        eqc-direct software package
        eqc-models software package
          Getting Started
          Basic Usage
          eqc_models
          Dependencies
      Quantum random number generation
      Reservoir computing
    Applications
    Lessons
    Research and publications
    Support

Couldn’t find what you are looking for? Reach out to technical support.

Edit this BannerContact support
Privacy PolicyCookie PolicyTerms of UseForward Looking StatementsAccessibility Statement
Terms and Conditions of SaleEnd User License Agreement

© 2018-2026 Quantum Computing Inc.

Download

Default

Source code for eqc_models.algorithms.alm

  • # (C) Quantum Computing Inc., 2025.
  • """
  • ============================
  • Augmented Lagrangian Method
  • ============================
  • Overview
  • ============================
  • The augmented Lagrangian method algorithm in eqc-models is for automating parameter
  • adjustments to achieve feasible solutions to constrained optimization models using
  • entropy quantum computing devices. It supports fractional (continuous), binary and
  • general integer variables. ALM also supports enforcement of one-hot constraints,
  • inequality constraints and equality constraints without introduciton of slack
  • variables.
  • This method requires a different approach to modeling than used in model implementations
  • of constrained optimization models as direct subclasses of `EqcModel`. To use this approach,
  • implement an objective function in one of the subclasses of `EqcModel`. The next step is to
  • use a `ConstraintRegistry` object to describe the model constraints. There are many
  • parameters available inside the `ALMConfig` class. Only a few of these parameters should
  • be adjusted by novice users. Once the model, constraint registry and config objects are
  • ready, pass them to an `ALMAlgorithm` instance with a solver and it will iterate over
  • parameter adjustments until feasibility tolerance is found or maximum iteration count
  • is exceeded.
  • The following example is a basic outline of how to use the ALM implementation to
  • solve models.
  • ::
  • model = PolynomialModel()
  • solver = Dirac3ContinuousCloudSolver()
  • solver_kwargs = {} # add solver parameters as dict here
  • config = ALMConfig() # add configuration parameters here
  • registry = ConstraintRegistry()
  • # add variable domains and constraints here
  • algorithm = ALMAlgorithm()
  • response = algorithm.run(model, registry, solver, config, **solver_kwargs) # keyword arguments are passed to the solver
  • Model Creation
  • ============================
  • Definining the objective function for the optimization problem can be done in a number of ways.
  • There are two base classes on which all other models are based and they define how
  • function coefficients are specified. With a `QuadraticModel`, the coefficients are
  • specified using a one-dimensional array for the linear terms and a two-dimensional,
  • symmetric array or Hermitian matrix for quadratic terms. The `PolynomialModel` way uses two lists,
  • one for indices of terms and another for the coefficients of terms. These terms can be
  • up to degree five. The two classes just described are preferred for use with ALM because
  • most other model classes incorporate constraints into the operator. In the future, ALM will
  • provide a way to extract the objective function and constraints from the model separately,
  • populating the `ConstraintRegistry` automatically.
  • First, choose variable definitions by describing the variable names in a single list.
  • Then organize the terms in ascending lexicographic order by index. Finally, create a
  • list with coefficient entries corresponding to the index list.
  • The example problem used here is a simple logical decision of one variable among 10.
  • Those familiar with QUBO formulations
  • will recognize the penalties in the quadratic coefficients. This first example sets
  • the one-hot constraint as penalties in the objective function and adds block constraints
  • to each of the variables to enforce 0 or 1. This has the effect of using two time bins
  • per binary variable.
  • >>> import warnings; warnings.filterwarnings("ignore") # silence warnings
  • >>> n = 10
  • >>> variables = [f"x_{i}" for i in range(n)]
  • Define the indices and coefficients. The solution is `x_1 == 1` and all other `x_i == 0`.
  • Variable indices sent to the device are 1-based.
  • >>> indices = [(0, i+1) for i in range(n)] + [(i+1, j+1) for i in range(10) for j in range(i, n)]
  • >>> coefficients = [-i if i==n-1 else 0 for i in range(n)] + [2 for i in range(n) for j in range(i, n)]
  • Define the model, setting the upper bound to 1 on all varaibles.
  • >>> model = PolynomialModel(coefficients, indices)
  • >>> model.variables = variables
  • >>> model.variables[0] == "x_0"
  • True
  • >>> model.upper_bound = np.ones((n,))
  • >>> model.is_discrete = [True for i in range(n)] # for future modeling support
  • Begin the ALM setup. One of the features of this algorithm in eqc-models is the ability to enforce
  • discrete variables while using the continuous solver. This can allow for mixed integer formulations.
  • For binary variables, add a block for each one so that it is forced to 0 or 1. The `sum_to_one`
  • parameter is normally true and setting to false here is done because the simplicity of the problem
  • allows a more efficient approach.
  • >>> reg = ConstraintRegistry()
  • >>> for i in range(n): reg.add_block([i], levels=[0, 1], sum_to_one=False)
  • Configure `rho_h` (equality constraints) to start at 1 and only allow 4 iterations. Also
  • set tolerance high.
  • >>> config = ALMConfig(tol_h=0.5, max_outer=4, rho_h=1, rho_g=1, gamma_up=2.0, gamma_down=1.0)
  • >>> from eqc_models.solvers import Dirac3ContinuousCloudSolver
  • >>> solver = Dirac3ContinuousCloudSolver()
  • >>> algo = ALMAlgorithm()
  • >>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  • 20...COMPLETE...
  • [ALM 00] ...
  • ...
  • >>> results["decoded"] # had to hard-code to n==10
  • {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 9: 1.0}
  • Equality constraints
  • ========================
  • This ALM algorithm can seek feasible solutions with targeted multipliers
  • rather than adding penalty multipliers into the objective coefficients and
  • adjusting manually, which would be required for more complex problems.
  • These constraints should be specified in the constraint registry. Specification
  • of the binary variables using the `add_block` call sets up the domain.
  • The objective function is formulated directly, purely as the cost of each variable,
  • which is the negative value of the variable index.
  • >>> indices = [(0, i+1) for i in range(n)]
  • >>> coefficients = [-i for i in range(n)]
  • >>> model = PolynomialModel(coefficients, indices)
  • >>> model.upper_bound = np.ones((n, ))
  • >>> model.is_discrete = [True for i in range(n)]
  • >>> reg = ConstraintRegistry()
  • >>> for i in range(n): reg.add_block([i], levels=[0, 1], sum_to_one=False)
  • Now, a new concept. Instead of placing the penalties explicitly in the objective, this statement
  • enforces the one-hot constraint by defining a function which is 0 when true and
  • either positive or negative when false. The lambda function is passed an encoded
  • array for the variable values, so the array must be decoded before evaluating
  • the function. To assist in this, a helper function called `level_decoder` is
  • available for import. Use this decoder and the solution x to extract the
  • values of the variables.
  • >>> decoder = level_decoder(n, [0, 1])
  • >>> test_x = np.zeros((20,))
  • >>> test_x[::2] = 1
  • >>> (decoder@test_x==0).all()
  • True
  • >>> config.max_outer = 15 # added complexity in the definition requires more steps
  • >>> config.gamma_up = 1.2
  • >>> config.gamma_down = 0.9
  • >>> reg.add_equality(lambda x: np.sum(decoder@x) - 1)
  • Everything else is the same as in the previous example.
  • >>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  • 20...COMPLETE...
  • [ALM 00] ...
  • ...
  • >>> results["decoded"] # had to hard-code to n==10
  • {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 9: 1.0}
  • Inequality Constraints
  • ========================
  • Incorporating inequality constraints is not much different. Specify an inequality
  • constraint by using the `ConstraintRegistry.add_inequality` method to pass a
  • function which is zero or negative when the constraint is met and positive when
  • the constraint is not met. This is straightforward for less than or equal to
  • constraints and requires negating the function when the constraint is for greater
  • than or equal. Here is an example showing a model where less than two variables
  • are non zero. When adding the binary variable blocks, the `sum_to_one` parameter
  • is left out, which results in the default of true. The use of the sum to one
  • parameter gives the algorithm more control over the dual variable computations.
  • >>> reg = ConstraintRegistry()
  • >>> for i in range(n): reg.add_block([i], levels=[0, 1])
  • >>> reg.add_inequality(lambda x: np.sum(decoder@x) - 2) # reuse the decoder from the previous example
  • >>> config.gamma_up = 1.2 # slower adjustment
  • >>> config.gamma_down = 0.8 # slower adjustment
  • >>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  • 20...COMPLETED...
  • [ALM 00] ...
  • [ALM 05] ...
  • ...
  • >>> results["decoded"]
  • {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 1.0, 9: 1.0}
  • """
  • from dataclasses import dataclass, field
  • from typing import Callable, Dict, List, Tuple, Optional, Sequence, Union
  • import numpy as np
  • from collections import defaultdict
  • from eqc_models.base.polynomial import PolynomialModel
  • from eqc_models.base.operators import Polynomial
  • from eqc_models.base.results import SolutionResults
  • from eqc_models.algorithms.bcd import (
  • BCDConfig,
  • BCDRegistry,
  • BlockCoordinateDescentAlgorithm,
  • )
  • Array = np.ndarray
  • PolyTerm = Tuple[Tuple[int, ...], float]
  • [docs]
  • def level_decoder(n, levels):
  • """
  • Return an operator that transforms an encoded solution to a solution
  • with a single value per variable. For instance, a sequence of two level
  • variables with levels 0 and 1 has values `[0, 1, 1, 0]` is decoded into
  • `[1, 0]` with the operator::
  • [[ 0, 1],
  • [ 0, 1]]
  • and used with the expression `op@variable`.
  • >>> decoder = level_decoder(3, [0, 1, 2])
  • >>> values = np.array([1, 0, 0, 0, 1, 0, 0, 0, 1])
  • >>> decoder@values
  • array([0., 1., 2.])
  • """
  • level_count = len(levels)
  • decoder = np.zeros((n, n*level_count))
  • for i in range(n):
  • decoder[i, i*level_count:(i+1)*level_count] = levels
  • return decoder
  • [docs]
  • @dataclass
  • class ALMConstraint:
  • """One constraint family; fun returns a vector; jac returns its Jacobian."""
  • kind: str # "eq" or "ineq"
  • fun: Callable[[Array], Array] # h(x) or g(x)
  • jac: Optional[Callable[[Array], Array]] = None
  • name: str = ""
  • family: str = ""
  • [docs]
  • @dataclass
  • class ALMBlock:
  • """Lifted discrete variable block (optional)."""
  • idx: Sequence[int] # indices of block in the full x
  • levels: Array # (k,) level values (b_i)
  • enforce_sum_to_one: bool = True # register as equality via helper
  • enforce_one_hot: bool = True # ALM linearization with M = 11^T - I
  • [docs]
  • @dataclass
  • class ALMConfig:
  • # penalties
  • rho_h: float = 1.0 # equalities
  • rho_g: float = 1.0 # inequalities / one-hot
  • rho_min: float = 1e-3
  • rho_max: float = 1e3
  • # per-family penalty overrides
  • rho_eq_family: Dict[str, float] = field(default_factory=dict)
  • rho_ineq_family: Dict[str, float] = field(default_factory=dict)
  • # allow per-family adaptation toggle (optional)
  • adapt_by_family: bool = True
  • # adaptation toggles
  • adapt: bool = True
  • tau_up_h: float = 1.05
  • tau_down_h: float = 0.95
  • tau_up_g: float = 0.90
  • tau_down_g: float = 0.50
  • gamma_up: float = 1.2
  • gamma_down: float = 0.9
  • # tolerances & loop
  • tol_h: float = 1e-6
  • tol_g: float = 1e-6
  • max_outer: int = 100
  • # stagnation safety net
  • use_stagnation_bump: bool = False
  • patience_h: int = 10
  • patience_g: int = 10
  • stagnation_factor: float = 1e-3
  • # smoothing (optional)
  • ema_alpha: float = 1.0
  • # finite diff (only used if jac=None)
  • fd_eps: float = 1e-6
  • # activation threshold for projected ALM
  • act_tol: float = 1e-10
  • # Optional: BCD inner solve (decomposition)
  • bcd: BCDConfig = field(default_factory=BCDConfig)
  • [docs]
  • class ConstraintRegistry:
  • """
  • Holds constraints and block metadata; keeps ALMAlgorithm stateless. Register constraints and
  • (optional) lifted-discrete blocks here.
  • """
  • def __init__(self):
  • self.constraints: List[ALMConstraint] = []
  • self.blocks: List[ALMBlock] = []
  • [docs]
  • def add_equality(self, fun, jac=None, name="", family=""):
  • self.constraints.append(ALMConstraint("eq", fun, jac, name, family))
  • [docs]
  • def add_inequality(self, fun, jac=None, name="", family=""):
  • self.constraints.append(ALMConstraint("ineq", fun, jac, name, family))
  • [docs]
  • def add_block(self, idx: Sequence[int], levels: Array, sum_to_one=True, one_hot=True):
  • self.blocks.append(ALMBlock(list(idx), np.asarray(levels, float), sum_to_one, one_hot))
  • [docs]
  • class ALMAlgorithm:
  • """Stateless ALM outer loop. Call `run(model, registry, core, cfg, **core_kwargs)`."""
  • # ---- helpers (static) ----
  • @staticmethod
  • def _finite_diff_jac(fun: Callable[[Array], Array], x: Array, eps: float) -> Array:
  • y0 = fun(x)
  • m = int(np.prod(y0.shape))
  • y0 = y0.reshape(-1)
  • n = x.size
  • J = np.zeros((m, n), dtype=float)
  • for j in range(n):
  • xp = x.copy()
  • xp[j] += eps
  • J[:, j] = (fun(xp).reshape(-1) - y0) / eps
  • return J
  • @staticmethod
  • def _pairwise_M(k: int) -> Array:
  • return np.ones((k, k), dtype=float) - np.eye(k, dtype=float)
  • @staticmethod
  • def _sum_to_one_selector(n: int, idx: Sequence[int]) -> Array:
  • S = np.zeros((1, n), dtype=float)
  • S[0, np.array(list(idx), int)] = 1.0
  • return S
  • @staticmethod
  • def _make_sum1_fun(S):
  • return lambda x: S @ x - np.array([1.0])
  • @staticmethod
  • def _make_sum1_jac(S):
  • return lambda x: S
  • @staticmethod
  • def _make_onehot_fun(sl, M):
  • sl = np.array(sl, int)
  • def _f(x):
  • s = x[sl]
  • return np.array([float(s @ (M @ s))]) # shape (1,)
  • return _f
  • @staticmethod
  • def _make_onehot_jac(sl, M, n):
  • sl = np.array(sl, int)
  • def _J(x):
  • s = x[sl]
  • grad_blk = 2.0 * (M @ s) # (k,)
  • J = np.zeros((1, n), dtype=float) # shape (1, n)
  • J[0, sl] = grad_blk
  • return J
  • return _J
  • @staticmethod
  • def _as_float(value) -> float:
  • arr = np.asarray(value, dtype=float).reshape(-1)
  • return float(arr[0]) if arr.size else 0.0
  • @staticmethod
  • def _polynomial_from_terms(poly_terms: List[PolyTerm]) -> Polynomial:
  • if not poly_terms:
  • return Polynomial([], [])
  • idxs, coeffs = zip(*poly_terms)
  • return Polynomial(list(coeffs), list(idxs))
  • @staticmethod
  • def _poly_value(poly_terms: List[PolyTerm], x: Array) -> float:
  • """Backward-compatible wrapper around Polynomial.evaluate."""
  • if not poly_terms:
  • return 0.0
  • poly = ALMAlgorithm._polynomial_from_terms(poly_terms)
  • return ALMAlgorithm._as_float(poly.evaluate(np.asarray(x, dtype=float)))
  • @staticmethod
  • def _merge_poly(poly_terms: Optional[List[PolyTerm]], Q_aug: Optional[Array],
  • c_aug: Optional[Array]) -> List[PolyTerm]:
  • """
  • Merge ALM's quadratic/linear increments (Q_aug, c_aug) into the base polynomial term list `poly_terms`.
  • If 'poly_terms' is None, then turn x^T Q_aug x + c_aug^T x into polynomial monomials.
  • Terms are of the form:
  • ((0, i), w) for linear, ((i, j), w) for quadratic.
  • """
  • merged = list(poly_terms) if poly_terms is not None else []
  • if Q_aug is not None:
  • Qs = 0.5 * (Q_aug + Q_aug.T)
  • n = Qs.shape[0]
  • for i in range(n):
  • # diagonal contributes Qii * x_i^2
  • if Qs[i, i] != 0.0:
  • merged.append(((i + 1, i + 1), float(Qs[i, i])))
  • for j in range(i + 1, n):
  • q = 2.0 * Qs[i, j] # x^T Q x -> sum_{i<j} 2*Q_ij x_i x_j
  • if q != 0.0:
  • merged.append(((i + 1, j + 1), float(q)))
  • if c_aug is not None:
  • for i, ci in enumerate(c_aug):
  • if ci != 0.0:
  • merged.append(((0, i + 1), float(ci)))
  • return merged
  • @staticmethod
  • def _block_offsets(blocks: List[ALMBlock]) -> List[int]:
  • """
  • Return starting offsets (0-based) for each lifted block in the
  • concatenated s-vector.
  • """
  • offs, pos = [], 0
  • for blk in blocks:
  • offs.append(pos)
  • pos += len(blk.levels) # each block contributes k lifted coordinates
  • return offs
  • [docs]
  • @staticmethod
  • def lift_Qc_to_poly_terms(
  • Q_native: np.ndarray, # shape (m, m) over original discrete vars (one block per var)
  • c_native: np.ndarray, # shape (m,)
  • blocks: List[ALMBlock], # ALM blocks (in the same order as the rows/cols of Q_native, c_native)
  • ) -> Tuple[List[Tuple[int, ...]], List[float]]:
  • """
  • Expand a quadratic/linear objective over m native discrete variables into the lifted
  • (one-hot) s-space.
  • Given `Q_native` and `c_native` over original m discrete vars (one block per var), and
  • `blocks` (`list[ALMBlock]` in the same order as original vars), we enforce:
  • - For variable i with level values b_i (length k_i), we create k_i lifted coords s_{i,a}.
  • - Quadratic term: J_ij = Q_native[i,j] * (b_i b_j^T) contributes to pairs (s_{i,a}, s_{j,b})
  • - Linear term: C_i = c_native[i] * b_i contributes to s_{i,a}
  • Returns polynomial (indices, coeffs) over concatenated s variables.
  • """
  • m = len(blocks)
  • assert Q_native.shape == (m, m), "Q_native must match number of blocks"
  • assert c_native.shape == (m,), "c_native must match number of blocks"
  • offs = ALMAlgorithm._block_offsets(blocks)
  • terms_acc = defaultdict(float)
  • # Quadratic lift: J = kron-expansion
  • for i in range(m):
  • bi = blocks[i].levels[:, None] # (k_i, 1)
  • oi = offs[i]
  • for j in range(m):
  • bj = blocks[j].levels[:, None] # (k_j, 1)
  • oj = offs[j]
  • J_ij = Q_native[i, j] * (bi @ bj.T) # (k_i, k_j)
  • if np.allclose(J_ij, 0.0):
  • continue
  • ki, kj = bi.shape[0], bj.shape[0]
  • for a in range(ki):
  • ia = oi + a + 1
  • for b in range(kj):
  • jb = oj + b + 1
  • w = float(J_ij[a, b])
  • if w == 0.0:
  • continue
  • if ia == jb:
  • terms_acc[(ia, ia)] += w
  • else:
  • # NOTE: we store each cross monomial once (i < j); for i==j block pairs,
  • # double to represent J_ab + J_ba
  • if i == j: # intra-block off-diagonal needs 2x
  • w *= 2.0
  • i1, i2 = (ia, jb) if ia < jb else (jb, ia)
  • terms_acc[(i1, i2)] += w
  • # Linear lift: C_i = L_i * b_i
  • for i in range(m):
  • b = blocks[i].levels
  • oi = offs[i]
  • for a, val in enumerate(b):
  • ia = oi + a + 1
  • w = float(c_native[i] * val)
  • if w != 0.0:
  • terms_acc[(0, ia)] += w
  • # Pack to PolynomialModel format
  • indices = [tuple(k) for k in terms_acc.keys()]
  • coeffs = [float(v) for v in terms_acc.values()]
  • return indices, coeffs
  • @staticmethod
  • def _make_bcd_registry(
  • registry: "ConstraintRegistry",
  • *,
  • lifted_slices: Optional[List[List[int]]] = None,
  • ) -> BCDRegistry:
  • """
  • Translate ALM's registry into the standalone BCD registry.
  • Important:
  • - In native/non-lifted mode, BCD blocks should use the original `blk.idx`.
  • - In lifted mode, BCD blocks must use the current working-coordinate slices
  • (`lifted_slices`), not the original native indices.
  • """
  • breg = BCDRegistry()
  • for i, blk in enumerate(registry.blocks):
  • if lifted_slices is None:
  • bcd_idx = list(int(j) for j in blk.idx)
  • else:
  • bcd_idx = list(int(j) for j in lifted_slices[i])
  • breg.add_block(
  • bcd_idx,
  • name=f"block_{i}",
  • levels=None if blk.levels is None else np.asarray(blk.levels, float),
  • )
  • for c in registry.constraints:
  • if c.kind == "eq":
  • breg.add_equality(c.fun, name=c.name, family=c.family)
  • elif c.kind == "ineq":
  • breg.add_inequality(c.fun, name=c.name, family=c.family)
  • return breg
  • # ---- main entrypoint ----
  • [docs]
  • @staticmethod
  • def run(
  • base_model: PolynomialModel,
  • registry: ConstraintRegistry,
  • solver,
  • cfg: ALMConfig = ALMConfig(),
  • x0: Optional[Array] = None,
  • *,
  • parse_output=None,
  • verbose: bool = True,
  • **solver_kwargs,
  • ) -> Dict[str, Union[Array, Dict[int, float], Dict]]:
  • """
  • Solve with ALM. Keep all ALM state local to this call (no global side-effects).
  • Handles three modes:
  • (A) No blocks -> continuous; use base_model as-is
  • (B) Blocks + native base_model -> lift to s-space
  • (C) Blocks + already-lifted base_model -> use as-is (compat)
  • Returns:
  • {
  • "x": final iterate,
  • "decoded": {start_idx_of_block: level_value, ...} for lifted blocks,
  • "decoded_debug": {start_idx_of_block: native_device_value, ...},
  • "hist": { "eq_inf": [...], "ineq_inf": [...], "obj": [...], "x": [...] }
  • }
  • """
  • blocks = registry.blocks
  • has_blocks = len(blocks) > 0
  • # ---- choose working model + dimension n ----
  • if not has_blocks:
  • # (A) continuous case: use the provided model directly
  • model = base_model
  • # Prefer model.n, fall back to bounds; else infer from polynomial indices
  • n = getattr(model, "n", None)
  • if n is None:
  • ub = getattr(model, "upper_bound", None)
  • lb = getattr(model, "lower_bound", None)
  • if ub is not None:
  • n = len(ub)
  • elif lb is not None:
  • n = len(lb)
  • else:
  • # infer from polynomial terms
  • n = 0
  • for inds in getattr(model, "indices", getattr(model.polynomial, "indices", [])):
  • for j in inds:
  • if j > 0:
  • n = max(n, j)
  • lifted_slices: List[List[int]] = []
  • else:
  • # (B/C) lifted (discrete) case
  • target_lifted_n = sum(len(blk.levels) for blk in blocks)
  • base_n = getattr(base_model, "n", None)
  • def _infer_n_from_terms(pm: PolynomialModel) -> int:
  • inds_list = getattr(pm, "indices", getattr(pm.polynomial, "indices", []))
  • mx = 0
  • for inds in inds_list:
  • for j in inds:
  • if j > mx:
  • mx = j
  • return int(mx)
  • if base_n is None:
  • base_n = _infer_n_from_terms(base_model)
  • # detect "already-lifted" native input (compat path)
  • already_lifted = (base_n == target_lifted_n)
  • if already_lifted:
  • # (C) use provided model directly; assume bounds already sensible
  • model = base_model
  • n = target_lifted_n
  • else:
  • # (B) lift from native space
  • # base_model must expose coefficients/indices compatible with this call
  • c_base, Q_base = base_model._quadratic_polynomial_to_qubo_coefficients(
  • getattr(base_model, "coefficients", getattr(base_model.polynomial, "coefficients", [])),
  • getattr(base_model, "indices", getattr(base_model.polynomial, "indices", [])),
  • getattr(base_model, "n")
  • )
  • assert Q_base.shape[0] == Q_base.shape[1]
  • assert c_base.shape[0] == Q_base.shape[0]
  • indices_lifted, coeffs_lifted = ALMAlgorithm.lift_Qc_to_poly_terms(Q_base, c_base, blocks)
  • model = PolynomialModel(coeffs_lifted, indices_lifted)
  • n = target_lifted_n
  • # set canonical [0,1] bounds for lifted s
  • setattr(model, "lower_bound", np.zeros(n, float))
  • setattr(model, "upper_bound", np.ones(n, float))
  • # ---- n and lifted_slices ----
  • lifted_slices = []
  • pos = 0
  • for blk in blocks:
  • k = len(blk.levels) # number of lifted coords for this block
  • lifted_slices.append(list(range(pos, pos + k))) # 0-based in lifted x
  • pos += k
  • # Algorithm initial solution and bounds
  • lb = getattr(model, "lower_bound", None)
  • ub = getattr(model, "upper_bound", None)
  • if x0 is not None:
  • x = np.asarray(x0, float).copy()
  • else:
  • # default init
  • if (lb is not None) and (ub is not None) and np.all(np.isfinite(lb)) and np.all(np.isfinite(ub)):
  • x = 0.5 * (np.asarray(lb, float) + np.asarray(ub, float))
  • else:
  • x = np.zeros(n, float)
  • # ---- collect constraints ----
  • problem_eqs = [c for c in registry.constraints if c.kind == "eq"]
  • problem_ineqs = [c for c in registry.constraints if c.kind == "ineq"]
  • # auto-install sum-to-one and one-hot as equalities
  • # (One-hot: s^T (11^T - I) s = 0))
  • def _install_block_equalities() -> List[ALMConstraint]:
  • if not has_blocks:
  • return []
  • eqs: List[ALMConstraint] = []
  • for blk, lift_idx in zip(registry.blocks, lifted_slices):
  • if blk.enforce_sum_to_one:
  • S = ALMAlgorithm._sum_to_one_selector(n, lift_idx)
  • eqs.append(ALMConstraint(
  • "eq",
  • fun=ALMAlgorithm._make_sum1_fun(S),
  • jac=ALMAlgorithm._make_sum1_jac(S),
  • name=f"sum_to_one_block_{lift_idx[0]}",
  • family="block_sum1",
  • ))
  • if blk.enforce_one_hot:
  • k = len(lift_idx)
  • M = ALMAlgorithm._pairwise_M(k)
  • eqs.append(ALMConstraint(
  • "eq",
  • fun=ALMAlgorithm._make_onehot_fun(lift_idx, M),
  • jac=ALMAlgorithm._make_onehot_jac(lift_idx, M, n),
  • name=f"onehot_block_{lift_idx[0]}",
  • family="block_onehot",
  • ))
  • return eqs
  • block_eqs = _install_block_equalities()
  • # Unified equality list (order is fixed for whole run)
  • full_eqs = problem_eqs + block_eqs
  • # Allocate multipliers for every equality in full_eqs
  • lam_eq = []
  • for csp in full_eqs:
  • r0 = csp.fun(x).reshape(-1)
  • lam_eq.append(np.zeros_like(r0, dtype=float))
  • # Inequality multipliers per user inequality
  • mu_ineq = []
  • for csp in problem_ineqs:
  • r0 = csp.fun(x).reshape(-1)
  • mu_ineq.append(np.zeros_like(r0, dtype=float))
  • rho_eq_family = dict(getattr(cfg, "rho_eq_family", {}) or {})
  • rho_ineq_family = dict(getattr(cfg, "rho_ineq_family", {}) or {})
  • def _rho_for(csp_k: ALMConstraint) -> float:
  • fam = getattr(csp_k, "family", "") or ""
  • if csp_k.kind == "eq":
  • return float(rho_eq_family.get(fam, rho_h))
  • else:
  • return float(rho_ineq_family.get(fam, rho_g))
  • # -------- running stats for adaptive penalties --------
  • rho_h, rho_g = cfg.rho_h, cfg.rho_g
  • best_eq, best_ineq = np.inf, np.inf
  • no_imp_eq = no_imp_ineq = 0
  • prev_eq_inf, prev_ineq_inf = np.inf, np.inf
  • eps = 1e-12
  • prev_eq_inf_by_family, prev_ineq_inf_by_family = {}, {}
  • # ---- per-family stagnation tracking (only used when cfg.adapt_by_family=True)
  • best_eq_by_family: Dict[str, float] = {}
  • best_ineq_by_family: Dict[str, float] = {}
  • no_imp_eq_by_family: Dict[str, int] = {}
  • no_imp_ineq_by_family: Dict[str, int] = {}
  • hist = {"eq_inf": [], "ineq_inf": [], "obj": [], "x": [],
  • # per-iteration logs for parameters/multipliers
  • "rho_h": [], "rho_g": [],
  • "rho_eq_family": [], "rho_ineq_family": [],
  • "eq_inf_by_family": [], "ineq_inf_by_family": [],
  • # critical for debugging hardware compatibility (per-iteration)
  • "dynamic_range": [],
  • }
  • for k_idx, csp in enumerate(full_eqs):
  • if csp.kind != "eq":
  • continue
  • hist[f"lam_eq_max_idx{k_idx}"] = []
  • hist[f"lam_eq_min_idx{k_idx}"] = []
  • for k_idx, csp in enumerate(problem_ineqs):
  • if csp.kind != "ineq":
  • continue
  • hist[f"mu_ineq_max_idx{k_idx}"] = []
  • hist[f"mu_ineq_min_idx{k_idx}"] = []
  • for it in range(cfg.max_outer):
  • # -------- base polynomial (does not include fixed penalties here) --------
  • base_terms: List[PolyTerm] = list(zip(model.indices, model.coefficients))
  • base_poly = ALMAlgorithm._polynomial_from_terms(base_terms)
  • # -------- ALM quadratic/linear pieces (assembled here, kept separate) --------
  • Q_aug = np.zeros((n, n), dtype=float)
  • c_aug = np.zeros(n, dtype=float)
  • have_aug = False
  • # (A) Equalities: linearize h near x^t => (rho/2)||A x - b||^2 + lam^T(Ax - b)
  • for k_idx, csp in enumerate(full_eqs):
  • if csp.kind != "eq":
  • continue
  • h = csp.fun(x).reshape(-1)
  • A = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps)
  • A = np.atleast_2d(A)
  • assert A.shape[1] == n, f"A has {A.shape[1]} cols, expected {n}"
  • # linearization about current x: residual model r(x) = A x - b, with b = A x - h
  • b = A @ x - h
  • rho_k = _rho_for(csp)
  • Qk = 0.5 * rho_k * (A.T @ A)
  • ck = (A.T @ lam_eq[k_idx]) - rho_k * (A.T @ b)
  • Q_aug += Qk
  • c_aug += ck
  • have_aug = True
  • # (B) Inequalities: projected ALM. Linearize g near x^t.
  • for k_idx, csp in enumerate(problem_ineqs):
  • if csp.kind != "ineq":
  • continue
  • g = csp.fun(x).reshape(-1)
  • G = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps)
  • G = np.atleast_2d(G)
  • assert G.shape[1] == n, f"G has {G.shape[1]} cols, expected {n}"
  • d = G @ x - g
  • rho_k = _rho_for(csp)
  • # Activation measure at current iterate; meaning, the current violating inequality components:
  • # g(x) + mu/rho; Powell-Hestenes-Rockafellar shifted residual
  • y = G @ x - d + mu_ineq[k_idx] / rho_k
  • active = (y > cfg.act_tol)
  • if np.any(active):
  • GA = G[active, :]
  • muA = mu_ineq[k_idx][active]
  • gA = g[active]
  • # Q += (rho/2) * GA^T GA
  • Qk = 0.5 * rho_k * (GA.T @ GA)
  • # c += GA^T mu - rho * GA^T (GA x - gA); where GA x - gA is active measures of d = G @ x - g
  • ck = (GA.T @ muA) - rho_k * (GA.T @ (GA @ x - gA))
  • Q_aug += Qk
  • c_aug += ck
  • have_aug = True
  • # -------- build merged polynomial for the core solver --------
  • all_terms = ALMAlgorithm._merge_poly(base_terms, Q_aug if have_aug else None,
  • c_aug if have_aug else None)
  • idxs, coeffs = zip(*[(inds, w) for (inds, w) in all_terms]) if all_terms else ([], [])
  • poly_model = PolynomialModel(list(coeffs), list(idxs))
  • if lb is not None and hasattr(poly_model, "lower_bound"):
  • poly_model.lower_bound = np.asarray(lb, float)
  • if ub is not None and hasattr(poly_model, "upper_bound"):
  • poly_model.upper_bound = np.asarray(ub, float)
  • x_ws = x.copy()
  • # Convention: many cores look for one of these fields if present.
  • # Use one or more to be future-proof; harmless if ignored.
  • setattr(poly_model, "initial_guess", x_ws)
  • setattr(poly_model, "warm_start", x_ws)
  • setattr(poly_model, "x0", x_ws)
  • # -------- inner solve --------
  • if getattr(cfg, "bcd", None) is not None and getattr(cfg.bcd, "enabled", False):
  • # If ALM is operating in lifted space, BCD must receive lifted block
  • # coordinates rather than the original native block indices.
  • if lifted_slices is not None and len(lifted_slices) != len(registry.blocks):
  • raise ValueError("lifted_slices must align 1-to-1 with registry.blocks")
  • breg = ALMAlgorithm._make_bcd_registry(
  • registry,
  • lifted_slices=lifted_slices if has_blocks else None,
  • )
  • bcd_res = BlockCoordinateDescentAlgorithm.run(
  • poly_model,
  • breg,
  • solver,
  • cfg=cfg.bcd,
  • x0=x_ws,
  • verbose=verbose,
  • **solver_kwargs,
  • )
  • x = np.asarray(bcd_res["x"], float).reshape(-1)
  • hist.setdefault("bcd_inner", []).append(bcd_res["hist"])
  • else:
  • out = solver.solve(poly_model, **solver_kwargs)
  • # -------- parse --------
  • if parse_output:
  • x = parse_output(out)
  • else:
  • # default: support (value, x) or `.x` or raw x
  • if isinstance(out, SolutionResults):
  • x = out.solutions[0]
  • elif isinstance(out, tuple) and len(out) == 2:
  • _, x = out
  • elif isinstance(out, dict) and "results" in out and "solutions" in out["results"]:
  • x = out["results"]["solutions"][0]
  • elif isinstance(out, dict) and "x" in out:
  • x = out["x"]
  • else:
  • x = getattr(out, "x", out)
  • x = np.asarray(x, float)
  • # ---- multiplier tracking ----
  • for k_idx, csp in enumerate(full_eqs):
  • if csp.kind != "eq": continue
  • hist[f"lam_eq_max_idx{k_idx}"].append(float(np.max(lam_eq[k_idx])))
  • hist[f"lam_eq_min_idx{k_idx}"].append(float(np.min(lam_eq[k_idx])))
  • for k_idx, csp in enumerate(problem_ineqs):
  • if csp.kind != "ineq": continue
  • hist[f"mu_ineq_max_idx{k_idx}"].append(float(np.max(mu_ineq[k_idx])))
  • hist[f"mu_ineq_min_idx{k_idx}"].append(float(np.min(mu_ineq[k_idx])))
  • # -------- residuals + multiplier updates --------
  • eq_infs = []
  • for k_idx, csp in enumerate(full_eqs):
  • if csp.kind != "eq": continue
  • r = csp.fun(x).reshape(-1)
  • rho_k = _rho_for(csp)
  • lam_eq[k_idx] = lam_eq[k_idx] + rho_k * r
  • if r.size:
  • eq_infs.append(np.max(np.abs(r)))
  • eq_inf = float(np.max(eq_infs)) if eq_infs else 0.0
  • ineq_infs = []
  • for k_idx, csp in enumerate(problem_ineqs):
  • if csp.kind != "ineq": continue
  • r = csp.fun(x).reshape(-1)
  • rho_k = _rho_for(csp)
  • mu_ineq[k_idx] = np.maximum(0.0, mu_ineq[k_idx] + rho_k * r)
  • if r.size:
  • ineq_infs.append(np.max(np.maximum(0.0, r)))
  • ineq_inf = float(np.max(ineq_infs)) if ineq_infs else 0.0
  • assert len(lam_eq) == len(full_eqs)
  • assert len(mu_ineq) == len(problem_ineqs)
  • # ---- per-family residual telemetry ----
  • eq_inf_by_family = {}
  • for k_idx, csp in enumerate(full_eqs):
  • if csp.kind != "eq": continue
  • fam = getattr(csp, "family", "") or ""
  • r = csp.fun(x).reshape(-1)
  • v = float(np.max(np.abs(r))) if r.size else 0.0
  • eq_inf_by_family[fam] = max(eq_inf_by_family.get(fam, 0.0), v)
  • ineq_inf_by_family = {}
  • for k_idx, csp in enumerate(problem_ineqs):
  • if csp.kind != "ineq": continue
  • fam = getattr(csp, "family", "") or ""
  • r = csp.fun(x).reshape(-1)
  • v = float(np.max(np.maximum(0.0, r))) if r.size else 0.0
  • ineq_inf_by_family[fam] = max(ineq_inf_by_family.get(fam, 0.0), v)
  • # evaluate base polynomial only (ca add aug value if want to track full L_A)
  • f_val = (
  • ALMAlgorithm._as_float(base_poly.evaluate(np.asarray(x, dtype=float)))
  • if base_terms else 0.0
  • )
  • # evaluate dynamic range
  • dynamic_range = poly_model.dynamic_range
  • hist["eq_inf"].append(eq_inf); hist["ineq_inf"].append(ineq_inf)
  • hist["obj"].append(float(f_val)); hist["x"].append(x.copy())
  • hist["dynamic_range"].append(float(dynamic_range))
  • # parameter tracking
  • hist["rho_h"].append(float(rho_h)); hist["rho_g"].append(float(rho_g))
  • hist["rho_eq_family"].append(dict(rho_eq_family))
  • hist["rho_ineq_family"].append(dict(rho_ineq_family))
  • hist["eq_inf_by_family"].append(dict(eq_inf_by_family))
  • hist["ineq_inf_by_family"].append(dict(ineq_inf_by_family))
  • if verbose:
  • # show worst 3 equality families by residual, with their rhos
  • eq_items = sorted(eq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True)
  • eq_top = eq_items[:3]
  • eq_str = ",".join([
  • f"{fam or 'eq'}:{val:.2e}@"
  • f"{_rho_for(next(c for c in full_eqs if (getattr(c,'family','') or '')==fam and c.kind=='eq')):.1g}"
  • for fam, val in eq_top
  • ])
  • # show worst 3 inequality families by residual, with their rhos
  • ineq_items = sorted(ineq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True)
  • ineq_top = ineq_items[:3]
  • ineq_str = ",".join([
  • f"{fam or 'ineq'}:{val:.2e}@"
  • f"{_rho_for(next(c for c in problem_ineqs if (getattr(c, 'family', '') or '') == fam and c.kind == 'ineq')):.1g}"
  • for fam, val in ineq_top
  • ])
  • print(f"[ALM {it:02d}] f={f_val:.6g} | eq_inf={eq_inf:.2e} | ineq_inf={ineq_inf:.2e} "
  • f"| rho_h={rho_h:.2e} | rho_g={rho_g:.2e} | eq_fam[{eq_str}] | ineq_fam[{ineq_str}] "
  • f"| dynamic_range={dynamic_range:.4g}")
  • # stopping
  • if eq_inf <= cfg.tol_h and ineq_inf <= cfg.tol_g:
  • if verbose:
  • print(f"[ALM] converged at iter {it}")
  • break
  • # EMA smoothing to reduce jitter
  • if it == 0:
  • eq_inf_smooth = eq_inf
  • ineq_inf_smooth = ineq_inf
  • else:
  • eq_inf_smooth = cfg.ema_alpha * eq_inf + (1 - cfg.ema_alpha) * eq_inf_smooth
  • ineq_inf_smooth = cfg.ema_alpha * ineq_inf + (1 - cfg.ema_alpha) * ineq_inf_smooth
  • # -------- Residual-ratio controller --------
  • if cfg.adapt and it > 0:
  • # per-family rho adaptation for equalities
  • if getattr(cfg, "adapt_by_family", False):
  • # Equality family
  • for fam, cur in eq_inf_by_family.items():
  • prev = max(prev_eq_inf_by_family.get(fam, cur), eps)
  • # rho value for family: if absent, start at rho_h
  • rho_f = float(rho_eq_family.get(fam, rho_h))
  • if cur > cfg.tau_up_h * prev:
  • rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max)
  • elif cur < cfg.tau_down_h * prev:
  • rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min)
  • rho_eq_family[fam] = rho_f
  • # update prev map
  • prev_eq_inf_by_family = dict(eq_inf_by_family)
  • # Inequality family
  • for fam, cur in ineq_inf_by_family.items():
  • prev = max(prev_ineq_inf_by_family.get(fam, cur), eps)
  • # rho value for family: if absent, start at rho_g
  • rho_f = float(rho_ineq_family.get(fam, rho_g))
  • if cur > cfg.tau_up_g * prev:
  • rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max)
  • elif cur < cfg.tau_down_g * prev:
  • rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min)
  • rho_ineq_family[fam] = rho_f
  • # update prev map
  • prev_ineq_inf_by_family = dict(ineq_inf_by_family)
  • else:
  • # Equality group
  • if eq_inf_smooth > cfg.tau_up_h * max(prev_eq_inf, eps): # stalled or not shrinking
  • rho_h = min(cfg.gamma_up * rho_h, cfg.rho_max)
  • elif eq_inf_smooth < cfg.tau_down_h * max(prev_eq_inf, eps): # fast progress, allow relaxation
  • rho_h = max(cfg.gamma_down * rho_h, cfg.rho_min)
  • # Inequality group
  • if ineq_inf_smooth > cfg.tau_up_g * max(prev_ineq_inf, eps):
  • rho_g = min(cfg.gamma_up * rho_g, cfg.rho_max)
  • elif ineq_inf_smooth < cfg.tau_down_g * max(prev_ineq_inf, eps):
  • rho_g = max(cfg.gamma_down * rho_g, cfg.rho_min)
  • else:
  • # per-family rho adaptation for equalities
  • if getattr(cfg, "adapt_by_family", False):
  • # update prev map
  • prev_eq_inf_by_family = dict(eq_inf_by_family)
  • prev_ineq_inf_by_family = dict(ineq_inf_by_family)
  • # -------- Stagnation bump (safety net) --------
  • if cfg.use_stagnation_bump:
  • if getattr(cfg, "adapt_by_family", False):
  • # ---- Equality families ----
  • for fam, cur in eq_inf_by_family.items():
  • # initialize
  • if fam not in best_eq_by_family:
  • best_eq_by_family[fam] = float(cur)
  • no_imp_eq_by_family[fam] = 0
  • if cur <= best_eq_by_family[fam] * (1 - cfg.stagnation_factor):
  • best_eq_by_family[fam] = float(cur)
  • no_imp_eq_by_family[fam] = 0
  • else:
  • no_imp_eq_by_family[fam] += 1
  • if no_imp_eq_by_family[fam] >= cfg.patience_h:
  • rho_f = float(rho_eq_family.get(fam, rho_h))
  • rho_eq_family[fam] = min(2.0 * rho_f, cfg.rho_max)
  • no_imp_eq_by_family[fam] = 0
  • # ---- Inequality families ----
  • for fam, cur in ineq_inf_by_family.items():
  • # initialize
  • if fam not in best_ineq_by_family:
  • best_ineq_by_family[fam] = float(cur)
  • no_imp_ineq_by_family[fam] = 0
  • if cur <= best_ineq_by_family[fam] * (1 - cfg.stagnation_factor):
  • best_ineq_by_family[fam] = float(cur)
  • no_imp_ineq_by_family[fam] = 0
  • else:
  • no_imp_ineq_by_family[fam] += 1
  • if no_imp_ineq_by_family[fam] >= cfg.patience_g:
  • rho_f = float(rho_ineq_family.get(fam, rho_g))
  • rho_ineq_family[fam] = min(2.0 * rho_f, cfg.rho_max)
  • no_imp_ineq_by_family[fam] = 0
  • else:
  • # Equality stagnation
  • if eq_inf <= best_eq * (1 - cfg.stagnation_factor):
  • best_eq = eq_inf; no_imp_eq = 0
  • else:
  • no_imp_eq += 1
  • if no_imp_eq >= cfg.patience_h:
  • rho_h = min(2.0 * rho_h, cfg.rho_max); no_imp_eq = 0
  • # Inequality stagnation
  • if ineq_inf <= best_ineq * (1 - cfg.stagnation_factor):
  • best_ineq = ineq_inf; no_imp_ineq = 0
  • else:
  • no_imp_ineq += 1
  • if no_imp_ineq >= cfg.patience_g:
  • rho_g = min(2.0 * rho_g, cfg.rho_max); no_imp_ineq = 0
  • # -------- finalize for next iteration --------
  • prev_eq_inf = max(eq_inf_smooth, eps)
  • prev_ineq_inf = max(ineq_inf_smooth, eps)
  • # ---- decoding back to native levels (only if blocks) ----
  • decoded_native: Dict[int, float] = {} # maps original var anchor -> chosen level value
  • decoded_lifted: Dict[int, int] = {} # maps lifted start index -> argmax position (optional)
  • if has_blocks:
  • for blk, lift_idx in zip(registry.blocks, lifted_slices):
  • if not lift_idx:
  • continue
  • sl = np.array(lift_idx, int)
  • if len(sl) == 0:
  • continue
  • sblk = x[sl]
  • j = int(np.argmax(sblk)) # which level got selected in the block
  • orig_anchor = int(blk.idx[0]) # anchor original var id for this block
  • decoded_native[orig_anchor] = float(blk.levels[j])
  • decoded_lifted[sl[0]] = j # optional: lifted index -> chosen slot
  • return {
  • "x": x,
  • "decoded": decoded_native if has_blocks else {},
  • "decoded_debug": decoded_lifted if has_blocks else {},
  • "hist": hist,
  • }
Next page

Content

  • Source code for eqc_models.algorithms.alm