Coverage for rivapy/marketdata_tools/pfc_shifter.py: 97%

142 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-05 14:27 +0000

1import itertools 

2import pandas as pd 

3import numpy as np 

4import datetime as dt 

5import rivapy.tools.interfaces as interfaces 

6import rivapy.tools._validators as validators 

7from rivapy.instruments import EnergyFutureSpecifications 

8from typing import Dict, Set, List, Any 

9from collections import defaultdict 

10 

11 

12def validate_class_input(func): 

13 def validate_wrapper(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]): 

14 validators._check_pandas_index_for_datetime(dataframe=shape) 

15 # if isinstance(shape, pd.DataFrame): 

16 # if not isinstance(shape.index, pd.DatetimeIndex): 

17 # raise TypeError("The index of the shape DataFrame is not of type pd.DatetimeIndex!") 

18 # else: 

19 # raise TypeError("The shape argument is not of type pd.DataFrame!") 

20 

21 contract_scheduled_dates = set(np.concatenate([contract.get_schedule() for contract in contracts])) 

22 expected_dates = set(shape.index) 

23 date_diff = expected_dates - contract_scheduled_dates 

24 if len(date_diff) != 0: 

25 raise ValueError("The contract dates do not cover each date provided by the shape DataFrame!") 

26 func(self, shape, contracts) 

27 

28 return validate_wrapper 

29 

30 

31class PFCShifter(interfaces.FactoryObject): 

32 """A shifting methodology for PFC shapes. This class gets a PFC shape as an input and shifts it in such a way, that the resulting PFC contains the future prices defined in the ``contracts`` dictionary. 

33 We follow the methodology described here: https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2706366 

34 

35 Args: 

36 shape (pd.DataFrame): PFC shape, where the ``DataFrame`` index are ``datetime`` objects. 

37 contracts (Dict[str, EnergyFutureSpecifications]): Dictionary containing the future contracts (``EnergyFutureSpecifications`` objects) 

38 

39 Usage: 

40 

41 .. highlight:: python 

42 .. code-block:: python 

43 

44 # iterative usage 

45 pfc_shifter = PFCShifter(shape=shape, contracts=contracts) 

46 transition_matrix = pfc_shifter.generate_transition_matrix() 

47 transition_matrix = pfc_shifter.detect_redundant_contracts(transition_matrix) 

48 transition_matrix = pfc_shifter.generate_synthetic_contracts(transition_matrix) 

49 pfc = pfc_shifter.shift(transition_matrix) 

50 

51 # direct call 

52 pfc_shifter = PFCShifter(shape=shape, contracts=contracts) 

53 pfc = pfc_shifter.compute() 

54 

55 """ 

56 

57 @validate_class_input 

58 def __init__(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]) -> None: 

59 self.shape = shape 

60 self.contracts = {contract.name: contract for contract in contracts} 

61 self._redundant_contracts: Dict[str, EnergyFutureSpecifications] = {} 

62 self._synthetic_contracts: Dict[str, EnergyFutureSpecifications] = {} 

63 

64 def _get_contract_start_end_dates(self) -> List[dt.datetime]: 

65 """Returns a sorted list combined of all start and end ``datetime`` objects for each contract. 

66 

67 Returns: 

68 List[dt.datetime]: Sorted list of start and end dates 

69 """ 

70 dates = set() 

71 for contract_schedule in self.contracts.values(): 

72 dates.update(contract_schedule.get_start_end()) 

73 return sorted(list(dates)) 

74 

75 def _get_forward_price_vector(self) -> np.ndarray: 

76 """Returns a vector containing the forward/future prices of the contracts and potentiall synthetic contracts. 

77 

78 Returns: 

79 np.ndarray: Numpy array of forward/future prices 

80 """ 

81 _dict = {**self.contracts, **self._synthetic_contracts} 

82 return np.array([contract.get_price() for contract in _dict.values()]).reshape(-1, 1) 

83 

84 def compute(self) -> pd.DataFrame: 

85 """Compute method to directly call all the individual steps involved for the shifting 

86 

87 Returns: 

88 pd.DataFrame: Shifted PFC shape 

89 """ 

90 transition_matrix = self.generate_transition_matrix() 

91 transition_matrix = self.detect_redundant_contracts(transition_matrix=transition_matrix) 

92 transition_matrix = self.generate_synthetic_contracts(transition_matrix=transition_matrix) 

93 return self.shift(transition_matrix=transition_matrix) 

94 

95 def generate_transition_matrix(self) -> pd.DataFrame: 

96 """The transition matrix is the basis of the shifting algorithm. This method generates a (n x m) matrix with zero and one entries, where n is the number of contracts and m are start and end dates for the delivery periods. 

97 Hence, the matrix row vectors indicate the delivery periods of each contract. Note that the latest delivery end date is not displayed in the transition matrix. 

98 

99 

100 Returns: 

101 pd.DataFrame: Transition matrix containing zeros and ones indicating delivery periods of individual contracts. 

102 """ 

103 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates()) 

104 

105 transition_df = pd.DataFrame( 

106 data=np.zeros((len(self.contracts.keys()), len(contract_start_and_end_dates))), 

107 index=list(self.contracts.keys()), 

108 columns=contract_start_and_end_dates, 

109 ) 

110 

111 for contract_name, contract_schedule in self.contracts.items(): 

112 idx = contract_start_and_end_dates.searchsorted(list(contract_schedule.get_start_end()), "right") - 1 

113 

114 if idx[0] == idx[1]: 

115 transition_df.iloc[transition_df.index == contract_name, idx[0]] = 1 

116 else: 

117 transition_df.iloc[transition_df.index == contract_name, idx[0] : idx[1]] = 1 

118 

119 return transition_df.iloc[:, :-1] # drop the last column for the transition matrix 

120 

121 def detect_redundant_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: 

122 """In order to obtain an invertable matrix, the matrix must be of full rank. Linear dependent contracts will yield linear dependent row vectors. 

123 This is the case if e.g. a Cal Base and all four quarter contracts are provided. This method finds all redundant (linear dependent) contracts and 

124 omits the last found linear dependent contract in order to make sure that the row vectors are linearly independent. 

125 

126 Args: 

127 transition_matrix (pd.DataFrame): Transition matrix generated by the ``generate_transition_matrix`` method. 

128 

129 Returns: 

130 pd.DataFrame: Transition matrix without linearly dependent row vectors. 

131 """ 

132 potential_redundant_contracts = [] 

133 np_transition_matrix = transition_matrix.to_numpy() 

134 for i in range(len(transition_matrix)): 

135 lst = list(range(len(transition_matrix))) 

136 lst.remove(i) 

137 if np.linalg.matrix_rank(np_transition_matrix[lst, :]) == np.linalg.matrix_rank(np_transition_matrix): 

138 potential_redundant_contracts.append(i) 

139 

140 base_matrix = np.delete(np_transition_matrix, potential_redundant_contracts, axis=0) 

141 

142 detected_redundant_contracts = [] 

143 if len(potential_redundant_contracts) != 0: 

144 for contract_idx in potential_redundant_contracts: 

145 _temp_matrix = np.concatenate([base_matrix, np_transition_matrix[contract_idx, :].reshape(1, -1)], axis=0) 

146 # in case all contracts are potentially redundant 

147 if base_matrix.shape[0] == 0: 

148 ref_rank = 0 

149 else: 

150 ref_rank = np.linalg.matrix_rank(base_matrix) 

151 if np.linalg.matrix_rank(_temp_matrix) > ref_rank: 

152 base_matrix = _temp_matrix 

153 else: 

154 print(f"Found redundant contract: {transition_matrix.index[contract_idx]}") 

155 detected_redundant_contracts.append(transition_matrix.index[contract_idx]) 

156 

157 # update the contracts dictionary, but still keep the information about the redundant contracts 

158 self._redundant_contracts = {} 

159 for contract in detected_redundant_contracts: 

160 self._redundant_contracts[contract] = self.contracts[contract] 

161 del self.contracts[contract] # <- keep an eye on that line 

162 return transition_matrix.loc[~transition_matrix.index.isin(detected_redundant_contracts), :] 

163 

164 def generate_synthetic_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: 

165 """In order to fulfill the requirement of an invertable transition matrix, not only the row vectors but also the 

166 column vectors must generate a basis. In cases where m > n, we need to additionally generate synthetic contracts. 

167 The delivery period for the synthetic contracts are chosen in such a way that the column vectors become linearly independent. 

168 The forward price for each synthetic contract is computed based on the rations of the average shape values over the corresponding delivery period of the synthetic contract and a reference contract. 

169 The shape ratio is multiplied with the forward price of the reference contract in order to obtain a forward price for the synthetic contract. 

170 The reference contract is implemented to be always the first contract in the ``contracts`` dictionary. 

171 

172 Args: 

173 transition_matrix (pd.DataFrame): Transition matrix generated by the ``detect_redundant_contracts`` method. 

174 

175 Returns: 

176 pd.DataFrame: Full rank transition matrix 

177 """ 

178 m, n = transition_matrix.shape 

179 target_rank = max(m, n) 

180 transition_matrix = transition_matrix.copy() 

181 

182 np_transition_matrix = transition_matrix.to_numpy() 

183 current_rank = np.linalg.matrix_rank(np_transition_matrix) 

184 if current_rank == target_rank: 

185 return transition_matrix 

186 else: 

187 synthetic_contracts = defaultdict(list) 

188 for i in range(target_rank - m): 

189 # compute the most current rank 

190 updated_rank = np.linalg.matrix_rank(np_transition_matrix) 

191 linear_dep_candidates = [] 

192 

193 for j in range(n): 

194 lst = list(range(n)) 

195 lst.remove(j) 

196 tmp_rank = np.linalg.matrix_rank(np_transition_matrix[:, lst]) 

197 if tmp_rank == updated_rank: 

198 # linear dependent 

199 linear_dep_candidates.append(j) 

200 

201 # iteratively test if, adding a further row with a '1' entry for the specific column 

202 # yields a larger matrix rank 

203 tmp_matrix = np.concatenate([np_transition_matrix, np.zeros((1, n))], axis=0) 

204 tmp_rank = updated_rank 

205 for ld_id in linear_dep_candidates: 

206 tmp_matrix[-1, ld_id] = 1 

207 test_rank = np.linalg.matrix_rank(tmp_matrix) 

208 if test_rank > tmp_rank: 

209 tmp_rank = test_rank 

210 synthetic_contracts[i].append(ld_id) 

211 else: 

212 # if the column does not yield a higher matrix rank, revoke the changes 

213 tmp_matrix[-1, ld_id] = 0 

214 # set the new matrix, such that the most current rank can be computed 

215 np_transition_matrix = tmp_matrix 

216 

217 # get reference contract information to calculate a price for the synthetic contracts 

218 reference_contract = list(self.contracts.keys())[0] 

219 reference_mean_shape = self.shape.loc[self.contracts[reference_contract].get_schedule(), :].mean(axis=0) 

220 reference_price = self.contracts[reference_contract].get_price() 

221 

222 date_list = self._get_contract_start_end_dates() 

223 for row_id, column_ids in dict(synthetic_contracts).items(): 

224 _temp_df_shape = None 

225 for column_id in column_ids: 

226 cond1 = self.shape.index >= date_list[column_id] 

227 if column_id == n: 

228 cond2 = self.shape.index <= date_list[column_id + 1] 

229 else: 

230 cond2 = self.shape.index < date_list[column_id + 1] 

231 

232 if _temp_df_shape is None: 

233 _temp_df_shape = self.shape.loc[(cond1) & (cond2), :] 

234 else: 

235 _temp_df_shape = pd.concat([_temp_df_shape, self.shape.loc[(cond1) & (cond2), :]], axis=0) 

236 

237 mean_shape = np.mean(_temp_df_shape, axis=0) 

238 name = f"Synth_Contr_{row_id+1}" 

239 self._synthetic_contracts[name] = EnergyFutureSpecifications( 

240 schedule=None, price=(mean_shape * reference_price / reference_mean_shape).iloc[0], name=name 

241 ) 

242 

243 _data = np.zeros((n)) 

244 _data[column_ids] = 1 

245 _df = pd.DataFrame([_data], index=[name], columns=transition_matrix.columns) 

246 transition_matrix = pd.concat([transition_matrix, _df], axis=0) 

247 return transition_matrix 

248 

249 def shift(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: 

250 """This method is the final step in the shifting algorithm. The transition matrix is inversed and multiplied with the forward price vector to obtain a non overlapping forward price vector. 

251 

252 .. math:: 

253 

254 f^{no} = T^{-1}\cdot f 

255 

256 where:\n 

257 :math:`f^{no}` is the Non-overlapping forward price vector\n 

258 :math:`T` is the Transition matrix\n 

259 :math:`f` is the Forward price vector\n 

260 

261 Afterwards the PFC :math:`S(t)` is obtained from the shape :math:`s(t)` by the follwoing formular: 

262 

263 .. math:: 

264 S(t) = s(t)\cdot \\frac{\sum_{u=T_s}^{T_e} f^{no}(u)}{\sum_{u=T_s}^{T_e} s(u)} 

265 

266 with :math:`T_s` and :math:`T_e` being the start and end dates of the individual delivery periods. 

267 

268 Args: 

269 transition_matrix (pd.DataFrame): Full rank transition matrix generated by the ``generate_synthetic_contracts`` method 

270 

271 Returns: 

272 pd.DataFrame: Shifted shape. 

273 """ 

274 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates()) 

275 contract_schedules = np.unique(list(itertools.chain(*[contract.get_schedule() for contract in self.contracts.values()]))) 

276 

277 # starting after the first start date, since we want to get the delivery ticks until the next starting date 

278 # side='left since we do not want to consider a match as a delivery tick 

279 delivery_ticks = np.searchsorted(contract_schedules, contract_start_and_end_dates[1:], side="left") 

280 delivery_ticks_per_period = np.concatenate([np.array([delivery_ticks[0]]), (delivery_ticks[1:] - delivery_ticks[:-1])]) 

281 

282 date_tpls = list(zip(contract_start_and_end_dates[:-1], contract_start_and_end_dates[1:])) 

283 

284 transition_matrix = transition_matrix.to_numpy() * delivery_ticks_per_period 

285 transition_matrix = transition_matrix / np.sum(transition_matrix, axis=1).reshape(-1, 1) 

286 fwd_price_vec = self._get_forward_price_vector() 

287 

288 fwd_price_noc = np.linalg.inv(transition_matrix) @ fwd_price_vec 

289 pfc = self.shape.copy() 

290 # print(date_tpls) 

291 for i, date_tpl in enumerate(date_tpls): 

292 if i == len(date_tpls) - 1: 

293 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index <= date_tpl[1]) 

294 else: 

295 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index < date_tpl[1]) 

296 

297 pfc.iloc[row_filter, 0] = pfc.iloc[row_filter, 0] / np.sum(pfc.iloc[row_filter, 0]) * len(pfc.iloc[row_filter, 0]) * fwd_price_noc[i, 0] 

298 return pfc 

299 

300 def _to_dict(self) -> dict: 

301 return {**{"shape": self.shape}, **{"contracts": [v.to_dict() for v in self.contracts.values()]}}