Source code for rivapy.marketdata_tools.pfc_shifter

import itertools
import pandas as pd
import numpy as np
import datetime as dt
import rivapy.tools.interfaces as interfaces
import rivapy.tools._validators as validators
from rivapy.instruments import EnergyFutureSpecifications
from typing import Dict, Set, List, Any
from collections import defaultdict


def validate_class_input(func):
    def validate_wrapper(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]):
        validators._check_pandas_index_for_datetime(dataframe=shape)
        # if isinstance(shape, pd.DataFrame):
        #     if not isinstance(shape.index, pd.DatetimeIndex):
        #         raise TypeError("The index of the shape DataFrame is not of type pd.DatetimeIndex!")
        # else:
        #     raise TypeError("The shape argument is not of type pd.DataFrame!")

        contract_scheduled_dates = set(np.concatenate([contract.get_schedule() for contract in contracts]))
        expected_dates = set(shape.index)
        date_diff = expected_dates - contract_scheduled_dates
        if len(date_diff) != 0:
            raise ValueError("The contract dates do not cover each date provided by the shape DataFrame!")
        func(self, shape, contracts)

    return validate_wrapper


[docs] class PFCShifter(interfaces.FactoryObject): """A shifting methodology for PFC shapes. This class gets a PFC shape as an input and shifts it in such a way, that the resulting PFC contains the future prices defined in the ``contracts`` dictionary. We follow the methodology described here: https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2706366 Args: shape (pd.DataFrame): PFC shape, where the ``DataFrame`` index are ``datetime`` objects. contracts (Dict[str, EnergyFutureSpecifications]): Dictionary containing the future contracts (``EnergyFutureSpecifications`` objects) Usage: .. highlight:: python .. code-block:: python # iterative usage pfc_shifter = PFCShifter(shape=shape, contracts=contracts) transition_matrix = pfc_shifter.generate_transition_matrix() transition_matrix = pfc_shifter.detect_redundant_contracts(transition_matrix) transition_matrix = pfc_shifter.generate_synthetic_contracts(transition_matrix) pfc = pfc_shifter.shift(transition_matrix) # direct call pfc_shifter = PFCShifter(shape=shape, contracts=contracts) pfc = pfc_shifter.compute() """ @validate_class_input def __init__(self, shape: pd.DataFrame, contracts: List[EnergyFutureSpecifications]) -> None: self.shape = shape self.contracts = {contract.name: contract for contract in contracts} self._redundant_contracts: Dict[str, EnergyFutureSpecifications] = {} self._synthetic_contracts: Dict[str, EnergyFutureSpecifications] = {} def _get_contract_start_end_dates(self) -> List[dt.datetime]: """Returns a sorted list combined of all start and end ``datetime`` objects for each contract. Returns: List[dt.datetime]: Sorted list of start and end dates """ dates = set() for contract_schedule in self.contracts.values(): dates.update(contract_schedule.get_start_end()) return sorted(list(dates)) def _get_forward_price_vector(self) -> np.ndarray: """Returns a vector containing the forward/future prices of the contracts and potentiall synthetic contracts. Returns: np.ndarray: Numpy array of forward/future prices """ _dict = {**self.contracts, **self._synthetic_contracts} return np.array([contract.get_price() for contract in _dict.values()]).reshape(-1, 1)
[docs] def compute(self) -> pd.DataFrame: """Compute method to directly call all the individual steps involved for the shifting Returns: pd.DataFrame: Shifted PFC shape """ transition_matrix = self.generate_transition_matrix() transition_matrix = self.detect_redundant_contracts(transition_matrix=transition_matrix) transition_matrix = self.generate_synthetic_contracts(transition_matrix=transition_matrix) return self.shift(transition_matrix=transition_matrix)
[docs] def generate_transition_matrix(self) -> pd.DataFrame: """The transition matrix is the basis of the shifting algorithm. This method generates a (n x m) matrix with zero and one entries, where n is the number of contracts and m are start and end dates for the delivery periods. Hence, the matrix row vectors indicate the delivery periods of each contract. Note that the latest delivery end date is not displayed in the transition matrix. Returns: pd.DataFrame: Transition matrix containing zeros and ones indicating delivery periods of individual contracts. """ contract_start_and_end_dates = np.array(self._get_contract_start_end_dates()) transition_df = pd.DataFrame( data=np.zeros((len(self.contracts.keys()), len(contract_start_and_end_dates))), index=list(self.contracts.keys()), columns=contract_start_and_end_dates, ) for contract_name, contract_schedule in self.contracts.items(): idx = contract_start_and_end_dates.searchsorted(list(contract_schedule.get_start_end()), "right") - 1 if idx[0] == idx[1]: transition_df.iloc[transition_df.index == contract_name, idx[0]] = 1 else: transition_df.iloc[transition_df.index == contract_name, idx[0] : idx[1]] = 1 return transition_df.iloc[:, :-1] # drop the last column for the transition matrix
[docs] def detect_redundant_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: """In order to obtain an invertable matrix, the matrix must be of full rank. Linear dependent contracts will yield linear dependent row vectors. This is the case if e.g. a Cal Base and all four quarter contracts are provided. This method finds all redundant (linear dependent) contracts and omits the last found linear dependent contract in order to make sure that the row vectors are linearly independent. Args: transition_matrix (pd.DataFrame): Transition matrix generated by the ``generate_transition_matrix`` method. Returns: pd.DataFrame: Transition matrix without linearly dependent row vectors. """ potential_redundant_contracts = [] np_transition_matrix = transition_matrix.to_numpy() for i in range(len(transition_matrix)): lst = list(range(len(transition_matrix))) lst.remove(i) if np.linalg.matrix_rank(np_transition_matrix[lst, :]) == np.linalg.matrix_rank(np_transition_matrix): potential_redundant_contracts.append(i) base_matrix = np.delete(np_transition_matrix, potential_redundant_contracts, axis=0) detected_redundant_contracts = [] if len(potential_redundant_contracts) != 0: for contract_idx in potential_redundant_contracts: _temp_matrix = np.concatenate([base_matrix, np_transition_matrix[contract_idx, :].reshape(1, -1)], axis=0) # in case all contracts are potentially redundant if base_matrix.shape[0] == 0: ref_rank = 0 else: ref_rank = np.linalg.matrix_rank(base_matrix) if np.linalg.matrix_rank(_temp_matrix) > ref_rank: base_matrix = _temp_matrix else: print(f"Found redundant contract: {transition_matrix.index[contract_idx]}") detected_redundant_contracts.append(transition_matrix.index[contract_idx]) # update the contracts dictionary, but still keep the information about the redundant contracts self._redundant_contracts = {} for contract in detected_redundant_contracts: self._redundant_contracts[contract] = self.contracts[contract] del self.contracts[contract] # <- keep an eye on that line return transition_matrix.loc[~transition_matrix.index.isin(detected_redundant_contracts), :]
[docs] def generate_synthetic_contracts(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: """In order to fulfill the requirement of an invertable transition matrix, not only the row vectors but also the column vectors must generate a basis. In cases where m > n, we need to additionally generate synthetic contracts. The delivery period for the synthetic contracts are chosen in such a way that the column vectors become linearly independent. The forward price for each synthetic contract is computed based on the rations of the average shape values over the corresponding delivery period of the synthetic contract and a reference contract. The shape ratio is multiplied with the forward price of the reference contract in order to obtain a forward price for the synthetic contract. The reference contract is implemented to be always the first contract in the ``contracts`` dictionary. Args: transition_matrix (pd.DataFrame): Transition matrix generated by the ``detect_redundant_contracts`` method. Returns: pd.DataFrame: Full rank transition matrix """ m, n = transition_matrix.shape target_rank = max(m, n) transition_matrix = transition_matrix.copy() np_transition_matrix = transition_matrix.to_numpy() current_rank = np.linalg.matrix_rank(np_transition_matrix) if current_rank == target_rank: return transition_matrix else: synthetic_contracts = defaultdict(list) for i in range(target_rank - m): # compute the most current rank updated_rank = np.linalg.matrix_rank(np_transition_matrix) linear_dep_candidates = [] for j in range(n): lst = list(range(n)) lst.remove(j) tmp_rank = np.linalg.matrix_rank(np_transition_matrix[:, lst]) if tmp_rank == updated_rank: # linear dependent linear_dep_candidates.append(j) # iteratively test if, adding a further row with a '1' entry for the specific column # yields a larger matrix rank tmp_matrix = np.concatenate([np_transition_matrix, np.zeros((1, n))], axis=0) tmp_rank = updated_rank for ld_id in linear_dep_candidates: tmp_matrix[-1, ld_id] = 1 test_rank = np.linalg.matrix_rank(tmp_matrix) if test_rank > tmp_rank: tmp_rank = test_rank synthetic_contracts[i].append(ld_id) else: # if the column does not yield a higher matrix rank, revoke the changes tmp_matrix[-1, ld_id] = 0 # set the new matrix, such that the most current rank can be computed np_transition_matrix = tmp_matrix # get reference contract information to calculate a price for the synthetic contracts reference_contract = list(self.contracts.keys())[0] reference_mean_shape = self.shape.loc[self.contracts[reference_contract].get_schedule(), :].mean(axis=0) reference_price = self.contracts[reference_contract].get_price() date_list = self._get_contract_start_end_dates() for row_id, column_ids in dict(synthetic_contracts).items(): _temp_df_shape = None for column_id in column_ids: cond1 = self.shape.index >= date_list[column_id] if column_id == n: cond2 = self.shape.index <= date_list[column_id + 1] else: cond2 = self.shape.index < date_list[column_id + 1] if _temp_df_shape is None: _temp_df_shape = self.shape.loc[(cond1) & (cond2), :] else: _temp_df_shape = pd.concat([_temp_df_shape, self.shape.loc[(cond1) & (cond2), :]], axis=0) mean_shape = np.mean(_temp_df_shape, axis=0) name = f"Synth_Contr_{row_id+1}" self._synthetic_contracts[name] = EnergyFutureSpecifications( schedule=None, price=(mean_shape * reference_price / reference_mean_shape).iloc[0], name=name ) _data = np.zeros((n)) _data[column_ids] = 1 _df = pd.DataFrame([_data], index=[name], columns=transition_matrix.columns) transition_matrix = pd.concat([transition_matrix, _df], axis=0) return transition_matrix
[docs] def shift(self, transition_matrix: pd.DataFrame) -> pd.DataFrame: """This method is the final step in the shifting algorithm. The transition matrix is inversed and multiplied with the forward price vector to obtain a non overlapping forward price vector. .. math:: f^{no} = T^{-1}\cdot f where:\n :math:`f^{no}` is the Non-overlapping forward price vector\n :math:`T` is the Transition matrix\n :math:`f` is the Forward price vector\n Afterwards the PFC :math:`S(t)` is obtained from the shape :math:`s(t)` by the follwoing formular: .. math:: S(t) = s(t)\cdot \\frac{\sum_{u=T_s}^{T_e} f^{no}(u)}{\sum_{u=T_s}^{T_e} s(u)} with :math:`T_s` and :math:`T_e` being the start and end dates of the individual delivery periods. Args: transition_matrix (pd.DataFrame): Full rank transition matrix generated by the ``generate_synthetic_contracts`` method Returns: pd.DataFrame: Shifted shape. """ contract_start_and_end_dates = np.array(self._get_contract_start_end_dates()) contract_schedules = np.unique(list(itertools.chain(*[contract.get_schedule() for contract in self.contracts.values()]))) # starting after the first start date, since we want to get the delivery ticks until the next starting date # side='left since we do not want to consider a match as a delivery tick delivery_ticks = np.searchsorted(contract_schedules, contract_start_and_end_dates[1:], side="left") delivery_ticks_per_period = np.concatenate([np.array([delivery_ticks[0]]), (delivery_ticks[1:] - delivery_ticks[:-1])]) date_tpls = list(zip(contract_start_and_end_dates[:-1], contract_start_and_end_dates[1:])) transition_matrix = transition_matrix.to_numpy() * delivery_ticks_per_period transition_matrix = transition_matrix / np.sum(transition_matrix, axis=1).reshape(-1, 1) fwd_price_vec = self._get_forward_price_vector() fwd_price_noc = np.linalg.inv(transition_matrix) @ fwd_price_vec pfc = self.shape.copy() # print(date_tpls) for i, date_tpl in enumerate(date_tpls): if i == len(date_tpls) - 1: row_filter = (pfc.index >= date_tpl[0]) & (pfc.index <= date_tpl[1]) else: row_filter = (pfc.index >= date_tpl[0]) & (pfc.index < date_tpl[1]) pfc.iloc[row_filter, 0] = pfc.iloc[row_filter, 0] / np.sum(pfc.iloc[row_filter, 0]) * len(pfc.iloc[row_filter, 0]) * fwd_price_noc[i, 0] return pfc
def _to_dict(self) -> dict: return {**{"shape": self.shape}, **{"contracts": [v.to_dict() for v in self.contracts.values()]}}