Coverage for rivapy/marketdata_tools/pfc_shifter.py: 97%
142 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
1import itertools
2import pandas as pd
3import numpy as np
4import datetime as dt
5import rivapy.tools.interfaces as interfaces
6import rivapy.tools._validators as validators
7from rivapy.instruments import EnergyFutureSpecifications
8from typing import Dict, Set, List, Any
9from collections import defaultdict
12def validate_class_input(func):
13 def validate_wrapper(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]):
14 validators._check_pandas_index_for_datetime(dataframe=shape)
15 # if isinstance(shape, pd.DataFrame):
16 # if not isinstance(shape.index, pd.DatetimeIndex):
17 # raise TypeError("The index of the shape DataFrame is not of type pd.DatetimeIndex!")
18 # else:
19 # raise TypeError("The shape argument is not of type pd.DataFrame!")
21 contract_scheduled_dates = set(np.concatenate([contract.get_schedule() for contract in contracts]))
22 expected_dates = set(shape.index)
23 date_diff = expected_dates - contract_scheduled_dates
24 if len(date_diff) != 0:
25 raise ValueError("The contract dates do not cover each date provided by the shape DataFrame!")
26 func(self, shape, contracts)
28 return validate_wrapper
31class PFCShifter(interfaces.FactoryObject):
32 """A shifting methodology for PFC shapes. This class gets a PFC shape as an input and shifts it in such a way, that the resulting PFC contains the future prices defined in the ``contracts`` dictionary.
33 We follow the methodology described here: https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2706366
35 Args:
36 shape (pd.DataFrame): PFC shape, where the ``DataFrame`` index are ``datetime`` objects.
37 contracts (Dict[str, EnergyFutureSpecifications]): Dictionary containing the future contracts (``EnergyFutureSpecifications`` objects)
39 Usage:
41 .. highlight:: python
42 .. code-block:: python
44 # iterative usage
45 pfc_shifter = PFCShifter(shape=shape, contracts=contracts)
46 transition_matrix = pfc_shifter.generate_transition_matrix()
47 transition_matrix = pfc_shifter.detect_redundant_contracts(transition_matrix)
48 transition_matrix = pfc_shifter.generate_synthetic_contracts(transition_matrix)
49 pfc = pfc_shifter.shift(transition_matrix)
51 # direct call
52 pfc_shifter = PFCShifter(shape=shape, contracts=contracts)
53 pfc = pfc_shifter.compute()
55 """
57 @validate_class_input
58 def __init__(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]) -> None:
59 self.shape = shape
60 self.contracts = {contract.name: contract for contract in contracts}
61 self._redundant_contracts: Dict[str, EnergyFutureSpecifications] = {}
62 self._synthetic_contracts: Dict[str, EnergyFutureSpecifications] = {}
64 def _get_contract_start_end_dates(self) -> List[dt.datetime]:
65 """Returns a sorted list combined of all start and end ``datetime`` objects for each contract.
67 Returns:
68 List[dt.datetime]: Sorted list of start and end dates
69 """
70 dates = set()
71 for contract_schedule in self.contracts.values():
72 dates.update(contract_schedule.get_start_end())
73 return sorted(list(dates))
75 def _get_forward_price_vector(self) -> np.ndarray:
76 """Returns a vector containing the forward/future prices of the contracts and potentiall synthetic contracts.
78 Returns:
79 np.ndarray: Numpy array of forward/future prices
80 """
81 _dict = {**self.contracts, **self._synthetic_contracts}
82 return np.array([contract.get_price() for contract in _dict.values()]).reshape(-1, 1)
84 def compute(self) -> pd.DataFrame:
85 """Compute method to directly call all the individual steps involved for the shifting
87 Returns:
88 pd.DataFrame: Shifted PFC shape
89 """
90 transition_matrix = self.generate_transition_matrix()
91 transition_matrix = self.detect_redundant_contracts(transition_matrix=transition_matrix)
92 transition_matrix = self.generate_synthetic_contracts(transition_matrix=transition_matrix)
93 return self.shift(transition_matrix=transition_matrix)
95 def generate_transition_matrix(self) -> pd.DataFrame:
96 """The transition matrix is the basis of the shifting algorithm. This method generates a (n x m) matrix with zero and one entries, where n is the number of contracts and m are start and end dates for the delivery periods.
97 Hence, the matrix row vectors indicate the delivery periods of each contract. Note that the latest delivery end date is not displayed in the transition matrix.
100 Returns:
101 pd.DataFrame: Transition matrix containing zeros and ones indicating delivery periods of individual contracts.
102 """
103 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates())
105 transition_df = pd.DataFrame(
106 data=np.zeros((len(self.contracts.keys()), len(contract_start_and_end_dates))),
107 index=list(self.contracts.keys()),
108 columns=contract_start_and_end_dates,
109 )
111 for contract_name, contract_schedule in self.contracts.items():
112 idx = contract_start_and_end_dates.searchsorted(list(contract_schedule.get_start_end()), "right") - 1
114 if idx[0] == idx[1]:
115 transition_df.iloc[transition_df.index == contract_name, idx[0]] = 1
116 else:
117 transition_df.iloc[transition_df.index == contract_name, idx[0] : idx[1]] = 1
119 return transition_df.iloc[:, :-1] # drop the last column for the transition matrix
121 def detect_redundant_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame:
122 """In order to obtain an invertable matrix, the matrix must be of full rank. Linear dependent contracts will yield linear dependent row vectors.
123 This is the case if e.g. a Cal Base and all four quarter contracts are provided. This method finds all redundant (linear dependent) contracts and
124 omits the last found linear dependent contract in order to make sure that the row vectors are linearly independent.
126 Args:
127 transition_matrix (pd.DataFrame): Transition matrix generated by the ``generate_transition_matrix`` method.
129 Returns:
130 pd.DataFrame: Transition matrix without linearly dependent row vectors.
131 """
132 potential_redundant_contracts = []
133 np_transition_matrix = transition_matrix.to_numpy()
134 for i in range(len(transition_matrix)):
135 lst = list(range(len(transition_matrix)))
136 lst.remove(i)
137 if np.linalg.matrix_rank(np_transition_matrix[lst, :]) == np.linalg.matrix_rank(np_transition_matrix):
138 potential_redundant_contracts.append(i)
140 base_matrix = np.delete(np_transition_matrix, potential_redundant_contracts, axis=0)
142 detected_redundant_contracts = []
143 if len(potential_redundant_contracts) != 0:
144 for contract_idx in potential_redundant_contracts:
145 _temp_matrix = np.concatenate([base_matrix, np_transition_matrix[contract_idx, :].reshape(1, -1)], axis=0)
146 # in case all contracts are potentially redundant
147 if base_matrix.shape[0] == 0:
148 ref_rank = 0
149 else:
150 ref_rank = np.linalg.matrix_rank(base_matrix)
151 if np.linalg.matrix_rank(_temp_matrix) > ref_rank:
152 base_matrix = _temp_matrix
153 else:
154 print(f"Found redundant contract: {transition_matrix.index[contract_idx]}")
155 detected_redundant_contracts.append(transition_matrix.index[contract_idx])
157 # update the contracts dictionary, but still keep the information about the redundant contracts
158 self._redundant_contracts = {}
159 for contract in detected_redundant_contracts:
160 self._redundant_contracts[contract] = self.contracts[contract]
161 del self.contracts[contract] # <- keep an eye on that line
162 return transition_matrix.loc[~transition_matrix.index.isin(detected_redundant_contracts), :]
164 def generate_synthetic_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame:
165 """In order to fulfill the requirement of an invertable transition matrix, not only the row vectors but also the
166 column vectors must generate a basis. In cases where m > n, we need to additionally generate synthetic contracts.
167 The delivery period for the synthetic contracts are chosen in such a way that the column vectors become linearly independent.
168 The forward price for each synthetic contract is computed based on the rations of the average shape values over the corresponding delivery period of the synthetic contract and a reference contract.
169 The shape ratio is multiplied with the forward price of the reference contract in order to obtain a forward price for the synthetic contract.
170 The reference contract is implemented to be always the first contract in the ``contracts`` dictionary.
172 Args:
173 transition_matrix (pd.DataFrame): Transition matrix generated by the ``detect_redundant_contracts`` method.
175 Returns:
176 pd.DataFrame: Full rank transition matrix
177 """
178 m, n = transition_matrix.shape
179 target_rank = max(m, n)
180 transition_matrix = transition_matrix.copy()
182 np_transition_matrix = transition_matrix.to_numpy()
183 current_rank = np.linalg.matrix_rank(np_transition_matrix)
184 if current_rank == target_rank:
185 return transition_matrix
186 else:
187 synthetic_contracts = defaultdict(list)
188 for i in range(target_rank - m):
189 # compute the most current rank
190 updated_rank = np.linalg.matrix_rank(np_transition_matrix)
191 linear_dep_candidates = []
193 for j in range(n):
194 lst = list(range(n))
195 lst.remove(j)
196 tmp_rank = np.linalg.matrix_rank(np_transition_matrix[:, lst])
197 if tmp_rank == updated_rank:
198 # linear dependent
199 linear_dep_candidates.append(j)
201 # iteratively test if, adding a further row with a '1' entry for the specific column
202 # yields a larger matrix rank
203 tmp_matrix = np.concatenate([np_transition_matrix, np.zeros((1, n))], axis=0)
204 tmp_rank = updated_rank
205 for ld_id in linear_dep_candidates:
206 tmp_matrix[-1, ld_id] = 1
207 test_rank = np.linalg.matrix_rank(tmp_matrix)
208 if test_rank > tmp_rank:
209 tmp_rank = test_rank
210 synthetic_contracts[i].append(ld_id)
211 else:
212 # if the column does not yield a higher matrix rank, revoke the changes
213 tmp_matrix[-1, ld_id] = 0
214 # set the new matrix, such that the most current rank can be computed
215 np_transition_matrix = tmp_matrix
217 # get reference contract information to calculate a price for the synthetic contracts
218 reference_contract = list(self.contracts.keys())[0]
219 reference_mean_shape = self.shape.loc[self.contracts[reference_contract].get_schedule(), :].mean(axis=0)
220 reference_price = self.contracts[reference_contract].get_price()
222 date_list = self._get_contract_start_end_dates()
223 for row_id, column_ids in dict(synthetic_contracts).items():
224 _temp_df_shape = None
225 for column_id in column_ids:
226 cond1 = self.shape.index >= date_list[column_id]
227 if column_id == n:
228 cond2 = self.shape.index <= date_list[column_id + 1]
229 else:
230 cond2 = self.shape.index < date_list[column_id + 1]
232 if _temp_df_shape is None:
233 _temp_df_shape = self.shape.loc[(cond1) & (cond2), :]
234 else:
235 _temp_df_shape = pd.concat([_temp_df_shape, self.shape.loc[(cond1) & (cond2), :]], axis=0)
237 mean_shape = np.mean(_temp_df_shape, axis=0)
238 name = f"Synth_Contr_{row_id+1}"
239 self._synthetic_contracts[name] = EnergyFutureSpecifications(
240 schedule=None, price=(mean_shape * reference_price / reference_mean_shape).iloc[0], name=name
241 )
243 _data = np.zeros((n))
244 _data[column_ids] = 1
245 _df = pd.DataFrame([_data], index=[name], columns=transition_matrix.columns)
246 transition_matrix = pd.concat([transition_matrix, _df], axis=0)
247 return transition_matrix
249 def shift(self, transition_matrix: pd.DataFrame) -> pd.DataFrame:
250 """This method is the final step in the shifting algorithm. The transition matrix is inversed and multiplied with the forward price vector to obtain a non overlapping forward price vector.
252 .. math::
254 f^{no} = T^{-1}\cdot f
256 where:\n
257 :math:`f^{no}` is the Non-overlapping forward price vector\n
258 :math:`T` is the Transition matrix\n
259 :math:`f` is the Forward price vector\n
261 Afterwards the PFC :math:`S(t)` is obtained from the shape :math:`s(t)` by the follwoing formular:
263 .. math::
264 S(t) = s(t)\cdot \\frac{\sum_{u=T_s}^{T_e} f^{no}(u)}{\sum_{u=T_s}^{T_e} s(u)}
266 with :math:`T_s` and :math:`T_e` being the start and end dates of the individual delivery periods.
268 Args:
269 transition_matrix (pd.DataFrame): Full rank transition matrix generated by the ``generate_synthetic_contracts`` method
271 Returns:
272 pd.DataFrame: Shifted shape.
273 """
274 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates())
275 contract_schedules = np.unique(list(itertools.chain(*[contract.get_schedule() for contract in self.contracts.values()])))
277 # starting after the first start date, since we want to get the delivery ticks until the next starting date
278 # side='left since we do not want to consider a match as a delivery tick
279 delivery_ticks = np.searchsorted(contract_schedules, contract_start_and_end_dates[1:], side="left")
280 delivery_ticks_per_period = np.concatenate([np.array([delivery_ticks[0]]), (delivery_ticks[1:] - delivery_ticks[:-1])])
282 date_tpls = list(zip(contract_start_and_end_dates[:-1], contract_start_and_end_dates[1:]))
284 transition_matrix = transition_matrix.to_numpy() * delivery_ticks_per_period
285 transition_matrix = transition_matrix / np.sum(transition_matrix, axis=1).reshape(-1, 1)
286 fwd_price_vec = self._get_forward_price_vector()
288 fwd_price_noc = np.linalg.inv(transition_matrix) @ fwd_price_vec
289 pfc = self.shape.copy()
290 # print(date_tpls)
291 for i, date_tpl in enumerate(date_tpls):
292 if i == len(date_tpls) - 1:
293 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index <= date_tpl[1])
294 else:
295 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index < date_tpl[1])
297 pfc.iloc[row_filter, 0] = pfc.iloc[row_filter, 0] / np.sum(pfc.iloc[row_filter, 0]) * len(pfc.iloc[row_filter, 0]) * fwd_price_noc[i, 0]
298 return pfc
300 def _to_dict(self) -> dict:
301 return {**{"shape": self.shape}, **{"contracts": [v.to_dict() for v in self.contracts.values()]}}