Coverage for rivapy/marketdata/curves.py: 65%

458 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-05 14:27 +0000

1import math 

2import scipy.optimize 

3import matplotlib.pyplot as plt 

4import pandas as pd 

5import numpy as np 

6import dateutil.relativedelta as relativedelta 

7import rivapy.tools.interfaces as interfaces 

8import rivapy.tools._validators as validators 

9from typing import List, Union, Tuple, Literal, Dict, Optional 

10from datetime import datetime, date, timedelta 

11from collections import defaultdict 

12 

13try: 

14 import tensorflow as tf 

15 

16 has_tf = True 

17except ImportError: 

18 has_tf = False 

19 

20from rivapy.tools.enums import DayCounterType, InterpolationType, ExtrapolationType 

21from rivapy.tools.enums import EnergyTimeGridStructure as ets 

22from rivapy.tools.datetools import DayCounter 

23from rivapy.marketdata.factory import create as _create 

24from rivapy.marketdata_tools.pfc_shaper import PFCShaper 

25from rivapy.marketdata_tools.pfc_shifter import PFCShifter 

26from rivapy.tools.scheduler import SimpleSchedule, OffPeakSchedule, PeakSchedule, BaseSchedule 

27from rivapy.instruments.energy_futures_specifications import EnergyFutureSpecifications 

28 

29from rivapy import _pyvacon_available 

30 

31if _pyvacon_available: 

32 from pyvacon.finance.marketdata import EquityForwardCurve as _EquityForwardCurve 

33 from pyvacon.finance.marketdata import SurvivalCurve as _SurvivalCurve 

34 from pyvacon.finance.marketdata import DiscountCurve as _DiscountCurve 

35 import pyvacon as _pyvacon 

36 

37 

38class DiscountCurve: 

39 

40 def __init__( 

41 self, 

42 id: str, 

43 refdate: Union[datetime, date], 

44 dates: List[Union[datetime, date]], 

45 df: List[float], 

46 interpolation: InterpolationType = InterpolationType.HAGAN_DF, 

47 extrapolation: ExtrapolationType = ExtrapolationType.NONE, 

48 daycounter: DayCounterType = DayCounterType.Act365Fixed, 

49 ): 

50 """Discountcurve 

51 

52 Args: 

53 id (str): Identifier of the discount curve. 

54 refdate (Union[datetime, date]): Reference date of the discount curve. 

55 dates (List[Union[datetime, date]]): List of dates belonging to the list of discount factors. All dates must be distinct and equal or after the refdate, otherwise an exception will be thrown. 

56 df (List[float]): List of discount factors. Length of list of discount factors must equal to length of list of dates, otherwise an exception will be thrown. 

57 interpolation (enums.InterpolationType, optional): Defaults to InterpolationType.HAGAN_DF. 

58 extrapolation (enums.ExtrapolationType, optional): Defaults to ExtrapolationType.NONE which does not allow to compute a discount factor for a date past all given dates given to this constructor. 

59 daycounter (enums.DayCounterType, optional): Daycounter used within the interpolation formula to compute a discount factor between two dates from the dates-list above. Defaults to DayCounterType.Act365Fixed. 

60 

61 """ 

62 if len(dates) < 1: 

63 raise Exception("Please specify at least one date and discount factor") 

64 if len(dates) != len(df): 

65 raise Exception("List of dates and discount factors must have equal length.") 

66 self.values = sorted(zip(dates, df), key=lambda tup: tup[0]) # zip dates and discount factors and sort by dates 

67 if isinstance(refdate, datetime): 

68 self.refdate = refdate 

69 else: 

70 self.refdate = datetime(refdate, 0, 0, 0) 

71 if not isinstance(interpolation, InterpolationType): 

72 raise TypeError("Interpolation is not of type enums.InterpolationType") 

73 self.interpolation = interpolation 

74 if not isinstance(extrapolation, ExtrapolationType): 

75 raise TypeError("Extrapolation is not of type enums.ExtrapolationType") 

76 self.extrapolation = extrapolation 

77 if not isinstance(daycounter, DayCounterType): 

78 raise TypeError("Daycounter is not of type enums.DaycounterType") 

79 self.daycounter = daycounter 

80 self.id = id 

81 # check if dates are monotonically increasing and if first date is greather then refdate 

82 if self.values[0][0] < refdate: 

83 raise Exception("First date must be equal or greater then reference date.") 

84 if self.values[0][0] > refdate: 

85 self.values = [(self.refdate, 1.0)] + self.values 

86 if self.values[0][1] != 1.0: 

87 raise Exception("Discount factor for today must equal 1.0.") 

88 for i in range(1, len(self.values)): 

89 if self.values[i - 1] >= self.values[i]: 

90 raise Exception("Dates must be given in monotonically increasing order.") 

91 self._pyvacon_obj = None 

92 

93 def get_dates(self) -> Tuple[datetime]: 

94 """Return list of dates of curve 

95 

96 Returns: 

97 Tuple[datetime]: List of dates 

98 """ 

99 x, y = zip(*self.values) 

100 return x 

101 

102 def get_df(self) -> Tuple[float]: 

103 """Return list of discount factors 

104 

105 Returns: 

106 Tuple[float]: List of discount factors 

107 """ 

108 x, y = zip(*self.values) 

109 return y 

110 

111 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

112 """Return discount factor for a given date 

113 

114 Args: 

115 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned. 

116 d (Union[date, datetime]): The date for which the discount factor will be returned 

117 

118 Returns: 

119 float: discount factor 

120 """ 

121 if not isinstance(refdate, datetime): 

122 refdate = datetime(refdate, 0, 0, 0) 

123 if not isinstance(d, datetime): 

124 d = datetime(d, 0, 0, 0) 

125 if refdate < self.refdate: 

126 raise Exception("The given reference date is before the curves reference date.") 

127 return self._get_pyvacon_obj().value(refdate, d) 

128 

129 def _get_pyvacon_obj(self): 

130 if self._pyvacon_obj is None: 

131 self._pyvacon_obj = _DiscountCurve( 

132 self.id, 

133 self.refdate, 

134 [x for x in self.get_dates()], 

135 [x for x in self.get_df()], 

136 self.daycounter.value, 

137 self.interpolation, 

138 self.extrapolation, 

139 ) 

140 return self._pyvacon_obj 

141 

142 def plot(self, days: int = 10, discount_factors: bool = False, **kwargs): 

143 """Plots the discount curve using matplotlibs plot function. 

144 The timegrid includes the dates of the discount curve. Here either the discount factors or the zero rates (continuously compounded, ACT365 yearfraction) are plotted. 

145 

146 Args: 

147 days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10. 

148 discount_factors (bool, optional): If True, discount factors will be plotted, otherwise the rates. Defaults to False. 

149 **kwargs: optional arguments that will be directly passed to the matplotlib plto function 

150 """ 

151 dates = self.get_dates() 

152 dates_new = [dates[0]] 

153 for i in range(1, len(dates)): 

154 while dates_new[-1] + timedelta(days=days) < dates[i]: 

155 dates_new.append(dates_new[-1] + timedelta(days=days)) 

156 dates_new.append(dates[-1]) 

157 values = [self.value(self.refdate, d) for d in dates_new] 

158 

159 if not discount_factors: 

160 for i in range(1, len(values)): 

161 dt = float((dates_new[i] - self.refdate).days) / 365.0 

162 values[i] = -math.log(values[i]) / dt 

163 values[0] = values[1] 

164 plt.plot(dates_new, values, label=self.id, **kwargs) 

165 

166 

167class NelsonSiegel(interfaces.FactoryObject): 

168 def __init__(self, beta0: float, beta1: float, beta2: float, tau: float): 

169 """Nelson-Siegel parametrization for rates and yields, see :footcite:t:`Nelson1987`. 

170 

171 This parametrization is mostly used to parametrize rate curves and can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`. It is defined by 

172 

173 .. math:: 

174 

175 f(t) = \\beta_0 + (\\beta_1+\\beta_2)\\frac{1-e^{-t/\\tau}}{t/\\tau} -\\beta_2e^{t/\\tau} 

176 

177 

178 Args: 

179 beta0 (float): This parameter is the asymptotic (for arbitrary large maturities) rate, see formula above. 

180 beta1 (float): beta0 + beta1 give the short term rate, see formula above. 

181 beta2 (float): This parameter controls the size of the hump, see formula above. 

182 tau (float): This parameter controls the location of the hump, see formula above. 

183 

184 Examples: 

185 .. code-block:: python 

186 

187 >>> from rivapy.marketdata.curves import NelsonSiegel, DiscountCurveParametrized 

188 >>> ns = NelsonSiegel(beta0=0.05, beta1 = 0.02, beta2=0.1, tau=1.0) 

189 >>> dc = DiscountCurveParametrized('DC', refdate = dt.datetime(2023,1,1), rate_parametrization=ns, daycounter = DayCounterType.Act365Fixed) 

190 >>> dates = [dt.datetime(2023,1,1) + dt.timedelta(days=30*days) for days in range(120)] 

191 >>> values = [dc.value(refdate = dt.datetime(2023,1,1),d=d) for d in dates] 

192 >>> plt.plot(dates, values) 

193 """ 

194 self.beta0 = beta0 

195 self.beta1 = beta1 

196 self.beta2 = beta2 

197 self.tau = tau 

198 self._multiplier = 1.0 

199 

200 def _to_dict(self) -> dict: 

201 return {"beta0": self.beta0, "beta1": self.beta1, "beta2": self.beta2, "tau": self.tau} 

202 

203 def __call__(self, t: float): 

204 return self._multiplier * NelsonSiegel.compute(self.beta0, self.beta1, self.beta2, self.tau, t) 

205 

206 def __mul__(self, x: float): 

207 result = NelsonSiegel(self.beta0, self.beta1, self.beta2, self.tau) 

208 result._multiplier = x 

209 return result 

210 

211 @staticmethod 

212 def compute(beta0: float, beta1: float, beta2: float, tau: float, T: float) -> float: 

213 """_summary_ 

214 

215 Args: 

216 beta0 (float): longrun 

217 beta1 (float): beta0 + beta1 = shortrun 

218 beta2 (float): hump or through 

219 tau (float):locaton of hump 

220 T (float): _description_ 

221 

222 Returns: 

223 float: _description_ 

224 """ 

225 t = np.maximum(T, 1e-4) / tau 

226 return beta0 + beta1 * (1.0 - np.exp(-t)) / t + beta2 * ((1 - np.exp(-t)) / t - np.exp(-(t))) 

227 

228 @staticmethod 

229 def _create_sample( 

230 n_samples: int, 

231 seed: int = None, 

232 min_short_term_rate: float = -0.01, 

233 max_short_term_rate: float = 0.12, 

234 min_long_run_rate: float = 0.005, 

235 max_long_run_rate: float = 0.15, 

236 min_hump: float = -0.1, 

237 max_hump: float = 0.1, 

238 min_tau: float = 0.5, 

239 max_tau: float = 3.0, 

240 ): 

241 if seed is not None: 

242 np.random.seed(seed) 

243 result = [] 

244 for i in range(n_samples): 

245 beta0 = np.random.uniform(min_long_run_rate, max_long_run_rate) 

246 beta1 = np.random.uniform(min_short_term_rate - beta0, max_short_term_rate - beta0) 

247 beta2 = np.random.uniform(min_hump, max_hump) 

248 tau = np.random.uniform(min_tau, max_tau) 

249 result.append(NelsonSiegel(beta0, beta1, beta2, tau)) 

250 return result 

251 

252 if has_tf: 

253 

254 @staticmethod 

255 def compute_tf(beta0: tf.Tensor, beta1: tf.Tensor, beta2: tf.Tensor, tau: tf.Tensor, T: tf.Tensor) -> tf.Tensor: 

256 """_summary_ 

257 

258 Args: 

259 beta0 (float): longrun 

260 beta1 (float): beta0 + beta1 = shortrun 

261 beta2 (float): hump or through 

262 tau (float):locaton of hump 

263 T (float): _description_ 

264 

265 Returns: 

266 float: _description_ 

267 """ 

268 t = tf.maximum(T, 1e-4) / tau 

269 return beta0 + beta1 * (1.0 - tf.exp(-t)) / t + beta2 * ((1 - tf.exp(-t)) / t - tf.exp(-(t))) 

270 

271 

272class ConstantRate(interfaces.FactoryObject): 

273 def __init__(self, rate: float): 

274 """Continuously compounded flat rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`. 

275 

276 Args: 

277 rate (float): The constant rate. 

278 

279 """ 

280 self.rate = rate 

281 

282 def _to_dict(self) -> dict: 

283 return {"rate": self.rate} 

284 

285 @staticmethod 

286 def _create_sample(n_samples: int, seed: int = None): 

287 if seed is not None: 

288 np.random.seed(seed) 

289 result = [] 

290 for i in range(n_samples): 

291 result.append(ConstantRate(rate=np.random.uniform(-0.005, 0.1))) 

292 return result 

293 

294 def __call__(self, t: float): 

295 return self.rate 

296 

297 

298class LinearRate(interfaces.FactoryObject): 

299 def __init__(self, shortterm_rate: float, longterm_rate: float, max_maturity: float = 10.0): 

300 """Continuously compounded linearly interpolated rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`. 

301 

302 Args: 

303 shortterm_rate (float): The short term rate. 

304 longterm_rate (float): the longterm rate. 

305 max_maturity (float): AFer this timepoint constant extrapolation is applied. 

306 """ 

307 self.shortterm_rate = shortterm_rate 

308 self.longterm_rate = longterm_rate 

309 self.max_maturity = max_maturity 

310 self._coeff = (self.longterm_rate - self.shortterm_rate) / (self.max_maturity) 

311 

312 @staticmethod 

313 def _create_sample(n_samples: int, seed: int = None): 

314 if seed is not None: 

315 np.random.seed(seed) 

316 result = [] 

317 for i in range(n_samples): 

318 shortterm_rate = np.random.uniform(-0.005, 0.07) 

319 longterm_rate = shortterm_rate + np.random.uniform(0.0025, 0.09) 

320 result.append(LinearRate(shortterm_rate=shortterm_rate, longterm_rate=longterm_rate)) 

321 return result 

322 

323 def _to_dict(self) -> dict: 

324 return {"shortterm_rate": self.shortterm_rate, "longterm_rate": self.longterm_rate, "max_maturity": self.max_maturity} 

325 

326 def __call__(self, t: float): 

327 if t < self.max_maturity: 

328 return self.shortterm_rate + self._coeff * t 

329 return self.longterm_rate 

330 

331 

332class NelsonSiegelSvensson(NelsonSiegel): 

333 def __init__(self, beta0: float, beta1: float, beta2: float, beta3: float, tau: float, tau2: float): 

334 super().__init__(beta0, beta1, beta2, tau) 

335 self.beta3 = beta3 

336 self.tau2 = tau2 

337 

338 def _to_dict(self) -> dict: 

339 tmp = super()._to_dict() 

340 tmp.update({"beta3": self.beta3, "tau2": self.tau2}) 

341 return tmp 

342 

343 def __call__(self, t: float): 

344 return NelsonSiegelSvensson.compute(self.beta0, self.beta1, self.beta2, self.beta3, self.tau, self.tau2, t) 

345 

346 @staticmethod 

347 def compute(beta0, beta1, beta2, beta3, tau, tau2, T): 

348 t = np.maximum(T, 1e-4) / tau2 

349 return NelsonSiegel.compute(beta0, beta1, beta2, tau, T) + beta3 * ((1 - np.exp(-t)) / t - np.exp(-(t))) 

350 

351 

352class DiscountCurveComposition(interfaces.FactoryObject): 

353 def __init__(self, a, b, c): 

354 # check if all discount curves have the same daycounter, otherwise exception 

355 if isinstance(a, dict): 

356 a = _create(a) 

357 if isinstance(b, dict): 

358 b = _create(b) 

359 if isinstance(c, dict): 

360 c = _create(c) 

361 dc = set() 

362 for k in [a, b, c]: 

363 if hasattr(k, "daycounter"): 

364 dc.add(k.daycounter) 

365 if len(dc) > 1: 

366 raise Exception("All curves must have same daycounter.") 

367 if len(dc) > 0: 

368 self.daycounter = dc.pop() 

369 else: 

370 self.daycounter = DayCounterType.Act365Fixed.value 

371 self._dc = DayCounter(self.daycounter) 

372 self.a = a 

373 if not hasattr(a, "value"): 

374 self.a = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(a), self.daycounter) 

375 self.b = b 

376 if not hasattr(b, "value"): 

377 self.b = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(b), self.daycounter) 

378 self.c = c 

379 if not hasattr(c, "value"): 

380 self.c = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(c), self.daycounter) 

381 

382 def _to_dict(self) -> dict: 

383 if hasattr(self.a, "to_dict"): 

384 a = self.a.to_dict() 

385 else: 

386 a = self.a 

387 if hasattr(self.b, "to_dict"): 

388 b = self.b.to_dict() 

389 else: 

390 b = self.b 

391 if hasattr(self.c, "to_dict"): 

392 c = self.c.to_dict() 

393 else: 

394 c = self.c 

395 return {"a": a, "b": b, "c": c} 

396 

397 @staticmethod 

398 def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list: 

399 curves = DiscountCurveParametrized._create_sample(n_samples, seed, refdate, parametrization_type) 

400 results = [] 

401 for c in curves: 

402 results.append(c + 0.001) 

403 return results 

404 

405 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

406 r = self.value_rate(refdate, d) 

407 yf = self._dc.yf(refdate, d) 

408 return np.exp(-r * yf) 

409 

410 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

411 return self.a.value_rate(refdate, d) * self.b.value_rate(refdate, d) + self.c.value_rate(refdate, d) 

412 

413 def __mul__(self, other): 

414 # TODO unittests 

415 return DiscountCurveComposition(self, other, 0.0) 

416 

417 def __rmul__(self, other): 

418 return DiscountCurveComposition(self, other, 0.0) 

419 

420 def __add__(self, other): 

421 return DiscountCurveComposition(self, 1.0, other) 

422 

423 def __radd__(self, other): 

424 return DiscountCurveComposition(self, 1.0, other) 

425 

426 

427class DiscountCurveParametrized(interfaces.FactoryObject): 

428 def __init__( 

429 self, 

430 obj_id: str, 

431 refdate: Union[datetime, date], 

432 rate_parametrization, #: Callable[[float], float], 

433 daycounter: Union[DayCounterType, str] = DayCounterType.Act365Fixed, 

434 ): 

435 """_summary_ 

436 

437 Args: 

438 obj_id (str): _description_ 

439 refdate (Union[datetime, date]): _description_ 

440 rate_parametrization (Callable[[float], float]): _description_ 

441 daycounter (Union[DayCounterType, str], optional): _description_. Defaults to DayCounterType.Act365Fixed. 

442 """ 

443 if isinstance(refdate, datetime): 

444 self.refdate = refdate 

445 else: 

446 self.refdate = datetime(refdate, 0, 0, 0) 

447 

448 self.daycounter = DayCounterType.to_string(daycounter) 

449 self._dc = DayCounter(self.daycounter) 

450 self.obj_id = obj_id 

451 if isinstance(rate_parametrization, dict): # if schedule is a dict we try to create it from factory 

452 self.rate_parametrization = _create(rate_parametrization) 

453 else: 

454 self.rate_parametrization = rate_parametrization 

455 

456 def _to_dict(self) -> dict: 

457 try: 

458 parametrization = self.rate_parametrization.to_dict() 

459 except Exception as e: 

460 raise Exception("Missing implementation of to_dict() in parametrization of type " + type(self.rate_parametrization).__name__) 

461 return {"obj_id": self.obj_id, "refdate": self.refdate, "rate_parametrization": parametrization} 

462 

463 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

464 """Return discount factor for a given date 

465 

466 Args: 

467 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned. 

468 d (Union[date, datetime]): The date for which the discount factor will be returned 

469 

470 Returns: 

471 float: discount factor 

472 """ 

473 if not isinstance(refdate, datetime): 

474 refdate = datetime(refdate, 0, 0, 0) 

475 if not isinstance(d, datetime): 

476 d = datetime(d, 0, 0, 0) 

477 if refdate < self.refdate: 

478 raise Exception("The given reference date is before the curves reference date.") 

479 yf = self._dc.yf(refdate, d) 

480 return np.exp(-self.rate_parametrization(yf) * yf) 

481 

482 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

483 """Return the continuous rate for a given date 

484 

485 Args: 

486 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned. 

487 d (Union[date, datetime]): The date for which the discount factor will be returned 

488 

489 Returns: 

490 float: continuous rate 

491 """ 

492 if not isinstance(refdate, datetime): 

493 refdate = datetime(refdate, 0, 0, 0) 

494 if not isinstance(d, datetime): 

495 d = datetime(d, 0, 0, 0) 

496 if refdate < self.refdate: 

497 raise Exception("The given reference date is before the curves reference date.") 

498 yf = self._dc.yf(refdate, d) 

499 return self.rate_parametrization(yf) 

500 

501 @staticmethod 

502 def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list: 

503 if seed is not None: 

504 np.random.seed(seed) 

505 if refdate is None: 

506 refdate = datetime.now() 

507 parametrizations = parametrization_type._create_sample(n_samples) 

508 result = [] 

509 for i, p in enumerate(parametrizations): 

510 result.append(DiscountCurveParametrized("DCP_" + str(i), refdate, p)) 

511 return result 

512 

513 def __mul__(self, other): 

514 return DiscountCurveComposition(self, other, 0.0) 

515 

516 def __rmul__(self, other): 

517 return DiscountCurveComposition(self, other, 0.0) 

518 

519 def __add__(self, other): 

520 return DiscountCurveComposition(self, 1.0, other) 

521 

522 def __radd__(self, other): 

523 return DiscountCurveComposition(self, 1.0, other) 

524 

525 

526class EquityForwardCurve: 

527 def __init__(self, spot: float, funding_curve: DiscountCurve, borrow_curve: DiscountCurve, div_table): 

528 """Equity Forward Curve 

529 

530 Args: 

531 

532 spot (float): Current spot 

533 discount_curve (DiscountCurve): [description] 

534 funding_curve (DiscountCurve): [description] 

535 borrow_curve (DiscountCurve): [description] 

536 div_table (:class:`rivapy.marketdata.DividendTable`): [description] 

537 """ 

538 self.spot = spot 

539 

540 self.bc = borrow_curve 

541 self.fc = funding_curve 

542 self.div = div_table 

543 self._pyvacon_obj = None 

544 self.refdate = self.fc.refdate 

545 if self.bc is not None: 

546 if self.refdate < self.bc.refdate: 

547 self.refdate = self.bc.refdate 

548 

549 if self.div is not None: 

550 if hasattr(self.div, "refdate"): 

551 if self.refdate < self.div.refdate: 

552 self.refdate = self.div.refdate 

553 

554 def _get_pyvacon_obj(self): 

555 if self._pyvacon_obj is None: 

556 if hasattr(self.fc, "_get_pyvacon_obj"): 

557 fc = self.fc._get_pyvacon_obj() 

558 else: 

559 fc = self.fc 

560 

561 if hasattr(self.bc, "_get_pyvacon_obj"): 

562 bc = self.bc._get_pyvacon_obj() 

563 else: 

564 bc = self.bc 

565 

566 if hasattr(self.div, "_get_pyvacon_obj"): 

567 div = self.div._get_pyvacon_obj() 

568 else: 

569 div = self.div 

570 self._pyvacon_obj = _EquityForwardCurve(self.refdate, self.spot, fc, bc, div) 

571 

572 return self._pyvacon_obj 

573 

574 def value(self, refdate, expiry): 

575 return self._get_pyvacon_obj().value(refdate, expiry) 

576 

577 def plot(self, days: int = 10, days_end: int = 10 * 365, **kwargs): 

578 """Plots the forward curve using matplotlibs plot function. 

579 

580 Args: 

581 days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10. 

582 days_end (int. optional): Number of days when plotting will end. Defaults to 10*365 (10yr) 

583 **kwargs: optional arguments that will be directly passed to the matplotlib plto function 

584 """ 

585 dates = [self.refdate + timedelta(days=i) for i in range(0, days_end, days)] 

586 values = [self.value(self.refdate, d) for d in dates] 

587 plt.plot(dates, values, **kwargs) 

588 plt.xlabel("expiry") 

589 plt.ylabel("forward value") 

590 

591 

592class BootstrapHazardCurve: 

593 def __init__( 

594 self, ref_date: datetime, trade_date: datetime, dc: DiscountCurve, RR: float, payment_dates: List[datetime], market_spreads: List[float] 

595 ): 

596 """[summary] 

597 

598 Args: 

599 ref_date (datetime): [description] 

600 trade_date (datetime): [description] 

601 dc (DiscountCurve): [description] 

602 RR (float): [description] 

603 payment_dates (List[datetime]): [description] 

604 market_spreads (List[float]): [description] 

605 """ 

606 

607 self.ref_date = ref_date 

608 self.trade_date = trade_date 

609 self.dc = dc 

610 self.RR = RR 

611 self.payment_dates_bootstrapp = payment_dates 

612 self.market_spreads = market_spreads 

613 self._pyvacon_obj = None 

614 

615 def par_spread(self, dc_survival, maturity_date, payment_dates: List[datetime]): 

616 integration_step = relativedelta.relativedelta(days=365) 

617 premium_period_start = self.ref_date 

618 prev_date = self.ref_date 

619 current_date = min(prev_date + integration_step, maturity_date) 

620 dc_valuation_date = self.dc.value(self.ref_date, maturity_date) 

621 risk_adj_factor_protection = 0 

622 risk_adj_factor_premium = 0 

623 risk_adj_factor_accrued = 0 

624 

625 while current_date <= maturity_date: 

626 default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, current_date) 

627 risk_adj_factor_protection += self.dc.value(self.ref_date, current_date) * default_prob 

628 prev_date = current_date 

629 current_date += integration_step 

630 

631 if prev_date < maturity_date and current_date > maturity_date: 

632 default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, maturity_date) 

633 risk_adj_factor_protection += self.dc.value(self.ref_date, maturity_date) * default_prob 

634 

635 for premium_payment in payment_dates: 

636 if premium_payment >= self.ref_date: 

637 period_length = ((premium_payment - premium_period_start).days) / 360 

638 survival_prob = (dc_survival.value(self.ref_date, premium_period_start) + dc_survival.value(self.ref_date, premium_payment)) / 2 

639 df = self.dc.value(self.ref_date, premium_payment) 

640 risk_adj_factor_premium += period_length * survival_prob * df 

641 default_prob = dc_survival.value(self.ref_date, premium_period_start) - dc_survival.value(self.ref_date, premium_payment) 

642 risk_adj_factor_accrued += period_length * default_prob * df 

643 premium_period_start = premium_payment 

644 

645 PV_accrued = (1 / 2) * risk_adj_factor_accrued 

646 PV_premium = (1) * risk_adj_factor_premium 

647 PV_protection = ((1 - self.RR)) * risk_adj_factor_protection 

648 

649 par_spread_i = (PV_protection) / ((PV_premium + PV_accrued)) 

650 return par_spread_i 

651 

652 def create_survival(self, dates: List[datetime], hazard_rates: List[float]): 

653 return _SurvivalCurve("survival_curve", self.refdate, dates, hazard_rates) 

654 

655 def calibration_error(x, self, mkt_par_spread, ref_date, payment_dates, dates, hazard_rates): 

656 hazard_rates[-1] = x 

657 maturity_date = dates[-1] 

658 dc_surv = self.create_survival(ref_date, dates, hazard_rates) 

659 return mkt_par_spread - self.par_spread(dc_surv, maturity_date, payment_dates) 

660 

661 def calibrate_hazard_rate(self): 

662 sc_dates = [self.ref_date] 

663 hazard_rates = [0.0] 

664 for i in range(len(self.payment_dates_bootstrapp)): 

665 payment_dates_iter = self.payment_dates_bootstrapp[i] 

666 mkt_par_spread_iter = self.market_spreads[i] 

667 sc_dates.append(payment_dates_iter[-1]) 

668 hazard_rates.append(hazard_rates[-1]) 

669 sol = scipy.optimize.root_scalar( 

670 self.calibration_error, 

671 args=(mkt_par_spread_iter, self.ref_date, payment_dates_iter, sc_dates, hazard_rates), 

672 method="brentq", 

673 bracket=[0, 3], 

674 xtol=1e-8, 

675 rtol=1e-8, 

676 ) 

677 hazard_rates[-1] = sol.root 

678 return hazard_rates, sc_dates # self.create_survival(self.ref_date, sc_dates, hazard_rates)#.value, hazard_rates 

679 

680 # def hazard_rates(self): 

681 # #hazard_rates_value=[] 

682 # hazard_rates_value=self.calibrate_hazard_rate() 

683 # return self.hazard_rates_value 

684 

685 # def value(self, refdate: Union[date, datetime], d: Union[date, datetime])->float: 

686 # """Return discount factor for a given date 

687 

688 # Args: 

689 # refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned. 

690 # d (Union[date, datetime]): The date for which the discount factor will be returned 

691 

692 # Returns: 

693 # float: discount factor 

694 # """ 

695 # #if not isinstance(refdate, datetime): 

696 # # refdate = datetime(refdate,0,0,0) 

697 # #if not isinstance(d, datetime): 

698 # # d = datetime(d,0,0,0) 

699 # #if refdate < self.refdate: 

700 # # raise Exception('The given reference date is before the curves reference date.') 

701 # return self._get_pyvacon_obj().value(refdate, d) 

702 

703 # def _get_pyvacon_obj(self): 

704 # if self._pyvacon_obj is None: 

705 # self._pyvacon_obj = _SurvivalCurve('survival_curve', self.refdate, 

706 # self.calibrate_hazard_rate[1], self.calibrate_hazard_rate[0]) 

707 # return self._pyvacon_obj 

708 

709 

710# class PowerPriceForwardCurve: 

711# def __init__( 

712# self, refdate: Union[datetime, date], start: datetime, end: datetime, values: np.ndarray, freq: str = "1H", tz: str = None, id: str = None 

713# ): 

714# """Simple forward curve for power. 

715 

716# Args: 

717# refdate (Union[datetime, date]): Reference date of curve 

718# start (dt.datetime): Start of forward curve datetimepoints (including this timepoint). 

719# end (dt.datetime): End of forad curve datetimepoints (excluding this timepoint). 

720# values (np.ndarray): One dimensional array holding the price for each datetimepint in the curve. The method value will raise an exception if the number of values is not equal to the number of datetimepoints. 

721# freq (str, optional): Frequency of timepoints. Defaults to '1H'. See documentation for pandas.date_range for further details on freq. 

722# tz (str or tzinfo): Time zone name for returning localized datetime points, for example ‘Asia/Hong_Kong’. 

723# By default, the resulting datetime points are timezone-naive. See documentation for pandas.date_range for further details on tz. 

724# id (str): Identifier for the curve. It has no impact on the valuation functionality. If None, a uuid will be generated. Defaults to None. 

725# """ 

726# self.id = id 

727# if id is None: 

728# self.id = "PFC/" + str(datetime.now()) 

729# self.refdate = refdate 

730# self.start = start 

731# self.end = end 

732# self.freq = freq 

733# self.tz = tz 

734# self.values = values 

735# # timegrid used to compute prices for a certain schedule 

736# self._tg = None 

737# self._df = ( 

738# pd.DataFrame( 

739# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values} 

740# ) 

741# .set_index(["dates"]) 

742# .sort_index() 

743# ) 

744 

745# def value(self, refdate: Union[date, datetime], schedule) -> np.ndarray: 

746# if self._tg is None: 

747# self._tg = pd.DataFrame( 

748# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values} 

749# ).reset_index() 

750# if self._tg.shape[0] != self.values.shape[0]: 

751# raise Exception( 

752# "The number of dates (" 

753# + str(self._tg.shape[0]) 

754# + ") does not equal number of values (" 

755# + str(self.values.shape[0]) 

756# + ") in forward curve." 

757# ) 

758# tg = self._tg[(self._tg.dates >= schedule.start) & (self._tg.dates < schedule.end)].set_index("dates") 

759# _schedule = pd.DataFrame({"dates": schedule.get_schedule(refdate)}) 

760# tg = _schedule.join(tg, on="dates") 

761# # tg = tg[tg['dates']>=refdate] 

762# if tg["index"].isna().sum() > 0: 

763# raise Exception("There are " + str(tg["index"].isna().sum()) + " dates in the schedule not covered by the forward curve.") 

764# return self.values[tg["index"].values] 

765 

766# def get_df(self) -> pd.DataFrame: 

767# return self._df 

768 

769 

770class EnergyPriceForwardCurve: 

771 """Energy Price Forward Curve object. 

772 It is recommended to initialze this object via the class methods ``from_existing_pfc``, ``from_existing_shape`` or ``from_scratch``. 

773 

774 Args: 

775 id (_type_): ID for the PFC object 

776 refdate (Union[datetime, date]): Reference date 

777 pfc (pd.DataFrame, optional): This object can be initialized with an existing pfc. Defaults to None. 

778 """ 

779 

780 def __init__(self, id, refdate: Union[datetime, date], pfc: pd.DataFrame = None, **kwargs): 

781 self.id = id 

782 if id is None: 

783 self.id = "PFC/" + str(datetime.now()) 

784 self.refdate = refdate 

785 

786 self._pfc = pfc 

787 

788 self._pfc_shape: pd.DataFrame = kwargs.get("pfc_shape", None) 

789 

790 self._apply_schedule: SimpleSchedule = kwargs.get("apply_schedule", None) 

791 self._pfc_shaper: PFCShaper = kwargs.get("pfc_shaper", None) 

792 

793 list(map(lambda x: EnergyPriceForwardCurve._validate_dataframes(x), [self._pfc, self._pfc_shape])) 

794 

795 self._future_contracts: List[EnergyFutureSpecifications] = kwargs.get("future_contracts", None) 

796 

797 if self._pfc is None and self._pfc_shape is None and self._pfc_shaper is None: 

798 raise ValueError("No values provided for the arguments pfc, pfc_shape and pfc_shaper!") 

799 

800 @staticmethod 

801 def _validate_dataframes(dataframe: Optional[pd.DataFrame]): 

802 if dataframe is not None: 

803 validators._check_pandas_index_for_datetime(dataframe) 

804 

805 @classmethod 

806 def from_existing_pfc(cls, id, refdate: Union[datetime, date], pfc: pd.DataFrame) -> "EnergyPriceForwardCurve": 

807 """Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC. 

808 

809 Args: 

810 id (_type_): ID for the PFC object 

811 refdate (Union[datetime, date]): Reference Date 

812 pfc (pd.DataFrame): Existing Pfc 

813 

814 Returns: 

815 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object 

816 """ 

817 instance = cls(id=id, refdate=refdate, pfc=pfc) 

818 return instance 

819 

820 @classmethod 

821 def from_existing_shape( 

822 cls, id, refdate: Union[datetime, date], pfc_shape: pd.DataFrame, future_contracts: List[EnergyFutureSpecifications] 

823 ) -> "EnergyPriceForwardCurve": 

824 """Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC shape. The shape is then shifted in order to match the future contracts defined in the ``future_contracts`` list. 

825 

826 

827 Args: 

828 id (_type_): ID for the PFC object 

829 refdate (Union[datetime, date]): Reference Date 

830 pfc_shape (pd.DataFrame): Existing PFC shape 

831 future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects) 

832 

833 Returns: 

834 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object 

835 """ 

836 instance = cls(id=id, refdate=refdate, pfc_shape=pfc_shape, future_contracts=future_contracts) 

837 instance._shift_shape() 

838 return instance 

839 

840 @classmethod 

841 def from_scratch( 

842 cls, 

843 id, 

844 refdate: Union[datetime, date], 

845 apply_schedule: SimpleSchedule, 

846 pfc_shaper: PFCShaper, 

847 future_contracts: List[EnergyFutureSpecifications], 

848 ) -> "EnergyPriceForwardCurve": 

849 """Initialization of the ``EnergyPriceForwardCurve`` from scratch. First a shape is created using the ``pfc_shaper``. Afterwards, shape is shifted in order to match the future contracts defined in the ``future_contracts`` list. 

850 

851 Args: 

852 id (_type_): ID for the PFC object 

853 refdate (Union[datetime, date]): Reference Date 

854 apply_schedule (SimpleSchedule): Schedule to apply the ``pfc_shaper`` on, in order to obtain shape values for future time points 

855 pfc_shaper (PFCShaper): PFC shaper 

856 future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects) 

857 

858 Returns: 

859 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object 

860 """ 

861 instance = cls(id=id, refdate=refdate, pfc_shaper=pfc_shaper, future_contracts=future_contracts, apply_schedule=apply_schedule) 

862 instance._create_shape() 

863 instance._shift_shape() 

864 return instance 

865 

866 def __validate_contracts_frequency(self): 

867 """Checks if all contracts in ``self._future_contracts`` have the sample schedule frequency.""" 

868 frequencies_contracts = defaultdict(list) 

869 for future_contracts in self._future_contracts: 

870 frequencies_contracts[future_contracts.schedule.freq].append((future_contracts.schedule.__class__.__name__, future_contracts.name)) 

871 

872 if len(list(frequencies_contracts.keys())) > 1: 

873 raise ValueError( 

874 f"Found different contract frequencies: {frequencies_contracts}.\n Please provide uniform frequencies for the elements in the `future_contract` dictionary!" 

875 ) 

876 

877 def __get_offpeak_contracts( 

878 self, base_contracts: List[EnergyFutureSpecifications], peak_contracts: List[EnergyFutureSpecifications] 

879 ) -> List[EnergyFutureSpecifications]: 

880 """In cases where base and peak contracts are part of the ``self._future_contracts``, offpeak contracts need to be decuted from these two in order to shift the shape properly. 

881 

882 Args: 

883 base_contracts (List[EnergyFutureSpecifications]): List of base contracts 

884 peak_contracts (List[EnergyFutureSpecifications]): List of peak contracts 

885 

886 Returns: 

887 List[EnergyFutureSpecifications]: List of offpeak contracts 

888 """ 

889 offpeak_contracts = [] 

890 

891 # iterate over each combination of base and peak contracts 

892 for base_contract_spec in base_contracts: 

893 n_base = len(base_contract_spec.get_schedule()) 

894 for peak_contract_spec in peak_contracts: 

895 # match both by the start and end dates of their respective schedule 

896 if base_contract_spec.get_start_end() == peak_contract_spec.get_start_end(): 

897 # if both match, an offpeak contract can be created from these two 

898 offpeak_name = f"offpeak_{base_contract_spec.name}&{peak_contract_spec.name}" 

899 n_peak = len(peak_contract_spec.get_schedule()) 

900 offpeak_price = ( 

901 n_base / (n_base - n_peak) * base_contract_spec.get_price() - n_peak / (n_base - n_peak) * peak_contract_spec.get_price() 

902 ) 

903 offpeak_contracts.append( 

904 EnergyFutureSpecifications( 

905 schedule=OffPeakSchedule(start=base_contract_spec.get_start(), end=base_contract_spec.get_end()), 

906 price=offpeak_price, 

907 name=offpeak_name, 

908 ) 

909 ) 

910 break 

911 

912 return offpeak_contracts 

913 

914 def _shift_shape(self): 

915 """Shifts the shape to match the future contracts defined in the ``self._future_contracts`` list.""" 

916 self.__validate_contracts_frequency() 

917 

918 base_contracts, peak_contracts = [ 

919 [fc for fc in self._future_contracts if fc.schedule.__class__._name == schedule_type] for schedule_type in (ets.BASE, ets.PEAK) 

920 ] 

921 

922 # if base and peak contracts both exist, offpeak contracts are computed 

923 if (len(base_contracts) > 0) and (len(peak_contracts) > 0): 

924 shifted_pfc = [] 

925 offpeak_contracts = self.__get_offpeak_contracts(base_contracts=base_contracts, peak_contracts=peak_contracts) 

926 

927 # shift offpeak and peak separately 

928 for contracts in [offpeak_contracts, peak_contracts]: 

929 shifting_datetimes = np.sort(np.unique(np.concatenate([contract.get_schedule() for contract in contracts]))) 

930 _pfc_shape = self._pfc_shape.loc[shifting_datetimes, :] 

931 pfc_shifter = PFCShifter(shape=_pfc_shape, contracts=contracts) 

932 shifted_pfc.append(pfc_shifter.compute()) 

933 

934 # combine offpeak and peak shifts 

935 shifted_pfc = pd.concat(shifted_pfc, axis=0) 

936 self._pfc = shifted_pfc.sort_index(ascending=True) 

937 

938 else: 

939 # if either base of peak exists, the shift can be directly performed 

940 pfc_shifter = PFCShifter(shape=self._pfc_shape, contracts=self._future_contracts) 

941 self._pfc = pfc_shifter.compute() 

942 

943 def _create_shape(self): 

944 """Creates a shape using the ``self._pfc_shaper`` model""" 

945 self._pfc_shaper.calibrate() 

946 self._pfc_shape = self._pfc_shaper.apply(self._apply_schedule) 

947 

948 def get_pfc(self) -> pd.DataFrame: 

949 """Returns the PFC 

950 

951 Returns: 

952 pd.DataFrame: PFC 

953 """ 

954 return self._pfc