Coverage for rivapy / marketdata_tools / pfc_shifter.py: 96%
146 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-27 14:36 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-27 14:36 +0000
1import itertools
2import pandas as pd
3import numpy as np
4import datetime as dt
5import rivapy.tools.interfaces as interfaces
6import rivapy.tools._validators as validators
7from rivapy.instruments.energy_futures_specifications import EnergyFutureSpecifications
9# from rivapy.instruments import EnergyFutureSpecifications
10from typing import Dict, Set, List, Any
11from collections import defaultdict
14def validate_class_input(func):
15 def validate_wrapper(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]):
16 validators._check_pandas_index_for_datetime(dataframe=shape)
17 # if isinstance(shape, pd.DataFrame):
18 # if not isinstance(shape.index, pd.DatetimeIndex):
19 # raise TypeError("The index of the shape DataFrame is not of type pd.DatetimeIndex!")
20 # else:
21 # raise TypeError("The shape argument is not of type pd.DataFrame!")
23 contract_scheduled_dates = set(np.concatenate([contract.get_schedule() for contract in contracts]))
24 expected_dates = set(shape.index)
25 date_diff = expected_dates - contract_scheduled_dates
26 if len(date_diff) != 0:
27 raise ValueError("The contract dates do not cover each date provided by the shape DataFrame!")
28 func(self, shape, contracts)
30 return validate_wrapper
33class PFCShifter(interfaces.FactoryObject):
34 """A shifting methodology for PFC shapes. This class gets a PFC shape as an input and shifts it in such a way, that the resulting PFC contains the future prices defined in the ``contracts`` dictionary.
35 We follow the methodology described here: https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2706366
37 Args:
38 shape (pd.DataFrame): PFC shape, where the ``DataFrame`` index are ``datetime`` objects.
39 contracts (Dict[str, EnergyFutureSpecifications]): Dictionary containing the future contracts (``EnergyFutureSpecifications`` objects)
41 Usage:
43 .. highlight:: python
44 .. code-block:: python
46 # iterative usage
47 pfc_shifter = PFCShifter(shape=shape, contracts=contracts)
48 transition_matrix = pfc_shifter.generate_transition_matrix()
49 transition_matrix = pfc_shifter.detect_redundant_contracts(transition_matrix)
50 transition_matrix = pfc_shifter.generate_synthetic_contracts(transition_matrix)
51 pfc = pfc_shifter.shift(transition_matrix)
53 # direct call
54 pfc_shifter = PFCShifter(shape=shape, contracts=contracts)
55 pfc = pfc_shifter.compute()
57 """
59 @validate_class_input
60 def __init__(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]) -> None:
61 self.shape = shape
62 self.contracts = {contract.name: contract for contract in contracts}
63 self._redundant_contracts: Dict[str, EnergyFutureSpecifications] = {}
64 self._synthetic_contracts: Dict[str, EnergyFutureSpecifications] = {}
66 def _get_contract_start_end_dates(self) -> List[dt.datetime]:
67 """Returns a sorted list combined of all start and end ``datetime`` objects for each contract.
69 Returns:
70 List[dt.datetime]: Sorted list of start and end dates
71 """
72 dates = set()
73 for contract_schedule in self.contracts.values():
74 dates.update(contract_schedule.get_start_end())
75 return sorted(list(dates))
77 def _get_forward_price_vector(self) -> np.ndarray:
78 """Returns a vector containing the forward/future prices of the contracts and potentiall synthetic contracts.
80 Returns:
81 np.ndarray: Numpy array of forward/future prices
82 """
83 _dict = {**self.contracts, **self._synthetic_contracts}
84 return np.array([contract.get_price() for contract in _dict.values()]).reshape(-1, 1)
86 def compute(self) -> pd.DataFrame:
87 """Compute method to directly call all the individual steps involved for the shifting
89 Returns:
90 pd.DataFrame: Shifted PFC shape
91 """
92 transition_matrix = self.generate_transition_matrix()
93 transition_matrix = self.detect_redundant_contracts(transition_matrix=transition_matrix)
94 transition_matrix = self.generate_synthetic_contracts(transition_matrix=transition_matrix)
95 return self.shift(transition_matrix=transition_matrix)
97 def generate_transition_matrix(self) -> pd.DataFrame:
98 """The transition matrix is the basis of the shifting algorithm. This method generates a (n x m) matrix with zero and one entries, where n is the number of contracts and m are start and end dates for the delivery periods.
99 Hence, the matrix row vectors indicate the delivery periods of each contract. Note that the latest delivery end date is not displayed in the transition matrix.
102 Returns:
103 pd.DataFrame: Transition matrix containing zeros and ones indicating delivery periods of individual contracts.
104 """
105 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates())
107 transition_df = pd.DataFrame(
108 data=np.zeros((len(self.contracts.keys()), len(contract_start_and_end_dates))),
109 index=list(self.contracts.keys()),
110 columns=contract_start_and_end_dates,
111 )
113 for contract_name, contract_schedule in self.contracts.items():
114 idx = contract_start_and_end_dates.searchsorted(list(contract_schedule.get_start_end()), "right") - 1
116 if idx[0] == idx[1]:
117 transition_df.iloc[transition_df.index == contract_name, idx[0]] = 1
118 else:
119 transition_df.iloc[transition_df.index == contract_name, idx[0] : idx[1]] = 1
121 return transition_df.iloc[:, :-1] # drop the last column for the transition matrix
123 def detect_redundant_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame:
124 """In order to obtain an invertable matrix, the matrix must be of full rank. Linear dependent contracts will yield linear dependent row vectors.
125 This is the case if e.g. a Cal Base and all four quarter contracts are provided. This method finds all redundant (linear dependent) contracts and
126 omits the last found linear dependent contract in order to make sure that the row vectors are linearly independent.
128 Args:
129 transition_matrix (pd.DataFrame): Transition matrix generated by the ``generate_transition_matrix`` method.
131 Returns:
132 pd.DataFrame: Transition matrix without linearly dependent row vectors.
133 """
134 if transition_matrix.shape == (1, 1):
135 return transition_matrix
137 potential_redundant_contracts = []
138 np_transition_matrix = transition_matrix.to_numpy()
139 for i in range(len(transition_matrix)):
140 lst = list(range(len(transition_matrix)))
141 lst.remove(i)
142 if np.linalg.matrix_rank(np_transition_matrix[lst, :]) == np.linalg.matrix_rank(np_transition_matrix):
143 potential_redundant_contracts.append(i)
145 base_matrix = np.delete(np_transition_matrix, potential_redundant_contracts, axis=0)
147 detected_redundant_contracts = []
148 if len(potential_redundant_contracts) != 0:
149 for contract_idx in potential_redundant_contracts:
150 _temp_matrix = np.concatenate([base_matrix, np_transition_matrix[contract_idx, :].reshape(1, -1)], axis=0)
151 # in case all contracts are potentially redundant
152 if base_matrix.shape[0] == 0:
153 ref_rank = 0
154 else:
155 ref_rank = np.linalg.matrix_rank(base_matrix)
156 if np.linalg.matrix_rank(_temp_matrix) > ref_rank:
157 base_matrix = _temp_matrix
158 else:
159 print(f"Found redundant contract: {transition_matrix.index[contract_idx]}")
160 detected_redundant_contracts.append(transition_matrix.index[contract_idx])
162 # update the contracts dictionary, but still keep the information about the redundant contracts
163 self._redundant_contracts = {}
164 for contract in detected_redundant_contracts:
165 self._redundant_contracts[contract] = self.contracts[contract]
166 del self.contracts[contract] # <- keep an eye on that line
167 return transition_matrix.loc[~transition_matrix.index.isin(detected_redundant_contracts), :]
169 def generate_synthetic_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame:
170 """In order to fulfill the requirement of an invertable transition matrix, not only the row vectors but also the
171 column vectors must generate a basis. In cases where m > n, we need to additionally generate synthetic contracts.
172 The delivery period for the synthetic contracts are chosen in such a way that the column vectors become linearly independent.
173 The forward price for each synthetic contract is computed based on the rations of the average shape values over the corresponding delivery period of the synthetic contract and a reference contract.
174 The shape ratio is multiplied with the forward price of the reference contract in order to obtain a forward price for the synthetic contract.
175 The reference contract is implemented to be always the first contract in the ``contracts`` dictionary.
177 Args:
178 transition_matrix (pd.DataFrame): Transition matrix generated by the ``detect_redundant_contracts`` method.
180 Returns:
181 pd.DataFrame: Full rank transition matrix
182 """
183 if transition_matrix.shape == (1, 1):
184 return transition_matrix
186 m, n = transition_matrix.shape
187 target_rank = max(m, n)
188 transition_matrix = transition_matrix.copy()
190 np_transition_matrix = transition_matrix.to_numpy()
191 current_rank = np.linalg.matrix_rank(np_transition_matrix)
192 if current_rank == target_rank:
193 return transition_matrix
194 else:
195 synthetic_contracts = defaultdict(list)
196 for i in range(target_rank - m):
197 # compute the most current rank
198 updated_rank = np.linalg.matrix_rank(np_transition_matrix)
199 linear_dep_candidates = []
201 for j in range(n):
202 lst = list(range(n))
203 lst.remove(j)
204 tmp_rank = np.linalg.matrix_rank(np_transition_matrix[:, lst])
205 if tmp_rank == updated_rank:
206 # linear dependent
207 linear_dep_candidates.append(j)
209 # iteratively test if, adding a further row with a '1' entry for the specific column
210 # yields a larger matrix rank
211 tmp_matrix = np.concatenate([np_transition_matrix, np.zeros((1, n))], axis=0)
212 tmp_rank = updated_rank
213 for ld_id in linear_dep_candidates:
214 tmp_matrix[-1, ld_id] = 1
215 test_rank = np.linalg.matrix_rank(tmp_matrix)
216 if test_rank > tmp_rank:
217 tmp_rank = test_rank
218 synthetic_contracts[i].append(ld_id)
219 else:
220 # if the column does not yield a higher matrix rank, revoke the changes
221 tmp_matrix[-1, ld_id] = 0
222 # set the new matrix, such that the most current rank can be computed
223 np_transition_matrix = tmp_matrix
225 # get reference contract information to calculate a price for the synthetic contracts
226 reference_contract = list(self.contracts.keys())[0]
227 reference_mean_shape = self.shape.loc[self.contracts[reference_contract].get_schedule(), :].mean(axis=0)
228 reference_price = self.contracts[reference_contract].get_price()
230 date_list = self._get_contract_start_end_dates()
231 for row_id, column_ids in dict(synthetic_contracts).items():
232 _temp_df_shape = None
233 for column_id in column_ids:
234 cond1 = self.shape.index >= date_list[column_id]
235 if column_id == n:
236 cond2 = self.shape.index <= date_list[column_id + 1]
237 else:
238 cond2 = self.shape.index < date_list[column_id + 1]
240 if _temp_df_shape is None:
241 _temp_df_shape = self.shape.loc[(cond1) & (cond2), :]
242 else:
243 _temp_df_shape = pd.concat([_temp_df_shape, self.shape.loc[(cond1) & (cond2), :]], axis=0)
245 mean_shape = np.mean(_temp_df_shape, axis=0)
246 name = f"Synth_Contr_{row_id+1}"
247 self._synthetic_contracts[name] = EnergyFutureSpecifications(
248 schedule=None, price=(mean_shape * reference_price / reference_mean_shape).iloc[0], name=name
249 )
251 _data = np.zeros((n))
252 _data[column_ids] = 1
253 _df = pd.DataFrame([_data], index=[name], columns=transition_matrix.columns)
254 transition_matrix = pd.concat([transition_matrix, _df], axis=0)
255 return transition_matrix
257 def shift(self, transition_matrix: pd.DataFrame) -> pd.DataFrame:
258 r"""This method is the final step in the shifting algorithm. The transition matrix is inversed and multiplied with the forward price vector to obtain a non overlapping forward price vector.
260 .. math::
262 f^{no} = T^{-1}\cdot f
264 Where:
266 - :math:`f^{no}` is the Non-overlapping forward price vector
268 - :math:`T` is the Transition matrix
270 - :math:`f` is the Forward price vector
272 Afterwards the PFC :math:`S(t)` is obtained from the shape :math:`s(t)` by the follwoing formular:
274 .. math::
275 S(t) = s(t)\cdot \frac{\sum_{u=T_s}^{T_e} f^{no}(u)}{\sum_{u=T_s}^{T_e} s(u)}
277 with :math:`T_s` and :math:`T_e` being the start and end dates of the individual delivery periods.
279 Args:
280 transition_matrix (pd.DataFrame): Full rank transition matrix generated by the ``generate_synthetic_contracts`` method
282 Returns:
283 pd.DataFrame: Shifted shape.
284 """
285 contract_start_and_end_dates = np.array(self._get_contract_start_end_dates())
286 contract_schedules = np.unique(list(itertools.chain(*[contract.get_schedule() for contract in self.contracts.values()])))
288 # starting after the first start date, since we want to get the delivery ticks until the next starting date
289 # side='left since we do not want to consider a match as a delivery tick
290 delivery_ticks = np.searchsorted(contract_schedules, contract_start_and_end_dates[1:], side="left")
291 delivery_ticks_per_period = np.concatenate([np.array([delivery_ticks[0]]), (delivery_ticks[1:] - delivery_ticks[:-1])])
293 date_tpls = list(zip(contract_start_and_end_dates[:-1], contract_start_and_end_dates[1:]))
295 transition_matrix = transition_matrix.to_numpy() * delivery_ticks_per_period
296 transition_matrix = transition_matrix / np.sum(transition_matrix, axis=1).reshape(-1, 1)
297 fwd_price_vec = self._get_forward_price_vector()
299 fwd_price_noc = np.linalg.inv(transition_matrix) @ fwd_price_vec
300 pfc = self.shape.copy()
301 # print(date_tpls)
302 for i, date_tpl in enumerate(date_tpls):
303 if i == len(date_tpls) - 1:
304 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index <= date_tpl[1])
305 else:
306 row_filter = (pfc.index >= date_tpl[0]) & (pfc.index < date_tpl[1])
308 pfc.iloc[row_filter, 0] = pfc.iloc[row_filter, 0] / np.sum(pfc.iloc[row_filter, 0]) * len(pfc.iloc[row_filter, 0]) * fwd_price_noc[i, 0]
309 return pfc
311 def _to_dict(self) -> dict:
312 return {**{"shape": self.shape}, **{"contracts": [v.to_dict() for v in self.contracts.values()]}}