Coverage for rivapy/marketdata_tools/pfc_shifter.py: 96%

146 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-08-01 15:21 +0000

1import itertools 

2import pandas as pd 

3import numpy as np 

4import datetime as dt 

5import rivapy.tools.interfaces as interfaces 

6import rivapy.tools._validators as validators 

7from rivapy.instruments import EnergyFutureSpecifications 

8from typing import Dict, Set, List, Any 

9from collections import defaultdict 

10 

11 

12def validate_class_input(func): 

13 def validate_wrapper(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]): 

14 validators._check_pandas_index_for_datetime(dataframe=shape) 

15 # if isinstance(shape, pd.DataFrame): 

16 # if not isinstance(shape.index, pd.DatetimeIndex): 

17 # raise TypeError("The index of the shape DataFrame is not of type pd.DatetimeIndex!") 

18 # else: 

19 # raise TypeError("The shape argument is not of type pd.DataFrame!") 

20 

21 contract_scheduled_dates = set(np.concatenate([contract.get_schedule() for contract in contracts])) 

22 expected_dates = set(shape.index) 

23 date_diff = expected_dates - contract_scheduled_dates 

24 if len(date_diff) != 0: 

25 raise ValueError("The contract dates do not cover each date provided by the shape DataFrame!") 

26 func(self, shape, contracts) 

27 

28 return validate_wrapper 

29 

30 

31class PFCShifter(interfaces.FactoryObject): 

32 """A shifting methodology for PFC shapes. This class gets a PFC shape as an input and shifts it in such a way, that the resulting PFC contains the future prices defined in the ``contracts`` dictionary. 

33 We follow the methodology described here: https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2706366 

34 

35 Args: 

36 shape (pd.DataFrame): PFC shape, where the ``DataFrame`` index are ``datetime`` objects. 

37 contracts (Dict[str, EnergyFutureSpecifications]): Dictionary containing the future contracts (``EnergyFutureSpecifications`` objects) 

38 

39 Usage: 

40 

41 .. highlight:: python 

42 .. code-block:: python 

43 

44 # iterative usage 

45 pfc_shifter = PFCShifter(shape=shape, contracts=contracts) 

46 transition_matrix = pfc_shifter.generate_transition_matrix() 

47 transition_matrix = pfc_shifter.detect_redundant_contracts(transition_matrix) 

48 transition_matrix = pfc_shifter.generate_synthetic_contracts(transition_matrix) 

49 pfc = pfc_shifter.shift(transition_matrix) 

50 

51 # direct call 

52 pfc_shifter = PFCShifter(shape=shape, contracts=contracts) 

53 pfc = pfc_shifter.compute() 

54 

55 """ 

56 

57 @validate_class_input 

58 def __init__(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]) -> None: 

59 self.shape = shape 

60 self.contracts = {contract.name: contract for contract in contracts} 

61 self._redundant_contracts: Dict[str, EnergyFutureSpecifications] = {} 

62 self._synthetic_contracts: Dict[str, EnergyFutureSpecifications] = {} 

63 

64 def _get_contract_start_end_dates(self) -> List[dt.datetime]: 

65 """Returns a sorted list combined of all start and end ``datetime`` objects for each contract. 

66 

67 Returns: 

68 List[dt.datetime]: Sorted list of start and end dates 

69 """ 

70 dates = set() 

71 for contract_schedule in self.contracts.values(): 

72 dates.update(contract_schedule.get_start_end()) 

73 return sorted(list(dates)) 

74 

75 def _get_forward_price_vector(self) -> np.ndarray: 

76 """Returns a vector containing the forward/future prices of the contracts and potentiall synthetic contracts. 

77 

78 Returns: 

79 np.ndarray: Numpy array of forward/future prices 

80 """ 

81 _dict = {**self.contracts, **self._synthetic_contracts} 

82 return np.array([contract.get_price() for contract in _dict.values()]).reshape(-1, 1) 

83 

84 def compute(self) -> pd.DataFrame: 

85 """Compute method to directly call all the individual steps involved for the shifting 

86 

87 Returns: 

88 pd.DataFrame: Shifted PFC shape 

89 """ 

90 transition_matrix = self.generate_transition_matrix() 

91 transition_matrix = self.detect_redundant_contracts(transition_matrix=transition_matrix) 

92 transition_matrix = self.generate_synthetic_contracts(transition_matrix=transition_matrix) 

93 return self.shift(transition_matrix=transition_matrix) 

94 

95 def generate_transition_matrix(self) -> pd.DataFrame: 

96 """The transition matrix is the basis of the shifting algorithm. This method generates a (n x m) matrix with zero and one entries, where n is the number of contracts and m are start and end dates for the delivery periods. 

97 Hence, the matrix row vectors indicate the delivery periods of each contract. Note that the latest delivery end date is not displayed in the transition matrix. 

98 

99 

100 Returns: 

101 pd.DataFrame: Transition matrix containing zeros and ones indicating delivery periods of individual contracts. 

102 """ 

103 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates()) 

104 

105 transition_df = pd.DataFrame( 

106 data=np.zeros((len(self.contracts.keys()), len(contract_start_and_end_dates))), 

107 index=list(self.contracts.keys()), 

108 columns=contract_start_and_end_dates, 

109 ) 

110 

111 for contract_name, contract_schedule in self.contracts.items(): 

112 idx = contract_start_and_end_dates.searchsorted(list(contract_schedule.get_start_end()), "right") - 1 

113 

114 if idx[0] == idx[1]: 

115 transition_df.iloc[transition_df.index == contract_name, idx[0]] = 1 

116 else: 

117 transition_df.iloc[transition_df.index == contract_name, idx[0] : idx[1]] = 1 

118 

119 return transition_df.iloc[:, :-1] # drop the last column for the transition matrix 

120 

121 def detect_redundant_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: 

122 """In order to obtain an invertable matrix, the matrix must be of full rank. Linear dependent contracts will yield linear dependent row vectors. 

123 This is the case if e.g. a Cal Base and all four quarter contracts are provided. This method finds all redundant (linear dependent) contracts and 

124 omits the last found linear dependent contract in order to make sure that the row vectors are linearly independent. 

125 

126 Args: 

127 transition_matrix (pd.DataFrame): Transition matrix generated by the ``generate_transition_matrix`` method. 

128 

129 Returns: 

130 pd.DataFrame: Transition matrix without linearly dependent row vectors. 

131 """ 

132 if transition_matrix.shape == (1, 1): 

133 return transition_matrix 

134 

135 potential_redundant_contracts = [] 

136 np_transition_matrix = transition_matrix.to_numpy() 

137 for i in range(len(transition_matrix)): 

138 lst = list(range(len(transition_matrix))) 

139 lst.remove(i) 

140 if np.linalg.matrix_rank(np_transition_matrix[lst, :]) == np.linalg.matrix_rank(np_transition_matrix): 

141 potential_redundant_contracts.append(i) 

142 

143 base_matrix = np.delete(np_transition_matrix, potential_redundant_contracts, axis=0) 

144 

145 detected_redundant_contracts = [] 

146 if len(potential_redundant_contracts) != 0: 

147 for contract_idx in potential_redundant_contracts: 

148 _temp_matrix = np.concatenate([base_matrix, np_transition_matrix[contract_idx, :].reshape(1, -1)], axis=0) 

149 # in case all contracts are potentially redundant 

150 if base_matrix.shape[0] == 0: 

151 ref_rank = 0 

152 else: 

153 ref_rank = np.linalg.matrix_rank(base_matrix) 

154 if np.linalg.matrix_rank(_temp_matrix) > ref_rank: 

155 base_matrix = _temp_matrix 

156 else: 

157 print(f"Found redundant contract: {transition_matrix.index[contract_idx]}") 

158 detected_redundant_contracts.append(transition_matrix.index[contract_idx]) 

159 

160 # update the contracts dictionary, but still keep the information about the redundant contracts 

161 self._redundant_contracts = {} 

162 for contract in detected_redundant_contracts: 

163 self._redundant_contracts[contract] = self.contracts[contract] 

164 del self.contracts[contract] # <- keep an eye on that line 

165 return transition_matrix.loc[~transition_matrix.index.isin(detected_redundant_contracts), :] 

166 

167 def generate_synthetic_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: 

168 """In order to fulfill the requirement of an invertable transition matrix, not only the row vectors but also the 

169 column vectors must generate a basis. In cases where m > n, we need to additionally generate synthetic contracts. 

170 The delivery period for the synthetic contracts are chosen in such a way that the column vectors become linearly independent. 

171 The forward price for each synthetic contract is computed based on the rations of the average shape values over the corresponding delivery period of the synthetic contract and a reference contract. 

172 The shape ratio is multiplied with the forward price of the reference contract in order to obtain a forward price for the synthetic contract. 

173 The reference contract is implemented to be always the first contract in the ``contracts`` dictionary. 

174 

175 Args: 

176 transition_matrix (pd.DataFrame): Transition matrix generated by the ``detect_redundant_contracts`` method. 

177 

178 Returns: 

179 pd.DataFrame: Full rank transition matrix 

180 """ 

181 if transition_matrix.shape == (1, 1): 

182 return transition_matrix 

183 

184 m, n = transition_matrix.shape 

185 target_rank = max(m, n) 

186 transition_matrix = transition_matrix.copy() 

187 

188 np_transition_matrix = transition_matrix.to_numpy() 

189 current_rank = np.linalg.matrix_rank(np_transition_matrix) 

190 if current_rank == target_rank: 

191 return transition_matrix 

192 else: 

193 synthetic_contracts = defaultdict(list) 

194 for i in range(target_rank - m): 

195 # compute the most current rank 

196 updated_rank = np.linalg.matrix_rank(np_transition_matrix) 

197 linear_dep_candidates = [] 

198 

199 for j in range(n): 

200 lst = list(range(n)) 

201 lst.remove(j) 

202 tmp_rank = np.linalg.matrix_rank(np_transition_matrix[:, lst]) 

203 if tmp_rank == updated_rank: 

204 # linear dependent 

205 linear_dep_candidates.append(j) 

206 

207 # iteratively test if, adding a further row with a '1' entry for the specific column 

208 # yields a larger matrix rank 

209 tmp_matrix = np.concatenate([np_transition_matrix, np.zeros((1, n))], axis=0) 

210 tmp_rank = updated_rank 

211 for ld_id in linear_dep_candidates: 

212 tmp_matrix[-1, ld_id] = 1 

213 test_rank = np.linalg.matrix_rank(tmp_matrix) 

214 if test_rank > tmp_rank: 

215 tmp_rank = test_rank 

216 synthetic_contracts[i].append(ld_id) 

217 else: 

218 # if the column does not yield a higher matrix rank, revoke the changes 

219 tmp_matrix[-1, ld_id] = 0 

220 # set the new matrix, such that the most current rank can be computed 

221 np_transition_matrix = tmp_matrix 

222 

223 # get reference contract information to calculate a price for the synthetic contracts 

224 reference_contract = list(self.contracts.keys())[0] 

225 reference_mean_shape = self.shape.loc[self.contracts[reference_contract].get_schedule(), :].mean(axis=0) 

226 reference_price = self.contracts[reference_contract].get_price() 

227 

228 date_list = self._get_contract_start_end_dates() 

229 for row_id, column_ids in dict(synthetic_contracts).items(): 

230 _temp_df_shape = None 

231 for column_id in column_ids: 

232 cond1 = self.shape.index >= date_list[column_id] 

233 if column_id == n: 

234 cond2 = self.shape.index <= date_list[column_id + 1] 

235 else: 

236 cond2 = self.shape.index < date_list[column_id + 1] 

237 

238 if _temp_df_shape is None: 

239 _temp_df_shape = self.shape.loc[(cond1) & (cond2), :] 

240 else: 

241 _temp_df_shape = pd.concat([_temp_df_shape, self.shape.loc[(cond1) & (cond2), :]], axis=0) 

242 

243 mean_shape = np.mean(_temp_df_shape, axis=0) 

244 name = f"Synth_Contr_{row_id+1}" 

245 self._synthetic_contracts[name] = EnergyFutureSpecifications( 

246 schedule=None, price=(mean_shape * reference_price / reference_mean_shape).iloc[0], name=name 

247 ) 

248 

249 _data = np.zeros((n)) 

250 _data[column_ids] = 1 

251 _df = pd.DataFrame([_data], index=[name], columns=transition_matrix.columns) 

252 transition_matrix = pd.concat([transition_matrix, _df], axis=0) 

253 return transition_matrix 

254 

255 def shift(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: 

256 r"""This method is the final step in the shifting algorithm. The transition matrix is inversed and multiplied with the forward price vector to obtain a non overlapping forward price vector. 

257 

258 .. math:: 

259 

260 f^{no} = T^{-1}\\cdot f 

261 

262 Where: 

263 

264 - :math:`f^{no}` is the Non-overlapping forward price vector 

265 - :math:`T` is the Transition matrix 

266 - :math:`f` is the Forward price vector 

267 

268 Afterwards the PFC :math:`S(t)` is obtained from the shape :math:`s(t)` by the follwoing formular: 

269 

270 .. math:: 

271 S(t) = s(t)\\cdot \\frac{\\sum_{u=T_s}^{T_e} f^{no}(u)}{\\sum_{u=T_s}^{T_e} s(u)} 

272 

273 with :math:`T_s` and :math:`T_e` being the start and end dates of the individual delivery periods. 

274 

275 Args: 

276 transition_matrix (pd.DataFrame): Full rank transition matrix generated by the ``generate_synthetic_contracts`` method 

277 

278 Returns: 

279 pd.DataFrame: Shifted shape. 

280 """ 

281 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates()) 

282 contract_schedules = np.unique(list(itertools.chain(*[contract.get_schedule() for contract in self.contracts.values()]))) 

283 

284 # starting after the first start date, since we want to get the delivery ticks until the next starting date 

285 # side='left since we do not want to consider a match as a delivery tick 

286 delivery_ticks = np.searchsorted(contract_schedules, contract_start_and_end_dates[1:], side="left") 

287 delivery_ticks_per_period = np.concatenate([np.array([delivery_ticks[0]]), (delivery_ticks[1:] - delivery_ticks[:-1])]) 

288 

289 date_tpls = list(zip(contract_start_and_end_dates[:-1], contract_start_and_end_dates[1:])) 

290 

291 transition_matrix = transition_matrix.to_numpy() * delivery_ticks_per_period 

292 transition_matrix = transition_matrix / np.sum(transition_matrix, axis=1).reshape(-1, 1) 

293 fwd_price_vec = self._get_forward_price_vector() 

294 

295 fwd_price_noc = np.linalg.inv(transition_matrix) @ fwd_price_vec 

296 pfc = self.shape.copy() 

297 # print(date_tpls) 

298 for i, date_tpl in enumerate(date_tpls): 

299 if i == len(date_tpls) - 1: 

300 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index <= date_tpl[1]) 

301 else: 

302 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index < date_tpl[1]) 

303 

304 pfc.iloc[row_filter, 0] = pfc.iloc[row_filter, 0] / np.sum(pfc.iloc[row_filter, 0]) * len(pfc.iloc[row_filter, 0]) * fwd_price_noc[i, 0] 

305 return pfc 

306 

307 def _to_dict(self) -> dict: 

308 return {**{"shape": self.shape}, **{"contracts": [v.to_dict() for v in self.contracts.values()]}}