Coverage for rivapy / marketdata_tools / pfc_shifter.py: 96%

146 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-27 14:36 +0000

1import itertools 

2import pandas as pd 

3import numpy as np 

4import datetime as dt 

5import rivapy.tools.interfaces as interfaces 

6import rivapy.tools._validators as validators 

7from rivapy.instruments.energy_futures_specifications import EnergyFutureSpecifications 

8 

9# from rivapy.instruments import EnergyFutureSpecifications 

10from typing import Dict, Set, List, Any 

11from collections import defaultdict 

12 

13 

14def validate_class_input(func): 

15 def validate_wrapper(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]): 

16 validators._check_pandas_index_for_datetime(dataframe=shape) 

17 # if isinstance(shape, pd.DataFrame): 

18 # if not isinstance(shape.index, pd.DatetimeIndex): 

19 # raise TypeError("The index of the shape DataFrame is not of type pd.DatetimeIndex!") 

20 # else: 

21 # raise TypeError("The shape argument is not of type pd.DataFrame!") 

22 

23 contract_scheduled_dates = set(np.concatenate([contract.get_schedule() for contract in contracts])) 

24 expected_dates = set(shape.index) 

25 date_diff = expected_dates - contract_scheduled_dates 

26 if len(date_diff) != 0: 

27 raise ValueError("The contract dates do not cover each date provided by the shape DataFrame!") 

28 func(self, shape, contracts) 

29 

30 return validate_wrapper 

31 

32 

33class PFCShifter(interfaces.FactoryObject): 

34 """A shifting methodology for PFC shapes. This class gets a PFC shape as an input and shifts it in such a way, that the resulting PFC contains the future prices defined in the ``contracts`` dictionary. 

35 We follow the methodology described here: https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2706366 

36 

37 Args: 

38 shape (pd.DataFrame): PFC shape, where the ``DataFrame`` index are ``datetime`` objects. 

39 contracts (Dict[str, EnergyFutureSpecifications]): Dictionary containing the future contracts (``EnergyFutureSpecifications`` objects) 

40 

41 Usage: 

42 

43 .. highlight:: python 

44 .. code-block:: python 

45 

46 # iterative usage 

47 pfc_shifter = PFCShifter(shape=shape, contracts=contracts) 

48 transition_matrix = pfc_shifter.generate_transition_matrix() 

49 transition_matrix = pfc_shifter.detect_redundant_contracts(transition_matrix) 

50 transition_matrix = pfc_shifter.generate_synthetic_contracts(transition_matrix) 

51 pfc = pfc_shifter.shift(transition_matrix) 

52 

53 # direct call 

54 pfc_shifter = PFCShifter(shape=shape, contracts=contracts) 

55 pfc = pfc_shifter.compute() 

56 

57 """ 

58 

59 @validate_class_input 

60 def __init__(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]) -> None: 

61 self.shape = shape 

62 self.contracts = {contract.name: contract for contract in contracts} 

63 self._redundant_contracts: Dict[str, EnergyFutureSpecifications] = {} 

64 self._synthetic_contracts: Dict[str, EnergyFutureSpecifications] = {} 

65 

66 def _get_contract_start_end_dates(self) -> List[dt.datetime]: 

67 """Returns a sorted list combined of all start and end ``datetime`` objects for each contract. 

68 

69 Returns: 

70 List[dt.datetime]: Sorted list of start and end dates 

71 """ 

72 dates = set() 

73 for contract_schedule in self.contracts.values(): 

74 dates.update(contract_schedule.get_start_end()) 

75 return sorted(list(dates)) 

76 

77 def _get_forward_price_vector(self) -> np.ndarray: 

78 """Returns a vector containing the forward/future prices of the contracts and potentiall synthetic contracts. 

79 

80 Returns: 

81 np.ndarray: Numpy array of forward/future prices 

82 """ 

83 _dict = {**self.contracts, **self._synthetic_contracts} 

84 return np.array([contract.get_price() for contract in _dict.values()]).reshape(-1, 1) 

85 

86 def compute(self) -> pd.DataFrame: 

87 """Compute method to directly call all the individual steps involved for the shifting 

88 

89 Returns: 

90 pd.DataFrame: Shifted PFC shape 

91 """ 

92 transition_matrix = self.generate_transition_matrix() 

93 transition_matrix = self.detect_redundant_contracts(transition_matrix=transition_matrix) 

94 transition_matrix = self.generate_synthetic_contracts(transition_matrix=transition_matrix) 

95 return self.shift(transition_matrix=transition_matrix) 

96 

97 def generate_transition_matrix(self) -> pd.DataFrame: 

98 """The transition matrix is the basis of the shifting algorithm. This method generates a (n x m) matrix with zero and one entries, where n is the number of contracts and m are start and end dates for the delivery periods. 

99 Hence, the matrix row vectors indicate the delivery periods of each contract. Note that the latest delivery end date is not displayed in the transition matrix. 

100 

101 

102 Returns: 

103 pd.DataFrame: Transition matrix containing zeros and ones indicating delivery periods of individual contracts. 

104 """ 

105 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates()) 

106 

107 transition_df = pd.DataFrame( 

108 data=np.zeros((len(self.contracts.keys()), len(contract_start_and_end_dates))), 

109 index=list(self.contracts.keys()), 

110 columns=contract_start_and_end_dates, 

111 ) 

112 

113 for contract_name, contract_schedule in self.contracts.items(): 

114 idx = contract_start_and_end_dates.searchsorted(list(contract_schedule.get_start_end()), "right") - 1 

115 

116 if idx[0] == idx[1]: 

117 transition_df.iloc[transition_df.index == contract_name, idx[0]] = 1 

118 else: 

119 transition_df.iloc[transition_df.index == contract_name, idx[0] : idx[1]] = 1 

120 

121 return transition_df.iloc[:, :-1] # drop the last column for the transition matrix 

122 

123 def detect_redundant_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: 

124 """In order to obtain an invertable matrix, the matrix must be of full rank. Linear dependent contracts will yield linear dependent row vectors. 

125 This is the case if e.g. a Cal Base and all four quarter contracts are provided. This method finds all redundant (linear dependent) contracts and 

126 omits the last found linear dependent contract in order to make sure that the row vectors are linearly independent. 

127 

128 Args: 

129 transition_matrix (pd.DataFrame): Transition matrix generated by the ``generate_transition_matrix`` method. 

130 

131 Returns: 

132 pd.DataFrame: Transition matrix without linearly dependent row vectors. 

133 """ 

134 if transition_matrix.shape == (1, 1): 

135 return transition_matrix 

136 

137 potential_redundant_contracts = [] 

138 np_transition_matrix = transition_matrix.to_numpy() 

139 for i in range(len(transition_matrix)): 

140 lst = list(range(len(transition_matrix))) 

141 lst.remove(i) 

142 if np.linalg.matrix_rank(np_transition_matrix[lst, :]) == np.linalg.matrix_rank(np_transition_matrix): 

143 potential_redundant_contracts.append(i) 

144 

145 base_matrix = np.delete(np_transition_matrix, potential_redundant_contracts, axis=0) 

146 

147 detected_redundant_contracts = [] 

148 if len(potential_redundant_contracts) != 0: 

149 for contract_idx in potential_redundant_contracts: 

150 _temp_matrix = np.concatenate([base_matrix, np_transition_matrix[contract_idx, :].reshape(1, -1)], axis=0) 

151 # in case all contracts are potentially redundant 

152 if base_matrix.shape[0] == 0: 

153 ref_rank = 0 

154 else: 

155 ref_rank = np.linalg.matrix_rank(base_matrix) 

156 if np.linalg.matrix_rank(_temp_matrix) > ref_rank: 

157 base_matrix = _temp_matrix 

158 else: 

159 print(f"Found redundant contract: {transition_matrix.index[contract_idx]}") 

160 detected_redundant_contracts.append(transition_matrix.index[contract_idx]) 

161 

162 # update the contracts dictionary, but still keep the information about the redundant contracts 

163 self._redundant_contracts = {} 

164 for contract in detected_redundant_contracts: 

165 self._redundant_contracts[contract] = self.contracts[contract] 

166 del self.contracts[contract] # <- keep an eye on that line 

167 return transition_matrix.loc[~transition_matrix.index.isin(detected_redundant_contracts), :] 

168 

169 def generate_synthetic_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: 

170 """In order to fulfill the requirement of an invertable transition matrix, not only the row vectors but also the 

171 column vectors must generate a basis. In cases where m > n, we need to additionally generate synthetic contracts. 

172 The delivery period for the synthetic contracts are chosen in such a way that the column vectors become linearly independent. 

173 The forward price for each synthetic contract is computed based on the rations of the average shape values over the corresponding delivery period of the synthetic contract and a reference contract. 

174 The shape ratio is multiplied with the forward price of the reference contract in order to obtain a forward price for the synthetic contract. 

175 The reference contract is implemented to be always the first contract in the ``contracts`` dictionary. 

176 

177 Args: 

178 transition_matrix (pd.DataFrame): Transition matrix generated by the ``detect_redundant_contracts`` method. 

179 

180 Returns: 

181 pd.DataFrame: Full rank transition matrix 

182 """ 

183 if transition_matrix.shape == (1, 1): 

184 return transition_matrix 

185 

186 m, n = transition_matrix.shape 

187 target_rank = max(m, n) 

188 transition_matrix = transition_matrix.copy() 

189 

190 np_transition_matrix = transition_matrix.to_numpy() 

191 current_rank = np.linalg.matrix_rank(np_transition_matrix) 

192 if current_rank == target_rank: 

193 return transition_matrix 

194 else: 

195 synthetic_contracts = defaultdict(list) 

196 for i in range(target_rank - m): 

197 # compute the most current rank 

198 updated_rank = np.linalg.matrix_rank(np_transition_matrix) 

199 linear_dep_candidates = [] 

200 

201 for j in range(n): 

202 lst = list(range(n)) 

203 lst.remove(j) 

204 tmp_rank = np.linalg.matrix_rank(np_transition_matrix[:, lst]) 

205 if tmp_rank == updated_rank: 

206 # linear dependent 

207 linear_dep_candidates.append(j) 

208 

209 # iteratively test if, adding a further row with a '1' entry for the specific column 

210 # yields a larger matrix rank 

211 tmp_matrix = np.concatenate([np_transition_matrix, np.zeros((1, n))], axis=0) 

212 tmp_rank = updated_rank 

213 for ld_id in linear_dep_candidates: 

214 tmp_matrix[-1, ld_id] = 1 

215 test_rank = np.linalg.matrix_rank(tmp_matrix) 

216 if test_rank > tmp_rank: 

217 tmp_rank = test_rank 

218 synthetic_contracts[i].append(ld_id) 

219 else: 

220 # if the column does not yield a higher matrix rank, revoke the changes 

221 tmp_matrix[-1, ld_id] = 0 

222 # set the new matrix, such that the most current rank can be computed 

223 np_transition_matrix = tmp_matrix 

224 

225 # get reference contract information to calculate a price for the synthetic contracts 

226 reference_contract = list(self.contracts.keys())[0] 

227 reference_mean_shape = self.shape.loc[self.contracts[reference_contract].get_schedule(), :].mean(axis=0) 

228 reference_price = self.contracts[reference_contract].get_price() 

229 

230 date_list = self._get_contract_start_end_dates() 

231 for row_id, column_ids in dict(synthetic_contracts).items(): 

232 _temp_df_shape = None 

233 for column_id in column_ids: 

234 cond1 = self.shape.index >= date_list[column_id] 

235 if column_id == n: 

236 cond2 = self.shape.index <= date_list[column_id + 1] 

237 else: 

238 cond2 = self.shape.index < date_list[column_id + 1] 

239 

240 if _temp_df_shape is None: 

241 _temp_df_shape = self.shape.loc[(cond1) & (cond2), :] 

242 else: 

243 _temp_df_shape = pd.concat([_temp_df_shape, self.shape.loc[(cond1) & (cond2), :]], axis=0) 

244 

245 mean_shape = np.mean(_temp_df_shape, axis=0) 

246 name = f"Synth_Contr_{row_id+1}" 

247 self._synthetic_contracts[name] = EnergyFutureSpecifications( 

248 schedule=None, price=(mean_shape * reference_price / reference_mean_shape).iloc[0], name=name 

249 ) 

250 

251 _data = np.zeros((n)) 

252 _data[column_ids] = 1 

253 _df = pd.DataFrame([_data], index=[name], columns=transition_matrix.columns) 

254 transition_matrix = pd.concat([transition_matrix, _df], axis=0) 

255 return transition_matrix 

256 

257 def shift(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: 

258 r"""This method is the final step in the shifting algorithm. The transition matrix is inversed and multiplied with the forward price vector to obtain a non overlapping forward price vector. 

259 

260 .. math:: 

261 

262 f^{no} = T^{-1}\cdot f 

263 

264 Where: 

265 

266 - :math:`f^{no}` is the Non-overlapping forward price vector 

267 

268 - :math:`T` is the Transition matrix 

269 

270 - :math:`f` is the Forward price vector 

271 

272 Afterwards the PFC :math:`S(t)` is obtained from the shape :math:`s(t)` by the follwoing formular: 

273 

274 .. math:: 

275 S(t) = s(t)\cdot \frac{\sum_{u=T_s}^{T_e} f^{no}(u)}{\sum_{u=T_s}^{T_e} s(u)} 

276 

277 with :math:`T_s` and :math:`T_e` being the start and end dates of the individual delivery periods. 

278 

279 Args: 

280 transition_matrix (pd.DataFrame): Full rank transition matrix generated by the ``generate_synthetic_contracts`` method 

281 

282 Returns: 

283 pd.DataFrame: Shifted shape. 

284 """ 

285 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates()) 

286 contract_schedules = np.unique(list(itertools.chain(*[contract.get_schedule() for contract in self.contracts.values()]))) 

287 

288 # starting after the first start date, since we want to get the delivery ticks until the next starting date 

289 # side='left since we do not want to consider a match as a delivery tick 

290 delivery_ticks = np.searchsorted(contract_schedules, contract_start_and_end_dates[1:], side="left") 

291 delivery_ticks_per_period = np.concatenate([np.array([delivery_ticks[0]]), (delivery_ticks[1:] - delivery_ticks[:-1])]) 

292 

293 date_tpls = list(zip(contract_start_and_end_dates[:-1], contract_start_and_end_dates[1:])) 

294 

295 transition_matrix = transition_matrix.to_numpy() * delivery_ticks_per_period 

296 transition_matrix = transition_matrix / np.sum(transition_matrix, axis=1).reshape(-1, 1) 

297 fwd_price_vec = self._get_forward_price_vector() 

298 

299 fwd_price_noc = np.linalg.inv(transition_matrix) @ fwd_price_vec 

300 pfc = self.shape.copy() 

301 # print(date_tpls) 

302 for i, date_tpl in enumerate(date_tpls): 

303 if i == len(date_tpls) - 1: 

304 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index <= date_tpl[1]) 

305 else: 

306 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index < date_tpl[1]) 

307 

308 pfc.iloc[row_filter, 0] = pfc.iloc[row_filter, 0] / np.sum(pfc.iloc[row_filter, 0]) * len(pfc.iloc[row_filter, 0]) * fwd_price_noc[i, 0] 

309 return pfc 

310 

311 def _to_dict(self) -> dict: 

312 return {**{"shape": self.shape}, **{"contracts": [v.to_dict() for v in self.contracts.values()]}}