Source code for eqc_models.allocation.portmomentum
- import os
- import sys
- import time
- import datetime
- import json
- import warnings
- from functools import wraps
- import numpy as np
- import pandas as pd
- from .portbase import PortBase
- [docs]
- class PortMomentum(PortBase):
-     def __init__(
-         self,
-         stocks: list,
-         stock_data_dir: str,
-         adj_date: str,
-         lookback_days: int = 60,
-         window_days: int = 30,
-         window_overlap_days: int = 15,
-         weight_upper_limit: float = 0.08,
-         r_base: float = 0.05 / 365,
-         alpha: float = 5.0,
-         beta: float = 1.0,
-         xi: float = 1.0,
-     ):
-         self.stocks = stocks
-         self.data_dir = stock_data_dir
-         self.adj_date = adj_date
-         self.lookback_days = lookback_days
-         self.window_days = window_days
-         self.window_overlap_days = window_overlap_days
-         self.weight_upper_limit = weight_upper_limit
-         self.r_base = r_base
-         self.alpha = alpha
-         self.beta = beta
-         self.xi = xi
-         self._H = self.build()
- [docs]
-     def get_hamiltonian(
-         self,
-         return_df,
-         min_date,
-         max_date,
-     ):
-         stocks = self.stocks
-         xi = self.xi
-         window_days = self.window_days
-         window_overlap_days = self.window_overlap_days
-         weight_upper_limit = self.weight_upper_limit
-         
-         K = len(stocks)
-         
-         Q = np.zeros(shape=(K, K), dtype=np.float32)
-         p_vec = np.zeros(shape=(K), dtype=np.float32)
-         m = 0
-         min_date = pd.to_datetime(min_date)
-         max_date = pd.to_datetime(max_date)
-         tmp_date = min_date
-         while tmp_date <= max_date:
-             tmp_min_date = tmp_date
-             tmp_max_date = tmp_date + datetime.timedelta(days=window_days)
-             tmp_df = return_df[
-                 (return_df["Date"] >= tmp_min_date)
-                 & (return_df["Date"] <= tmp_max_date)
-             ]
-             r_list = []
-             for i in range(K):
-                 r_list.append(np.array(tmp_df[stocks[i]]))
-             Q_tmp = np.cov(r_list)
-             for i in range(K):
-                 p_vec[i] += -self.r_base * np.mean(r_list[i])
-                 for j in range(K):
-                     Q[i][j] += Q_tmp[i][j]
-             tmp_date += datetime.timedelta(
-                 days=window_days - window_overlap_days,
-             )
-             m += 1
-         fct = m
-         if fct > 0:
-             fct = 1.0 / fct
-         p_vec = fct * p_vec
-         Q = fct * Q
-         
-         J_no_limit = xi * Q
-         C_no_limit = p_vec
-         
-         J_no_limit = 0.5 * (J_no_limit + J_no_limit.transpose())
-         if weight_upper_limit is None:
-             return J_no_limit, C_no_limit, 100.0
-         W_max = 100.0 * weight_upper_limit
-         J = np.zeros(shape=(2 * K, 2 * K), dtype=np.float32)
-         C = np.zeros(shape=(2 * K), dtype=np.float32)
-         for i in range(K):
-             for j in range(K):
-                 J[i][j] = J_no_limit[i][j] + self.alpha
-             J[i][i] += self.beta
-             J[i][i + K] += self.beta
-             J[i + K][i] += self.beta
-             J[i + K][i + K] += self.beta
-             C[i] = (
-                 C_no_limit[i] - 200.0 * self.alpha - 2 * self.beta * W_max
-             )
-             C[i + K] = -2 * self.beta * W_max
-         C = C.reshape((C.shape[0], 1))
-         
-         stocks = self.stocks
-         K = len(stocks)
-         assert J.shape[0] == K or J.shape[0] == 2 * K
-         assert J.shape[1] == K or J.shape[0] == 2 * K
-         assert C.shape[0] == K or C.shape[0] == 2 * K
-         return J, C, K * W_max