import math
import scipy.optimize
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import dateutil.relativedelta as relativedelta
from rivapy.marketdata._logger import logger
import rivapy.tools.interfaces as interfaces
import rivapy.tools._validators as validators
# from rivapy.tools.interpolate import Interpolator
from typing import List, Union, Tuple, Literal, Dict, Optional, Any
from datetime import datetime, date, timedelta
from collections import defaultdict
try:
import tensorflow as tf
has_tf = True
except ImportError:
has_tf = False
from rivapy.tools.enums import DayCounterType, InterpolationType, ExtrapolationType
from rivapy.tools.enums import EnergyTimeGridStructure as etgs
from rivapy.tools.datetools import DayCounter, _date_to_datetime
from rivapy.marketdata.factory import create as _create
from rivapy.marketdata_tools.pfc_shaper import PFCShaper
from rivapy.marketdata_tools.pfc_shifter import PFCShifter
from rivapy.tools.scheduler import SimpleSchedule, OffPeakSchedule, PeakSchedule, BaseSchedule
from rivapy.instruments.energy_futures_specifications import EnergyFutureSpecifications
from rivapy.tools.interpolate import Interpolator
from rivapy import _pyvacon_available
if _pyvacon_available:
from pyvacon.finance.marketdata import EquityForwardCurve as _EquityForwardCurve
from pyvacon.finance.marketdata import SurvivalCurve as _SurvivalCurve
from pyvacon.finance.marketdata import DiscountCurve as _DiscountCurve
import pyvacon as _pyvacon
[docs]
class DiscountCurve:
def __init__(
self,
id: str,
refdate: Union[datetime, date],
dates: List[Union[datetime, date]],
df: List[float],
interpolation: InterpolationType = InterpolationType.HAGAN_DF,
extrapolation: ExtrapolationType = ExtrapolationType.NONE,
daycounter: DayCounterType = DayCounterType.Act365Fixed,
):
"""Discountcurve
Args:
id (str): Identifier of the discount curve.
refdate (Union[datetime, date]): Reference date of the discount curve.
dates (List[Union[datetime, date]]): List of dates belonging to the list of discount factors. All dates must be distinct and equal or after the refdate, otherwise an exception will be thrown.
df (List[float]): List of discount factors. Length of list of discount factors must equal to length of list of dates, otherwise an exception will be thrown.
interpolation (enums.InterpolationType, optional): Defaults to InterpolationType.HAGAN_DF.
extrapolation (enums.ExtrapolationType, optional): Defaults to ExtrapolationType.NONE which does not allow to compute a discount factor for a date past all given dates given to this constructor.
daycounter (enums.DayCounterType, optional): Daycounter used within the interpolation formula to compute a discount factor between two dates from the dates-list above. Defaults to DayCounterType.Act365Fixed.
"""
if len(dates) < 1:
raise Exception("Please specify at least one date and discount factor")
if len(dates) != len(df):
raise Exception("List of dates and discount factors must have equal length.")
self.values = sorted(zip(dates, df), key=lambda tup: tup[0]) # zip dates and discount factors and sort by dates
if isinstance(refdate, datetime):
self.refdate = refdate
else:
# self.refdate = datetime(refdate, 0, 0, 0) # old version syntax??
self.refdate = datetime(refdate.year, refdate.month, refdate.day)
if not isinstance(interpolation, InterpolationType):
raise TypeError("Interpolation is not of type enums.InterpolationType")
self.interpolation = interpolation
if not isinstance(extrapolation, ExtrapolationType):
raise TypeError("Extrapolation is not of type enums.ExtrapolationType")
self.extrapolation = extrapolation
if not isinstance(daycounter, DayCounterType):
print(daycounter)
raise TypeError("Daycounter is not of type enums.DaycounterType")
self.daycounter = daycounter
self.id = id
# check if dates are monotonically increasing and if first date is greather then refdate
if self.values[0][0] < refdate:
raise Exception("First date must be equal or greater then reference date.")
if self.values[0][0] > refdate:
self.values = [(self.refdate, 1.0)] + self.values
if self.values[0][1] != 1.0:
raise Exception("Discount factor for today must equal 1.0.")
for i in range(1, len(self.values)):
if self.values[i - 1] >= self.values[i]:
raise Exception("Dates must be given in monotonically increasing order.")
self._pyvacon_obj = None
[docs]
def get_dates(self) -> Tuple[datetime]:
"""Return list of dates of curve
Returns:
Tuple[datetime]: List of dates
"""
x, y = zip(*self.values)
return x
[docs]
def get_df(self) -> Tuple[float]:
"""Return list of discount factors
Returns:
Tuple[float]: List of discount factors
"""
x, y = zip(*self.values)
return y
# Change the name with value once full pyvacon dependencies are removed throughout rivapy
[docs]
def value(self, refdate: Union[date, datetime], d: Union[date, datetime], payment_dates=None, annual_payment_frequency=None) -> float:
"""Return discount factor for a given date
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future
(compared to the curves reference date), the forward discount
factor will be returned.
d (Union[date, datetime]): The date for which the discount factor will be returned. Assumption
is that the day given already follows correct business logic
(e.g., roll convention)
Returns:
float: discount factor
"""
# {
# Analytics_ASSERT(calcDate == validFrom_, "given calcdate must equal refdate of curve");
# double t = nP_.dc->yf(validFrom_, date);
# return nP_.interp->compute(t);
# }
# check valid dates
if not isinstance(refdate, datetime): # handling date object -> datetime
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d, datetime):
d = datetime(d, 0, 0, 0)
if refdate < self.refdate:
raise Exception("The given reference date is before the curves reference date.")
# get yearfrac, taking into account DCC
dcc = DayCounter(self.daycounter)
yf_list = [
dcc.yf(self.refdate, x, payment_dates, annual_payment_frequency) for x in self.get_dates()
] # list(dcc.yf(self.refdate, self.get_dates()))
df_list = [x for x in self.get_df()]
# interpolate/extrapolate given a chosen method
interp = Interpolator(self.interpolation, self.extrapolation)
# temp testing delete when working
# print(self.extrapolation)
# print(f"x_data: {yf_list}")
# print(f"y_data: {df_list}")
# print(f"x_target: {dcc.yf(self.refdate,d)}")
# print(dcc.yf(refdate, d))
# give FWD value if given refdate is greater than curves reference date
if refdate > self.refdate:
df1 = interp.interp(yf_list, df_list, dcc.yf(self.refdate, refdate, payment_dates, annual_payment_frequency), self.extrapolation)
df2 = interp.interp(yf_list, df_list, dcc.yf(self.refdate, d, payment_dates, annual_payment_frequency), self.extrapolation)
df = df2 / df1
else: # this also co ers the case if refdates are the same, and avoids division by zero
df = interp.interp(yf_list, df_list, dcc.yf(self.refdate, d, payment_dates, annual_payment_frequency), self.extrapolation)
return df
[docs]
def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
"""Return continuously compounded zero rate for a given date
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward rate will be returned.
d (Union[date, datetime]): The date for which the continuously compounded zero rate will be returned.
Returns:
float: continuously compounded zero rate
"""
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d, datetime):
d = datetime(d, 0, 0, 0)
if refdate < self.refdate:
raise Exception("The given reference date is before the curves reference date.")
r = -math.log(self.value(refdate, d)) / DayCounter(self.daycounter).yf(refdate, d)
return r
[docs]
def value_yf(self, yf: float) -> float:
"""Return discount factor for a given yearfrac as of the curve's reference date.
Args:
yf (float): The year fraction for which the discount factor will be returned.
Returns:
float: discount factor
"""
# get yearfrac, taking into account DCC
dcc = DayCounter(self.daycounter)
yf_list = [dcc.yf(self.refdate, x) for x in self.get_dates()] # list(dcc.yf(self.refdate, self.get_dates()))
df_list = [x for x in self.get_df()]
# interpolate/extrapolate given a chosen method
interp = Interpolator(self.interpolation, self.extrapolation)
# temp testing delete when working
# print(self.extrapolation)
# print(f"x_data: {yf_list}")
# print(f"y_data: {df_list}")
# print(f"x_target: {dcc.yf(self.refdate,d)}")
# print(dcc.yf(refdate, d))
df = interp.interp(yf_list, df_list, yf, self.extrapolation)
return df
[docs]
def value_fwd(self, val_date: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
"""Return forward discount factor for a given date (without dependencies from pyvacon)
The `value_fwd()` method has been updated to support forward valuation scenarios
(`val_date > refdate`) by rebasing the curve from its construction date to the new
valuation date.
The rebasement follows the relationship:
DF(val_date, t) = DF(refdate, t) / DF(refdate, val_date)
This adjustment ensures that discount factors and forward rates remain consistent
across time, even when the valuation date is later than the curve’s reference date.
This approach aligns with market-standard practices for OIS and collateralized
discounting frameworks, where forward discounting must be time-consistent with
the curve’s anchor date.
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future
(compared to the curves reference date), the forward discount
factor will be returned.
d (Union[date, datetime]): The date for which the discount factor will be returned. Assumption
is that the day given already follows correct business logic
(e.g., roll convention)
Returns:
float: discount factor
"""
# double DiscountCurve::valueFwd(
# const boost::posix_time::ptime &valDate,
# const boost::posix_time::ptime& d1,
# const boost::posix_time::ptime& d2) const
# {
# Analytics_ASSERT(d2 >= d1, "first date " << boost::posix_time::to_iso_string(d1)
# << " must be less or equal to the second date " << boost::posix_time::to_iso_string(d2));
# double df1 = value(valDate, d1);
# double df2 = value(valDate, d2);
# return df2 / df1;
# }
# check valid dates
if isinstance(val_date, date): # handling date object -> datetime
val_date = datetime.combine(val_date, datetime.min.time())
if isinstance(d1, date):
d1 = datetime.combine(d1, datetime.min.time())
if isinstance(d2, date):
d2 = datetime.combine(d2, datetime.min.time())
if val_date < self.refdate:
raise Exception("The given value date is before the curves reference date.")
# get yearfrac, taking into account DCC
dcc = DayCounter(self.daycounter)
yf_list = [dcc.yf(self.refdate, x) for x in self.get_dates()] # list(dcc.yf(self.refdate, self.get_dates()))
df_list = [x for x in self.get_df()]
# DEBUG TODO REMOVE
# print("Debugging value_fwd: x (yearfrac), then y (df) lists")
# print(yf_list)
# print(df_list)
# interpolate/extrapolate given a chosen method
interp = Interpolator(self.interpolation, self.extrapolation)
# temp testing delete when working
# print(self.extrapolation)
# print(f"x_data: {yf_list}")
# print(f"y_data: {df_list}")
# print(f"x_target: {dcc.yf(self.refdate,d)}")
# print(dcc.yf(refdate, d))
# give FWD value if given refdate is greater than curves reference date
# df1 = interp.interp(yf_list, df_list, dcc.yf(val_date, d1), self.extrapolation)
# df2 = interp.interp(yf_list, df_list, dcc.yf(val_date, d2), self.extrapolation)
x1 = dcc.yf(self.refdate, d1)
x2 = dcc.yf(self.refdate, d2)
df1 = interp.interp(yf_list, df_list, x1, self.extrapolation)
df2 = interp.interp(yf_list, df_list, x2, self.extrapolation)
if val_date > self.refdate:
xval = dcc.yf(self.refdate, val_date)
df_val = interp.interp(yf_list, df_list, xval, self.extrapolation)
# rebase curve to val_date
logger.info(f"{val_date} > {self.refdate}: forward valuation")
df1 /= df_val
df2 /= df_val
df = df2 / df1
return df
[docs]
def value_fwd_rate(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
"""Return forward continuously compounded zero rate for a given date
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward rate will be returned.
d1 (Union[date, datetime]): The start date of the period for which the forward continuously compounded zero rate will be returned.
d2 (Union[date, datetime]): The end date of the period for which the forward continuously compounded zero rate will be returned.
Returns:
float: forward continuously compounded zero rate
"""
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d1, datetime):
d1 = datetime(d1, 0, 0, 0)
if not isinstance(d2, datetime):
d2 = datetime(d2, 0, 0, 0)
if refdate < self.refdate:
raise Exception("The given reference date is before the curves reference date.")
r = -math.log(self.value_fwd(refdate, d1, d2)) / DayCounter(self.daycounter).yf(d1, d2)
return r
def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None) -> float:
if refdate is None or d is None:
# directly return the zero rate for a given yearfrac t
return -math.log(self.value_yf(t)) / t
else: # return the zero rate for a given date d and reference date refdate
return self.value_rate(refdate, d)
[docs]
def plot(self, days: int = 10, discount_factors: bool = False, **kwargs):
"""Plots the discount curve using matplotlibs plot function.
The timegrid includes the dates of the discount curve. Here either the discount factors or the zero rates (continuously compounded, ACT365 yearfraction) are plotted.
Args:
days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10.
discount_factors (bool, optional): If True, discount factors will be plotted, otherwise the rates. Defaults to False.
**kwargs: optional arguments that will be directly passed to the matplotlib plot function
"""
dates = self.get_dates()
dates_new = [dates[0]]
for i in range(1, len(dates)):
while dates_new[-1] + timedelta(days=days) < dates[i]:
dates_new.append(dates_new[-1] + timedelta(days=days))
dates_new.append(dates[i])
# TODO: consider how best to deal with pyvacon version vs rivapy version
# if self._pyvacon_obj is None:
# values = [self.value(self.refdate, d) for d in dates_new]
# else:
# values = [self.value(self.refdate, d) for d in dates_new]
##values = [self.value(self.refdate, d) for d in dates_new]
try:
values = [self.value(self.refdate, d) for d in dates_new]
except:
values = [self.value(self.refdate, d) for d in dates_new]
if not discount_factors:
for i in range(1, len(values)):
dt = float((dates_new[i] - self.refdate).days) / 365.0
values[i] = -math.log(values[i]) / dt
values[0] = values[1]
plt.plot(dates_new, values, label=self.id, **kwargs)
class FlatDiscountCurve(interfaces.BaseDatedCurve):
"""
A simple discount curve implementation based on a single flat interest rate.
"""
def __init__(
self,
valuation_date: Union[date, datetime],
flat_rate: Optional[float] = 0.05,
curve_data: Any = None,
day_counter_type: DayCounterType = DayCounterType.Act365Fixed,
):
"""
Initializes the flat discount curve.
Args:
valuation_date (Union[date, datetime]): The valuation date of the curve.
flat_rate (Optional[float], optional): The flat interest rate used for discounting. Defaults to 0.05.
curve_data (Any, optional): Placeholder for more complex curve data (not used in this implementation). Defaults to None.
day_counter_type (DayCounterType, optional): The day count convention for calculating year fractions. Defaults to DayCounterType.Act365Fixed.
"""
self.valuation_date = valuation_date
self._flat_rate = flat_rate
self._curve_data = curve_data # Placeholder for more complex curve data
self._day_counter = DayCounter(day_counter_type)
@property
def valuation_date(self) -> datetime:
"""The valuation date of the curve as a datetime object."""
return self._valuation_date
@valuation_date.setter
def valuation_date(self, value: Union[date, datetime]):
self._valuation_date = _date_to_datetime(value)
def get_discount_factor(self, target_date: Union[date, datetime], spread: float = 0.0) -> float:
"""
Calculates the discount factor from the valuation date to a target date.
Args:
target_date (Union[date, datetime]): The date to which to discount.
Returns:
float: The discount factor. Returns 0.0 if the target date is before the valuation date.
"""
val_date_dt = _date_to_datetime(self.valuation_date)
target_date_dt = _date_to_datetime(target_date)
if target_date_dt < val_date_dt:
return 0.0
time_to_maturity_years = self._day_counter.yf(val_date_dt, target_date_dt)
rate_to_use = self._flat_rate if self._flat_rate is not None else 0.02 # Fallback if flat_rate is None
return 1 / ((1 + rate_to_use + spread) ** time_to_maturity_years)
def value(self, ref_date: datetime, target_date: datetime, spread: float = 0.0) -> float:
"""
Returns the discount factor from a reference date to a target date.
For this simple implementation, the reference date must be the curve's valuation date.
Args:
ref_date (datetime): The reference date (must match the curve's valuation date).
target_date (datetime): The date to which to discount.
Raises:
ValueError: If the reference date does not match the curve's valuation date.
Returns:
float: The discount factor.
"""
# Ensure ref_date matches the curve's valuation_date for this simple implementation
if _date_to_datetime(ref_date).date() != self.valuation_date.date():
raise ValueError(f"Reference date {ref_date} does not match DiscountCurve valuation date {self.valuation_date}")
return self.get_discount_factor(target_date, spread=spread)
def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None) -> float:
return self._flat_rate
[docs]
class NelsonSiegel(interfaces.FactoryObject):
def __init__(self, beta0: float, beta1: float, beta2: float, tau: float):
"""Nelson-Siegel parametrization for rates and yields, see :footcite:t:`Nelson1987`.
This parametrization is mostly used to parametrize rate curves and can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`. It is defined by
.. math::
f(t) = \\beta_0 + (\\beta_1+\\beta_2)\\frac{1-e^{-t/\\tau}}{t/\\tau} -\\beta_2e^{t/\\tau}
Args:
beta0 (float): This parameter is the asymptotic (for arbitrary large maturities) rate, see formula above.
beta1 (float): beta0 + beta1 give the short term rate, see formula above.
beta2 (float): This parameter controls the size of the hump, see formula above.
tau (float): This parameter controls the location of the hump, see formula above.
Examples:
.. code-block:: python
>>> from rivapy.marketdata.curves import NelsonSiegel, DiscountCurveParametrized
>>> ns = NelsonSiegel(beta0=0.05, beta1 = 0.02, beta2=0.1, tau=1.0)
>>> dc = DiscountCurveParametrized('DC', refdate = dt.datetime(2023,1,1), rate_parametrization=ns, daycounter = DayCounterType.Act365Fixed)
>>> dates = [dt.datetime(2023,1,1) + dt.timedelta(days=30*days) for days in range(120)]
>>> values = [dc.value(refdate = dt.datetime(2023,1,1),d=d) for d in dates]
>>> plt.plot(dates, values)
"""
self.beta0 = beta0
self.beta1 = beta1
self.beta2 = beta2
self.tau = tau
self._multiplier = 1.0
def _to_dict(self) -> dict:
return {"beta0": self.beta0, "beta1": self.beta1, "beta2": self.beta2, "tau": self.tau}
def __call__(self, t: float):
return self._multiplier * NelsonSiegel.compute(self.beta0, self.beta1, self.beta2, self.tau, t)
def __mul__(self, x: float):
result = NelsonSiegel(self.beta0, self.beta1, self.beta2, self.tau)
result._multiplier = x
return result
[docs]
@staticmethod
def compute(beta0: float, beta1: float, beta2: float, tau: float, T: float) -> float:
"""_summary_
Args:
beta0 (float): longrun
beta1 (float): beta0 + beta1 = shortrun
beta2 (float): hump or through
tau (float):locaton of hump
T (float): _description_
Returns:
float: _description_
"""
t = np.maximum(T, 1e-4) / tau
return beta0 + beta1 * (1.0 - np.exp(-t)) / t + beta2 * ((1 - np.exp(-t)) / t - np.exp(-(t)))
@staticmethod
def _create_sample(
n_samples: int,
seed: int = None,
min_short_term_rate: float = -0.01,
max_short_term_rate: float = 0.12,
min_long_run_rate: float = 0.005,
max_long_run_rate: float = 0.15,
min_hump: float = -0.1,
max_hump: float = 0.1,
min_tau: float = 0.5,
max_tau: float = 3.0,
):
if seed is not None:
np.random.seed(seed)
result = []
for i in range(n_samples):
beta0 = np.random.uniform(min_long_run_rate, max_long_run_rate)
beta1 = np.random.uniform(min_short_term_rate - beta0, max_short_term_rate - beta0)
beta2 = np.random.uniform(min_hump, max_hump)
tau = np.random.uniform(min_tau, max_tau)
result.append(NelsonSiegel(beta0, beta1, beta2, tau))
return result
if has_tf:
@staticmethod
def compute_tf(beta0: tf.Tensor, beta1: tf.Tensor, beta2: tf.Tensor, tau: tf.Tensor, T: tf.Tensor) -> tf.Tensor:
"""_summary_
Args:
beta0 (float): longrun
beta1 (float): beta0 + beta1 = shortrun
beta2 (float): hump or through
tau (float):locaton of hump
T (float): _description_
Returns:
float: _description_
"""
t = tf.maximum(T, 1e-4) / tau
return beta0 + beta1 * (1.0 - tf.exp(-t)) / t + beta2 * ((1 - tf.exp(-t)) / t - tf.exp(-(t)))
[docs]
class ConstantRate(interfaces.FactoryObject):
def __init__(self, rate: float):
"""Continuously compounded flat rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`.
Args:
rate (float): The constant rate.
"""
self.rate = rate
def _to_dict(self) -> dict:
return {"rate": self.rate}
@staticmethod
def _create_sample(n_samples: int, seed: int = None):
if seed is not None:
np.random.seed(seed)
result = []
for i in range(n_samples):
result.append(ConstantRate(rate=np.random.uniform(-0.005, 0.1)))
return result
[docs]
def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d, datetime):
d = datetime(d, 0, 0, 0)
r = self.rate
yf = DayCounter(DayCounterType.Act365Fixed).yf(refdate, d)
return np.exp(-r * yf)
def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None):
return self.rate
class LinearRate(interfaces.FactoryObject):
def __init__(self, shortterm_rate: float, longterm_rate: float, max_maturity: float = 10.0, min_maturity: float = 1.0):
"""Continuously compounded linearly interpolated rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`.
Args:
shortterm_rate (float): The short term rate.
longterm_rate (float): the longterm rate.
max_maturity (float): AFer this timepoint constant extrapolation is applied.
"""
self.shortterm_rate = shortterm_rate
self.min_maturity = min_maturity
self.longterm_rate = longterm_rate
self.max_maturity = max_maturity
self._coeff = (self.longterm_rate - self.shortterm_rate) / (self.max_maturity - self.min_maturity)
@staticmethod
def _create_sample(n_samples: int, seed: int = None):
if seed is not None:
np.random.seed(seed)
result = []
for i in range(n_samples):
shortterm_rate = np.random.uniform(-0.005, 0.07)
longterm_rate = shortterm_rate + np.random.uniform(0.0025, 0.09)
result.append(LinearRate(shortterm_rate=shortterm_rate, longterm_rate=longterm_rate))
return result
def _to_dict(self) -> dict:
return {"shortterm_rate": self.shortterm_rate, "longterm_rate": self.longterm_rate, "max_maturity": self.max_maturity}
def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d, datetime):
d = datetime(d, 0, 0, 0)
r = Interpolator(InterpolationType.LINEAR, ExtrapolationType.CONSTANT).interp(
[self.min_maturity, self.max_maturity],
[self.shortterm_rate, self.longterm_rate],
DayCounter(DayCounterType.Act365Fixed).yf(refdate, d),
ExtrapolationType.CONSTANT,
)
yf = DayCounter(DayCounterType.Act365Fixed).yf(refdate, d)
return np.exp(-r * yf)
def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d, datetime):
d = datetime(d, 0, 0, 0)
r = -math.log(self.value(refdate, d)) / DayCounter(DayCounterType.Act365Fixed).yf(refdate, d)
return r
def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None):
return Interpolator(InterpolationType.LINEAR, ExtrapolationType.CONSTANT).interp(
[self.min_maturity, self.max_maturity], [self.shortterm_rate, self.longterm_rate], t, ExtrapolationType.CONSTANT
)
[docs]
class NelsonSiegelSvensson(NelsonSiegel):
def __init__(self, beta0: float, beta1: float, beta2: float, beta3: float, tau: float, tau2: float):
super().__init__(beta0, beta1, beta2, tau)
self.beta3 = beta3
self.tau2 = tau2
def _to_dict(self) -> dict:
tmp = super()._to_dict()
tmp.update({"beta3": self.beta3, "tau2": self.tau2})
return tmp
def __call__(self, t: float):
return NelsonSiegelSvensson.compute(self.beta0, self.beta1, self.beta2, self.beta3, self.tau, self.tau2, t)
[docs]
@staticmethod
def compute(beta0, beta1, beta2, beta3, tau, tau2, T):
t = np.maximum(T, 1e-4) / tau2
return NelsonSiegel.compute(beta0, beta1, beta2, tau, T) + beta3 * ((1 - np.exp(-t)) / t - np.exp(-(t)))
class DiscountCurveComposition(interfaces.FactoryObject):
def __init__(self, a, b, c):
# check if all discount curves have the same daycounter, otherwise exception
if isinstance(a, dict):
a = _create(a)
if isinstance(b, dict):
b = _create(b)
if isinstance(c, dict):
c = _create(c)
dc = set()
for k in [a, b, c]:
if hasattr(k, "daycounter"):
dc.add(k.daycounter)
if len(dc) > 1:
raise Exception("All curves must have same daycounter.")
if len(dc) > 0:
self.daycounter = dc.pop()
else:
self.daycounter = DayCounterType.Act365Fixed.value
self._dc = DayCounter(self.daycounter)
self.a = a
if not hasattr(a, "value"):
self.a = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(a), self.daycounter)
self.b = b
if not hasattr(b, "value"):
self.b = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(b), self.daycounter)
self.c = c
if not hasattr(c, "value"):
self.c = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(c), self.daycounter)
def _to_dict(self) -> dict:
if hasattr(self.a, "to_dict"):
a = self.a.to_dict()
else:
a = self.a
if hasattr(self.b, "to_dict"):
b = self.b.to_dict()
else:
b = self.b
if hasattr(self.c, "to_dict"):
c = self.c.to_dict()
else:
c = self.c
return {"a": a, "b": b, "c": c}
@staticmethod
def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list:
curves = DiscountCurveParametrized._create_sample(n_samples, seed, refdate, parametrization_type)
results = []
for c in curves:
results.append(c + 0.001)
return results
def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
r = self.value_rate(refdate, d)
yf = self._dc.yf(refdate, d)
return np.exp(-r * yf)
def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
return self.a.value_rate(refdate, d) * self.b.value_rate(refdate, d) + self.c.value_rate(refdate, d)
def value_fwd(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
"""Return forward discount factor for a given date"""
return self.value(refdate, d2) / self.value(refdate, d1)
def value_fwd_rate(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
"""Return forward continuously compounded zero rate for a given date"""
r = -math.log(self.value_fwd(refdate, d1, d2)) / self._dc.yf(d1, d2)
return r
def __mul__(self, other):
# TODO unittests
return DiscountCurveComposition(self, other, 0.0)
def __rmul__(self, other):
return DiscountCurveComposition(self, other, 0.0)
def __add__(self, other):
return DiscountCurveComposition(self, 1.0, other)
def __radd__(self, other):
return DiscountCurveComposition(self, 1.0, other)
[docs]
class DiscountCurveParametrized(interfaces.FactoryObject):
def __init__(
self,
obj_id: str,
refdate: Union[datetime, date],
rate_parametrization, #: Callable[[float], float],
daycounter: Union[DayCounterType, str] = DayCounterType.Act365Fixed,
):
"""_summary_
Args:
obj_id (str): _description_
refdate (Union[datetime, date]): _description_
rate_parametrization (Callable[[float], float]): _description_
daycounter (Union[DayCounterType, str], optional): _description_. Defaults to DayCounterType.Act365Fixed.
"""
if isinstance(refdate, datetime):
self.refdate = refdate
else:
self.refdate = datetime(refdate, 0, 0, 0)
self.daycounter = DayCounterType.to_string(daycounter)
self._dc = DayCounter(self.daycounter)
self.obj_id = obj_id
if isinstance(rate_parametrization, dict): # if schedule is a dict we try to create it from factory
self.rate_parametrization = _create(rate_parametrization)
else:
self.rate_parametrization = rate_parametrization
def _to_dict(self) -> dict:
try:
parametrization = self.rate_parametrization.to_dict()
except Exception as e:
raise Exception("Missing implementation of to_dict() in parametrization of type " + type(self.rate_parametrization).__name__)
return {"obj_id": self.obj_id, "refdate": self.refdate, "rate_parametrization": parametrization}
[docs]
def value_fwd(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
"""Return forward discount factor for a given date
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
d1 (Union[date, datetime]): The start date of the forward period
d2 (Union[date, datetime]): The end date of the forward period
Returns:
float: forward rate
"""
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d1, datetime):
d1 = datetime(d1, 0, 0, 0)
if not isinstance(d2, datetime):
d2 = datetime(d2, 0, 0, 0)
if refdate < self.refdate:
raise Exception("The given reference date is before the curves reference date.")
yf1 = self.value(refdate, d1)
yf2 = self.value(refdate, d2)
return yf2 / yf1
[docs]
def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
"""Return discount factor for a given date
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
d (Union[date, datetime]): The date for which the discount factor will be returned
Returns:
float: discount factor
"""
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d, datetime):
d = datetime(d, 0, 0, 0)
if refdate < self.refdate:
raise Exception("The given reference date is before the curves reference date.")
yf = self._dc.yf(refdate, d)
return np.exp(-self.rate_parametrization(yf, refdate, d) * yf)
[docs]
def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
"""Return the continuous rate for a given date
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
d (Union[date, datetime]): The date for which the discount factor will be returned
Returns:
float: continuous rate
"""
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d, datetime):
d = datetime(d, 0, 0, 0)
if refdate < self.refdate:
raise Exception("The given reference date is before the curves reference date.")
yf = self._dc.yf(refdate, d)
return self.rate_parametrization(yf, refdate, d)
[docs]
def value_fwd_rate(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
"""Return forward continuously compounded zero rate for a given date
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward rate will be returned.
d1 (Union[date, datetime]): The start date of the period for which the forward continuously compounded zero rate will be returned.
d2 (Union[date, datetime]): The end date of the period for which the forward continuously compounded zero rate will be returned.
Returns:
float: forward continuously compounded zero rate
"""
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d1, datetime):
d1 = datetime(d1, 0, 0, 0)
if not isinstance(d2, datetime):
d2 = datetime(d2, 0, 0, 0)
if refdate < self.refdate:
raise Exception("The given reference date is before the curves reference date.")
r = -math.log(self.value_fwd(refdate, d1, d2)) / self._dc.yf(d1, d2)
return r
@staticmethod
def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list:
if seed is not None:
np.random.seed(seed)
if refdate is None:
refdate = datetime.now()
parametrizations = parametrization_type._create_sample(n_samples)
result = []
for i, p in enumerate(parametrizations):
result.append(DiscountCurveParametrized("DCP_" + str(i), refdate, p))
return result
def __mul__(self, other):
return DiscountCurveComposition(self, other, 0.0)
def __rmul__(self, other):
return DiscountCurveComposition(self, other, 0.0)
def __add__(self, other):
return DiscountCurveComposition(self, 1.0, other)
def __radd__(self, other):
return DiscountCurveComposition(self, 1.0, other)
[docs]
class EquityForwardCurve:
def __init__(self, spot: float, funding_curve: DiscountCurve, borrow_curve: DiscountCurve, div_table):
"""Equity Forward Curve
Args:
spot (float): Current spot
discount_curve (DiscountCurve): [description]
funding_curve (DiscountCurve): [description]
borrow_curve (DiscountCurve): [description]
div_table (:class:`rivapy.marketdata.DividendTable`): [description]
"""
self.spot = spot
self.bc = borrow_curve
self.fc = funding_curve
self.div = div_table
self._pyvacon_obj = None
self.refdate = self.fc.refdate
if self.bc is not None:
if self.refdate < self.bc.refdate:
self.refdate = self.bc.refdate
if self.div is not None:
if hasattr(self.div, "refdate"):
if self.refdate < self.div.refdate:
self.refdate = self.div.refdate
def _get_pyvacon_obj(self):
if self._pyvacon_obj is None:
if hasattr(self.fc, "_get_pyvacon_obj"):
fc = self.fc._get_pyvacon_obj()
else:
fc = self.fc
if hasattr(self.bc, "_get_pyvacon_obj"):
bc = self.bc._get_pyvacon_obj()
else:
bc = self.bc
if hasattr(self.div, "_get_pyvacon_obj"):
div = self.div._get_pyvacon_obj()
else:
div = self.div
self._pyvacon_obj = _EquityForwardCurve(self.refdate, self.spot, fc, bc, div)
return self._pyvacon_obj
[docs]
def value(self, refdate, expiry):
return self._get_pyvacon_obj().value(refdate, expiry)
[docs]
def plot(self, days: int = 10, days_end: int = 10 * 365, **kwargs):
"""Plots the forward curve using matplotlibs plot function.
Args:
days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10.
days_end (int. optional): Number of days when plotting will end. Defaults to 10*365 (10yr)
**kwargs: optional arguments that will be directly passed to the matplotlib plto function
"""
dates = [self.refdate + timedelta(days=i) for i in range(0, days_end, days)]
values = [self.value(self.refdate, d) for d in dates]
plt.plot(dates, values, **kwargs)
plt.xlabel("expiry")
plt.ylabel("forward value")
class BootstrapHazardCurve:
def __init__(
self, ref_date: datetime, trade_date: datetime, dc: DiscountCurve, RR: float, payment_dates: List[datetime], market_spreads: List[float]
):
"""[summary]
Args:
ref_date (datetime): [description]
trade_date (datetime): [description]
dc (DiscountCurve): [description]
RR (float): [description]
payment_dates (List[datetime]): [description]
market_spreads (List[float]): [description]
"""
self.ref_date = ref_date
self.trade_date = trade_date
self.dc = dc
self.RR = RR
self.payment_dates_bootstrapp = payment_dates
self.market_spreads = market_spreads
self._pyvacon_obj = None
def par_spread(self, dc_survival, maturity_date, payment_dates: List[datetime]):
integration_step = relativedelta.relativedelta(days=365)
premium_period_start = self.ref_date
prev_date = self.ref_date
current_date = min(prev_date + integration_step, maturity_date)
dc_valuation_date = self.dc.value(self.ref_date, maturity_date)
risk_adj_factor_protection = 0
risk_adj_factor_premium = 0
risk_adj_factor_accrued = 0
while current_date <= maturity_date:
default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, current_date)
risk_adj_factor_protection += self.dc.value(self.ref_date, current_date) * default_prob
prev_date = current_date
current_date += integration_step
if prev_date < maturity_date and current_date > maturity_date:
default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, maturity_date)
risk_adj_factor_protection += self.dc.value(self.ref_date, maturity_date) * default_prob
for premium_payment in payment_dates:
if premium_payment >= self.ref_date:
period_length = ((premium_payment - premium_period_start).days) / 360
survival_prob = (dc_survival.value(self.ref_date, premium_period_start) + dc_survival.value(self.ref_date, premium_payment)) / 2
df = self.dc.value(self.ref_date, premium_payment)
risk_adj_factor_premium += period_length * survival_prob * df
default_prob = dc_survival.value(self.ref_date, premium_period_start) - dc_survival.value(self.ref_date, premium_payment)
risk_adj_factor_accrued += period_length * default_prob * df
premium_period_start = premium_payment
PV_accrued = (1 / 2) * risk_adj_factor_accrued
PV_premium = (1) * risk_adj_factor_premium
PV_protection = ((1 - self.RR)) * risk_adj_factor_protection
par_spread_i = (PV_protection) / ((PV_premium + PV_accrued))
return par_spread_i
def create_survival(self, dates: List[datetime], hazard_rates: List[float]):
return _SurvivalCurve("survival_curve", self.refdate, dates, hazard_rates)
def calibration_error(x, self, mkt_par_spread, ref_date, payment_dates, dates, hazard_rates):
hazard_rates[-1] = x
maturity_date = dates[-1]
dc_surv = self.create_survival(ref_date, dates, hazard_rates)
return mkt_par_spread - self.par_spread(dc_surv, maturity_date, payment_dates)
def calibrate_hazard_rate(self):
sc_dates = [self.ref_date]
hazard_rates = [0.0]
for i in range(len(self.payment_dates_bootstrapp)):
payment_dates_iter = self.payment_dates_bootstrapp[i]
mkt_par_spread_iter = self.market_spreads[i]
sc_dates.append(payment_dates_iter[-1])
hazard_rates.append(hazard_rates[-1])
sol = scipy.optimize.root_scalar(
self.calibration_error,
args=(mkt_par_spread_iter, self.ref_date, payment_dates_iter, sc_dates, hazard_rates),
method="brentq",
bracket=[0, 3],
xtol=1e-8,
rtol=1e-8,
)
hazard_rates[-1] = sol.root
return hazard_rates, sc_dates # self.create_survival(self.ref_date, sc_dates, hazard_rates)#.value, hazard_rates
# def hazard_rates(self):
# #hazard_rates_value=[]
# hazard_rates_value=self.calibrate_hazard_rate()
# return self.hazard_rates_value
# def value(self, refdate: Union[date, datetime], d: Union[date, datetime])->float:
# """Return discount factor for a given date
# Args:
# refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
# d (Union[date, datetime]): The date for which the discount factor will be returned
# Returns:
# float: discount factor
# """
# #if not isinstance(refdate, datetime):
# # refdate = datetime(refdate,0,0,0)
# #if not isinstance(d, datetime):
# # d = datetime(d,0,0,0)
# #if refdate < self.refdate:
# # raise Exception('The given reference date is before the curves reference date.')
# return self._get_pyvacon_obj().value(refdate, d)
# def _get_pyvacon_obj(self):
# if self._pyvacon_obj is None:
# self._pyvacon_obj = _SurvivalCurve('survival_curve', self.refdate,
# self.calibrate_hazard_rate[1], self.calibrate_hazard_rate[0])
# return self._pyvacon_obj
# class PowerPriceForwardCurve:
# def __init__(
# self, refdate: Union[datetime, date], start: datetime, end: datetime, values: np.ndarray, freq: str = "1H", tz: str = None, id: str = None
# ):
# """Simple forward curve for power.
# Args:
# refdate (Union[datetime, date]): Reference date of curve
# start (dt.datetime): Start of forward curve datetimepoints (including this timepoint).
# end (dt.datetime): End of forad curve datetimepoints (excluding this timepoint).
# values (np.ndarray): One dimensional array holding the price for each datetimepint in the curve. The method value will raise an exception if the number of values is not equal to the number of datetimepoints.
# freq (str, optional): Frequency of timepoints. Defaults to '1H'. See documentation for pandas.date_range for further details on freq.
# tz (str or tzinfo): Time zone name for returning localized datetime points, for example ‘Asia/Hong_Kong’.
# By default, the resulting datetime points are timezone-naive. See documentation for pandas.date_range for further details on tz.
# id (str): Identifier for the curve. It has no impact on the valuation functionality. If None, a uuid will be generated. Defaults to None.
# """
# self.id = id
# if id is None:
# self.id = "PFC/" + str(datetime.now())
# self.refdate = refdate
# self.start = start
# self.end = end
# self.freq = freq
# self.tz = tz
# self.values = values
# # timegrid used to compute prices for a certain schedule
# self._tg = None
# self._df = (
# pd.DataFrame(
# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values}
# )
# .set_index(["dates"])
# .sort_index()
# )
# def value(self, refdate: Union[date, datetime], schedule) -> np.ndarray:
# if self._tg is None:
# self._tg = pd.DataFrame(
# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values}
# ).reset_index()
# if self._tg.shape[0] != self.values.shape[0]:
# raise Exception(
# "The number of dates ("
# + str(self._tg.shape[0])
# + ") does not equal number of values ("
# + str(self.values.shape[0])
# + ") in forward curve."
# )
# tg = self._tg[(self._tg.dates >= schedule.start) & (self._tg.dates < schedule.end)].set_index("dates")
# _schedule = pd.DataFrame({"dates": schedule.get_schedule(refdate)})
# tg = _schedule.join(tg, on="dates")
# # tg = tg[tg['dates']>=refdate]
# if tg["index"].isna().sum() > 0:
# raise Exception("There are " + str(tg["index"].isna().sum()) + " dates in the schedule not covered by the forward curve.")
# return self.values[tg["index"].values]
# def get_df(self) -> pd.DataFrame:
# return self._df
[docs]
class EnergyPriceForwardCurve:
"""Energy Price Forward Curve object.
It is recommended to initialze this object via the class methods ``from_existing_pfc``, ``from_existing_shape`` or ``from_scratch``.
Args:
id (_type_): ID for the PFC object
refdate (Union[datetime, date]): Reference date
pfc (pd.DataFrame, optional): This object can be initialized with an existing pfc. Defaults to None.
"""
def __init__(self, id, refdate: Union[datetime, date], pfc: pd.DataFrame = None, **kwargs):
self.id = id
if id is None:
self.id = "PFC/" + str(datetime.now())
self.refdate = refdate
self._pfc = pfc
self._pfc_shape: pd.DataFrame = kwargs.get("pfc_shape", None)
self._apply_schedule: SimpleSchedule = kwargs.get("apply_schedule", None)
self._pfc_shaper: PFCShaper = kwargs.get("pfc_shaper", None)
list(map(lambda x: EnergyPriceForwardCurve._validate_dataframes(x), [self._pfc, self._pfc_shape]))
self._future_contracts: List[EnergyFutureSpecifications] = kwargs.get("future_contracts", None)
if self._pfc is None and self._pfc_shape is None and self._pfc_shaper is None:
raise ValueError("No values provided for the arguments pfc, pfc_shape and pfc_shaper!")
@staticmethod
def _validate_dataframes(dataframe: Optional[pd.DataFrame]):
if dataframe is not None:
validators._check_pandas_index_for_datetime(dataframe)
[docs]
@classmethod
def from_existing_pfc(cls, id, refdate: Union[datetime, date], pfc: pd.DataFrame) -> "EnergyPriceForwardCurve":
"""Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC.
Args:
id (_type_): ID for the PFC object
refdate (Union[datetime, date]): Reference Date
pfc (pd.DataFrame): Existing Pfc
Returns:
EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
"""
instance = cls(id=id, refdate=refdate, pfc=pfc)
return instance
[docs]
@classmethod
def from_existing_shape(
cls, id, refdate: Union[datetime, date], pfc_shape: pd.DataFrame, future_contracts: List[EnergyFutureSpecifications]
) -> "EnergyPriceForwardCurve":
"""Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC shape. The shape is then shifted in order to match the future contracts defined in the ``future_contracts`` list.
Args:
id (_type_): ID for the PFC object
refdate (Union[datetime, date]): Reference Date
pfc_shape (pd.DataFrame): Existing PFC shape
future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects)
Returns:
EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
"""
instance = cls(id=id, refdate=refdate, pfc_shape=pfc_shape, future_contracts=future_contracts)
instance._shift_shape()
return instance
[docs]
@classmethod
def from_scratch(
cls,
id,
refdate: Union[datetime, date],
apply_schedule: SimpleSchedule,
pfc_shaper: PFCShaper,
future_contracts: List[EnergyFutureSpecifications],
) -> "EnergyPriceForwardCurve":
"""Initialization of the ``EnergyPriceForwardCurve`` from scratch. First a shape is created using the ``pfc_shaper``. Afterwards, shape is shifted in order to match the future contracts defined in the ``future_contracts`` list.
Args:
id (_type_): ID for the PFC object
refdate (Union[datetime, date]): Reference Date
apply_schedule (SimpleSchedule): Schedule to apply the ``pfc_shaper`` on, in order to obtain shape values for future time points
pfc_shaper (PFCShaper): PFC shaper
future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects)
Returns:
EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
"""
instance = cls(id=id, refdate=refdate, pfc_shaper=pfc_shaper, future_contracts=future_contracts, apply_schedule=apply_schedule)
instance._create_shape()
instance._shift_shape()
return instance
def __validate_contracts_frequency(self):
"""Checks if all contracts in ``self._future_contracts`` have the sample schedule frequency."""
frequencies_contracts = defaultdict(list)
for future_contracts in self._future_contracts:
frequencies_contracts[future_contracts.schedule.freq].append((future_contracts.schedule.__class__.__name__, future_contracts.name))
if len(list(frequencies_contracts.keys())) > 1:
raise ValueError(
f"Found different contract frequencies: {frequencies_contracts}.\n Please provide uniform frequencies for the elements in the `future_contract` dictionary!"
)
def __get_offpeak_contracts(
self, base_contracts: List[EnergyFutureSpecifications], peak_contracts: List[EnergyFutureSpecifications]
) -> List[EnergyFutureSpecifications]:
"""In cases where base and peak contracts are part of the ``self._future_contracts``, offpeak contracts need to be deducted from these two in order to shift the shape properly.
Args:
base_contracts (List[EnergyFutureSpecifications]): List of base contracts
peak_contracts (List[EnergyFutureSpecifications]): List of peak contracts
Returns:
List[EnergyFutureSpecifications]: List of offpeak contracts
"""
offpeak_contracts = []
# iterate over each combination of base and peak contracts
for base_contract_spec in base_contracts:
n_base = len(base_contract_spec.get_schedule())
for peak_contract_spec in peak_contracts:
# match both by the start and end dates of their respective schedule
if base_contract_spec.get_start_end() == peak_contract_spec.get_start_end():
# if both match, an offpeak contract can be created from these two
offpeak_name = f"offpeak_{base_contract_spec.name}&{peak_contract_spec.name}"
n_peak = len(peak_contract_spec.get_schedule())
offpeak_price = (
n_base / (n_base - n_peak) * base_contract_spec.get_price() - n_peak / (n_base - n_peak) * peak_contract_spec.get_price()
)
offpeak_contracts.append(
EnergyFutureSpecifications(
schedule=OffPeakSchedule(start=base_contract_spec.get_start(), end=base_contract_spec.get_end()),
price=offpeak_price,
name=offpeak_name,
)
)
break
return offpeak_contracts
def _shift_shape(self):
"""Shifts the shape to match the future contracts defined in the ``self._future_contracts`` list."""
self.__validate_contracts_frequency()
base_contracts, peak_contracts = [
[fc for fc in self._future_contracts if fc.schedule.__class__._name == schedule_type] for schedule_type in (etgs.BASE, etgs.PEAK)
]
# if base and peak contracts both exist, offpeak contracts are computed
if (len(base_contracts) > 0) and (len(peak_contracts) > 0):
shifted_pfc = []
offpeak_contracts = self.__get_offpeak_contracts(base_contracts=base_contracts, peak_contracts=peak_contracts)
# shift offpeak and peak separately
for contracts in [offpeak_contracts, peak_contracts]:
shifting_datetimes = np.sort(np.unique(np.concatenate([contract.get_schedule() for contract in contracts])))
_pfc_shape = self._pfc_shape.loc[shifting_datetimes, :]
pfc_shifter = PFCShifter(shape=_pfc_shape, contracts=contracts)
shifted_pfc.append(pfc_shifter.compute())
# combine offpeak and peak shifts
shifted_pfc = pd.concat(shifted_pfc, axis=0)
self._pfc = shifted_pfc.sort_index(ascending=True)
else:
# if either base of peak exists, shifting can be directly performed
pfc_shifter = PFCShifter(shape=self._pfc_shape, contracts=self._future_contracts)
self._pfc = pfc_shifter.compute()
def _create_shape(self):
"""Creates a shape using the ``self._pfc_shaper`` model"""
self._pfc_shaper.calibrate()
self._pfc_shape = self._pfc_shaper.apply(self._apply_schedule.get_schedule())
[docs]
def get_pfc(self) -> pd.DataFrame:
"""Returns the PFC
Returns:
pd.DataFrame: PFC
"""
return self._pfc