Source code for eqc_models.algorithms.alm
- """
- ============================
- Augmented Lagrangian Method
- ============================
- Overview
- ============================
- The augmented Lagrangian method algorithm in eqc-models is for automating parameter
- adjustments to achieve feasible solutions to constrained optimization models using
- entropy quantum computing devices. It supports fractional (continuous), binary and
- general integer variables. ALM also supports enforcement of one-hot constraints,
- inequality constraints and equality constraints without introduciton of slack
- variables.
- This method requires a different approach to modeling than used in model implementations
- of constrained optimization models as direct subclasses of `EqcModel`. To use this approach,
- implement an objective function in one of the subclasses of `EqcModel`. The next step is to
- use a `ConstraintRegistry` object to describe the model constraints. There are many
- parameters available inside the `ALMConfig` class. Only a few of these parameters should
- be adjusted by novice users. Once the model, constraint registry and config objects are
- ready, pass them to an `ALMAlgorithm` instance with a solver and it will iterate over
- parameter adjustments until feasibility tolerance is found or maximum iteration count
- is exceeded.
- The following example is a basic outline of how to use the ALM implementation to
- solve models.
- ::
- model = PolynomialModel()
- solver = Dirac3ContinuousCloudSolver()
- solver_kwargs = {} # add solver parameters as dict here
- config = ALMConfig() # add configuration parameters here
- registry = ConstraintRegistry()
- # add variable domains and constraints here
- algorithm = ALMAlgorithm()
- response = algorithm.run(model, registry, solver, config, **solver_kwargs) # keyword arguments are passed to the solver
- Model Creation
- ============================
- Definining the objective function for the optimization problem can be done in a number of ways.
- There are two base classes on which all other models are based and they define how
- function coefficients are specified. With a `QuadraticModel`, the coefficients are
- specified using a one-dimensional array for the linear terms and a two-dimensional,
- symmetric array or Hermitian matrix for quadratic terms. The `PolynomialModel` way uses two lists,
- one for indices of terms and another for the coefficients of terms. These terms can be
- up to degree five. The two classes just described are preferred for use with ALM because
- most other model classes incorporate constraints into the operator. In the future, ALM will
- provide a way to extract the objective function and constraints from the model separately,
- populating the `ConstraintRegistry` automatically.
- First, choose variable definitions by describing the variable names in a single list.
- Then organize the terms in ascending lexicographic order by index. Finally, create a
- list with coefficient entries corresponding to the index list.
- The example problem used here is a simple logical decision of one variable among 10.
- Those familiar with QUBO formulations
- will recognize the penalties in the quadratic coefficients. This first example sets
- the one-hot constraint as penalties in the objective function and adds block constraints
- to each of the variables to enforce 0 or 1. This has the effect of using two time bins
- per binary variable.
- >>> import warnings; warnings.filterwarnings("ignore") # silence warnings
- >>> n = 10
- >>> variables = [f"x_{i}" for i in range(n)]
- Define the indices and coefficients. The solution is `x_1 == 1` and all other `x_i == 0`.
- Variable indices sent to the device are 1-based.
- >>> indices = [(0, i+1) for i in range(n)] + [(i+1, j+1) for i in range(10) for j in range(i, n)]
- >>> coefficients = [-i if i==n-1 else 0 for i in range(n)] + [2 for i in range(n) for j in range(i, n)]
- Define the model, setting the upper bound to 1 on all varaibles.
- >>> model = PolynomialModel(coefficients, indices)
- >>> model.variables = variables
- >>> model.variables[0] == "x_0"
- True
- >>> model.upper_bound = np.ones((n,))
- >>> model.is_discrete = [True for i in range(n)] # for future modeling support
- Begin the ALM setup. One of the features of this algorithm in eqc-models is the ability to enforce
- discrete variables while using the continuous solver. This can allow for mixed integer formulations.
- For binary variables, add a block for each one so that it is forced to 0 or 1. The `sum_to_one`
- parameter is normally true and setting to false here is done because the simplicity of the problem
- allows a more efficient approach.
- >>> reg = ConstraintRegistry()
- >>> for i in range(n): reg.add_block([i], levels=[0, 1], sum_to_one=False)
- Configure `rho_h` (equality constraints) to start at 1 and only allow 4 iterations. Also
- set tolerance high.
- >>> config = ALMConfig(tol_h=0.5, max_outer=4, rho_h=1, rho_g=1, gamma_up=2.0, gamma_down=1.0)
- >>> from eqc_models.solvers import Dirac3ContinuousCloudSolver
- >>> solver = Dirac3ContinuousCloudSolver()
- >>> algo = ALMAlgorithm()
- >>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
- 20...COMPLETE...
- [ALM 00] ...
- ...
- >>> results["decoded"] # had to hard-code to n==10
- {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 9: 1.0}
- Equality constraints
- ========================
- This ALM algorithm can seek feasible solutions with targeted multipliers
- rather than adding penalty multipliers into the objective coefficients and
- adjusting manually, which would be required for more complex problems.
- These constraints should be specified in the constraint registry. Specification
- of the binary variables using the `add_block` call sets up the domain.
- The objective function is formulated directly, purely as the cost of each variable,
- which is the negative value of the variable index.
- >>> indices = [(0, i+1) for i in range(n)]
- >>> coefficients = [-i for i in range(n)]
- >>> model = PolynomialModel(coefficients, indices)
- >>> model.upper_bound = np.ones((n, ))
- >>> model.is_discrete = [True for i in range(n)]
- >>> reg = ConstraintRegistry()
- >>> for i in range(n): reg.add_block([i], levels=[0, 1], sum_to_one=False)
- Now, a new concept. Instead of placing the penalties explicitly in the objective, this statement
- enforces the one-hot constraint by defining a function which is 0 when true and
- either positive or negative when false. The lambda function is passed an encoded
- array for the variable values, so the array must be decoded before evaluating
- the function. To assist in this, a helper function called `level_decoder` is
- available for import. Use this decoder and the solution x to extract the
- values of the variables.
- >>> decoder = level_decoder(n, [0, 1])
- >>> test_x = np.zeros((20,))
- >>> test_x[::2] = 1
- >>> (decoder@test_x==0).all()
- True
- >>> config.max_outer = 15 # added complexity in the definition requires more steps
- >>> config.gamma_up = 1.2
- >>> config.gamma_down = 0.9
- >>> reg.add_equality(lambda x: np.sum(decoder@x) - 1)
- Everything else is the same as in the previous example.
- >>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
- 20...COMPLETE...
- [ALM 00] ...
- ...
- >>> results["decoded"] # had to hard-code to n==10
- {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 9: 1.0}
- Inequality Constraints
- ========================
- Incorporating inequality constraints is not much different. Specify an inequality
- constraint by using the `ConstraintRegistry.add_inequality` method to pass a
- function which is zero or negative when the constraint is met and positive when
- the constraint is not met. This is straightforward for less than or equal to
- constraints and requires negating the function when the constraint is for greater
- than or equal. Here is an example showing a model where less than two variables
- are non zero. When adding the binary variable blocks, the `sum_to_one` parameter
- is left out, which results in the default of true. The use of the sum to one
- parameter gives the algorithm more control over the dual variable computations.
- >>> reg = ConstraintRegistry()
- >>> for i in range(n): reg.add_block([i], levels=[0, 1])
- >>> reg.add_inequality(lambda x: np.sum(decoder@x) - 2) # reuse the decoder from the previous example
- >>> config.gamma_up = 1.2 # slower adjustment
- >>> config.gamma_down = 0.8 # slower adjustment
- >>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
- 20...COMPLETED...
- [ALM 00] ...
- [ALM 05] ...
- ...
- >>> results["decoded"]
- {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 1.0, 9: 1.0}
- """
- from dataclasses import dataclass, field
- from typing import Callable, Dict, List, Tuple, Optional, Sequence, Union
- import numpy as np
- from collections import defaultdict
- from eqc_models.base.polynomial import PolynomialModel
- from eqc_models.base.operators import Polynomial
- from eqc_models.base.results import SolutionResults
- from eqc_models.algorithms.bcd import (
- BCDConfig,
- BCDRegistry,
- BlockCoordinateDescentAlgorithm,
- )
- Array = np.ndarray
- PolyTerm = Tuple[Tuple[int, ...], float]
- [docs]
- def level_decoder(n, levels):
- """
- Return an operator that transforms an encoded solution to a solution
- with a single value per variable. For instance, a sequence of two level
- variables with levels 0 and 1 has values `[0, 1, 1, 0]` is decoded into
- `[1, 0]` with the operator::
- [[ 0, 1],
- [ 0, 1]]
- and used with the expression `op@variable`.
- >>> decoder = level_decoder(3, [0, 1, 2])
- >>> values = np.array([1, 0, 0, 0, 1, 0, 0, 0, 1])
- >>> decoder@values
- array([0., 1., 2.])
-
- """
- level_count = len(levels)
- decoder = np.zeros((n, n*level_count))
- for i in range(n):
- decoder[i, i*level_count:(i+1)*level_count] = levels
- return decoder
- [docs]
- @dataclass
- class ALMConstraint:
- """One constraint family; fun returns a vector; jac returns its Jacobian."""
- kind: str
- fun: Callable[[Array], Array]
- jac: Optional[Callable[[Array], Array]] = None
- name: str = ""
- family: str = ""
- [docs]
- @dataclass
- class ALMBlock:
- """Lifted discrete variable block (optional)."""
- idx: Sequence[int]
- levels: Array
- enforce_sum_to_one: bool = True
- enforce_one_hot: bool = True
- [docs]
- @dataclass
- class ALMConfig:
-
- rho_h: float = 1.0
- rho_g: float = 1.0
- rho_min: float = 1e-3
- rho_max: float = 1e3
-
- rho_eq_family: Dict[str, float] = field(default_factory=dict)
- rho_ineq_family: Dict[str, float] = field(default_factory=dict)
-
- adapt_by_family: bool = True
-
- adapt: bool = True
- tau_up_h: float = 1.05
- tau_down_h: float = 0.95
- tau_up_g: float = 0.90
- tau_down_g: float = 0.50
- gamma_up: float = 1.2
- gamma_down: float = 0.9
-
- tol_h: float = 1e-6
- tol_g: float = 1e-6
- max_outer: int = 100
-
- use_stagnation_bump: bool = False
- patience_h: int = 10
- patience_g: int = 10
- stagnation_factor: float = 1e-3
-
- ema_alpha: float = 1.0
-
- fd_eps: float = 1e-6
-
- act_tol: float = 1e-10
-
- bcd: BCDConfig = field(default_factory=BCDConfig)
- [docs]
- class ConstraintRegistry:
- """
- Holds constraints and block metadata; keeps ALMAlgorithm stateless. Register constraints and
- (optional) lifted-discrete blocks here.
- """
- def __init__(self):
- self.constraints: List[ALMConstraint] = []
- self.blocks: List[ALMBlock] = []
- [docs]
- def add_equality(self, fun, jac=None, name="", family=""):
- self.constraints.append(ALMConstraint("eq", fun, jac, name, family))
- [docs]
- def add_inequality(self, fun, jac=None, name="", family=""):
- self.constraints.append(ALMConstraint("ineq", fun, jac, name, family))
- [docs]
- def add_block(self, idx: Sequence[int], levels: Array, sum_to_one=True, one_hot=True):
- self.blocks.append(ALMBlock(list(idx), np.asarray(levels, float), sum_to_one, one_hot))
- [docs]
- class ALMAlgorithm:
- """Stateless ALM outer loop. Call `run(model, registry, core, cfg, **core_kwargs)`."""
-
- @staticmethod
- def _finite_diff_jac(fun: Callable[[Array], Array], x: Array, eps: float) -> Array:
- y0 = fun(x)
- m = int(np.prod(y0.shape))
- y0 = y0.reshape(-1)
- n = x.size
- J = np.zeros((m, n), dtype=float)
- for j in range(n):
- xp = x.copy()
- xp[j] += eps
- J[:, j] = (fun(xp).reshape(-1) - y0) / eps
- return J
- @staticmethod
- def _pairwise_M(k: int) -> Array:
- return np.ones((k, k), dtype=float) - np.eye(k, dtype=float)
- @staticmethod
- def _sum_to_one_selector(n: int, idx: Sequence[int]) -> Array:
- S = np.zeros((1, n), dtype=float)
- S[0, np.array(list(idx), int)] = 1.0
- return S
- @staticmethod
- def _make_sum1_fun(S):
- return lambda x: S @ x - np.array([1.0])
- @staticmethod
- def _make_sum1_jac(S):
- return lambda x: S
- @staticmethod
- def _make_onehot_fun(sl, M):
- sl = np.array(sl, int)
- def _f(x):
- s = x[sl]
- return np.array([float(s @ (M @ s))])
- return _f
- @staticmethod
- def _make_onehot_jac(sl, M, n):
- sl = np.array(sl, int)
- def _J(x):
- s = x[sl]
- grad_blk = 2.0 * (M @ s)
- J = np.zeros((1, n), dtype=float)
- J[0, sl] = grad_blk
- return J
- return _J
- @staticmethod
- def _as_float(value) -> float:
- arr = np.asarray(value, dtype=float).reshape(-1)
- return float(arr[0]) if arr.size else 0.0
- @staticmethod
- def _polynomial_from_terms(poly_terms: List[PolyTerm]) -> Polynomial:
- if not poly_terms:
- return Polynomial([], [])
- idxs, coeffs = zip(*poly_terms)
- return Polynomial(list(coeffs), list(idxs))
- @staticmethod
- def _poly_value(poly_terms: List[PolyTerm], x: Array) -> float:
- """Backward-compatible wrapper around Polynomial.evaluate."""
- if not poly_terms:
- return 0.0
- poly = ALMAlgorithm._polynomial_from_terms(poly_terms)
- return ALMAlgorithm._as_float(poly.evaluate(np.asarray(x, dtype=float)))
- @staticmethod
- def _merge_poly(poly_terms: Optional[List[PolyTerm]], Q_aug: Optional[Array],
- c_aug: Optional[Array]) -> List[PolyTerm]:
- """
- Merge ALM's quadratic/linear increments (Q_aug, c_aug) into the base polynomial term list `poly_terms`.
- If 'poly_terms' is None, then turn x^T Q_aug x + c_aug^T x into polynomial monomials.
- Terms are of the form:
- ((0, i), w) for linear, ((i, j), w) for quadratic.
- """
- merged = list(poly_terms) if poly_terms is not None else []
- if Q_aug is not None:
- Qs = 0.5 * (Q_aug + Q_aug.T)
- n = Qs.shape[0]
- for i in range(n):
-
- if Qs[i, i] != 0.0:
- merged.append(((i + 1, i + 1), float(Qs[i, i])))
- for j in range(i + 1, n):
- q = 2.0 * Qs[i, j]
- if q != 0.0:
- merged.append(((i + 1, j + 1), float(q)))
- if c_aug is not None:
- for i, ci in enumerate(c_aug):
- if ci != 0.0:
- merged.append(((0, i + 1), float(ci)))
- return merged
- @staticmethod
- def _block_offsets(blocks: List[ALMBlock]) -> List[int]:
- """
- Return starting offsets (0-based) for each lifted block in the
- concatenated s-vector.
- """
- offs, pos = [], 0
- for blk in blocks:
- offs.append(pos)
- pos += len(blk.levels)
- return offs
- [docs]
- @staticmethod
- def lift_Qc_to_poly_terms(
- Q_native: np.ndarray,
- c_native: np.ndarray,
- blocks: List[ALMBlock],
- ) -> Tuple[List[Tuple[int, ...]], List[float]]:
- """
- Expand a quadratic/linear objective over m native discrete variables into the lifted
- (one-hot) s-space.
- Given `Q_native` and `c_native` over original m discrete vars (one block per var), and
- `blocks` (`list[ALMBlock]` in the same order as original vars), we enforce:
- - For variable i with level values b_i (length k_i), we create k_i lifted coords s_{i,a}.
- - Quadratic term: J_ij = Q_native[i,j] * (b_i b_j^T) contributes to pairs (s_{i,a}, s_{j,b})
- - Linear term: C_i = c_native[i] * b_i contributes to s_{i,a}
- Returns polynomial (indices, coeffs) over concatenated s variables.
- """
- m = len(blocks)
- assert Q_native.shape == (m, m), "Q_native must match number of blocks"
- assert c_native.shape == (m,), "c_native must match number of blocks"
- offs = ALMAlgorithm._block_offsets(blocks)
- terms_acc = defaultdict(float)
-
- for i in range(m):
- bi = blocks[i].levels[:, None]
- oi = offs[i]
- for j in range(m):
- bj = blocks[j].levels[:, None]
- oj = offs[j]
- J_ij = Q_native[i, j] * (bi @ bj.T)
- if np.allclose(J_ij, 0.0):
- continue
- ki, kj = bi.shape[0], bj.shape[0]
- for a in range(ki):
- ia = oi + a + 1
- for b in range(kj):
- jb = oj + b + 1
- w = float(J_ij[a, b])
- if w == 0.0:
- continue
- if ia == jb:
- terms_acc[(ia, ia)] += w
- else:
- NOTE:
-
- if i == j:
- w *= 2.0
- i1, i2 = (ia, jb) if ia < jb else (jb, ia)
- terms_acc[(i1, i2)] += w
-
- for i in range(m):
- b = blocks[i].levels
- oi = offs[i]
- for a, val in enumerate(b):
- ia = oi + a + 1
- w = float(c_native[i] * val)
- if w != 0.0:
- terms_acc[(0, ia)] += w
-
- indices = [tuple(k) for k in terms_acc.keys()]
- coeffs = [float(v) for v in terms_acc.values()]
- return indices, coeffs
- @staticmethod
- def _make_bcd_registry(
- registry: "ConstraintRegistry",
- *,
- lifted_slices: Optional[List[List[int]]] = None,
- ) -> BCDRegistry:
- """
- Translate ALM's registry into the standalone BCD registry.
- Important:
- - In native/non-lifted mode, BCD blocks should use the original `blk.idx`.
- - In lifted mode, BCD blocks must use the current working-coordinate slices
- (`lifted_slices`), not the original native indices.
- """
- breg = BCDRegistry()
- for i, blk in enumerate(registry.blocks):
- if lifted_slices is None:
- bcd_idx = list(int(j) for j in blk.idx)
- else:
- bcd_idx = list(int(j) for j in lifted_slices[i])
- breg.add_block(
- bcd_idx,
- name=f"block_{i}",
- levels=None if blk.levels is None else np.asarray(blk.levels, float),
- )
- for c in registry.constraints:
- if c.kind == "eq":
- breg.add_equality(c.fun, name=c.name, family=c.family)
- elif c.kind == "ineq":
- breg.add_inequality(c.fun, name=c.name, family=c.family)
- return breg
-
- [docs]
- @staticmethod
- def run(
- base_model: PolynomialModel,
- registry: ConstraintRegistry,
- solver,
- cfg: ALMConfig = ALMConfig(),
- x0: Optional[Array] = None,
- *,
- parse_output=None,
- verbose: bool = True,
- **solver_kwargs,
- ) -> Dict[str, Union[Array, Dict[int, float], Dict]]:
- """
- Solve with ALM. Keep all ALM state local to this call (no global side-effects).
- Handles three modes:
- (A) No blocks -> continuous; use base_model as-is
- (B) Blocks + native base_model -> lift to s-space
- (C) Blocks + already-lifted base_model -> use as-is (compat)
- Returns:
- {
- "x": final iterate,
- "decoded": {start_idx_of_block: level_value, ...} for lifted blocks,
- "decoded_debug": {start_idx_of_block: native_device_value, ...},
- "hist": { "eq_inf": [...], "ineq_inf": [...], "obj": [...], "x": [...] }
- }
- """
- blocks = registry.blocks
- has_blocks = len(blocks) > 0
-
- if not has_blocks:
-
- model = base_model
-
- n = getattr(model, "n", None)
- if n is None:
- ub = getattr(model, "upper_bound", None)
- lb = getattr(model, "lower_bound", None)
- if ub is not None:
- n = len(ub)
- elif lb is not None:
- n = len(lb)
- else:
-
- n = 0
- for inds in getattr(model, "indices", getattr(model.polynomial, "indices", [])):
- for j in inds:
- if j > 0:
- n = max(n, j)
- lifted_slices: List[List[int]] = []
- else:
-
- target_lifted_n = sum(len(blk.levels) for blk in blocks)
- base_n = getattr(base_model, "n", None)
- def _infer_n_from_terms(pm: PolynomialModel) -> int:
- inds_list = getattr(pm, "indices", getattr(pm.polynomial, "indices", []))
- mx = 0
- for inds in inds_list:
- for j in inds:
- if j > mx:
- mx = j
- return int(mx)
- if base_n is None:
- base_n = _infer_n_from_terms(base_model)
-
- already_lifted = (base_n == target_lifted_n)
- if already_lifted:
-
- model = base_model
- n = target_lifted_n
- else:
-
-
- c_base, Q_base = base_model._quadratic_polynomial_to_qubo_coefficients(
- getattr(base_model, "coefficients", getattr(base_model.polynomial, "coefficients", [])),
- getattr(base_model, "indices", getattr(base_model.polynomial, "indices", [])),
- getattr(base_model, "n")
- )
- assert Q_base.shape[0] == Q_base.shape[1]
- assert c_base.shape[0] == Q_base.shape[0]
- indices_lifted, coeffs_lifted = ALMAlgorithm.lift_Qc_to_poly_terms(Q_base, c_base, blocks)
- model = PolynomialModel(coeffs_lifted, indices_lifted)
- n = target_lifted_n
-
- setattr(model, "lower_bound", np.zeros(n, float))
- setattr(model, "upper_bound", np.ones(n, float))
-
- lifted_slices = []
- pos = 0
- for blk in blocks:
- k = len(blk.levels)
- lifted_slices.append(list(range(pos, pos + k)))
- pos += k
-
- lb = getattr(model, "lower_bound", None)
- ub = getattr(model, "upper_bound", None)
- if x0 is not None:
- x = np.asarray(x0, float).copy()
- else:
-
- if (lb is not None) and (ub is not None) and np.all(np.isfinite(lb)) and np.all(np.isfinite(ub)):
- x = 0.5 * (np.asarray(lb, float) + np.asarray(ub, float))
- else:
- x = np.zeros(n, float)
-
- problem_eqs = [c for c in registry.constraints if c.kind == "eq"]
- problem_ineqs = [c for c in registry.constraints if c.kind == "ineq"]
-
-
- def _install_block_equalities() -> List[ALMConstraint]:
- if not has_blocks:
- return []
- eqs: List[ALMConstraint] = []
- for blk, lift_idx in zip(registry.blocks, lifted_slices):
- if blk.enforce_sum_to_one:
- S = ALMAlgorithm._sum_to_one_selector(n, lift_idx)
- eqs.append(ALMConstraint(
- "eq",
- fun=ALMAlgorithm._make_sum1_fun(S),
- jac=ALMAlgorithm._make_sum1_jac(S),
- name=f"sum_to_one_block_{lift_idx[0]}",
- family="block_sum1",
- ))
- if blk.enforce_one_hot:
- k = len(lift_idx)
- M = ALMAlgorithm._pairwise_M(k)
- eqs.append(ALMConstraint(
- "eq",
- fun=ALMAlgorithm._make_onehot_fun(lift_idx, M),
- jac=ALMAlgorithm._make_onehot_jac(lift_idx, M, n),
- name=f"onehot_block_{lift_idx[0]}",
- family="block_onehot",
- ))
- return eqs
- block_eqs = _install_block_equalities()
-
- full_eqs = problem_eqs + block_eqs
-
- lam_eq = []
- for csp in full_eqs:
- r0 = csp.fun(x).reshape(-1)
- lam_eq.append(np.zeros_like(r0, dtype=float))
-
- mu_ineq = []
- for csp in problem_ineqs:
- r0 = csp.fun(x).reshape(-1)
- mu_ineq.append(np.zeros_like(r0, dtype=float))
- rho_eq_family = dict(getattr(cfg, "rho_eq_family", {}) or {})
- rho_ineq_family = dict(getattr(cfg, "rho_ineq_family", {}) or {})
- def _rho_for(csp_k: ALMConstraint) -> float:
- fam = getattr(csp_k, "family", "") or ""
- if csp_k.kind == "eq":
- return float(rho_eq_family.get(fam, rho_h))
- else:
- return float(rho_ineq_family.get(fam, rho_g))
-
- rho_h, rho_g = cfg.rho_h, cfg.rho_g
- best_eq, best_ineq = np.inf, np.inf
- no_imp_eq = no_imp_ineq = 0
- prev_eq_inf, prev_ineq_inf = np.inf, np.inf
- eps = 1e-12
- prev_eq_inf_by_family, prev_ineq_inf_by_family = {}, {}
-
- best_eq_by_family: Dict[str, float] = {}
- best_ineq_by_family: Dict[str, float] = {}
- no_imp_eq_by_family: Dict[str, int] = {}
- no_imp_ineq_by_family: Dict[str, int] = {}
- hist = {"eq_inf": [], "ineq_inf": [], "obj": [], "x": [],
-
- "rho_h": [], "rho_g": [],
- "rho_eq_family": [], "rho_ineq_family": [],
- "eq_inf_by_family": [], "ineq_inf_by_family": [],
-
- "dynamic_range": [],
- }
- for k_idx, csp in enumerate(full_eqs):
- if csp.kind != "eq":
- continue
- hist[f"lam_eq_max_idx{k_idx}"] = []
- hist[f"lam_eq_min_idx{k_idx}"] = []
- for k_idx, csp in enumerate(problem_ineqs):
- if csp.kind != "ineq":
- continue
- hist[f"mu_ineq_max_idx{k_idx}"] = []
- hist[f"mu_ineq_min_idx{k_idx}"] = []
- for it in range(cfg.max_outer):
-
- base_terms: List[PolyTerm] = list(zip(model.indices, model.coefficients))
- base_poly = ALMAlgorithm._polynomial_from_terms(base_terms)
-
- Q_aug = np.zeros((n, n), dtype=float)
- c_aug = np.zeros(n, dtype=float)
- have_aug = False
-
- for k_idx, csp in enumerate(full_eqs):
- if csp.kind != "eq":
- continue
- h = csp.fun(x).reshape(-1)
- A = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps)
- A = np.atleast_2d(A)
- assert A.shape[1] == n, f"A has {A.shape[1]} cols, expected {n}"
-
- b = A @ x - h
- rho_k = _rho_for(csp)
- Qk = 0.5 * rho_k * (A.T @ A)
- ck = (A.T @ lam_eq[k_idx]) - rho_k * (A.T @ b)
- Q_aug += Qk
- c_aug += ck
- have_aug = True
-
- for k_idx, csp in enumerate(problem_ineqs):
- if csp.kind != "ineq":
- continue
- g = csp.fun(x).reshape(-1)
- G = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps)
- G = np.atleast_2d(G)
- assert G.shape[1] == n, f"G has {G.shape[1]} cols, expected {n}"
- d = G @ x - g
- rho_k = _rho_for(csp)
-
-
- y = G @ x - d + mu_ineq[k_idx] / rho_k
- active = (y > cfg.act_tol)
- if np.any(active):
- GA = G[active, :]
- muA = mu_ineq[k_idx][active]
- gA = g[active]
-
- Qk = 0.5 * rho_k * (GA.T @ GA)
-
- ck = (GA.T @ muA) - rho_k * (GA.T @ (GA @ x - gA))
- Q_aug += Qk
- c_aug += ck
- have_aug = True
-
- all_terms = ALMAlgorithm._merge_poly(base_terms, Q_aug if have_aug else None,
- c_aug if have_aug else None)
- idxs, coeffs = zip(*[(inds, w) for (inds, w) in all_terms]) if all_terms else ([], [])
- poly_model = PolynomialModel(list(coeffs), list(idxs))
- if lb is not None and hasattr(poly_model, "lower_bound"):
- poly_model.lower_bound = np.asarray(lb, float)
- if ub is not None and hasattr(poly_model, "upper_bound"):
- poly_model.upper_bound = np.asarray(ub, float)
- x_ws = x.copy()
-
-
- setattr(poly_model, "initial_guess", x_ws)
- setattr(poly_model, "warm_start", x_ws)
- setattr(poly_model, "x0", x_ws)
-
- if getattr(cfg, "bcd", None) is not None and getattr(cfg.bcd, "enabled", False):
-
-
- if lifted_slices is not None and len(lifted_slices) != len(registry.blocks):
- raise ValueError("lifted_slices must align 1-to-1 with registry.blocks")
- breg = ALMAlgorithm._make_bcd_registry(
- registry,
- lifted_slices=lifted_slices if has_blocks else None,
- )
- bcd_res = BlockCoordinateDescentAlgorithm.run(
- poly_model,
- breg,
- solver,
- cfg=cfg.bcd,
- x0=x_ws,
- verbose=verbose,
- **solver_kwargs,
- )
- x = np.asarray(bcd_res["x"], float).reshape(-1)
- hist.setdefault("bcd_inner", []).append(bcd_res["hist"])
- else:
- out = solver.solve(poly_model, **solver_kwargs)
-
- if parse_output:
- x = parse_output(out)
- else:
-
- if isinstance(out, SolutionResults):
- x = out.solutions[0]
- elif isinstance(out, tuple) and len(out) == 2:
- _, x = out
- elif isinstance(out, dict) and "results" in out and "solutions" in out["results"]:
- x = out["results"]["solutions"][0]
- elif isinstance(out, dict) and "x" in out:
- x = out["x"]
- else:
- x = getattr(out, "x", out)
- x = np.asarray(x, float)
-
- for k_idx, csp in enumerate(full_eqs):
- if csp.kind != "eq": continue
- hist[f"lam_eq_max_idx{k_idx}"].append(float(np.max(lam_eq[k_idx])))
- hist[f"lam_eq_min_idx{k_idx}"].append(float(np.min(lam_eq[k_idx])))
- for k_idx, csp in enumerate(problem_ineqs):
- if csp.kind != "ineq": continue
- hist[f"mu_ineq_max_idx{k_idx}"].append(float(np.max(mu_ineq[k_idx])))
- hist[f"mu_ineq_min_idx{k_idx}"].append(float(np.min(mu_ineq[k_idx])))
-
- eq_infs = []
- for k_idx, csp in enumerate(full_eqs):
- if csp.kind != "eq": continue
- r = csp.fun(x).reshape(-1)
- rho_k = _rho_for(csp)
- lam_eq[k_idx] = lam_eq[k_idx] + rho_k * r
- if r.size:
- eq_infs.append(np.max(np.abs(r)))
- eq_inf = float(np.max(eq_infs)) if eq_infs else 0.0
- ineq_infs = []
- for k_idx, csp in enumerate(problem_ineqs):
- if csp.kind != "ineq": continue
- r = csp.fun(x).reshape(-1)
- rho_k = _rho_for(csp)
- mu_ineq[k_idx] = np.maximum(0.0, mu_ineq[k_idx] + rho_k * r)
- if r.size:
- ineq_infs.append(np.max(np.maximum(0.0, r)))
- ineq_inf = float(np.max(ineq_infs)) if ineq_infs else 0.0
- assert len(lam_eq) == len(full_eqs)
- assert len(mu_ineq) == len(problem_ineqs)
-
- eq_inf_by_family = {}
- for k_idx, csp in enumerate(full_eqs):
- if csp.kind != "eq": continue
- fam = getattr(csp, "family", "") or ""
- r = csp.fun(x).reshape(-1)
- v = float(np.max(np.abs(r))) if r.size else 0.0
- eq_inf_by_family[fam] = max(eq_inf_by_family.get(fam, 0.0), v)
- ineq_inf_by_family = {}
- for k_idx, csp in enumerate(problem_ineqs):
- if csp.kind != "ineq": continue
- fam = getattr(csp, "family", "") or ""
- r = csp.fun(x).reshape(-1)
- v = float(np.max(np.maximum(0.0, r))) if r.size else 0.0
- ineq_inf_by_family[fam] = max(ineq_inf_by_family.get(fam, 0.0), v)
-
- f_val = (
- ALMAlgorithm._as_float(base_poly.evaluate(np.asarray(x, dtype=float)))
- if base_terms else 0.0
- )
-
- dynamic_range = poly_model.dynamic_range
- hist["eq_inf"].append(eq_inf); hist["ineq_inf"].append(ineq_inf)
- hist["obj"].append(float(f_val)); hist["x"].append(x.copy())
- hist["dynamic_range"].append(float(dynamic_range))
-
- hist["rho_h"].append(float(rho_h)); hist["rho_g"].append(float(rho_g))
- hist["rho_eq_family"].append(dict(rho_eq_family))
- hist["rho_ineq_family"].append(dict(rho_ineq_family))
- hist["eq_inf_by_family"].append(dict(eq_inf_by_family))
- hist["ineq_inf_by_family"].append(dict(ineq_inf_by_family))
- if verbose:
-
- eq_items = sorted(eq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True)
- eq_top = eq_items[:3]
- eq_str = ",".join([
- f"{fam or 'eq'}:{val:.2e}@"
- f"{_rho_for(next(c for c in full_eqs if (getattr(c,'family','') or '')==fam and c.kind=='eq')):.1g}"
- for fam, val in eq_top
- ])
-
- ineq_items = sorted(ineq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True)
- ineq_top = ineq_items[:3]
- ineq_str = ",".join([
- f"{fam or 'ineq'}:{val:.2e}@"
- f"{_rho_for(next(c for c in problem_ineqs if (getattr(c, 'family', '') or '') == fam and c.kind == 'ineq')):.1g}"
- for fam, val in ineq_top
- ])
- print(f"[ALM {it:02d}] f={f_val:.6g} | eq_inf={eq_inf:.2e} | ineq_inf={ineq_inf:.2e} "
- f"| rho_h={rho_h:.2e} | rho_g={rho_g:.2e} | eq_fam[{eq_str}] | ineq_fam[{ineq_str}] "
- f"| dynamic_range={dynamic_range:.4g}")
-
- if eq_inf <= cfg.tol_h and ineq_inf <= cfg.tol_g:
- if verbose:
- print(f"[ALM] converged at iter {it}")
- break
-
- if it == 0:
- eq_inf_smooth = eq_inf
- ineq_inf_smooth = ineq_inf
- else:
- eq_inf_smooth = cfg.ema_alpha * eq_inf + (1 - cfg.ema_alpha) * eq_inf_smooth
- ineq_inf_smooth = cfg.ema_alpha * ineq_inf + (1 - cfg.ema_alpha) * ineq_inf_smooth
-
- if cfg.adapt and it > 0:
-
- if getattr(cfg, "adapt_by_family", False):
-
- for fam, cur in eq_inf_by_family.items():
- prev = max(prev_eq_inf_by_family.get(fam, cur), eps)
-
- rho_f = float(rho_eq_family.get(fam, rho_h))
- if cur > cfg.tau_up_h * prev:
- rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max)
- elif cur < cfg.tau_down_h * prev:
- rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min)
- rho_eq_family[fam] = rho_f
-
- prev_eq_inf_by_family = dict(eq_inf_by_family)
-
- for fam, cur in ineq_inf_by_family.items():
- prev = max(prev_ineq_inf_by_family.get(fam, cur), eps)
-
- rho_f = float(rho_ineq_family.get(fam, rho_g))
- if cur > cfg.tau_up_g * prev:
- rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max)
- elif cur < cfg.tau_down_g * prev:
- rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min)
- rho_ineq_family[fam] = rho_f
-
- prev_ineq_inf_by_family = dict(ineq_inf_by_family)
- else:
-
- if eq_inf_smooth > cfg.tau_up_h * max(prev_eq_inf, eps):
- rho_h = min(cfg.gamma_up * rho_h, cfg.rho_max)
- elif eq_inf_smooth < cfg.tau_down_h * max(prev_eq_inf, eps):
- rho_h = max(cfg.gamma_down * rho_h, cfg.rho_min)
-
- if ineq_inf_smooth > cfg.tau_up_g * max(prev_ineq_inf, eps):
- rho_g = min(cfg.gamma_up * rho_g, cfg.rho_max)
- elif ineq_inf_smooth < cfg.tau_down_g * max(prev_ineq_inf, eps):
- rho_g = max(cfg.gamma_down * rho_g, cfg.rho_min)
- else:
-
- if getattr(cfg, "adapt_by_family", False):
-
- prev_eq_inf_by_family = dict(eq_inf_by_family)
- prev_ineq_inf_by_family = dict(ineq_inf_by_family)
-
- if cfg.use_stagnation_bump:
- if getattr(cfg, "adapt_by_family", False):
-
- for fam, cur in eq_inf_by_family.items():
-
- if fam not in best_eq_by_family:
- best_eq_by_family[fam] = float(cur)
- no_imp_eq_by_family[fam] = 0
- if cur <= best_eq_by_family[fam] * (1 - cfg.stagnation_factor):
- best_eq_by_family[fam] = float(cur)
- no_imp_eq_by_family[fam] = 0
- else:
- no_imp_eq_by_family[fam] += 1
- if no_imp_eq_by_family[fam] >= cfg.patience_h:
- rho_f = float(rho_eq_family.get(fam, rho_h))
- rho_eq_family[fam] = min(2.0 * rho_f, cfg.rho_max)
- no_imp_eq_by_family[fam] = 0
-
- for fam, cur in ineq_inf_by_family.items():
-
- if fam not in best_ineq_by_family:
- best_ineq_by_family[fam] = float(cur)
- no_imp_ineq_by_family[fam] = 0
- if cur <= best_ineq_by_family[fam] * (1 - cfg.stagnation_factor):
- best_ineq_by_family[fam] = float(cur)
- no_imp_ineq_by_family[fam] = 0
- else:
- no_imp_ineq_by_family[fam] += 1
- if no_imp_ineq_by_family[fam] >= cfg.patience_g:
- rho_f = float(rho_ineq_family.get(fam, rho_g))
- rho_ineq_family[fam] = min(2.0 * rho_f, cfg.rho_max)
- no_imp_ineq_by_family[fam] = 0
- else:
-
- if eq_inf <= best_eq * (1 - cfg.stagnation_factor):
- best_eq = eq_inf; no_imp_eq = 0
- else:
- no_imp_eq += 1
- if no_imp_eq >= cfg.patience_h:
- rho_h = min(2.0 * rho_h, cfg.rho_max); no_imp_eq = 0
-
- if ineq_inf <= best_ineq * (1 - cfg.stagnation_factor):
- best_ineq = ineq_inf; no_imp_ineq = 0
- else:
- no_imp_ineq += 1
- if no_imp_ineq >= cfg.patience_g:
- rho_g = min(2.0 * rho_g, cfg.rho_max); no_imp_ineq = 0
-
- prev_eq_inf = max(eq_inf_smooth, eps)
- prev_ineq_inf = max(ineq_inf_smooth, eps)
-
- decoded_native: Dict[int, float] = {}
- decoded_lifted: Dict[int, int] = {}
- if has_blocks:
- for blk, lift_idx in zip(registry.blocks, lifted_slices):
- if not lift_idx:
- continue
- sl = np.array(lift_idx, int)
- if len(sl) == 0:
- continue
- sblk = x[sl]
- j = int(np.argmax(sblk))
- orig_anchor = int(blk.idx[0])
- decoded_native[orig_anchor] = float(blk.levels[j])
- decoded_lifted[sl[0]] = j
- return {
- "x": x,
- "decoded": decoded_native if has_blocks else {},
- "decoded_debug": decoded_lifted if has_blocks else {},
- "hist": hist,
- }