Coverage for rivapy/marketdata_tools/pfc_shifter.py: 96%
146 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-08-01 15:21 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-08-01 15:21 +0000
1import itertools
2import pandas as pd
3import numpy as np
4import datetime as dt
5import rivapy.tools.interfaces as interfaces
6import rivapy.tools._validators as validators
7from rivapy.instruments import EnergyFutureSpecifications
8from typing import Dict, Set, List, Any
9from collections import defaultdict
12def validate_class_input(func):
13 def validate_wrapper(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]):
14 validators._check_pandas_index_for_datetime(dataframe=shape)
15 # if isinstance(shape, pd.DataFrame):
16 # if not isinstance(shape.index, pd.DatetimeIndex):
17 # raise TypeError("The index of the shape DataFrame is not of type pd.DatetimeIndex!")
18 # else:
19 # raise TypeError("The shape argument is not of type pd.DataFrame!")
21 contract_scheduled_dates = set(np.concatenate([contract.get_schedule() for contract in contracts]))
22 expected_dates = set(shape.index)
23 date_diff = expected_dates - contract_scheduled_dates
24 if len(date_diff) != 0:
25 raise ValueError("The contract dates do not cover each date provided by the shape DataFrame!")
26 func(self, shape, contracts)
28 return validate_wrapper
31class PFCShifter(interfaces.FactoryObject):
32 """A shifting methodology for PFC shapes. This class gets a PFC shape as an input and shifts it in such a way, that the resulting PFC contains the future prices defined in the ``contracts`` dictionary.
33 We follow the methodology described here: https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2706366
35 Args:
36 shape (pd.DataFrame): PFC shape, where the ``DataFrame`` index are ``datetime`` objects.
37 contracts (Dict[str, EnergyFutureSpecifications]): Dictionary containing the future contracts (``EnergyFutureSpecifications`` objects)
39 Usage:
41 .. highlight:: python
42 .. code-block:: python
44 # iterative usage
45 pfc_shifter = PFCShifter(shape=shape, contracts=contracts)
46 transition_matrix = pfc_shifter.generate_transition_matrix()
47 transition_matrix = pfc_shifter.detect_redundant_contracts(transition_matrix)
48 transition_matrix = pfc_shifter.generate_synthetic_contracts(transition_matrix)
49 pfc = pfc_shifter.shift(transition_matrix)
51 # direct call
52 pfc_shifter = PFCShifter(shape=shape, contracts=contracts)
53 pfc = pfc_shifter.compute()
55 """
57 @validate_class_input
58 def __init__(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]) -> None:
59 self.shape = shape
60 self.contracts = {contract.name: contract for contract in contracts}
61 self._redundant_contracts: Dict[str, EnergyFutureSpecifications] = {}
62 self._synthetic_contracts: Dict[str, EnergyFutureSpecifications] = {}
64 def _get_contract_start_end_dates(self) -> List[dt.datetime]:
65 """Returns a sorted list combined of all start and end ``datetime`` objects for each contract.
67 Returns:
68 List[dt.datetime]: Sorted list of start and end dates
69 """
70 dates = set()
71 for contract_schedule in self.contracts.values():
72 dates.update(contract_schedule.get_start_end())
73 return sorted(list(dates))
75 def _get_forward_price_vector(self) -> np.ndarray:
76 """Returns a vector containing the forward/future prices of the contracts and potentiall synthetic contracts.
78 Returns:
79 np.ndarray: Numpy array of forward/future prices
80 """
81 _dict = {**self.contracts, **self._synthetic_contracts}
82 return np.array([contract.get_price() for contract in _dict.values()]).reshape(-1, 1)
84 def compute(self) -> pd.DataFrame:
85 """Compute method to directly call all the individual steps involved for the shifting
87 Returns:
88 pd.DataFrame: Shifted PFC shape
89 """
90 transition_matrix = self.generate_transition_matrix()
91 transition_matrix = self.detect_redundant_contracts(transition_matrix=transition_matrix)
92 transition_matrix = self.generate_synthetic_contracts(transition_matrix=transition_matrix)
93 return self.shift(transition_matrix=transition_matrix)
95 def generate_transition_matrix(self) -> pd.DataFrame:
96 """The transition matrix is the basis of the shifting algorithm. This method generates a (n x m) matrix with zero and one entries, where n is the number of contracts and m are start and end dates for the delivery periods.
97 Hence, the matrix row vectors indicate the delivery periods of each contract. Note that the latest delivery end date is not displayed in the transition matrix.
100 Returns:
101 pd.DataFrame: Transition matrix containing zeros and ones indicating delivery periods of individual contracts.
102 """
103 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates())
105 transition_df = pd.DataFrame(
106 data=np.zeros((len(self.contracts.keys()), len(contract_start_and_end_dates))),
107 index=list(self.contracts.keys()),
108 columns=contract_start_and_end_dates,
109 )
111 for contract_name, contract_schedule in self.contracts.items():
112 idx = contract_start_and_end_dates.searchsorted(list(contract_schedule.get_start_end()), "right") - 1
114 if idx[0] == idx[1]:
115 transition_df.iloc[transition_df.index == contract_name, idx[0]] = 1
116 else:
117 transition_df.iloc[transition_df.index == contract_name, idx[0] : idx[1]] = 1
119 return transition_df.iloc[:, :-1] # drop the last column for the transition matrix
121 def detect_redundant_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame:
122 """In order to obtain an invertable matrix, the matrix must be of full rank. Linear dependent contracts will yield linear dependent row vectors.
123 This is the case if e.g. a Cal Base and all four quarter contracts are provided. This method finds all redundant (linear dependent) contracts and
124 omits the last found linear dependent contract in order to make sure that the row vectors are linearly independent.
126 Args:
127 transition_matrix (pd.DataFrame): Transition matrix generated by the ``generate_transition_matrix`` method.
129 Returns:
130 pd.DataFrame: Transition matrix without linearly dependent row vectors.
131 """
132 if transition_matrix.shape == (1, 1):
133 return transition_matrix
135 potential_redundant_contracts = []
136 np_transition_matrix = transition_matrix.to_numpy()
137 for i in range(len(transition_matrix)):
138 lst = list(range(len(transition_matrix)))
139 lst.remove(i)
140 if np.linalg.matrix_rank(np_transition_matrix[lst, :]) == np.linalg.matrix_rank(np_transition_matrix):
141 potential_redundant_contracts.append(i)
143 base_matrix = np.delete(np_transition_matrix, potential_redundant_contracts, axis=0)
145 detected_redundant_contracts = []
146 if len(potential_redundant_contracts) != 0:
147 for contract_idx in potential_redundant_contracts:
148 _temp_matrix = np.concatenate([base_matrix, np_transition_matrix[contract_idx, :].reshape(1, -1)], axis=0)
149 # in case all contracts are potentially redundant
150 if base_matrix.shape[0] == 0:
151 ref_rank = 0
152 else:
153 ref_rank = np.linalg.matrix_rank(base_matrix)
154 if np.linalg.matrix_rank(_temp_matrix) > ref_rank:
155 base_matrix = _temp_matrix
156 else:
157 print(f"Found redundant contract: {transition_matrix.index[contract_idx]}")
158 detected_redundant_contracts.append(transition_matrix.index[contract_idx])
160 # update the contracts dictionary, but still keep the information about the redundant contracts
161 self._redundant_contracts = {}
162 for contract in detected_redundant_contracts:
163 self._redundant_contracts[contract] = self.contracts[contract]
164 del self.contracts[contract] # <- keep an eye on that line
165 return transition_matrix.loc[~transition_matrix.index.isin(detected_redundant_contracts), :]
167 def generate_synthetic_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame:
168 """In order to fulfill the requirement of an invertable transition matrix, not only the row vectors but also the
169 column vectors must generate a basis. In cases where m > n, we need to additionally generate synthetic contracts.
170 The delivery period for the synthetic contracts are chosen in such a way that the column vectors become linearly independent.
171 The forward price for each synthetic contract is computed based on the rations of the average shape values over the corresponding delivery period of the synthetic contract and a reference contract.
172 The shape ratio is multiplied with the forward price of the reference contract in order to obtain a forward price for the synthetic contract.
173 The reference contract is implemented to be always the first contract in the ``contracts`` dictionary.
175 Args:
176 transition_matrix (pd.DataFrame): Transition matrix generated by the ``detect_redundant_contracts`` method.
178 Returns:
179 pd.DataFrame: Full rank transition matrix
180 """
181 if transition_matrix.shape == (1, 1):
182 return transition_matrix
184 m, n = transition_matrix.shape
185 target_rank = max(m, n)
186 transition_matrix = transition_matrix.copy()
188 np_transition_matrix = transition_matrix.to_numpy()
189 current_rank = np.linalg.matrix_rank(np_transition_matrix)
190 if current_rank == target_rank:
191 return transition_matrix
192 else:
193 synthetic_contracts = defaultdict(list)
194 for i in range(target_rank - m):
195 # compute the most current rank
196 updated_rank = np.linalg.matrix_rank(np_transition_matrix)
197 linear_dep_candidates = []
199 for j in range(n):
200 lst = list(range(n))
201 lst.remove(j)
202 tmp_rank = np.linalg.matrix_rank(np_transition_matrix[:, lst])
203 if tmp_rank == updated_rank:
204 # linear dependent
205 linear_dep_candidates.append(j)
207 # iteratively test if, adding a further row with a '1' entry for the specific column
208 # yields a larger matrix rank
209 tmp_matrix = np.concatenate([np_transition_matrix, np.zeros((1, n))], axis=0)
210 tmp_rank = updated_rank
211 for ld_id in linear_dep_candidates:
212 tmp_matrix[-1, ld_id] = 1
213 test_rank = np.linalg.matrix_rank(tmp_matrix)
214 if test_rank > tmp_rank:
215 tmp_rank = test_rank
216 synthetic_contracts[i].append(ld_id)
217 else:
218 # if the column does not yield a higher matrix rank, revoke the changes
219 tmp_matrix[-1, ld_id] = 0
220 # set the new matrix, such that the most current rank can be computed
221 np_transition_matrix = tmp_matrix
223 # get reference contract information to calculate a price for the synthetic contracts
224 reference_contract = list(self.contracts.keys())[0]
225 reference_mean_shape = self.shape.loc[self.contracts[reference_contract].get_schedule(), :].mean(axis=0)
226 reference_price = self.contracts[reference_contract].get_price()
228 date_list = self._get_contract_start_end_dates()
229 for row_id, column_ids in dict(synthetic_contracts).items():
230 _temp_df_shape = None
231 for column_id in column_ids:
232 cond1 = self.shape.index >= date_list[column_id]
233 if column_id == n:
234 cond2 = self.shape.index <= date_list[column_id + 1]
235 else:
236 cond2 = self.shape.index < date_list[column_id + 1]
238 if _temp_df_shape is None:
239 _temp_df_shape = self.shape.loc[(cond1) & (cond2), :]
240 else:
241 _temp_df_shape = pd.concat([_temp_df_shape, self.shape.loc[(cond1) & (cond2), :]], axis=0)
243 mean_shape = np.mean(_temp_df_shape, axis=0)
244 name = f"Synth_Contr_{row_id+1}"
245 self._synthetic_contracts[name] = EnergyFutureSpecifications(
246 schedule=None, price=(mean_shape * reference_price / reference_mean_shape).iloc[0], name=name
247 )
249 _data = np.zeros((n))
250 _data[column_ids] = 1
251 _df = pd.DataFrame([_data], index=[name], columns=transition_matrix.columns)
252 transition_matrix = pd.concat([transition_matrix, _df], axis=0)
253 return transition_matrix
255 def shift(self, transition_matrix: pd.DataFrame) -> pd.DataFrame:
256 r"""This method is the final step in the shifting algorithm. The transition matrix is inversed and multiplied with the forward price vector to obtain a non overlapping forward price vector.
258 .. math::
260 f^{no} = T^{-1}\\cdot f
262 Where:
264 - :math:`f^{no}` is the Non-overlapping forward price vector
265 - :math:`T` is the Transition matrix
266 - :math:`f` is the Forward price vector
268 Afterwards the PFC :math:`S(t)` is obtained from the shape :math:`s(t)` by the follwoing formular:
270 .. math::
271 S(t) = s(t)\\cdot \\frac{\\sum_{u=T_s}^{T_e} f^{no}(u)}{\\sum_{u=T_s}^{T_e} s(u)}
273 with :math:`T_s` and :math:`T_e` being the start and end dates of the individual delivery periods.
275 Args:
276 transition_matrix (pd.DataFrame): Full rank transition matrix generated by the ``generate_synthetic_contracts`` method
278 Returns:
279 pd.DataFrame: Shifted shape.
280 """
281 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates())
282 contract_schedules = np.unique(list(itertools.chain(*[contract.get_schedule() for contract in self.contracts.values()])))
284 # starting after the first start date, since we want to get the delivery ticks until the next starting date
285 # side='left since we do not want to consider a match as a delivery tick
286 delivery_ticks = np.searchsorted(contract_schedules, contract_start_and_end_dates[1:], side="left")
287 delivery_ticks_per_period = np.concatenate([np.array([delivery_ticks[0]]), (delivery_ticks[1:] - delivery_ticks[:-1])])
289 date_tpls = list(zip(contract_start_and_end_dates[:-1], contract_start_and_end_dates[1:]))
291 transition_matrix = transition_matrix.to_numpy() * delivery_ticks_per_period
292 transition_matrix = transition_matrix / np.sum(transition_matrix, axis=1).reshape(-1, 1)
293 fwd_price_vec = self._get_forward_price_vector()
295 fwd_price_noc = np.linalg.inv(transition_matrix) @ fwd_price_vec
296 pfc = self.shape.copy()
297 # print(date_tpls)
298 for i, date_tpl in enumerate(date_tpls):
299 if i == len(date_tpls) - 1:
300 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index <= date_tpl[1])
301 else:
302 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index < date_tpl[1])
304 pfc.iloc[row_filter, 0] = pfc.iloc[row_filter, 0] / np.sum(pfc.iloc[row_filter, 0]) * len(pfc.iloc[row_filter, 0]) * fwd_price_noc[i, 0]
305 return pfc
307 def _to_dict(self) -> dict:
308 return {**{"shape": self.shape}, **{"contracts": [v.to_dict() for v in self.contracts.values()]}}