Coverage for rivapy/marketdata/curves.py: 65%
458 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
1import math
2import scipy.optimize
3import matplotlib.pyplot as plt
4import pandas as pd
5import numpy as np
6import dateutil.relativedelta as relativedelta
7import rivapy.tools.interfaces as interfaces
8import rivapy.tools._validators as validators
9from typing import List, Union, Tuple, Literal, Dict, Optional
10from datetime import datetime, date, timedelta
11from collections import defaultdict
13try:
14 import tensorflow as tf
16 has_tf = True
17except ImportError:
18 has_tf = False
20from rivapy.tools.enums import DayCounterType, InterpolationType, ExtrapolationType
21from rivapy.tools.enums import EnergyTimeGridStructure as ets
22from rivapy.tools.datetools import DayCounter
23from rivapy.marketdata.factory import create as _create
24from rivapy.marketdata_tools.pfc_shaper import PFCShaper
25from rivapy.marketdata_tools.pfc_shifter import PFCShifter
26from rivapy.tools.scheduler import SimpleSchedule, OffPeakSchedule, PeakSchedule, BaseSchedule
27from rivapy.instruments.energy_futures_specifications import EnergyFutureSpecifications
29from rivapy import _pyvacon_available
31if _pyvacon_available:
32 from pyvacon.finance.marketdata import EquityForwardCurve as _EquityForwardCurve
33 from pyvacon.finance.marketdata import SurvivalCurve as _SurvivalCurve
34 from pyvacon.finance.marketdata import DiscountCurve as _DiscountCurve
35 import pyvacon as _pyvacon
38class DiscountCurve:
40 def __init__(
41 self,
42 id: str,
43 refdate: Union[datetime, date],
44 dates: List[Union[datetime, date]],
45 df: List[float],
46 interpolation: InterpolationType = InterpolationType.HAGAN_DF,
47 extrapolation: ExtrapolationType = ExtrapolationType.NONE,
48 daycounter: DayCounterType = DayCounterType.Act365Fixed,
49 ):
50 """Discountcurve
52 Args:
53 id (str): Identifier of the discount curve.
54 refdate (Union[datetime, date]): Reference date of the discount curve.
55 dates (List[Union[datetime, date]]): List of dates belonging to the list of discount factors. All dates must be distinct and equal or after the refdate, otherwise an exception will be thrown.
56 df (List[float]): List of discount factors. Length of list of discount factors must equal to length of list of dates, otherwise an exception will be thrown.
57 interpolation (enums.InterpolationType, optional): Defaults to InterpolationType.HAGAN_DF.
58 extrapolation (enums.ExtrapolationType, optional): Defaults to ExtrapolationType.NONE which does not allow to compute a discount factor for a date past all given dates given to this constructor.
59 daycounter (enums.DayCounterType, optional): Daycounter used within the interpolation formula to compute a discount factor between two dates from the dates-list above. Defaults to DayCounterType.Act365Fixed.
61 """
62 if len(dates) < 1:
63 raise Exception("Please specify at least one date and discount factor")
64 if len(dates) != len(df):
65 raise Exception("List of dates and discount factors must have equal length.")
66 self.values = sorted(zip(dates, df), key=lambda tup: tup[0]) # zip dates and discount factors and sort by dates
67 if isinstance(refdate, datetime):
68 self.refdate = refdate
69 else:
70 self.refdate = datetime(refdate, 0, 0, 0)
71 if not isinstance(interpolation, InterpolationType):
72 raise TypeError("Interpolation is not of type enums.InterpolationType")
73 self.interpolation = interpolation
74 if not isinstance(extrapolation, ExtrapolationType):
75 raise TypeError("Extrapolation is not of type enums.ExtrapolationType")
76 self.extrapolation = extrapolation
77 if not isinstance(daycounter, DayCounterType):
78 raise TypeError("Daycounter is not of type enums.DaycounterType")
79 self.daycounter = daycounter
80 self.id = id
81 # check if dates are monotonically increasing and if first date is greather then refdate
82 if self.values[0][0] < refdate:
83 raise Exception("First date must be equal or greater then reference date.")
84 if self.values[0][0] > refdate:
85 self.values = [(self.refdate, 1.0)] + self.values
86 if self.values[0][1] != 1.0:
87 raise Exception("Discount factor for today must equal 1.0.")
88 for i in range(1, len(self.values)):
89 if self.values[i - 1] >= self.values[i]:
90 raise Exception("Dates must be given in monotonically increasing order.")
91 self._pyvacon_obj = None
93 def get_dates(self) -> Tuple[datetime]:
94 """Return list of dates of curve
96 Returns:
97 Tuple[datetime]: List of dates
98 """
99 x, y = zip(*self.values)
100 return x
102 def get_df(self) -> Tuple[float]:
103 """Return list of discount factors
105 Returns:
106 Tuple[float]: List of discount factors
107 """
108 x, y = zip(*self.values)
109 return y
111 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
112 """Return discount factor for a given date
114 Args:
115 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
116 d (Union[date, datetime]): The date for which the discount factor will be returned
118 Returns:
119 float: discount factor
120 """
121 if not isinstance(refdate, datetime):
122 refdate = datetime(refdate, 0, 0, 0)
123 if not isinstance(d, datetime):
124 d = datetime(d, 0, 0, 0)
125 if refdate < self.refdate:
126 raise Exception("The given reference date is before the curves reference date.")
127 return self._get_pyvacon_obj().value(refdate, d)
129 def _get_pyvacon_obj(self):
130 if self._pyvacon_obj is None:
131 self._pyvacon_obj = _DiscountCurve(
132 self.id,
133 self.refdate,
134 [x for x in self.get_dates()],
135 [x for x in self.get_df()],
136 self.daycounter.value,
137 self.interpolation,
138 self.extrapolation,
139 )
140 return self._pyvacon_obj
142 def plot(self, days: int = 10, discount_factors: bool = False, **kwargs):
143 """Plots the discount curve using matplotlibs plot function.
144 The timegrid includes the dates of the discount curve. Here either the discount factors or the zero rates (continuously compounded, ACT365 yearfraction) are plotted.
146 Args:
147 days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10.
148 discount_factors (bool, optional): If True, discount factors will be plotted, otherwise the rates. Defaults to False.
149 **kwargs: optional arguments that will be directly passed to the matplotlib plto function
150 """
151 dates = self.get_dates()
152 dates_new = [dates[0]]
153 for i in range(1, len(dates)):
154 while dates_new[-1] + timedelta(days=days) < dates[i]:
155 dates_new.append(dates_new[-1] + timedelta(days=days))
156 dates_new.append(dates[-1])
157 values = [self.value(self.refdate, d) for d in dates_new]
159 if not discount_factors:
160 for i in range(1, len(values)):
161 dt = float((dates_new[i] - self.refdate).days) / 365.0
162 values[i] = -math.log(values[i]) / dt
163 values[0] = values[1]
164 plt.plot(dates_new, values, label=self.id, **kwargs)
167class NelsonSiegel(interfaces.FactoryObject):
168 def __init__(self, beta0: float, beta1: float, beta2: float, tau: float):
169 """Nelson-Siegel parametrization for rates and yields, see :footcite:t:`Nelson1987`.
171 This parametrization is mostly used to parametrize rate curves and can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`. It is defined by
173 .. math::
175 f(t) = \\beta_0 + (\\beta_1+\\beta_2)\\frac{1-e^{-t/\\tau}}{t/\\tau} -\\beta_2e^{t/\\tau}
178 Args:
179 beta0 (float): This parameter is the asymptotic (for arbitrary large maturities) rate, see formula above.
180 beta1 (float): beta0 + beta1 give the short term rate, see formula above.
181 beta2 (float): This parameter controls the size of the hump, see formula above.
182 tau (float): This parameter controls the location of the hump, see formula above.
184 Examples:
185 .. code-block:: python
187 >>> from rivapy.marketdata.curves import NelsonSiegel, DiscountCurveParametrized
188 >>> ns = NelsonSiegel(beta0=0.05, beta1 = 0.02, beta2=0.1, tau=1.0)
189 >>> dc = DiscountCurveParametrized('DC', refdate = dt.datetime(2023,1,1), rate_parametrization=ns, daycounter = DayCounterType.Act365Fixed)
190 >>> dates = [dt.datetime(2023,1,1) + dt.timedelta(days=30*days) for days in range(120)]
191 >>> values = [dc.value(refdate = dt.datetime(2023,1,1),d=d) for d in dates]
192 >>> plt.plot(dates, values)
193 """
194 self.beta0 = beta0
195 self.beta1 = beta1
196 self.beta2 = beta2
197 self.tau = tau
198 self._multiplier = 1.0
200 def _to_dict(self) -> dict:
201 return {"beta0": self.beta0, "beta1": self.beta1, "beta2": self.beta2, "tau": self.tau}
203 def __call__(self, t: float):
204 return self._multiplier * NelsonSiegel.compute(self.beta0, self.beta1, self.beta2, self.tau, t)
206 def __mul__(self, x: float):
207 result = NelsonSiegel(self.beta0, self.beta1, self.beta2, self.tau)
208 result._multiplier = x
209 return result
211 @staticmethod
212 def compute(beta0: float, beta1: float, beta2: float, tau: float, T: float) -> float:
213 """_summary_
215 Args:
216 beta0 (float): longrun
217 beta1 (float): beta0 + beta1 = shortrun
218 beta2 (float): hump or through
219 tau (float):locaton of hump
220 T (float): _description_
222 Returns:
223 float: _description_
224 """
225 t = np.maximum(T, 1e-4) / tau
226 return beta0 + beta1 * (1.0 - np.exp(-t)) / t + beta2 * ((1 - np.exp(-t)) / t - np.exp(-(t)))
228 @staticmethod
229 def _create_sample(
230 n_samples: int,
231 seed: int = None,
232 min_short_term_rate: float = -0.01,
233 max_short_term_rate: float = 0.12,
234 min_long_run_rate: float = 0.005,
235 max_long_run_rate: float = 0.15,
236 min_hump: float = -0.1,
237 max_hump: float = 0.1,
238 min_tau: float = 0.5,
239 max_tau: float = 3.0,
240 ):
241 if seed is not None:
242 np.random.seed(seed)
243 result = []
244 for i in range(n_samples):
245 beta0 = np.random.uniform(min_long_run_rate, max_long_run_rate)
246 beta1 = np.random.uniform(min_short_term_rate - beta0, max_short_term_rate - beta0)
247 beta2 = np.random.uniform(min_hump, max_hump)
248 tau = np.random.uniform(min_tau, max_tau)
249 result.append(NelsonSiegel(beta0, beta1, beta2, tau))
250 return result
252 if has_tf:
254 @staticmethod
255 def compute_tf(beta0: tf.Tensor, beta1: tf.Tensor, beta2: tf.Tensor, tau: tf.Tensor, T: tf.Tensor) -> tf.Tensor:
256 """_summary_
258 Args:
259 beta0 (float): longrun
260 beta1 (float): beta0 + beta1 = shortrun
261 beta2 (float): hump or through
262 tau (float):locaton of hump
263 T (float): _description_
265 Returns:
266 float: _description_
267 """
268 t = tf.maximum(T, 1e-4) / tau
269 return beta0 + beta1 * (1.0 - tf.exp(-t)) / t + beta2 * ((1 - tf.exp(-t)) / t - tf.exp(-(t)))
272class ConstantRate(interfaces.FactoryObject):
273 def __init__(self, rate: float):
274 """Continuously compounded flat rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`.
276 Args:
277 rate (float): The constant rate.
279 """
280 self.rate = rate
282 def _to_dict(self) -> dict:
283 return {"rate": self.rate}
285 @staticmethod
286 def _create_sample(n_samples: int, seed: int = None):
287 if seed is not None:
288 np.random.seed(seed)
289 result = []
290 for i in range(n_samples):
291 result.append(ConstantRate(rate=np.random.uniform(-0.005, 0.1)))
292 return result
294 def __call__(self, t: float):
295 return self.rate
298class LinearRate(interfaces.FactoryObject):
299 def __init__(self, shortterm_rate: float, longterm_rate: float, max_maturity: float = 10.0):
300 """Continuously compounded linearly interpolated rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`.
302 Args:
303 shortterm_rate (float): The short term rate.
304 longterm_rate (float): the longterm rate.
305 max_maturity (float): AFer this timepoint constant extrapolation is applied.
306 """
307 self.shortterm_rate = shortterm_rate
308 self.longterm_rate = longterm_rate
309 self.max_maturity = max_maturity
310 self._coeff = (self.longterm_rate - self.shortterm_rate) / (self.max_maturity)
312 @staticmethod
313 def _create_sample(n_samples: int, seed: int = None):
314 if seed is not None:
315 np.random.seed(seed)
316 result = []
317 for i in range(n_samples):
318 shortterm_rate = np.random.uniform(-0.005, 0.07)
319 longterm_rate = shortterm_rate + np.random.uniform(0.0025, 0.09)
320 result.append(LinearRate(shortterm_rate=shortterm_rate, longterm_rate=longterm_rate))
321 return result
323 def _to_dict(self) -> dict:
324 return {"shortterm_rate": self.shortterm_rate, "longterm_rate": self.longterm_rate, "max_maturity": self.max_maturity}
326 def __call__(self, t: float):
327 if t < self.max_maturity:
328 return self.shortterm_rate + self._coeff * t
329 return self.longterm_rate
332class NelsonSiegelSvensson(NelsonSiegel):
333 def __init__(self, beta0: float, beta1: float, beta2: float, beta3: float, tau: float, tau2: float):
334 super().__init__(beta0, beta1, beta2, tau)
335 self.beta3 = beta3
336 self.tau2 = tau2
338 def _to_dict(self) -> dict:
339 tmp = super()._to_dict()
340 tmp.update({"beta3": self.beta3, "tau2": self.tau2})
341 return tmp
343 def __call__(self, t: float):
344 return NelsonSiegelSvensson.compute(self.beta0, self.beta1, self.beta2, self.beta3, self.tau, self.tau2, t)
346 @staticmethod
347 def compute(beta0, beta1, beta2, beta3, tau, tau2, T):
348 t = np.maximum(T, 1e-4) / tau2
349 return NelsonSiegel.compute(beta0, beta1, beta2, tau, T) + beta3 * ((1 - np.exp(-t)) / t - np.exp(-(t)))
352class DiscountCurveComposition(interfaces.FactoryObject):
353 def __init__(self, a, b, c):
354 # check if all discount curves have the same daycounter, otherwise exception
355 if isinstance(a, dict):
356 a = _create(a)
357 if isinstance(b, dict):
358 b = _create(b)
359 if isinstance(c, dict):
360 c = _create(c)
361 dc = set()
362 for k in [a, b, c]:
363 if hasattr(k, "daycounter"):
364 dc.add(k.daycounter)
365 if len(dc) > 1:
366 raise Exception("All curves must have same daycounter.")
367 if len(dc) > 0:
368 self.daycounter = dc.pop()
369 else:
370 self.daycounter = DayCounterType.Act365Fixed.value
371 self._dc = DayCounter(self.daycounter)
372 self.a = a
373 if not hasattr(a, "value"):
374 self.a = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(a), self.daycounter)
375 self.b = b
376 if not hasattr(b, "value"):
377 self.b = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(b), self.daycounter)
378 self.c = c
379 if not hasattr(c, "value"):
380 self.c = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(c), self.daycounter)
382 def _to_dict(self) -> dict:
383 if hasattr(self.a, "to_dict"):
384 a = self.a.to_dict()
385 else:
386 a = self.a
387 if hasattr(self.b, "to_dict"):
388 b = self.b.to_dict()
389 else:
390 b = self.b
391 if hasattr(self.c, "to_dict"):
392 c = self.c.to_dict()
393 else:
394 c = self.c
395 return {"a": a, "b": b, "c": c}
397 @staticmethod
398 def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list:
399 curves = DiscountCurveParametrized._create_sample(n_samples, seed, refdate, parametrization_type)
400 results = []
401 for c in curves:
402 results.append(c + 0.001)
403 return results
405 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
406 r = self.value_rate(refdate, d)
407 yf = self._dc.yf(refdate, d)
408 return np.exp(-r * yf)
410 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
411 return self.a.value_rate(refdate, d) * self.b.value_rate(refdate, d) + self.c.value_rate(refdate, d)
413 def __mul__(self, other):
414 # TODO unittests
415 return DiscountCurveComposition(self, other, 0.0)
417 def __rmul__(self, other):
418 return DiscountCurveComposition(self, other, 0.0)
420 def __add__(self, other):
421 return DiscountCurveComposition(self, 1.0, other)
423 def __radd__(self, other):
424 return DiscountCurveComposition(self, 1.0, other)
427class DiscountCurveParametrized(interfaces.FactoryObject):
428 def __init__(
429 self,
430 obj_id: str,
431 refdate: Union[datetime, date],
432 rate_parametrization, #: Callable[[float], float],
433 daycounter: Union[DayCounterType, str] = DayCounterType.Act365Fixed,
434 ):
435 """_summary_
437 Args:
438 obj_id (str): _description_
439 refdate (Union[datetime, date]): _description_
440 rate_parametrization (Callable[[float], float]): _description_
441 daycounter (Union[DayCounterType, str], optional): _description_. Defaults to DayCounterType.Act365Fixed.
442 """
443 if isinstance(refdate, datetime):
444 self.refdate = refdate
445 else:
446 self.refdate = datetime(refdate, 0, 0, 0)
448 self.daycounter = DayCounterType.to_string(daycounter)
449 self._dc = DayCounter(self.daycounter)
450 self.obj_id = obj_id
451 if isinstance(rate_parametrization, dict): # if schedule is a dict we try to create it from factory
452 self.rate_parametrization = _create(rate_parametrization)
453 else:
454 self.rate_parametrization = rate_parametrization
456 def _to_dict(self) -> dict:
457 try:
458 parametrization = self.rate_parametrization.to_dict()
459 except Exception as e:
460 raise Exception("Missing implementation of to_dict() in parametrization of type " + type(self.rate_parametrization).__name__)
461 return {"obj_id": self.obj_id, "refdate": self.refdate, "rate_parametrization": parametrization}
463 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
464 """Return discount factor for a given date
466 Args:
467 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
468 d (Union[date, datetime]): The date for which the discount factor will be returned
470 Returns:
471 float: discount factor
472 """
473 if not isinstance(refdate, datetime):
474 refdate = datetime(refdate, 0, 0, 0)
475 if not isinstance(d, datetime):
476 d = datetime(d, 0, 0, 0)
477 if refdate < self.refdate:
478 raise Exception("The given reference date is before the curves reference date.")
479 yf = self._dc.yf(refdate, d)
480 return np.exp(-self.rate_parametrization(yf) * yf)
482 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float:
483 """Return the continuous rate for a given date
485 Args:
486 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
487 d (Union[date, datetime]): The date for which the discount factor will be returned
489 Returns:
490 float: continuous rate
491 """
492 if not isinstance(refdate, datetime):
493 refdate = datetime(refdate, 0, 0, 0)
494 if not isinstance(d, datetime):
495 d = datetime(d, 0, 0, 0)
496 if refdate < self.refdate:
497 raise Exception("The given reference date is before the curves reference date.")
498 yf = self._dc.yf(refdate, d)
499 return self.rate_parametrization(yf)
501 @staticmethod
502 def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list:
503 if seed is not None:
504 np.random.seed(seed)
505 if refdate is None:
506 refdate = datetime.now()
507 parametrizations = parametrization_type._create_sample(n_samples)
508 result = []
509 for i, p in enumerate(parametrizations):
510 result.append(DiscountCurveParametrized("DCP_" + str(i), refdate, p))
511 return result
513 def __mul__(self, other):
514 return DiscountCurveComposition(self, other, 0.0)
516 def __rmul__(self, other):
517 return DiscountCurveComposition(self, other, 0.0)
519 def __add__(self, other):
520 return DiscountCurveComposition(self, 1.0, other)
522 def __radd__(self, other):
523 return DiscountCurveComposition(self, 1.0, other)
526class EquityForwardCurve:
527 def __init__(self, spot: float, funding_curve: DiscountCurve, borrow_curve: DiscountCurve, div_table):
528 """Equity Forward Curve
530 Args:
532 spot (float): Current spot
533 discount_curve (DiscountCurve): [description]
534 funding_curve (DiscountCurve): [description]
535 borrow_curve (DiscountCurve): [description]
536 div_table (:class:`rivapy.marketdata.DividendTable`): [description]
537 """
538 self.spot = spot
540 self.bc = borrow_curve
541 self.fc = funding_curve
542 self.div = div_table
543 self._pyvacon_obj = None
544 self.refdate = self.fc.refdate
545 if self.bc is not None:
546 if self.refdate < self.bc.refdate:
547 self.refdate = self.bc.refdate
549 if self.div is not None:
550 if hasattr(self.div, "refdate"):
551 if self.refdate < self.div.refdate:
552 self.refdate = self.div.refdate
554 def _get_pyvacon_obj(self):
555 if self._pyvacon_obj is None:
556 if hasattr(self.fc, "_get_pyvacon_obj"):
557 fc = self.fc._get_pyvacon_obj()
558 else:
559 fc = self.fc
561 if hasattr(self.bc, "_get_pyvacon_obj"):
562 bc = self.bc._get_pyvacon_obj()
563 else:
564 bc = self.bc
566 if hasattr(self.div, "_get_pyvacon_obj"):
567 div = self.div._get_pyvacon_obj()
568 else:
569 div = self.div
570 self._pyvacon_obj = _EquityForwardCurve(self.refdate, self.spot, fc, bc, div)
572 return self._pyvacon_obj
574 def value(self, refdate, expiry):
575 return self._get_pyvacon_obj().value(refdate, expiry)
577 def plot(self, days: int = 10, days_end: int = 10 * 365, **kwargs):
578 """Plots the forward curve using matplotlibs plot function.
580 Args:
581 days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10.
582 days_end (int. optional): Number of days when plotting will end. Defaults to 10*365 (10yr)
583 **kwargs: optional arguments that will be directly passed to the matplotlib plto function
584 """
585 dates = [self.refdate + timedelta(days=i) for i in range(0, days_end, days)]
586 values = [self.value(self.refdate, d) for d in dates]
587 plt.plot(dates, values, **kwargs)
588 plt.xlabel("expiry")
589 plt.ylabel("forward value")
592class BootstrapHazardCurve:
593 def __init__(
594 self, ref_date: datetime, trade_date: datetime, dc: DiscountCurve, RR: float, payment_dates: List[datetime], market_spreads: List[float]
595 ):
596 """[summary]
598 Args:
599 ref_date (datetime): [description]
600 trade_date (datetime): [description]
601 dc (DiscountCurve): [description]
602 RR (float): [description]
603 payment_dates (List[datetime]): [description]
604 market_spreads (List[float]): [description]
605 """
607 self.ref_date = ref_date
608 self.trade_date = trade_date
609 self.dc = dc
610 self.RR = RR
611 self.payment_dates_bootstrapp = payment_dates
612 self.market_spreads = market_spreads
613 self._pyvacon_obj = None
615 def par_spread(self, dc_survival, maturity_date, payment_dates: List[datetime]):
616 integration_step = relativedelta.relativedelta(days=365)
617 premium_period_start = self.ref_date
618 prev_date = self.ref_date
619 current_date = min(prev_date + integration_step, maturity_date)
620 dc_valuation_date = self.dc.value(self.ref_date, maturity_date)
621 risk_adj_factor_protection = 0
622 risk_adj_factor_premium = 0
623 risk_adj_factor_accrued = 0
625 while current_date <= maturity_date:
626 default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, current_date)
627 risk_adj_factor_protection += self.dc.value(self.ref_date, current_date) * default_prob
628 prev_date = current_date
629 current_date += integration_step
631 if prev_date < maturity_date and current_date > maturity_date:
632 default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, maturity_date)
633 risk_adj_factor_protection += self.dc.value(self.ref_date, maturity_date) * default_prob
635 for premium_payment in payment_dates:
636 if premium_payment >= self.ref_date:
637 period_length = ((premium_payment - premium_period_start).days) / 360
638 survival_prob = (dc_survival.value(self.ref_date, premium_period_start) + dc_survival.value(self.ref_date, premium_payment)) / 2
639 df = self.dc.value(self.ref_date, premium_payment)
640 risk_adj_factor_premium += period_length * survival_prob * df
641 default_prob = dc_survival.value(self.ref_date, premium_period_start) - dc_survival.value(self.ref_date, premium_payment)
642 risk_adj_factor_accrued += period_length * default_prob * df
643 premium_period_start = premium_payment
645 PV_accrued = (1 / 2) * risk_adj_factor_accrued
646 PV_premium = (1) * risk_adj_factor_premium
647 PV_protection = ((1 - self.RR)) * risk_adj_factor_protection
649 par_spread_i = (PV_protection) / ((PV_premium + PV_accrued))
650 return par_spread_i
652 def create_survival(self, dates: List[datetime], hazard_rates: List[float]):
653 return _SurvivalCurve("survival_curve", self.refdate, dates, hazard_rates)
655 def calibration_error(x, self, mkt_par_spread, ref_date, payment_dates, dates, hazard_rates):
656 hazard_rates[-1] = x
657 maturity_date = dates[-1]
658 dc_surv = self.create_survival(ref_date, dates, hazard_rates)
659 return mkt_par_spread - self.par_spread(dc_surv, maturity_date, payment_dates)
661 def calibrate_hazard_rate(self):
662 sc_dates = [self.ref_date]
663 hazard_rates = [0.0]
664 for i in range(len(self.payment_dates_bootstrapp)):
665 payment_dates_iter = self.payment_dates_bootstrapp[i]
666 mkt_par_spread_iter = self.market_spreads[i]
667 sc_dates.append(payment_dates_iter[-1])
668 hazard_rates.append(hazard_rates[-1])
669 sol = scipy.optimize.root_scalar(
670 self.calibration_error,
671 args=(mkt_par_spread_iter, self.ref_date, payment_dates_iter, sc_dates, hazard_rates),
672 method="brentq",
673 bracket=[0, 3],
674 xtol=1e-8,
675 rtol=1e-8,
676 )
677 hazard_rates[-1] = sol.root
678 return hazard_rates, sc_dates # self.create_survival(self.ref_date, sc_dates, hazard_rates)#.value, hazard_rates
680 # def hazard_rates(self):
681 # #hazard_rates_value=[]
682 # hazard_rates_value=self.calibrate_hazard_rate()
683 # return self.hazard_rates_value
685 # def value(self, refdate: Union[date, datetime], d: Union[date, datetime])->float:
686 # """Return discount factor for a given date
688 # Args:
689 # refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned.
690 # d (Union[date, datetime]): The date for which the discount factor will be returned
692 # Returns:
693 # float: discount factor
694 # """
695 # #if not isinstance(refdate, datetime):
696 # # refdate = datetime(refdate,0,0,0)
697 # #if not isinstance(d, datetime):
698 # # d = datetime(d,0,0,0)
699 # #if refdate < self.refdate:
700 # # raise Exception('The given reference date is before the curves reference date.')
701 # return self._get_pyvacon_obj().value(refdate, d)
703 # def _get_pyvacon_obj(self):
704 # if self._pyvacon_obj is None:
705 # self._pyvacon_obj = _SurvivalCurve('survival_curve', self.refdate,
706 # self.calibrate_hazard_rate[1], self.calibrate_hazard_rate[0])
707 # return self._pyvacon_obj
710# class PowerPriceForwardCurve:
711# def __init__(
712# self, refdate: Union[datetime, date], start: datetime, end: datetime, values: np.ndarray, freq: str = "1H", tz: str = None, id: str = None
713# ):
714# """Simple forward curve for power.
716# Args:
717# refdate (Union[datetime, date]): Reference date of curve
718# start (dt.datetime): Start of forward curve datetimepoints (including this timepoint).
719# end (dt.datetime): End of forad curve datetimepoints (excluding this timepoint).
720# values (np.ndarray): One dimensional array holding the price for each datetimepint in the curve. The method value will raise an exception if the number of values is not equal to the number of datetimepoints.
721# freq (str, optional): Frequency of timepoints. Defaults to '1H'. See documentation for pandas.date_range for further details on freq.
722# tz (str or tzinfo): Time zone name for returning localized datetime points, for example ‘Asia/Hong_Kong’.
723# By default, the resulting datetime points are timezone-naive. See documentation for pandas.date_range for further details on tz.
724# id (str): Identifier for the curve. It has no impact on the valuation functionality. If None, a uuid will be generated. Defaults to None.
725# """
726# self.id = id
727# if id is None:
728# self.id = "PFC/" + str(datetime.now())
729# self.refdate = refdate
730# self.start = start
731# self.end = end
732# self.freq = freq
733# self.tz = tz
734# self.values = values
735# # timegrid used to compute prices for a certain schedule
736# self._tg = None
737# self._df = (
738# pd.DataFrame(
739# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values}
740# )
741# .set_index(["dates"])
742# .sort_index()
743# )
745# def value(self, refdate: Union[date, datetime], schedule) -> np.ndarray:
746# if self._tg is None:
747# self._tg = pd.DataFrame(
748# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values}
749# ).reset_index()
750# if self._tg.shape[0] != self.values.shape[0]:
751# raise Exception(
752# "The number of dates ("
753# + str(self._tg.shape[0])
754# + ") does not equal number of values ("
755# + str(self.values.shape[0])
756# + ") in forward curve."
757# )
758# tg = self._tg[(self._tg.dates >= schedule.start) & (self._tg.dates < schedule.end)].set_index("dates")
759# _schedule = pd.DataFrame({"dates": schedule.get_schedule(refdate)})
760# tg = _schedule.join(tg, on="dates")
761# # tg = tg[tg['dates']>=refdate]
762# if tg["index"].isna().sum() > 0:
763# raise Exception("There are " + str(tg["index"].isna().sum()) + " dates in the schedule not covered by the forward curve.")
764# return self.values[tg["index"].values]
766# def get_df(self) -> pd.DataFrame:
767# return self._df
770class EnergyPriceForwardCurve:
771 """Energy Price Forward Curve object.
772 It is recommended to initialze this object via the class methods ``from_existing_pfc``, ``from_existing_shape`` or ``from_scratch``.
774 Args:
775 id (_type_): ID for the PFC object
776 refdate (Union[datetime, date]): Reference date
777 pfc (pd.DataFrame, optional): This object can be initialized with an existing pfc. Defaults to None.
778 """
780 def __init__(self, id, refdate: Union[datetime, date], pfc: pd.DataFrame = None, **kwargs):
781 self.id = id
782 if id is None:
783 self.id = "PFC/" + str(datetime.now())
784 self.refdate = refdate
786 self._pfc = pfc
788 self._pfc_shape: pd.DataFrame = kwargs.get("pfc_shape", None)
790 self._apply_schedule: SimpleSchedule = kwargs.get("apply_schedule", None)
791 self._pfc_shaper: PFCShaper = kwargs.get("pfc_shaper", None)
793 list(map(lambda x: EnergyPriceForwardCurve._validate_dataframes(x), [self._pfc, self._pfc_shape]))
795 self._future_contracts: List[EnergyFutureSpecifications] = kwargs.get("future_contracts", None)
797 if self._pfc is None and self._pfc_shape is None and self._pfc_shaper is None:
798 raise ValueError("No values provided for the arguments pfc, pfc_shape and pfc_shaper!")
800 @staticmethod
801 def _validate_dataframes(dataframe: Optional[pd.DataFrame]):
802 if dataframe is not None:
803 validators._check_pandas_index_for_datetime(dataframe)
805 @classmethod
806 def from_existing_pfc(cls, id, refdate: Union[datetime, date], pfc: pd.DataFrame) -> "EnergyPriceForwardCurve":
807 """Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC.
809 Args:
810 id (_type_): ID for the PFC object
811 refdate (Union[datetime, date]): Reference Date
812 pfc (pd.DataFrame): Existing Pfc
814 Returns:
815 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
816 """
817 instance = cls(id=id, refdate=refdate, pfc=pfc)
818 return instance
820 @classmethod
821 def from_existing_shape(
822 cls, id, refdate: Union[datetime, date], pfc_shape: pd.DataFrame, future_contracts: List[EnergyFutureSpecifications]
823 ) -> "EnergyPriceForwardCurve":
824 """Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC shape. The shape is then shifted in order to match the future contracts defined in the ``future_contracts`` list.
827 Args:
828 id (_type_): ID for the PFC object
829 refdate (Union[datetime, date]): Reference Date
830 pfc_shape (pd.DataFrame): Existing PFC shape
831 future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects)
833 Returns:
834 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
835 """
836 instance = cls(id=id, refdate=refdate, pfc_shape=pfc_shape, future_contracts=future_contracts)
837 instance._shift_shape()
838 return instance
840 @classmethod
841 def from_scratch(
842 cls,
843 id,
844 refdate: Union[datetime, date],
845 apply_schedule: SimpleSchedule,
846 pfc_shaper: PFCShaper,
847 future_contracts: List[EnergyFutureSpecifications],
848 ) -> "EnergyPriceForwardCurve":
849 """Initialization of the ``EnergyPriceForwardCurve`` from scratch. First a shape is created using the ``pfc_shaper``. Afterwards, shape is shifted in order to match the future contracts defined in the ``future_contracts`` list.
851 Args:
852 id (_type_): ID for the PFC object
853 refdate (Union[datetime, date]): Reference Date
854 apply_schedule (SimpleSchedule): Schedule to apply the ``pfc_shaper`` on, in order to obtain shape values for future time points
855 pfc_shaper (PFCShaper): PFC shaper
856 future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects)
858 Returns:
859 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object
860 """
861 instance = cls(id=id, refdate=refdate, pfc_shaper=pfc_shaper, future_contracts=future_contracts, apply_schedule=apply_schedule)
862 instance._create_shape()
863 instance._shift_shape()
864 return instance
866 def __validate_contracts_frequency(self):
867 """Checks if all contracts in ``self._future_contracts`` have the sample schedule frequency."""
868 frequencies_contracts = defaultdict(list)
869 for future_contracts in self._future_contracts:
870 frequencies_contracts[future_contracts.schedule.freq].append((future_contracts.schedule.__class__.__name__, future_contracts.name))
872 if len(list(frequencies_contracts.keys())) > 1:
873 raise ValueError(
874 f"Found different contract frequencies: {frequencies_contracts}.\n Please provide uniform frequencies for the elements in the `future_contract` dictionary!"
875 )
877 def __get_offpeak_contracts(
878 self, base_contracts: List[EnergyFutureSpecifications], peak_contracts: List[EnergyFutureSpecifications]
879 ) -> List[EnergyFutureSpecifications]:
880 """In cases where base and peak contracts are part of the ``self._future_contracts``, offpeak contracts need to be decuted from these two in order to shift the shape properly.
882 Args:
883 base_contracts (List[EnergyFutureSpecifications]): List of base contracts
884 peak_contracts (List[EnergyFutureSpecifications]): List of peak contracts
886 Returns:
887 List[EnergyFutureSpecifications]: List of offpeak contracts
888 """
889 offpeak_contracts = []
891 # iterate over each combination of base and peak contracts
892 for base_contract_spec in base_contracts:
893 n_base = len(base_contract_spec.get_schedule())
894 for peak_contract_spec in peak_contracts:
895 # match both by the start and end dates of their respective schedule
896 if base_contract_spec.get_start_end() == peak_contract_spec.get_start_end():
897 # if both match, an offpeak contract can be created from these two
898 offpeak_name = f"offpeak_{base_contract_spec.name}&{peak_contract_spec.name}"
899 n_peak = len(peak_contract_spec.get_schedule())
900 offpeak_price = (
901 n_base / (n_base - n_peak) * base_contract_spec.get_price() - n_peak / (n_base - n_peak) * peak_contract_spec.get_price()
902 )
903 offpeak_contracts.append(
904 EnergyFutureSpecifications(
905 schedule=OffPeakSchedule(start=base_contract_spec.get_start(), end=base_contract_spec.get_end()),
906 price=offpeak_price,
907 name=offpeak_name,
908 )
909 )
910 break
912 return offpeak_contracts
914 def _shift_shape(self):
915 """Shifts the shape to match the future contracts defined in the ``self._future_contracts`` list."""
916 self.__validate_contracts_frequency()
918 base_contracts, peak_contracts = [
919 [fc for fc in self._future_contracts if fc.schedule.__class__._name == schedule_type] for schedule_type in (ets.BASE, ets.PEAK)
920 ]
922 # if base and peak contracts both exist, offpeak contracts are computed
923 if (len(base_contracts) > 0) and (len(peak_contracts) > 0):
924 shifted_pfc = []
925 offpeak_contracts = self.__get_offpeak_contracts(base_contracts=base_contracts, peak_contracts=peak_contracts)
927 # shift offpeak and peak separately
928 for contracts in [offpeak_contracts, peak_contracts]:
929 shifting_datetimes = np.sort(np.unique(np.concatenate([contract.get_schedule() for contract in contracts])))
930 _pfc_shape = self._pfc_shape.loc[shifting_datetimes, :]
931 pfc_shifter = PFCShifter(shape=_pfc_shape, contracts=contracts)
932 shifted_pfc.append(pfc_shifter.compute())
934 # combine offpeak and peak shifts
935 shifted_pfc = pd.concat(shifted_pfc, axis=0)
936 self._pfc = shifted_pfc.sort_index(ascending=True)
938 else:
939 # if either base of peak exists, the shift can be directly performed
940 pfc_shifter = PFCShifter(shape=self._pfc_shape, contracts=self._future_contracts)
941 self._pfc = pfc_shifter.compute()
943 def _create_shape(self):
944 """Creates a shape using the ``self._pfc_shaper`` model"""
945 self._pfc_shaper.calibrate()
946 self._pfc_shape = self._pfc_shaper.apply(self._apply_schedule)
948 def get_pfc(self) -> pd.DataFrame:
949 """Returns the PFC
951 Returns:
952 pd.DataFrame: PFC
953 """
954 return self._pfc