import math
import scipy.optimize
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import dateutil.relativedelta as relativedelta
import rivapy.tools.interfaces as interfaces
import rivapy.tools._validators as validators
from typing import List, Union, Tuple, Literal, Dict, Optional
from datetime import datetime, date, timedelta
from collections import defaultdict
try:
import tensorflow as tf
has_tf = True
except ImportError:
has_tf = False
from rivapy.tools.enums import DayCounterType, InterpolationType, ExtrapolationType
from rivapy.tools.enums import EnergyTimeGridStructure as ets
from rivapy.tools.datetools import DayCounter
from rivapy.marketdata.factory import create as _create
from rivapy.marketdata_tools.pfc_shaper import PFCShaper
from rivapy.marketdata_tools.pfc_shifter import PFCShifter
from rivapy.tools.scheduler import SimpleSchedule, OffPeakSchedule, PeakSchedule, BaseSchedule
from rivapy.instruments.energy_futures_specifications import EnergyFutureSpecifications
from rivapy import _pyvacon_available
if _pyvacon_available:
from pyvacon.finance.marketdata import EquityForwardCurve as _EquityForwardCurve
from pyvacon.finance.marketdata import SurvivalCurve as _SurvivalCurve
from pyvacon.finance.marketdata import DiscountCurve as _DiscountCurve
import pyvacon as _pyvacon
[docs]
class DiscountCurve:
def __init__(
self,
id: str,
refdate: Union[datetime, date],
dates: List[Union[datetime, date]],
df: List[float],
interpolation: InterpolationType = InterpolationType.HAGAN_DF,
extrapolation: ExtrapolationType = ExtrapolationType.NONE,
daycounter: DayCounterType = DayCounterType.Act365Fixed,
):
"""Discountcurve
Args:
id (str): Identifier of the discount curve.
refdate (Union[datetime, date]): Reference date of the discount curve.
dates (List[Union[datetime, date]]): List of dates belonging to the list of discount factors. All dates must be distinct and equal or after the refdate, otherwise an exception will be thrown.
df (List[float]): List of discount factors. Length of list of discount factors must equal to length of list of dates, otherwise an exception will be thrown.
interpolation (enums.InterpolationType, optional): Defaults to InterpolationType.HAGAN_DF.
extrapolation (enums.ExtrapolationType, optional): Defaults to ExtrapolationType.NONE which does not allow to compute a discount factor for a date past all given dates given to this constructor.
daycounter (enums.DayCounterType, optional): Daycounter used within the interpolation formula to compute a discount factor between two dates from the dates-list above. Defaults to DayCounterType.Act365Fixed.
"""
if len(dates) < 1:
raise Exception("Please specify at least one date and discount factor")
if len(dates) != len(df):
raise Exception("List of dates and discount factors must have equal length.")
self.values = sorted(zip(dates, df), key=lambda tup: tup[0]) # zip dates and discount factors and sort by dates
if isinstance(refdate, datetime):
self.refdate = refdate
else:
self.refdate = datetime(refdate, 0, 0, 0)
if not isinstance(interpolation, InterpolationType):
raise TypeError("Interpolation is not of type enums.InterpolationType")
self.interpolation = interpolation
if not isinstance(extrapolation, ExtrapolationType):
raise TypeError("Extrapolation is not of type enums.ExtrapolationType")
self.extrapolation = extrapolation
if not isinstance(daycounter, DayCounterType):
raise TypeError("Daycounter is not of type enums.DaycounterType")
self.daycounter = daycounter
self.id = id
# check if dates are monotonically increasing and if first date is greather then refdate
if self.values[0][0] < refdate:
raise Exception("First date must be equal or greater then reference date.")
if self.values[0][0] > refdate:
self.values = [(self.refdate, 1.0)] + self.values
if self.values[0][1] != 1.0:
raise Exception("Discount factor for today must equal 1.0.")
for i in range(1, len(self.values)):
if self.values[i - 1] >= self.values[i]:
raise Exception("Dates must be given in monotonically increasing order.")
self._pyvacon_obj = None
[docs]
def get_dates(self) -> Tuple[datetime]:
"""Return list of dates of curve
Returns:
Tuple[datetime]: List of dates
"""
x, y = zip(*self.values)
return x
[docs]
def get_df(self) -> Tuple[float]:
"""Return list of discount factors
Returns:
Tuple[float]: List of discount factors
"""
x, y = zip(*self.values)
return y
[docs]
def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
"""Return discount factor for a given date
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
d (Union[date, datetime]): The date for which the discount factor will be returned
Returns:
float: discount factor
"""
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d, datetime):
d = datetime(d, 0, 0, 0)
if refdate < self.refdate:
raise Exception("The given reference date is before the curves reference date.")
return self._get_pyvacon_obj().value(refdate, d)
def _get_pyvacon_obj(self):
if self._pyvacon_obj is None:
self._pyvacon_obj = _DiscountCurve(
self.id,
self.refdate,
[x for x in self.get_dates()],
[x for x in self.get_df()],
self.daycounter.value,
self.interpolation,
self.extrapolation,
)
return self._pyvacon_obj
[docs]
def plot(self, days: int = 10, discount_factors: bool = False, **kwargs):
"""Plots the discount curve using matplotlibs plot function.
The timegrid includes the dates of the discount curve. Here either the discount factors or the zero rates (continuously compounded, ACT365 yearfraction) are plotted.
Args:
days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10.
discount_factors (bool, optional): If True, discount factors will be plotted, otherwise the rates. Defaults to False.
**kwargs: optional arguments that will be directly passed to the matplotlib plto function
"""
dates = self.get_dates()
dates_new = [dates[0]]
for i in range(1, len(dates)):
while dates_new[-1] + timedelta(days=days) < dates[i]:
dates_new.append(dates_new[-1] + timedelta(days=days))
dates_new.append(dates[-1])
values = [self.value(self.refdate, d) for d in dates_new]
if not discount_factors:
for i in range(1, len(values)):
dt = float((dates_new[i] - self.refdate).days) / 365.0
values[i] = -math.log(values[i]) / dt
values[0] = values[1]
plt.plot(dates_new, values, label=self.id, **kwargs)
[docs]
class NelsonSiegel(interfaces.FactoryObject):
def __init__(self, beta0: float, beta1: float, beta2: float, tau: float):
"""Nelson-Siegel parametrization for rates and yields, see :footcite:t:`Nelson1987`.
This parametrization is mostly used to parametrize rate curves and can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`. It is defined by
.. math::
f(t) = \\beta_0 + (\\beta_1+\\beta_2)\\frac{1-e^{-t/\\tau}}{t/\\tau} -\\beta_2e^{t/\\tau}
Args:
beta0 (float): This parameter is the asymptotic (for arbitrary large maturities) rate, see formula above.
beta1 (float): beta0 + beta1 give the short term rate, see formula above.
beta2 (float): This parameter controls the size of the hump, see formula above.
tau (float): This parameter controls the location of the hump, see formula above.
Examples:
.. code-block:: python
>>> from rivapy.marketdata.curves import NelsonSiegel, DiscountCurveParametrized
>>> ns = NelsonSiegel(beta0=0.05, beta1 = 0.02, beta2=0.1, tau=1.0)
>>> dc = DiscountCurveParametrized('DC', refdate = dt.datetime(2023,1,1), rate_parametrization=ns, daycounter = DayCounterType.Act365Fixed)
>>> dates = [dt.datetime(2023,1,1) + dt.timedelta(days=30*days) for days in range(120)]
>>> values = [dc.value(refdate = dt.datetime(2023,1,1),d=d) for d in dates]
>>> plt.plot(dates, values)
"""
self.beta0 = beta0
self.beta1 = beta1
self.beta2 = beta2
self.tau = tau
self._multiplier = 1.0
def _to_dict(self) -> dict:
return {"beta0": self.beta0, "beta1": self.beta1, "beta2": self.beta2, "tau": self.tau}
def __call__(self, t: float):
return self._multiplier * NelsonSiegel.compute(self.beta0, self.beta1, self.beta2, self.tau, t)
def __mul__(self, x: float):
result = NelsonSiegel(self.beta0, self.beta1, self.beta2, self.tau)
result._multiplier = x
return result
[docs]
@staticmethod
def compute(beta0: float, beta1: float, beta2: float, tau: float, T: float) -> float:
"""_summary_
Args:
beta0 (float): longrun
beta1 (float): beta0 + beta1 = shortrun
beta2 (float): hump or through
tau (float):locaton of hump
T (float): _description_
Returns:
float: _description_
"""
t = np.maximum(T, 1e-4) / tau
return beta0 + beta1 * (1.0 - np.exp(-t)) / t + beta2 * ((1 - np.exp(-t)) / t - np.exp(-(t)))
@staticmethod
def _create_sample(
n_samples: int,
seed: int = None,
min_short_term_rate: float = -0.01,
max_short_term_rate: float = 0.12,
min_long_run_rate: float = 0.005,
max_long_run_rate: float = 0.15,
min_hump: float = -0.1,
max_hump: float = 0.1,
min_tau: float = 0.5,
max_tau: float = 3.0,
):
if seed is not None:
np.random.seed(seed)
result = []
for i in range(n_samples):
beta0 = np.random.uniform(min_long_run_rate, max_long_run_rate)
beta1 = np.random.uniform(min_short_term_rate - beta0, max_short_term_rate - beta0)
beta2 = np.random.uniform(min_hump, max_hump)
tau = np.random.uniform(min_tau, max_tau)
result.append(NelsonSiegel(beta0, beta1, beta2, tau))
return result
if has_tf:
@staticmethod
def compute_tf(beta0: tf.Tensor, beta1: tf.Tensor, beta2: tf.Tensor, tau: tf.Tensor, T: tf.Tensor) -> tf.Tensor:
"""_summary_
Args:
beta0 (float): longrun
beta1 (float): beta0 + beta1 = shortrun
beta2 (float): hump or through
tau (float):locaton of hump
T (float): _description_
Returns:
float: _description_
"""
t = tf.maximum(T, 1e-4) / tau
return beta0 + beta1 * (1.0 - tf.exp(-t)) / t + beta2 * ((1 - tf.exp(-t)) / t - tf.exp(-(t)))
[docs]
class ConstantRate(interfaces.FactoryObject):
def __init__(self, rate: float):
"""Continuously compounded flat rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`.
Args:
rate (float): The constant rate.
"""
self.rate = rate
def _to_dict(self) -> dict:
return {"rate": self.rate}
@staticmethod
def _create_sample(n_samples: int, seed: int = None):
if seed is not None:
np.random.seed(seed)
result = []
for i in range(n_samples):
result.append(ConstantRate(rate=np.random.uniform(-0.005, 0.1)))
return result
def __call__(self, t: float):
return self.rate
class LinearRate(interfaces.FactoryObject):
def __init__(self, shortterm_rate: float, longterm_rate: float, max_maturity: float = 10.0):
"""Continuously compounded linearly interpolated rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`.
Args:
shortterm_rate (float): The short term rate.
longterm_rate (float): the longterm rate.
max_maturity (float): AFer this timepoint constant extrapolation is applied.
"""
self.shortterm_rate = shortterm_rate
self.longterm_rate = longterm_rate
self.max_maturity = max_maturity
self._coeff = (self.longterm_rate - self.shortterm_rate) / (self.max_maturity)
@staticmethod
def _create_sample(n_samples: int, seed: int = None):
if seed is not None:
np.random.seed(seed)
result = []
for i in range(n_samples):
shortterm_rate = np.random.uniform(-0.005, 0.07)
longterm_rate = shortterm_rate + np.random.uniform(0.0025, 0.09)
result.append(LinearRate(shortterm_rate=shortterm_rate, longterm_rate=longterm_rate))
return result
def _to_dict(self) -> dict:
return {"shortterm_rate": self.shortterm_rate, "longterm_rate": self.longterm_rate, "max_maturity": self.max_maturity}
def __call__(self, t: float):
if t < self.max_maturity:
return self.shortterm_rate + self._coeff * t
return self.longterm_rate
[docs]
class NelsonSiegelSvensson(NelsonSiegel):
def __init__(self, beta0: float, beta1: float, beta2: float, beta3: float, tau: float, tau2: float):
super().__init__(beta0, beta1, beta2, tau)
self.beta3 = beta3
self.tau2 = tau2
def _to_dict(self) -> dict:
tmp = super()._to_dict()
tmp.update({"beta3": self.beta3, "tau2": self.tau2})
return tmp
def __call__(self, t: float):
return NelsonSiegelSvensson.compute(self.beta0, self.beta1, self.beta2, self.beta3, self.tau, self.tau2, t)
[docs]
@staticmethod
def compute(beta0, beta1, beta2, beta3, tau, tau2, T):
t = np.maximum(T, 1e-4) / tau2
return NelsonSiegel.compute(beta0, beta1, beta2, tau, T) + beta3 * ((1 - np.exp(-t)) / t - np.exp(-(t)))
class DiscountCurveComposition(interfaces.FactoryObject):
def __init__(self, a, b, c):
# check if all discount curves have the same daycounter, otherwise exception
if isinstance(a, dict):
a = _create(a)
if isinstance(b, dict):
b = _create(b)
if isinstance(c, dict):
c = _create(c)
dc = set()
for k in [a, b, c]:
if hasattr(k, "daycounter"):
dc.add(k.daycounter)
if len(dc) > 1:
raise Exception("All curves must have same daycounter.")
if len(dc) > 0:
self.daycounter = dc.pop()
else:
self.daycounter = DayCounterType.Act365Fixed.value
self._dc = DayCounter(self.daycounter)
self.a = a
if not hasattr(a, "value"):
self.a = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(a), self.daycounter)
self.b = b
if not hasattr(b, "value"):
self.b = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(b), self.daycounter)
self.c = c
if not hasattr(c, "value"):
self.c = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(c), self.daycounter)
def _to_dict(self) -> dict:
if hasattr(self.a, "to_dict"):
a = self.a.to_dict()
else:
a = self.a
if hasattr(self.b, "to_dict"):
b = self.b.to_dict()
else:
b = self.b
if hasattr(self.c, "to_dict"):
c = self.c.to_dict()
else:
c = self.c
return {"a": a, "b": b, "c": c}
@staticmethod
def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list:
curves = DiscountCurveParametrized._create_sample(n_samples, seed, refdate, parametrization_type)
results = []
for c in curves:
results.append(c + 0.001)
return results
def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
r = self.value_rate(refdate, d)
yf = self._dc.yf(refdate, d)
return np.exp(-r * yf)
def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
return self.a.value_rate(refdate, d) * self.b.value_rate(refdate, d) + self.c.value_rate(refdate, d)
def __mul__(self, other):
# TODO unittests
return DiscountCurveComposition(self, other, 0.0)
def __rmul__(self, other):
return DiscountCurveComposition(self, other, 0.0)
def __add__(self, other):
return DiscountCurveComposition(self, 1.0, other)
def __radd__(self, other):
return DiscountCurveComposition(self, 1.0, other)
[docs]
class DiscountCurveParametrized(interfaces.FactoryObject):
def __init__(
self,
obj_id: str,
refdate: Union[datetime, date],
rate_parametrization, #: Callable[[float], float],
daycounter: Union[DayCounterType, str] = DayCounterType.Act365Fixed,
):
"""_summary_
Args:
obj_id (str): _description_
refdate (Union[datetime, date]): _description_
rate_parametrization (Callable[[float], float]): _description_
daycounter (Union[DayCounterType, str], optional): _description_. Defaults to DayCounterType.Act365Fixed.
"""
if isinstance(refdate, datetime):
self.refdate = refdate
else:
self.refdate = datetime(refdate, 0, 0, 0)
self.daycounter = DayCounterType.to_string(daycounter)
self._dc = DayCounter(self.daycounter)
self.obj_id = obj_id
if isinstance(rate_parametrization, dict): # if schedule is a dict we try to create it from factory
self.rate_parametrization = _create(rate_parametrization)
else:
self.rate_parametrization = rate_parametrization
def _to_dict(self) -> dict:
try:
parametrization = self.rate_parametrization.to_dict()
except Exception as e:
raise Exception("Missing implementation of to_dict() in parametrization of type " + type(self.rate_parametrization).__name__)
return {"obj_id": self.obj_id, "refdate": self.refdate, "rate_parametrization": parametrization}
[docs]
def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
"""Return discount factor for a given date
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
d (Union[date, datetime]): The date for which the discount factor will be returned
Returns:
float: discount factor
"""
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d, datetime):
d = datetime(d, 0, 0, 0)
if refdate < self.refdate:
raise Exception("The given reference date is before the curves reference date.")
yf = self._dc.yf(refdate, d)
return np.exp(-self.rate_parametrization(yf) * yf)
[docs]
def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
"""Return the continuous rate for a given date
Args:
refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
d (Union[date, datetime]): The date for which the discount factor will be returned
Returns:
float: continuous rate
"""
if not isinstance(refdate, datetime):
refdate = datetime(refdate, 0, 0, 0)
if not isinstance(d, datetime):
d = datetime(d, 0, 0, 0)
if refdate < self.refdate:
raise Exception("The given reference date is before the curves reference date.")
yf = self._dc.yf(refdate, d)
return self.rate_parametrization(yf)
@staticmethod
def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list:
if seed is not None:
np.random.seed(seed)
if refdate is None:
refdate = datetime.now()
parametrizations = parametrization_type._create_sample(n_samples)
result = []
for i, p in enumerate(parametrizations):
result.append(DiscountCurveParametrized("DCP_" + str(i), refdate, p))
return result
def __mul__(self, other):
return DiscountCurveComposition(self, other, 0.0)
def __rmul__(self, other):
return DiscountCurveComposition(self, other, 0.0)
def __add__(self, other):
return DiscountCurveComposition(self, 1.0, other)
def __radd__(self, other):
return DiscountCurveComposition(self, 1.0, other)
[docs]
class EquityForwardCurve:
def __init__(self, spot: float, funding_curve: DiscountCurve, borrow_curve: DiscountCurve, div_table):
"""Equity Forward Curve
Args:
spot (float): Current spot
discount_curve (DiscountCurve): [description]
funding_curve (DiscountCurve): [description]
borrow_curve (DiscountCurve): [description]
div_table (:class:`rivapy.marketdata.DividendTable`): [description]
"""
self.spot = spot
self.bc = borrow_curve
self.fc = funding_curve
self.div = div_table
self._pyvacon_obj = None
self.refdate = self.fc.refdate
if self.bc is not None:
if self.refdate < self.bc.refdate:
self.refdate = self.bc.refdate
if self.div is not None:
if hasattr(self.div, "refdate"):
if self.refdate < self.div.refdate:
self.refdate = self.div.refdate
def _get_pyvacon_obj(self):
if self._pyvacon_obj is None:
if hasattr(self.fc, "_get_pyvacon_obj"):
fc = self.fc._get_pyvacon_obj()
else:
fc = self.fc
if hasattr(self.bc, "_get_pyvacon_obj"):
bc = self.bc._get_pyvacon_obj()
else:
bc = self.bc
if hasattr(self.div, "_get_pyvacon_obj"):
div = self.div._get_pyvacon_obj()
else:
div = self.div
self._pyvacon_obj = _EquityForwardCurve(self.refdate, self.spot, fc, bc, div)
return self._pyvacon_obj
[docs]
def value(self, refdate, expiry):
return self._get_pyvacon_obj().value(refdate, expiry)
[docs]
def plot(self, days: int = 10, days_end: int = 10 * 365, **kwargs):
"""Plots the forward curve using matplotlibs plot function.
Args:
days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10.
days_end (int. optional): Number of days when plotting will end. Defaults to 10*365 (10yr)
**kwargs: optional arguments that will be directly passed to the matplotlib plto function
"""
dates = [self.refdate + timedelta(days=i) for i in range(0, days_end, days)]
values = [self.value(self.refdate, d) for d in dates]
plt.plot(dates, values, **kwargs)
plt.xlabel("expiry")
plt.ylabel("forward value")
class BootstrapHazardCurve:
def __init__(
self, ref_date: datetime, trade_date: datetime, dc: DiscountCurve, RR: float, payment_dates: List[datetime], market_spreads: List[float]
):
"""[summary]
Args:
ref_date (datetime): [description]
trade_date (datetime): [description]
dc (DiscountCurve): [description]
RR (float): [description]
payment_dates (List[datetime]): [description]
market_spreads (List[float]): [description]
"""
self.ref_date = ref_date
self.trade_date = trade_date
self.dc = dc
self.RR = RR
self.payment_dates_bootstrapp = payment_dates
self.market_spreads = market_spreads
self._pyvacon_obj = None
def par_spread(self, dc_survival, maturity_date, payment_dates: List[datetime]):
integration_step = relativedelta.relativedelta(days=365)
premium_period_start = self.ref_date
prev_date = self.ref_date
current_date = min(prev_date + integration_step, maturity_date)
dc_valuation_date = self.dc.value(self.ref_date, maturity_date)
risk_adj_factor_protection = 0
risk_adj_factor_premium = 0
risk_adj_factor_accrued = 0
while current_date <= maturity_date:
default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, current_date)
risk_adj_factor_protection += self.dc.value(self.ref_date, current_date) * default_prob
prev_date = current_date
current_date += integration_step
if prev_date < maturity_date and current_date > maturity_date:
default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, maturity_date)
risk_adj_factor_protection += self.dc.value(self.ref_date, maturity_date) * default_prob
for premium_payment in payment_dates:
if premium_payment >= self.ref_date:
period_length = ((premium_payment - premium_period_start).days) / 360
survival_prob = (dc_survival.value(self.ref_date, premium_period_start) + dc_survival.value(self.ref_date, premium_payment)) / 2
df = self.dc.value(self.ref_date, premium_payment)
risk_adj_factor_premium += period_length * survival_prob * df
default_prob = dc_survival.value(self.ref_date, premium_period_start) - dc_survival.value(self.ref_date, premium_payment)
risk_adj_factor_accrued += period_length * default_prob * df
premium_period_start = premium_payment
PV_accrued = (1 / 2) * risk_adj_factor_accrued
PV_premium = (1) * risk_adj_factor_premium
PV_protection = ((1 - self.RR)) * risk_adj_factor_protection
par_spread_i = (PV_protection) / ((PV_premium + PV_accrued))
return par_spread_i
def create_survival(self, dates: List[datetime], hazard_rates: List[float]):
return _SurvivalCurve("survival_curve", self.refdate, dates, hazard_rates)
def calibration_error(x, self, mkt_par_spread, ref_date, payment_dates, dates, hazard_rates):
hazard_rates[-1] = x
maturity_date = dates[-1]
dc_surv = self.create_survival(ref_date, dates, hazard_rates)
return mkt_par_spread - self.par_spread(dc_surv, maturity_date, payment_dates)
def calibrate_hazard_rate(self):
sc_dates = [self.ref_date]
hazard_rates = [0.0]
for i in range(len(self.payment_dates_bootstrapp)):
payment_dates_iter = self.payment_dates_bootstrapp[i]
mkt_par_spread_iter = self.market_spreads[i]
sc_dates.append(payment_dates_iter[-1])
hazard_rates.append(hazard_rates[-1])
sol = scipy.optimize.root_scalar(
self.calibration_error,
args=(mkt_par_spread_iter, self.ref_date, payment_dates_iter, sc_dates, hazard_rates),
method="brentq",
bracket=[0, 3],
xtol=1e-8,
rtol=1e-8,
)
hazard_rates[-1] = sol.root
return hazard_rates, sc_dates # self.create_survival(self.ref_date, sc_dates, hazard_rates)#.value, hazard_rates
# def hazard_rates(self):
# #hazard_rates_value=[]
# hazard_rates_value=self.calibrate_hazard_rate()
# return self.hazard_rates_value
# def value(self, refdate: Union[date, datetime], d: Union[date, datetime])->float:
# """Return discount factor for a given date
# Args:
# refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
# d (Union[date, datetime]): The date for which the discount factor will be returned
# Returns:
# float: discount factor
# """
# #if not isinstance(refdate, datetime):
# # refdate = datetime(refdate,0,0,0)
# #if not isinstance(d, datetime):
# # d = datetime(d,0,0,0)
# #if refdate < self.refdate:
# # raise Exception('The given reference date is before the curves reference date.')
# return self._get_pyvacon_obj().value(refdate, d)
# def _get_pyvacon_obj(self):
# if self._pyvacon_obj is None:
# self._pyvacon_obj = _SurvivalCurve('survival_curve', self.refdate,
# self.calibrate_hazard_rate[1], self.calibrate_hazard_rate[0])
# return self._pyvacon_obj
# class PowerPriceForwardCurve:
# def __init__(
# self, refdate: Union[datetime, date], start: datetime, end: datetime, values: np.ndarray, freq: str = "1H", tz: str = None, id: str = None
# ):
# """Simple forward curve for power.
# Args:
# refdate (Union[datetime, date]): Reference date of curve
# start (dt.datetime): Start of forward curve datetimepoints (including this timepoint).
# end (dt.datetime): End of forad curve datetimepoints (excluding this timepoint).
# values (np.ndarray): One dimensional array holding the price for each datetimepint in the curve. The method value will raise an exception if the number of values is not equal to the number of datetimepoints.
# freq (str, optional): Frequency of timepoints. Defaults to '1H'. See documentation for pandas.date_range for further details on freq.
# tz (str or tzinfo): Time zone name for returning localized datetime points, for example ‘Asia/Hong_Kong’.
# By default, the resulting datetime points are timezone-naive. See documentation for pandas.date_range for further details on tz.
# id (str): Identifier for the curve. It has no impact on the valuation functionality. If None, a uuid will be generated. Defaults to None.
# """
# self.id = id
# if id is None:
# self.id = "PFC/" + str(datetime.now())
# self.refdate = refdate
# self.start = start
# self.end = end
# self.freq = freq
# self.tz = tz
# self.values = values
# # timegrid used to compute prices for a certain schedule
# self._tg = None
# self._df = (
# pd.DataFrame(
# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values}
# )
# .set_index(["dates"])
# .sort_index()
# )
# def value(self, refdate: Union[date, datetime], schedule) -> np.ndarray:
# if self._tg is None:
# self._tg = pd.DataFrame(
# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values}
# ).reset_index()
# if self._tg.shape[0] != self.values.shape[0]:
# raise Exception(
# "The number of dates ("
# + str(self._tg.shape[0])
# + ") does not equal number of values ("
# + str(self.values.shape[0])
# + ") in forward curve."
# )
# tg = self._tg[(self._tg.dates >= schedule.start) & (self._tg.dates < schedule.end)].set_index("dates")
# _schedule = pd.DataFrame({"dates": schedule.get_schedule(refdate)})
# tg = _schedule.join(tg, on="dates")
# # tg = tg[tg['dates']>=refdate]
# if tg["index"].isna().sum() > 0:
# raise Exception("There are " + str(tg["index"].isna().sum()) + " dates in the schedule not covered by the forward curve.")
# return self.values[tg["index"].values]
# def get_df(self) -> pd.DataFrame:
# return self._df
[docs]
class EnergyPriceForwardCurve:
"""Energy Price Forward Curve object.
It is recommended to initialze this object via the class methods ``from_existing_pfc``, ``from_existing_shape`` or ``from_scratch``.
Args:
id (_type_): ID for the PFC object
refdate (Union[datetime, date]): Reference date
pfc (pd.DataFrame, optional): This object can be initialized with an existing pfc. Defaults to None.
"""
def __init__(self, id, refdate: Union[datetime, date], pfc: pd.DataFrame = None, **kwargs):
self.id = id
if id is None:
self.id = "PFC/" + str(datetime.now())
self.refdate = refdate
self._pfc = pfc
self._pfc_shape: pd.DataFrame = kwargs.get("pfc_shape", None)
self._apply_schedule: SimpleSchedule = kwargs.get("apply_schedule", None)
self._pfc_shaper: PFCShaper = kwargs.get("pfc_shaper", None)
list(map(lambda x: EnergyPriceForwardCurve._validate_dataframes(x), [self._pfc, self._pfc_shape]))
self._future_contracts: List[EnergyFutureSpecifications] = kwargs.get("future_contracts", None)
if self._pfc is None and self._pfc_shape is None and self._pfc_shaper is None:
raise ValueError("No values provided for the arguments pfc, pfc_shape and pfc_shaper!")
@staticmethod
def _validate_dataframes(dataframe: Optional[pd.DataFrame]):
if dataframe is not None:
validators._check_pandas_index_for_datetime(dataframe)
[docs]
@classmethod
def from_existing_pfc(cls, id, refdate: Union[datetime, date], pfc: pd.DataFrame) -> "EnergyPriceForwardCurve":
"""Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC.
Args:
id (_type_): ID for the PFC object
refdate (Union[datetime, date]): Reference Date
pfc (pd.DataFrame): Existing Pfc
Returns:
EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
"""
instance = cls(id=id, refdate=refdate, pfc=pfc)
return instance
[docs]
@classmethod
def from_existing_shape(
cls, id, refdate: Union[datetime, date], pfc_shape: pd.DataFrame, future_contracts: List[EnergyFutureSpecifications]
) -> "EnergyPriceForwardCurve":
"""Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC shape. The shape is then shifted in order to match the future contracts defined in the ``future_contracts`` list.
Args:
id (_type_): ID for the PFC object
refdate (Union[datetime, date]): Reference Date
pfc_shape (pd.DataFrame): Existing PFC shape
future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects)
Returns:
EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
"""
instance = cls(id=id, refdate=refdate, pfc_shape=pfc_shape, future_contracts=future_contracts)
instance._shift_shape()
return instance
[docs]
@classmethod
def from_scratch(
cls,
id,
refdate: Union[datetime, date],
apply_schedule: SimpleSchedule,
pfc_shaper: PFCShaper,
future_contracts: List[EnergyFutureSpecifications],
) -> "EnergyPriceForwardCurve":
"""Initialization of the ``EnergyPriceForwardCurve`` from scratch. First a shape is created using the ``pfc_shaper``. Afterwards, shape is shifted in order to match the future contracts defined in the ``future_contracts`` list.
Args:
id (_type_): ID for the PFC object
refdate (Union[datetime, date]): Reference Date
apply_schedule (SimpleSchedule): Schedule to apply the ``pfc_shaper`` on, in order to obtain shape values for future time points
pfc_shaper (PFCShaper): PFC shaper
future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects)
Returns:
EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
"""
instance = cls(id=id, refdate=refdate, pfc_shaper=pfc_shaper, future_contracts=future_contracts, apply_schedule=apply_schedule)
instance._create_shape()
instance._shift_shape()
return instance
def __validate_contracts_frequency(self):
"""Checks if all contracts in ``self._future_contracts`` have the sample schedule frequency."""
frequencies_contracts = defaultdict(list)
for future_contracts in self._future_contracts:
frequencies_contracts[future_contracts.schedule.freq].append((future_contracts.schedule.__class__.__name__, future_contracts.name))
if len(list(frequencies_contracts.keys())) > 1:
raise ValueError(
f"Found different contract frequencies: {frequencies_contracts}.\n Please provide uniform frequencies for the elements in the `future_contract` dictionary!"
)
def __get_offpeak_contracts(
self, base_contracts: List[EnergyFutureSpecifications], peak_contracts: List[EnergyFutureSpecifications]
) -> List[EnergyFutureSpecifications]:
"""In cases where base and peak contracts are part of the ``self._future_contracts``, offpeak contracts need to be decuted from these two in order to shift the shape properly.
Args:
base_contracts (List[EnergyFutureSpecifications]): List of base contracts
peak_contracts (List[EnergyFutureSpecifications]): List of peak contracts
Returns:
List[EnergyFutureSpecifications]: List of offpeak contracts
"""
offpeak_contracts = []
# iterate over each combination of base and peak contracts
for base_contract_spec in base_contracts:
n_base = len(base_contract_spec.get_schedule())
for peak_contract_spec in peak_contracts:
# match both by the start and end dates of their respective schedule
if base_contract_spec.get_start_end() == peak_contract_spec.get_start_end():
# if both match, an offpeak contract can be created from these two
offpeak_name = f"offpeak_{base_contract_spec.name}&{peak_contract_spec.name}"
n_peak = len(peak_contract_spec.get_schedule())
offpeak_price = (
n_base / (n_base - n_peak) * base_contract_spec.get_price() - n_peak / (n_base - n_peak) * peak_contract_spec.get_price()
)
offpeak_contracts.append(
EnergyFutureSpecifications(
schedule=OffPeakSchedule(start=base_contract_spec.get_start(), end=base_contract_spec.get_end()),
price=offpeak_price,
name=offpeak_name,
)
)
break
return offpeak_contracts
def _shift_shape(self):
"""Shifts the shape to match the future contracts defined in the ``self._future_contracts`` list."""
self.__validate_contracts_frequency()
base_contracts, peak_contracts = [
[fc for fc in self._future_contracts if fc.schedule.__class__._name == schedule_type] for schedule_type in (ets.BASE, ets.PEAK)
]
# if base and peak contracts both exist, offpeak contracts are computed
if (len(base_contracts) > 0) and (len(peak_contracts) > 0):
shifted_pfc = []
offpeak_contracts = self.__get_offpeak_contracts(base_contracts=base_contracts, peak_contracts=peak_contracts)
# shift offpeak and peak separately
for contracts in [offpeak_contracts, peak_contracts]:
shifting_datetimes = np.sort(np.unique(np.concatenate([contract.get_schedule() for contract in contracts])))
_pfc_shape = self._pfc_shape.loc[shifting_datetimes, :]
pfc_shifter = PFCShifter(shape=_pfc_shape, contracts=contracts)
shifted_pfc.append(pfc_shifter.compute())
# combine offpeak and peak shifts
shifted_pfc = pd.concat(shifted_pfc, axis=0)
self._pfc = shifted_pfc.sort_index(ascending=True)
else:
# if either base of peak exists, the shift can be directly performed
pfc_shifter = PFCShifter(shape=self._pfc_shape, contracts=self._future_contracts)
self._pfc = pfc_shifter.compute()
def _create_shape(self):
"""Creates a shape using the ``self._pfc_shaper`` model"""
self._pfc_shaper.calibrate()
self._pfc_shape = self._pfc_shaper.apply(self._apply_schedule)
[docs]
def get_pfc(self) -> pd.DataFrame:
"""Returns the PFC
Returns:
pd.DataFrame: PFC
"""
return self._pfc