Coverage for rivapy / marketdata / curves.py: 71%
601 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-27 14:36 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-27 14:36 +0000
1import math
2import scipy.optimize
3import matplotlib.pyplot as plt
4import pandas as pd
5import numpy as np
6import dateutil.relativedelta as relativedelta
8from rivapy.marketdata._logger import logger
11import rivapy.tools.interfaces as interfaces
12import rivapy.tools._validators as validators
14# from rivapy.tools.interpolate import Interpolator
15from typing import List, Union, Tuple, Literal, Dict, Optional, Any
16from datetime import datetime, date, timedelta
17from collections import defaultdict
20try:
21 import tensorflow as tf
23 has_tf = True
24except ImportError:
25 has_tf = False
27from rivapy.tools.enums import DayCounterType, InterpolationType, ExtrapolationType
28from rivapy.tools.enums import EnergyTimeGridStructure as etgs
29from rivapy.tools.datetools import DayCounter, _date_to_datetime
30from rivapy.marketdata.factory import create as _create
31from rivapy.marketdata_tools.pfc_shaper import PFCShaper
32from rivapy.marketdata_tools.pfc_shifter import PFCShifter
33from rivapy.tools.scheduler import SimpleSchedule, OffPeakSchedule, PeakSchedule, BaseSchedule
34from rivapy.instruments.energy_futures_specifications import EnergyFutureSpecifications
36from rivapy.tools.interpolate import Interpolator
39from rivapy import _pyvacon_available
41if _pyvacon_available:
42 from pyvacon.finance.marketdata import EquityForwardCurve as _EquityForwardCurve
43 from pyvacon.finance.marketdata import SurvivalCurve as _SurvivalCurve
44 from pyvacon.finance.marketdata import DiscountCurve as _DiscountCurve
45 import pyvacon as _pyvacon
48class DiscountCurve:
50 def __init__(
51 self,
52 id: str,
53 refdate: Union[datetime, date],
54 dates: List[Union[datetime, date]],
55 df: List[float],
56 interpolation: InterpolationType = InterpolationType.HAGAN_DF,
57 extrapolation: ExtrapolationType = ExtrapolationType.NONE,
58 daycounter: DayCounterType = DayCounterType.Act365Fixed,
59 ):
60 """Discountcurve
62 Args:
63 id (str): Identifier of the discount curve.
64 refdate (Union[datetime, date]): Reference date of the discount curve.
65 dates (List[Union[datetime, date]]): List of dates belonging to the list of discount factors. All dates must be distinct and equal or after the refdate, otherwise an exception will be thrown.
66 df (List[float]): List of discount factors. Length of list of discount factors must equal to length of list of dates, otherwise an exception will be thrown.
67 interpolation (enums.InterpolationType, optional): Defaults to InterpolationType.HAGAN_DF.
68 extrapolation (enums.ExtrapolationType, optional): Defaults to ExtrapolationType.NONE which does not allow to compute a discount factor for a date past all given dates given to this constructor.
69 daycounter (enums.DayCounterType, optional): Daycounter used within the interpolation formula to compute a discount factor between two dates from the dates-list above. Defaults to DayCounterType.Act365Fixed.
71 """
72 if len(dates) < 1:
73 raise Exception("Please specify at least one date and discount factor")
74 if len(dates) != len(df):
75 raise Exception("List of dates and discount factors must have equal length.")
76 self.values = sorted(zip(dates, df), key=lambda tup: tup[0]) # zip dates and discount factors and sort by dates
77 if isinstance(refdate, datetime):
78 self.refdate = refdate
79 else:
80 # self.refdate = datetime(refdate, 0, 0, 0) # old version syntax??
81 self.refdate = datetime(refdate.year, refdate.month, refdate.day)
82 if not isinstance(interpolation, InterpolationType):
83 raise TypeError("Interpolation is not of type enums.InterpolationType")
84 self.interpolation = interpolation
85 if not isinstance(extrapolation, ExtrapolationType):
86 raise TypeError("Extrapolation is not of type enums.ExtrapolationType")
87 self.extrapolation = extrapolation
88 if not isinstance(daycounter, DayCounterType):
89 print(daycounter)
90 raise TypeError("Daycounter is not of type enums.DaycounterType")
91 self.daycounter = daycounter
92 self.id = id
93 # check if dates are monotonically increasing and if first date is greather then refdate
94 if self.values[0][0] < refdate:
95 raise Exception("First date must be equal or greater then reference date.")
96 if self.values[0][0] > refdate:
97 self.values = [(self.refdate, 1.0)] + self.values
98 if self.values[0][1] != 1.0:
99 raise Exception("Discount factor for today must equal 1.0.")
100 for i in range(1, len(self.values)):
101 if self.values[i - 1] >= self.values[i]:
102 raise Exception("Dates must be given in monotonically increasing order.")
103 self._pyvacon_obj = None
105 def get_dates(self) -> Tuple[datetime]:
106 """Return list of dates of curve
108 Returns:
109 Tuple[datetime]: List of dates
110 """
111 x, y = zip(*self.values)
112 return x
114 def get_df(self) -> Tuple[float]:
115 """Return list of discount factors
117 Returns:
118 Tuple[float]: List of discount factors
119 """
120 x, y = zip(*self.values)
121 return y
123 # Change the name with value once full pyvacon dependencies are removed throughout rivapy
124 def value(self, refdate: Union[date, datetime], d: Union[date, datetime], payment_dates=None, annual_payment_frequency=None) -> float:
125 """Return discount factor for a given date
127 Args:
128 refdate (Union[date, datetime]): The reference date. If the reference date is in the future
129 (compared to the curves reference date), the forward discount
130 factor will be returned.
131 d (Union[date, datetime]): The date for which the discount factor will be returned. Assumption
132 is that the day given already follows correct business logic
133 (e.g., roll convention)
135 Returns:
136 float: discount factor
137 """
139 # {
140 # Analytics_ASSERT(calcDate == validFrom_, "given calcdate must equal refdate of curve");
141 # double t = nP_.dc->yf(validFrom_, date);
142 # return nP_.interp->compute(t);
143 # }
145 # check valid dates
146 if not isinstance(refdate, datetime): # handling date object -> datetime
147 refdate = datetime(refdate, 0, 0, 0)
148 if not isinstance(d, datetime):
149 d = datetime(d, 0, 0, 0)
150 if refdate < self.refdate:
151 raise Exception("The given reference date is before the curves reference date.")
153 # get yearfrac, taking into account DCC
154 dcc = DayCounter(self.daycounter)
156 yf_list = [
157 dcc.yf(self.refdate, x, payment_dates, annual_payment_frequency) for x in self.get_dates()
158 ] # list(dcc.yf(self.refdate, self.get_dates()))
159 df_list = [x for x in self.get_df()]
161 # interpolate/extrapolate given a chosen method
162 interp = Interpolator(self.interpolation, self.extrapolation)
164 # temp testing delete when working
165 # print(self.extrapolation)
166 # print(f"x_data: {yf_list}")
167 # print(f"y_data: {df_list}")
168 # print(f"x_target: {dcc.yf(self.refdate,d)}")
169 # print(dcc.yf(refdate, d))
171 # give FWD value if given refdate is greater than curves reference date
172 if refdate > self.refdate:
173 df1 = interp.interp(yf_list, df_list, dcc.yf(self.refdate, refdate, payment_dates, annual_payment_frequency), self.extrapolation)
174 df2 = interp.interp(yf_list, df_list, dcc.yf(self.refdate, d, payment_dates, annual_payment_frequency), self.extrapolation)
175 df = df2 / df1
176 else: # this also co ers the case if refdates are the same, and avoids division by zero
177 df = interp.interp(yf_list, df_list, dcc.yf(self.refdate, d, payment_dates, annual_payment_frequency), self.extrapolation)
179 return df
181 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
182 """Return continuously compounded zero rate for a given date
184 Args:
185 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward rate will be returned.
186 d (Union[date, datetime]): The date for which the continuously compounded zero rate will be returned.
187 Returns:
188 float: continuously compounded zero rate
189 """
190 if not isinstance(refdate, datetime):
191 refdate = datetime(refdate, 0, 0, 0)
192 if not isinstance(d, datetime):
193 d = datetime(d, 0, 0, 0)
194 if refdate < self.refdate:
195 raise Exception("The given reference date is before the curves reference date.")
196 r = -math.log(self.value(refdate, d)) / DayCounter(self.daycounter).yf(refdate, d)
197 return r
199 def value_yf(self, yf: float) -> float:
200 """Return discount factor for a given yearfrac as of the curve's reference date.
201 Args:
202 yf (float): The year fraction for which the discount factor will be returned.
204 Returns:
205 float: discount factor
206 """
208 # get yearfrac, taking into account DCC
209 dcc = DayCounter(self.daycounter)
211 yf_list = [dcc.yf(self.refdate, x) for x in self.get_dates()] # list(dcc.yf(self.refdate, self.get_dates()))
212 df_list = [x for x in self.get_df()]
214 # interpolate/extrapolate given a chosen method
215 interp = Interpolator(self.interpolation, self.extrapolation)
217 # temp testing delete when working
218 # print(self.extrapolation)
219 # print(f"x_data: {yf_list}")
220 # print(f"y_data: {df_list}")
221 # print(f"x_target: {dcc.yf(self.refdate,d)}")
222 # print(dcc.yf(refdate, d))
224 df = interp.interp(yf_list, df_list, yf, self.extrapolation)
226 return df
228 def value_fwd(self, val_date: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
229 """Return forward discount factor for a given date (without dependencies from pyvacon)
231 The `value_fwd()` method has been updated to support forward valuation scenarios
232 (`val_date > refdate`) by rebasing the curve from its construction date to the new
233 valuation date.
235 The rebasement follows the relationship:
237 DF(val_date, t) = DF(refdate, t) / DF(refdate, val_date)
239 This adjustment ensures that discount factors and forward rates remain consistent
240 across time, even when the valuation date is later than the curve’s reference date.
242 This approach aligns with market-standard practices for OIS and collateralized
243 discounting frameworks, where forward discounting must be time-consistent with
244 the curve’s anchor date.
246 Args:
247 refdate (Union[date, datetime]): The reference date. If the reference date is in the future
248 (compared to the curves reference date), the forward discount
249 factor will be returned.
250 d (Union[date, datetime]): The date for which the discount factor will be returned. Assumption
251 is that the day given already follows correct business logic
252 (e.g., roll convention)
254 Returns:
255 float: discount factor
256 """
258 # double DiscountCurve::valueFwd(
259 # const boost::posix_time::ptime &valDate,
260 # const boost::posix_time::ptime& d1,
261 # const boost::posix_time::ptime& d2) const
262 # {
263 # Analytics_ASSERT(d2 >= d1, "first date " << boost::posix_time::to_iso_string(d1)
264 # << " must be less or equal to the second date " << boost::posix_time::to_iso_string(d2));
265 # double df1 = value(valDate, d1);
266 # double df2 = value(valDate, d2);
267 # return df2 / df1;
268 # }
270 # check valid dates
271 if isinstance(val_date, date): # handling date object -> datetime
272 val_date = datetime.combine(val_date, datetime.min.time())
273 if isinstance(d1, date):
274 d1 = datetime.combine(d1, datetime.min.time())
275 if isinstance(d2, date):
276 d2 = datetime.combine(d2, datetime.min.time())
277 if val_date < self.refdate:
278 raise Exception("The given value date is before the curves reference date.")
280 # get yearfrac, taking into account DCC
281 dcc = DayCounter(self.daycounter)
283 yf_list = [dcc.yf(self.refdate, x) for x in self.get_dates()] # list(dcc.yf(self.refdate, self.get_dates()))
284 df_list = [x for x in self.get_df()]
286 # DEBUG TODO REMOVE
287 # print("Debugging value_fwd: x (yearfrac), then y (df) lists")
288 # print(yf_list)
289 # print(df_list)
291 # interpolate/extrapolate given a chosen method
292 interp = Interpolator(self.interpolation, self.extrapolation)
294 # temp testing delete when working
295 # print(self.extrapolation)
296 # print(f"x_data: {yf_list}")
297 # print(f"y_data: {df_list}")
298 # print(f"x_target: {dcc.yf(self.refdate,d)}")
299 # print(dcc.yf(refdate, d))
301 # give FWD value if given refdate is greater than curves reference date
302 # df1 = interp.interp(yf_list, df_list, dcc.yf(val_date, d1), self.extrapolation)
303 # df2 = interp.interp(yf_list, df_list, dcc.yf(val_date, d2), self.extrapolation)
305 x1 = dcc.yf(self.refdate, d1)
306 x2 = dcc.yf(self.refdate, d2)
307 df1 = interp.interp(yf_list, df_list, x1, self.extrapolation)
308 df2 = interp.interp(yf_list, df_list, x2, self.extrapolation)
310 if val_date > self.refdate:
311 xval = dcc.yf(self.refdate, val_date)
312 df_val = interp.interp(yf_list, df_list, xval, self.extrapolation)
313 # rebase curve to val_date
314 logger.info(f"{val_date} > {self.refdate}: forward valuation")
315 df1 /= df_val
316 df2 /= df_val
318 df = df2 / df1
320 return df
322 def value_fwd_rate(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
323 """Return forward continuously compounded zero rate for a given date
325 Args:
326 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward rate will be returned.
327 d1 (Union[date, datetime]): The start date of the period for which the forward continuously compounded zero rate will be returned.
328 d2 (Union[date, datetime]): The end date of the period for which the forward continuously compounded zero rate will be returned.
329 Returns:
330 float: forward continuously compounded zero rate
331 """
332 if not isinstance(refdate, datetime):
333 refdate = datetime(refdate, 0, 0, 0)
334 if not isinstance(d1, datetime):
335 d1 = datetime(d1, 0, 0, 0)
336 if not isinstance(d2, datetime):
337 d2 = datetime(d2, 0, 0, 0)
338 if refdate < self.refdate:
339 raise Exception("The given reference date is before the curves reference date.")
340 r = -math.log(self.value_fwd(refdate, d1, d2)) / DayCounter(self.daycounter).yf(d1, d2)
341 return r
343 def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None) -> float:
344 if refdate is None or d is None:
345 # directly return the zero rate for a given yearfrac t
346 return -math.log(self.value_yf(t)) / t
347 else: # return the zero rate for a given date d and reference date refdate
348 return self.value_rate(refdate, d)
350 def plot(self, days: int = 10, discount_factors: bool = False, **kwargs):
351 """Plots the discount curve using matplotlibs plot function.
352 The timegrid includes the dates of the discount curve. Here either the discount factors or the zero rates (continuously compounded, ACT365 yearfraction) are plotted.
354 Args:
355 days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10.
356 discount_factors (bool, optional): If True, discount factors will be plotted, otherwise the rates. Defaults to False.
357 **kwargs: optional arguments that will be directly passed to the matplotlib plot function
358 """
359 dates = self.get_dates()
360 dates_new = [dates[0]]
361 for i in range(1, len(dates)):
362 while dates_new[-1] + timedelta(days=days) < dates[i]:
363 dates_new.append(dates_new[-1] + timedelta(days=days))
364 dates_new.append(dates[i])
365 # TODO: consider how best to deal with pyvacon version vs rivapy version
366 # if self._pyvacon_obj is None:
367 # values = [self.value(self.refdate, d) for d in dates_new]
368 # else:
369 # values = [self.value(self.refdate, d) for d in dates_new]
370 ##values = [self.value(self.refdate, d) for d in dates_new]
371 try:
372 values = [self.value(self.refdate, d) for d in dates_new]
373 except:
374 values = [self.value(self.refdate, d) for d in dates_new]
376 if not discount_factors:
377 for i in range(1, len(values)):
378 dt = float((dates_new[i] - self.refdate).days) / 365.0
379 values[i] = -math.log(values[i]) / dt
380 values[0] = values[1]
381 plt.plot(dates_new, values, label=self.id, **kwargs)
384class FlatDiscountCurve(interfaces.BaseDatedCurve):
385 """
386 A simple discount curve implementation based on a single flat interest rate.
387 """
389 def __init__(
390 self,
391 valuation_date: Union[date, datetime],
392 flat_rate: Optional[float] = 0.05,
393 curve_data: Any = None,
394 day_counter_type: DayCounterType = DayCounterType.Act365Fixed,
395 ):
396 """
397 Initializes the flat discount curve.
399 Args:
400 valuation_date (Union[date, datetime]): The valuation date of the curve.
401 flat_rate (Optional[float], optional): The flat interest rate used for discounting. Defaults to 0.05.
402 curve_data (Any, optional): Placeholder for more complex curve data (not used in this implementation). Defaults to None.
403 day_counter_type (DayCounterType, optional): The day count convention for calculating year fractions. Defaults to DayCounterType.Act365Fixed.
404 """
405 self.valuation_date = valuation_date
406 self._flat_rate = flat_rate
407 self._curve_data = curve_data # Placeholder for more complex curve data
408 self._day_counter = DayCounter(day_counter_type)
410 @property
411 def valuation_date(self) -> datetime:
412 """The valuation date of the curve as a datetime object."""
413 return self._valuation_date
415 @valuation_date.setter
416 def valuation_date(self, value: Union[date, datetime]):
417 self._valuation_date = _date_to_datetime(value)
419 def get_discount_factor(self, target_date: Union[date, datetime], spread: float = 0.0) -> float:
420 """
421 Calculates the discount factor from the valuation date to a target date.
423 Args:
424 target_date (Union[date, datetime]): The date to which to discount.
426 Returns:
427 float: The discount factor. Returns 0.0 if the target date is before the valuation date.
428 """
429 val_date_dt = _date_to_datetime(self.valuation_date)
430 target_date_dt = _date_to_datetime(target_date)
432 if target_date_dt < val_date_dt:
433 return 0.0
434 time_to_maturity_years = self._day_counter.yf(val_date_dt, target_date_dt)
435 rate_to_use = self._flat_rate if self._flat_rate is not None else 0.02 # Fallback if flat_rate is None
436 return 1 / ((1 + rate_to_use + spread) ** time_to_maturity_years)
438 def value(self, ref_date: datetime, target_date: datetime, spread: float = 0.0) -> float:
439 """
440 Returns the discount factor from a reference date to a target date.
441 For this simple implementation, the reference date must be the curve's valuation date.
443 Args:
444 ref_date (datetime): The reference date (must match the curve's valuation date).
445 target_date (datetime): The date to which to discount.
447 Raises:
448 ValueError: If the reference date does not match the curve's valuation date.
450 Returns:
451 float: The discount factor.
452 """
453 # Ensure ref_date matches the curve's valuation_date for this simple implementation
454 if _date_to_datetime(ref_date).date() != self.valuation_date.date():
455 raise ValueError(f"Reference date {ref_date} does not match DiscountCurve valuation date {self.valuation_date}")
456 return self.get_discount_factor(target_date, spread=spread)
458 def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None) -> float:
459 return self._flat_rate
462class NelsonSiegel(interfaces.FactoryObject):
463 def __init__(self, beta0: float, beta1: float, beta2: float, tau: float):
464 """Nelson-Siegel parametrization for rates and yields, see :footcite:t:`Nelson1987`.
466 This parametrization is mostly used to parametrize rate curves and can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`. It is defined by
468 .. math::
470 f(t) = \\beta_0 + (\\beta_1+\\beta_2)\\frac{1-e^{-t/\\tau}}{t/\\tau} -\\beta_2e^{t/\\tau}
473 Args:
474 beta0 (float): This parameter is the asymptotic (for arbitrary large maturities) rate, see formula above.
475 beta1 (float): beta0 + beta1 give the short term rate, see formula above.
476 beta2 (float): This parameter controls the size of the hump, see formula above.
477 tau (float): This parameter controls the location of the hump, see formula above.
479 Examples:
480 .. code-block:: python
482 >>> from rivapy.marketdata.curves import NelsonSiegel, DiscountCurveParametrized
483 >>> ns = NelsonSiegel(beta0=0.05, beta1 = 0.02, beta2=0.1, tau=1.0)
484 >>> dc = DiscountCurveParametrized('DC', refdate = dt.datetime(2023,1,1), rate_parametrization=ns, daycounter = DayCounterType.Act365Fixed)
485 >>> dates = [dt.datetime(2023,1,1) + dt.timedelta(days=30*days) for days in range(120)]
486 >>> values = [dc.value(refdate = dt.datetime(2023,1,1),d=d) for d in dates]
487 >>> plt.plot(dates, values)
488 """
489 self.beta0 = beta0
490 self.beta1 = beta1
491 self.beta2 = beta2
492 self.tau = tau
493 self._multiplier = 1.0
495 def _to_dict(self) -> dict:
496 return {"beta0": self.beta0, "beta1": self.beta1, "beta2": self.beta2, "tau": self.tau}
498 def __call__(self, t: float):
499 return self._multiplier * NelsonSiegel.compute(self.beta0, self.beta1, self.beta2, self.tau, t)
501 def __mul__(self, x: float):
502 result = NelsonSiegel(self.beta0, self.beta1, self.beta2, self.tau)
503 result._multiplier = x
504 return result
506 @staticmethod
507 def compute(beta0: float, beta1: float, beta2: float, tau: float, T: float) -> float:
508 """_summary_
510 Args:
511 beta0 (float): longrun
512 beta1 (float): beta0 + beta1 = shortrun
513 beta2 (float): hump or through
514 tau (float):locaton of hump
515 T (float): _description_
517 Returns:
518 float: _description_
519 """
520 t = np.maximum(T, 1e-4) / tau
521 return beta0 + beta1 * (1.0 - np.exp(-t)) / t + beta2 * ((1 - np.exp(-t)) / t - np.exp(-(t)))
523 @staticmethod
524 def _create_sample(
525 n_samples: int,
526 seed: int = None,
527 min_short_term_rate: float = -0.01,
528 max_short_term_rate: float = 0.12,
529 min_long_run_rate: float = 0.005,
530 max_long_run_rate: float = 0.15,
531 min_hump: float = -0.1,
532 max_hump: float = 0.1,
533 min_tau: float = 0.5,
534 max_tau: float = 3.0,
535 ):
536 if seed is not None:
537 np.random.seed(seed)
538 result = []
539 for i in range(n_samples):
540 beta0 = np.random.uniform(min_long_run_rate, max_long_run_rate)
541 beta1 = np.random.uniform(min_short_term_rate - beta0, max_short_term_rate - beta0)
542 beta2 = np.random.uniform(min_hump, max_hump)
543 tau = np.random.uniform(min_tau, max_tau)
544 result.append(NelsonSiegel(beta0, beta1, beta2, tau))
545 return result
547 if has_tf:
549 @staticmethod
550 def compute_tf(beta0: tf.Tensor, beta1: tf.Tensor, beta2: tf.Tensor, tau: tf.Tensor, T: tf.Tensor) -> tf.Tensor:
551 """_summary_
553 Args:
554 beta0 (float): longrun
555 beta1 (float): beta0 + beta1 = shortrun
556 beta2 (float): hump or through
557 tau (float):locaton of hump
558 T (float): _description_
560 Returns:
561 float: _description_
562 """
563 t = tf.maximum(T, 1e-4) / tau
564 return beta0 + beta1 * (1.0 - tf.exp(-t)) / t + beta2 * ((1 - tf.exp(-t)) / t - tf.exp(-(t)))
567class ConstantRate(interfaces.FactoryObject):
568 def __init__(self, rate: float):
569 """Continuously compounded flat rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`.
571 Args:
572 rate (float): The constant rate.
574 """
575 self.rate = rate
577 def _to_dict(self) -> dict:
578 return {"rate": self.rate}
580 @staticmethod
581 def _create_sample(n_samples: int, seed: int = None):
582 if seed is not None:
583 np.random.seed(seed)
584 result = []
585 for i in range(n_samples):
586 result.append(ConstantRate(rate=np.random.uniform(-0.005, 0.1)))
587 return result
589 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
590 if not isinstance(refdate, datetime):
591 refdate = datetime(refdate, 0, 0, 0)
592 if not isinstance(d, datetime):
593 d = datetime(d, 0, 0, 0)
594 r = self.rate
595 yf = DayCounter(DayCounterType.Act365Fixed).yf(refdate, d)
596 return np.exp(-r * yf)
598 def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None):
599 return self.rate
602class LinearRate(interfaces.FactoryObject):
603 def __init__(self, shortterm_rate: float, longterm_rate: float, max_maturity: float = 10.0, min_maturity: float = 1.0):
604 """Continuously compounded linearly interpolated rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`.
606 Args:
607 shortterm_rate (float): The short term rate.
608 longterm_rate (float): the longterm rate.
609 max_maturity (float): AFer this timepoint constant extrapolation is applied.
610 """
611 self.shortterm_rate = shortterm_rate
612 self.min_maturity = min_maturity
613 self.longterm_rate = longterm_rate
614 self.max_maturity = max_maturity
615 self._coeff = (self.longterm_rate - self.shortterm_rate) / (self.max_maturity - self.min_maturity)
617 @staticmethod
618 def _create_sample(n_samples: int, seed: int = None):
619 if seed is not None:
620 np.random.seed(seed)
621 result = []
622 for i in range(n_samples):
623 shortterm_rate = np.random.uniform(-0.005, 0.07)
624 longterm_rate = shortterm_rate + np.random.uniform(0.0025, 0.09)
625 result.append(LinearRate(shortterm_rate=shortterm_rate, longterm_rate=longterm_rate))
626 return result
628 def _to_dict(self) -> dict:
629 return {"shortterm_rate": self.shortterm_rate, "longterm_rate": self.longterm_rate, "max_maturity": self.max_maturity}
631 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
632 if not isinstance(refdate, datetime):
633 refdate = datetime(refdate, 0, 0, 0)
634 if not isinstance(d, datetime):
635 d = datetime(d, 0, 0, 0)
636 r = Interpolator(InterpolationType.LINEAR, ExtrapolationType.CONSTANT).interp(
637 [self.min_maturity, self.max_maturity],
638 [self.shortterm_rate, self.longterm_rate],
639 DayCounter(DayCounterType.Act365Fixed).yf(refdate, d),
640 ExtrapolationType.CONSTANT,
641 )
642 yf = DayCounter(DayCounterType.Act365Fixed).yf(refdate, d)
643 return np.exp(-r * yf)
645 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
646 if not isinstance(refdate, datetime):
647 refdate = datetime(refdate, 0, 0, 0)
648 if not isinstance(d, datetime):
649 d = datetime(d, 0, 0, 0)
650 r = -math.log(self.value(refdate, d)) / DayCounter(DayCounterType.Act365Fixed).yf(refdate, d)
651 return r
653 def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None):
654 return Interpolator(InterpolationType.LINEAR, ExtrapolationType.CONSTANT).interp(
655 [self.min_maturity, self.max_maturity], [self.shortterm_rate, self.longterm_rate], t, ExtrapolationType.CONSTANT
656 )
659class NelsonSiegelSvensson(NelsonSiegel):
660 def __init__(self, beta0: float, beta1: float, beta2: float, beta3: float, tau: float, tau2: float):
661 super().__init__(beta0, beta1, beta2, tau)
662 self.beta3 = beta3
663 self.tau2 = tau2
665 def _to_dict(self) -> dict:
666 tmp = super()._to_dict()
667 tmp.update({"beta3": self.beta3, "tau2": self.tau2})
668 return tmp
670 def __call__(self, t: float):
671 return NelsonSiegelSvensson.compute(self.beta0, self.beta1, self.beta2, self.beta3, self.tau, self.tau2, t)
673 @staticmethod
674 def compute(beta0, beta1, beta2, beta3, tau, tau2, T):
675 t = np.maximum(T, 1e-4) / tau2
676 return NelsonSiegel.compute(beta0, beta1, beta2, tau, T) + beta3 * ((1 - np.exp(-t)) / t - np.exp(-(t)))
679class DiscountCurveComposition(interfaces.FactoryObject):
680 def __init__(self, a, b, c):
681 # check if all discount curves have the same daycounter, otherwise exception
682 if isinstance(a, dict):
683 a = _create(a)
684 if isinstance(b, dict):
685 b = _create(b)
686 if isinstance(c, dict):
687 c = _create(c)
688 dc = set()
689 for k in [a, b, c]:
690 if hasattr(k, "daycounter"):
691 dc.add(k.daycounter)
692 if len(dc) > 1:
693 raise Exception("All curves must have same daycounter.")
694 if len(dc) > 0:
695 self.daycounter = dc.pop()
696 else:
697 self.daycounter = DayCounterType.Act365Fixed.value
698 self._dc = DayCounter(self.daycounter)
699 self.a = a
700 if not hasattr(a, "value"):
701 self.a = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(a), self.daycounter)
702 self.b = b
703 if not hasattr(b, "value"):
704 self.b = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(b), self.daycounter)
705 self.c = c
706 if not hasattr(c, "value"):
707 self.c = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(c), self.daycounter)
709 def _to_dict(self) -> dict:
710 if hasattr(self.a, "to_dict"):
711 a = self.a.to_dict()
712 else:
713 a = self.a
714 if hasattr(self.b, "to_dict"):
715 b = self.b.to_dict()
716 else:
717 b = self.b
718 if hasattr(self.c, "to_dict"):
719 c = self.c.to_dict()
720 else:
721 c = self.c
722 return {"a": a, "b": b, "c": c}
724 @staticmethod
725 def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list:
726 curves = DiscountCurveParametrized._create_sample(n_samples, seed, refdate, parametrization_type)
727 results = []
728 for c in curves:
729 results.append(c + 0.001)
730 return results
732 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
733 r = self.value_rate(refdate, d)
734 yf = self._dc.yf(refdate, d)
735 return np.exp(-r * yf)
737 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
738 return self.a.value_rate(refdate, d) * self.b.value_rate(refdate, d) + self.c.value_rate(refdate, d)
740 def value_fwd(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
741 """Return forward discount factor for a given date"""
742 return self.value(refdate, d2) / self.value(refdate, d1)
744 def value_fwd_rate(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
745 """Return forward continuously compounded zero rate for a given date"""
746 r = -math.log(self.value_fwd(refdate, d1, d2)) / self._dc.yf(d1, d2)
747 return r
749 def __mul__(self, other):
750 # TODO unittests
751 return DiscountCurveComposition(self, other, 0.0)
753 def __rmul__(self, other):
754 return DiscountCurveComposition(self, other, 0.0)
756 def __add__(self, other):
757 return DiscountCurveComposition(self, 1.0, other)
759 def __radd__(self, other):
760 return DiscountCurveComposition(self, 1.0, other)
763class DiscountCurveParametrized(interfaces.FactoryObject):
764 def __init__(
765 self,
766 obj_id: str,
767 refdate: Union[datetime, date],
768 rate_parametrization, #: Callable[[float], float],
769 daycounter: Union[DayCounterType, str] = DayCounterType.Act365Fixed,
770 ):
771 """_summary_
773 Args:
774 obj_id (str): _description_
775 refdate (Union[datetime, date]): _description_
776 rate_parametrization (Callable[[float], float]): _description_
777 daycounter (Union[DayCounterType, str], optional): _description_. Defaults to DayCounterType.Act365Fixed.
778 """
779 if isinstance(refdate, datetime):
780 self.refdate = refdate
781 else:
782 self.refdate = datetime(refdate, 0, 0, 0)
784 self.daycounter = DayCounterType.to_string(daycounter)
785 self._dc = DayCounter(self.daycounter)
786 self.obj_id = obj_id
787 if isinstance(rate_parametrization, dict): # if schedule is a dict we try to create it from factory
788 self.rate_parametrization = _create(rate_parametrization)
789 else:
790 self.rate_parametrization = rate_parametrization
792 def _to_dict(self) -> dict:
793 try:
794 parametrization = self.rate_parametrization.to_dict()
795 except Exception as e:
796 raise Exception("Missing implementation of to_dict() in parametrization of type " + type(self.rate_parametrization).__name__)
797 return {"obj_id": self.obj_id, "refdate": self.refdate, "rate_parametrization": parametrization}
799 def value_fwd(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
800 """Return forward discount factor for a given date
802 Args:
803 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
804 d1 (Union[date, datetime]): The start date of the forward period
805 d2 (Union[date, datetime]): The end date of the forward period
806 Returns:
807 float: forward rate
808 """
809 if not isinstance(refdate, datetime):
810 refdate = datetime(refdate, 0, 0, 0)
811 if not isinstance(d1, datetime):
812 d1 = datetime(d1, 0, 0, 0)
813 if not isinstance(d2, datetime):
814 d2 = datetime(d2, 0, 0, 0)
815 if refdate < self.refdate:
816 raise Exception("The given reference date is before the curves reference date.")
817 yf1 = self.value(refdate, d1)
818 yf2 = self.value(refdate, d2)
819 return yf2 / yf1
821 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
822 """Return discount factor for a given date
824 Args:
825 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
826 d (Union[date, datetime]): The date for which the discount factor will be returned
828 Returns:
829 float: discount factor
830 """
831 if not isinstance(refdate, datetime):
832 refdate = datetime(refdate, 0, 0, 0)
833 if not isinstance(d, datetime):
834 d = datetime(d, 0, 0, 0)
835 if refdate < self.refdate:
836 raise Exception("The given reference date is before the curves reference date.")
837 yf = self._dc.yf(refdate, d)
838 return np.exp(-self.rate_parametrization(yf, refdate, d) * yf)
840 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
841 """Return the continuous rate for a given date
843 Args:
844 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
845 d (Union[date, datetime]): The date for which the discount factor will be returned
847 Returns:
848 float: continuous rate
849 """
850 if not isinstance(refdate, datetime):
851 refdate = datetime(refdate, 0, 0, 0)
852 if not isinstance(d, datetime):
853 d = datetime(d, 0, 0, 0)
854 if refdate < self.refdate:
855 raise Exception("The given reference date is before the curves reference date.")
856 yf = self._dc.yf(refdate, d)
857 return self.rate_parametrization(yf, refdate, d)
859 def value_fwd_rate(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float:
860 """Return forward continuously compounded zero rate for a given date
862 Args:
863 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward rate will be returned.
864 d1 (Union[date, datetime]): The start date of the period for which the forward continuously compounded zero rate will be returned.
865 d2 (Union[date, datetime]): The end date of the period for which the forward continuously compounded zero rate will be returned.
866 Returns:
867 float: forward continuously compounded zero rate
868 """
869 if not isinstance(refdate, datetime):
870 refdate = datetime(refdate, 0, 0, 0)
871 if not isinstance(d1, datetime):
872 d1 = datetime(d1, 0, 0, 0)
873 if not isinstance(d2, datetime):
874 d2 = datetime(d2, 0, 0, 0)
875 if refdate < self.refdate:
876 raise Exception("The given reference date is before the curves reference date.")
877 r = -math.log(self.value_fwd(refdate, d1, d2)) / self._dc.yf(d1, d2)
878 return r
880 @staticmethod
881 def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list:
882 if seed is not None:
883 np.random.seed(seed)
884 if refdate is None:
885 refdate = datetime.now()
886 parametrizations = parametrization_type._create_sample(n_samples)
887 result = []
888 for i, p in enumerate(parametrizations):
889 result.append(DiscountCurveParametrized("DCP_" + str(i), refdate, p))
890 return result
892 def __mul__(self, other):
893 return DiscountCurveComposition(self, other, 0.0)
895 def __rmul__(self, other):
896 return DiscountCurveComposition(self, other, 0.0)
898 def __add__(self, other):
899 return DiscountCurveComposition(self, 1.0, other)
901 def __radd__(self, other):
902 return DiscountCurveComposition(self, 1.0, other)
905class EquityForwardCurve:
906 def __init__(self, spot: float, funding_curve: DiscountCurve, borrow_curve: DiscountCurve, div_table):
907 """Equity Forward Curve
909 Args:
911 spot (float): Current spot
912 discount_curve (DiscountCurve): [description]
913 funding_curve (DiscountCurve): [description]
914 borrow_curve (DiscountCurve): [description]
915 div_table (:class:`rivapy.marketdata.DividendTable`): [description]
916 """
917 self.spot = spot
919 self.bc = borrow_curve
920 self.fc = funding_curve
921 self.div = div_table
922 self._pyvacon_obj = None
923 self.refdate = self.fc.refdate
924 if self.bc is not None:
925 if self.refdate < self.bc.refdate:
926 self.refdate = self.bc.refdate
928 if self.div is not None:
929 if hasattr(self.div, "refdate"):
930 if self.refdate < self.div.refdate:
931 self.refdate = self.div.refdate
933 def _get_pyvacon_obj(self):
934 if self._pyvacon_obj is None:
935 if hasattr(self.fc, "_get_pyvacon_obj"):
936 fc = self.fc._get_pyvacon_obj()
937 else:
938 fc = self.fc
940 if hasattr(self.bc, "_get_pyvacon_obj"):
941 bc = self.bc._get_pyvacon_obj()
942 else:
943 bc = self.bc
945 if hasattr(self.div, "_get_pyvacon_obj"):
946 div = self.div._get_pyvacon_obj()
947 else:
948 div = self.div
949 self._pyvacon_obj = _EquityForwardCurve(self.refdate, self.spot, fc, bc, div)
951 return self._pyvacon_obj
953 def value(self, refdate, expiry):
954 return self._get_pyvacon_obj().value(refdate, expiry)
956 def plot(self, days: int = 10, days_end: int = 10 * 365, **kwargs):
957 """Plots the forward curve using matplotlibs plot function.
959 Args:
960 days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10.
961 days_end (int. optional): Number of days when plotting will end. Defaults to 10*365 (10yr)
962 **kwargs: optional arguments that will be directly passed to the matplotlib plto function
963 """
964 dates = [self.refdate + timedelta(days=i) for i in range(0, days_end, days)]
965 values = [self.value(self.refdate, d) for d in dates]
966 plt.plot(dates, values, **kwargs)
967 plt.xlabel("expiry")
968 plt.ylabel("forward value")
971class BootstrapHazardCurve:
972 def __init__(
973 self, ref_date: datetime, trade_date: datetime, dc: DiscountCurve, RR: float, payment_dates: List[datetime], market_spreads: List[float]
974 ):
975 """[summary]
977 Args:
978 ref_date (datetime): [description]
979 trade_date (datetime): [description]
980 dc (DiscountCurve): [description]
981 RR (float): [description]
982 payment_dates (List[datetime]): [description]
983 market_spreads (List[float]): [description]
984 """
986 self.ref_date = ref_date
987 self.trade_date = trade_date
988 self.dc = dc
989 self.RR = RR
990 self.payment_dates_bootstrapp = payment_dates
991 self.market_spreads = market_spreads
992 self._pyvacon_obj = None
994 def par_spread(self, dc_survival, maturity_date, payment_dates: List[datetime]):
995 integration_step = relativedelta.relativedelta(days=365)
996 premium_period_start = self.ref_date
997 prev_date = self.ref_date
998 current_date = min(prev_date + integration_step, maturity_date)
999 dc_valuation_date = self.dc.value(self.ref_date, maturity_date)
1000 risk_adj_factor_protection = 0
1001 risk_adj_factor_premium = 0
1002 risk_adj_factor_accrued = 0
1004 while current_date <= maturity_date:
1005 default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, current_date)
1006 risk_adj_factor_protection += self.dc.value(self.ref_date, current_date) * default_prob
1007 prev_date = current_date
1008 current_date += integration_step
1010 if prev_date < maturity_date and current_date > maturity_date:
1011 default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, maturity_date)
1012 risk_adj_factor_protection += self.dc.value(self.ref_date, maturity_date) * default_prob
1014 for premium_payment in payment_dates:
1015 if premium_payment >= self.ref_date:
1016 period_length = ((premium_payment - premium_period_start).days) / 360
1017 survival_prob = (dc_survival.value(self.ref_date, premium_period_start) + dc_survival.value(self.ref_date, premium_payment)) / 2
1018 df = self.dc.value(self.ref_date, premium_payment)
1019 risk_adj_factor_premium += period_length * survival_prob * df
1020 default_prob = dc_survival.value(self.ref_date, premium_period_start) - dc_survival.value(self.ref_date, premium_payment)
1021 risk_adj_factor_accrued += period_length * default_prob * df
1022 premium_period_start = premium_payment
1024 PV_accrued = (1 / 2) * risk_adj_factor_accrued
1025 PV_premium = (1) * risk_adj_factor_premium
1026 PV_protection = ((1 - self.RR)) * risk_adj_factor_protection
1028 par_spread_i = (PV_protection) / ((PV_premium + PV_accrued))
1029 return par_spread_i
1031 def create_survival(self, dates: List[datetime], hazard_rates: List[float]):
1032 return _SurvivalCurve("survival_curve", self.refdate, dates, hazard_rates)
1034 def calibration_error(x, self, mkt_par_spread, ref_date, payment_dates, dates, hazard_rates):
1035 hazard_rates[-1] = x
1036 maturity_date = dates[-1]
1037 dc_surv = self.create_survival(ref_date, dates, hazard_rates)
1038 return mkt_par_spread - self.par_spread(dc_surv, maturity_date, payment_dates)
1040 def calibrate_hazard_rate(self):
1041 sc_dates = [self.ref_date]
1042 hazard_rates = [0.0]
1043 for i in range(len(self.payment_dates_bootstrapp)):
1044 payment_dates_iter = self.payment_dates_bootstrapp[i]
1045 mkt_par_spread_iter = self.market_spreads[i]
1046 sc_dates.append(payment_dates_iter[-1])
1047 hazard_rates.append(hazard_rates[-1])
1048 sol = scipy.optimize.root_scalar(
1049 self.calibration_error,
1050 args=(mkt_par_spread_iter, self.ref_date, payment_dates_iter, sc_dates, hazard_rates),
1051 method="brentq",
1052 bracket=[0, 3],
1053 xtol=1e-8,
1054 rtol=1e-8,
1055 )
1056 hazard_rates[-1] = sol.root
1057 return hazard_rates, sc_dates # self.create_survival(self.ref_date, sc_dates, hazard_rates)#.value, hazard_rates
1059 # def hazard_rates(self):
1060 # #hazard_rates_value=[]
1061 # hazard_rates_value=self.calibrate_hazard_rate()
1062 # return self.hazard_rates_value
1064 # def value(self, refdate: Union[date, datetime], d: Union[date, datetime])->float:
1065 # """Return discount factor for a given date
1067 # Args:
1068 # refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
1069 # d (Union[date, datetime]): The date for which the discount factor will be returned
1071 # Returns:
1072 # float: discount factor
1073 # """
1074 # #if not isinstance(refdate, datetime):
1075 # # refdate = datetime(refdate,0,0,0)
1076 # #if not isinstance(d, datetime):
1077 # # d = datetime(d,0,0,0)
1078 # #if refdate < self.refdate:
1079 # # raise Exception('The given reference date is before the curves reference date.')
1080 # return self._get_pyvacon_obj().value(refdate, d)
1082 # def _get_pyvacon_obj(self):
1083 # if self._pyvacon_obj is None:
1084 # self._pyvacon_obj = _SurvivalCurve('survival_curve', self.refdate,
1085 # self.calibrate_hazard_rate[1], self.calibrate_hazard_rate[0])
1086 # return self._pyvacon_obj
1089# class PowerPriceForwardCurve:
1090# def __init__(
1091# self, refdate: Union[datetime, date], start: datetime, end: datetime, values: np.ndarray, freq: str = "1H", tz: str = None, id: str = None
1092# ):
1093# """Simple forward curve for power.
1095# Args:
1096# refdate (Union[datetime, date]): Reference date of curve
1097# start (dt.datetime): Start of forward curve datetimepoints (including this timepoint).
1098# end (dt.datetime): End of forad curve datetimepoints (excluding this timepoint).
1099# values (np.ndarray): One dimensional array holding the price for each datetimepint in the curve. The method value will raise an exception if the number of values is not equal to the number of datetimepoints.
1100# freq (str, optional): Frequency of timepoints. Defaults to '1H'. See documentation for pandas.date_range for further details on freq.
1101# tz (str or tzinfo): Time zone name for returning localized datetime points, for example ‘Asia/Hong_Kong’.
1102# By default, the resulting datetime points are timezone-naive. See documentation for pandas.date_range for further details on tz.
1103# id (str): Identifier for the curve. It has no impact on the valuation functionality. If None, a uuid will be generated. Defaults to None.
1104# """
1105# self.id = id
1106# if id is None:
1107# self.id = "PFC/" + str(datetime.now())
1108# self.refdate = refdate
1109# self.start = start
1110# self.end = end
1111# self.freq = freq
1112# self.tz = tz
1113# self.values = values
1114# # timegrid used to compute prices for a certain schedule
1115# self._tg = None
1116# self._df = (
1117# pd.DataFrame(
1118# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values}
1119# )
1120# .set_index(["dates"])
1121# .sort_index()
1122# )
1124# def value(self, refdate: Union[date, datetime], schedule) -> np.ndarray:
1125# if self._tg is None:
1126# self._tg = pd.DataFrame(
1127# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values}
1128# ).reset_index()
1129# if self._tg.shape[0] != self.values.shape[0]:
1130# raise Exception(
1131# "The number of dates ("
1132# + str(self._tg.shape[0])
1133# + ") does not equal number of values ("
1134# + str(self.values.shape[0])
1135# + ") in forward curve."
1136# )
1137# tg = self._tg[(self._tg.dates >= schedule.start) & (self._tg.dates < schedule.end)].set_index("dates")
1138# _schedule = pd.DataFrame({"dates": schedule.get_schedule(refdate)})
1139# tg = _schedule.join(tg, on="dates")
1140# # tg = tg[tg['dates']>=refdate]
1141# if tg["index"].isna().sum() > 0:
1142# raise Exception("There are " + str(tg["index"].isna().sum()) + " dates in the schedule not covered by the forward curve.")
1143# return self.values[tg["index"].values]
1145# def get_df(self) -> pd.DataFrame:
1146# return self._df
1149class EnergyPriceForwardCurve:
1150 """Energy Price Forward Curve object.
1151 It is recommended to initialze this object via the class methods ``from_existing_pfc``, ``from_existing_shape`` or ``from_scratch``.
1153 Args:
1154 id (_type_): ID for the PFC object
1155 refdate (Union[datetime, date]): Reference date
1156 pfc (pd.DataFrame, optional): This object can be initialized with an existing pfc. Defaults to None.
1157 """
1159 def __init__(self, id, refdate: Union[datetime, date], pfc: pd.DataFrame = None, **kwargs):
1160 self.id = id
1161 if id is None:
1162 self.id = "PFC/" + str(datetime.now())
1163 self.refdate = refdate
1165 self._pfc = pfc
1167 self._pfc_shape: pd.DataFrame = kwargs.get("pfc_shape", None)
1169 self._apply_schedule: SimpleSchedule = kwargs.get("apply_schedule", None)
1170 self._pfc_shaper: PFCShaper = kwargs.get("pfc_shaper", None)
1172 list(map(lambda x: EnergyPriceForwardCurve._validate_dataframes(x), [self._pfc, self._pfc_shape]))
1174 self._future_contracts: List[EnergyFutureSpecifications] = kwargs.get("future_contracts", None)
1176 if self._pfc is None and self._pfc_shape is None and self._pfc_shaper is None:
1177 raise ValueError("No values provided for the arguments pfc, pfc_shape and pfc_shaper!")
1179 @staticmethod
1180 def _validate_dataframes(dataframe: Optional[pd.DataFrame]):
1181 if dataframe is not None:
1182 validators._check_pandas_index_for_datetime(dataframe)
1184 @classmethod
1185 def from_existing_pfc(cls, id, refdate: Union[datetime, date], pfc: pd.DataFrame) -> "EnergyPriceForwardCurve":
1186 """Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC.
1188 Args:
1189 id (_type_): ID for the PFC object
1190 refdate (Union[datetime, date]): Reference Date
1191 pfc (pd.DataFrame): Existing Pfc
1193 Returns:
1194 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
1195 """
1196 instance = cls(id=id, refdate=refdate, pfc=pfc)
1197 return instance
1199 @classmethod
1200 def from_existing_shape(
1201 cls, id, refdate: Union[datetime, date], pfc_shape: pd.DataFrame, future_contracts: List[EnergyFutureSpecifications]
1202 ) -> "EnergyPriceForwardCurve":
1203 """Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC shape. The shape is then shifted in order to match the future contracts defined in the ``future_contracts`` list.
1206 Args:
1207 id (_type_): ID for the PFC object
1208 refdate (Union[datetime, date]): Reference Date
1209 pfc_shape (pd.DataFrame): Existing PFC shape
1210 future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects)
1212 Returns:
1213 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
1214 """
1215 instance = cls(id=id, refdate=refdate, pfc_shape=pfc_shape, future_contracts=future_contracts)
1216 instance._shift_shape()
1217 return instance
1219 @classmethod
1220 def from_scratch(
1221 cls,
1222 id,
1223 refdate: Union[datetime, date],
1224 apply_schedule: SimpleSchedule,
1225 pfc_shaper: PFCShaper,
1226 future_contracts: List[EnergyFutureSpecifications],
1227 ) -> "EnergyPriceForwardCurve":
1228 """Initialization of the ``EnergyPriceForwardCurve`` from scratch. First a shape is created using the ``pfc_shaper``. Afterwards, shape is shifted in order to match the future contracts defined in the ``future_contracts`` list.
1230 Args:
1231 id (_type_): ID for the PFC object
1232 refdate (Union[datetime, date]): Reference Date
1233 apply_schedule (SimpleSchedule): Schedule to apply the ``pfc_shaper`` on, in order to obtain shape values for future time points
1234 pfc_shaper (PFCShaper): PFC shaper
1235 future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects)
1237 Returns:
1238 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
1239 """
1240 instance = cls(id=id, refdate=refdate, pfc_shaper=pfc_shaper, future_contracts=future_contracts, apply_schedule=apply_schedule)
1241 instance._create_shape()
1242 instance._shift_shape()
1243 return instance
1245 def __validate_contracts_frequency(self):
1246 """Checks if all contracts in ``self._future_contracts`` have the sample schedule frequency."""
1247 frequencies_contracts = defaultdict(list)
1248 for future_contracts in self._future_contracts:
1249 frequencies_contracts[future_contracts.schedule.freq].append((future_contracts.schedule.__class__.__name__, future_contracts.name))
1251 if len(list(frequencies_contracts.keys())) > 1:
1252 raise ValueError(
1253 f"Found different contract frequencies: {frequencies_contracts}.\n Please provide uniform frequencies for the elements in the `future_contract` dictionary!"
1254 )
1256 def __get_offpeak_contracts(
1257 self, base_contracts: List[EnergyFutureSpecifications], peak_contracts: List[EnergyFutureSpecifications]
1258 ) -> List[EnergyFutureSpecifications]:
1259 """In cases where base and peak contracts are part of the ``self._future_contracts``, offpeak contracts need to be deducted from these two in order to shift the shape properly.
1261 Args:
1262 base_contracts (List[EnergyFutureSpecifications]): List of base contracts
1263 peak_contracts (List[EnergyFutureSpecifications]): List of peak contracts
1265 Returns:
1266 List[EnergyFutureSpecifications]: List of offpeak contracts
1267 """
1268 offpeak_contracts = []
1270 # iterate over each combination of base and peak contracts
1271 for base_contract_spec in base_contracts:
1272 n_base = len(base_contract_spec.get_schedule())
1273 for peak_contract_spec in peak_contracts:
1274 # match both by the start and end dates of their respective schedule
1275 if base_contract_spec.get_start_end() == peak_contract_spec.get_start_end():
1276 # if both match, an offpeak contract can be created from these two
1277 offpeak_name = f"offpeak_{base_contract_spec.name}&{peak_contract_spec.name}"
1278 n_peak = len(peak_contract_spec.get_schedule())
1279 offpeak_price = (
1280 n_base / (n_base - n_peak) * base_contract_spec.get_price() - n_peak / (n_base - n_peak) * peak_contract_spec.get_price()
1281 )
1282 offpeak_contracts.append(
1283 EnergyFutureSpecifications(
1284 schedule=OffPeakSchedule(start=base_contract_spec.get_start(), end=base_contract_spec.get_end()),
1285 price=offpeak_price,
1286 name=offpeak_name,
1287 )
1288 )
1289 break
1291 return offpeak_contracts
1293 def _shift_shape(self):
1294 """Shifts the shape to match the future contracts defined in the ``self._future_contracts`` list."""
1295 self.__validate_contracts_frequency()
1297 base_contracts, peak_contracts = [
1298 [fc for fc in self._future_contracts if fc.schedule.__class__._name == schedule_type] for schedule_type in (etgs.BASE, etgs.PEAK)
1299 ]
1301 # if base and peak contracts both exist, offpeak contracts are computed
1302 if (len(base_contracts) > 0) and (len(peak_contracts) > 0):
1303 shifted_pfc = []
1304 offpeak_contracts = self.__get_offpeak_contracts(base_contracts=base_contracts, peak_contracts=peak_contracts)
1306 # shift offpeak and peak separately
1307 for contracts in [offpeak_contracts, peak_contracts]:
1308 shifting_datetimes = np.sort(np.unique(np.concatenate([contract.get_schedule() for contract in contracts])))
1309 _pfc_shape = self._pfc_shape.loc[shifting_datetimes, :]
1310 pfc_shifter = PFCShifter(shape=_pfc_shape, contracts=contracts)
1311 shifted_pfc.append(pfc_shifter.compute())
1313 # combine offpeak and peak shifts
1314 shifted_pfc = pd.concat(shifted_pfc, axis=0)
1315 self._pfc = shifted_pfc.sort_index(ascending=True)
1317 else:
1318 # if either base of peak exists, shifting can be directly performed
1319 pfc_shifter = PFCShifter(shape=self._pfc_shape, contracts=self._future_contracts)
1320 self._pfc = pfc_shifter.compute()
1322 def _create_shape(self):
1323 """Creates a shape using the ``self._pfc_shaper`` model"""
1324 self._pfc_shaper.calibrate()
1325 self._pfc_shape = self._pfc_shaper.apply(self._apply_schedule.get_schedule())
1327 def get_pfc(self) -> pd.DataFrame:
1328 """Returns the PFC
1330 Returns:
1331 pd.DataFrame: PFC
1332 """
1333 return self._pfc