Coverage for rivapy / marketdata / curves.py: 71%

601 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-27 14:36 +0000

1import math 

2import scipy.optimize 

3import matplotlib.pyplot as plt 

4import pandas as pd 

5import numpy as np 

6import dateutil.relativedelta as relativedelta 

7 

8from rivapy.marketdata._logger import logger 

9 

10 

11import rivapy.tools.interfaces as interfaces 

12import rivapy.tools._validators as validators 

13 

14# from rivapy.tools.interpolate import Interpolator 

15from typing import List, Union, Tuple, Literal, Dict, Optional, Any 

16from datetime import datetime, date, timedelta 

17from collections import defaultdict 

18 

19 

20try: 

21 import tensorflow as tf 

22 

23 has_tf = True 

24except ImportError: 

25 has_tf = False 

26 

27from rivapy.tools.enums import DayCounterType, InterpolationType, ExtrapolationType 

28from rivapy.tools.enums import EnergyTimeGridStructure as etgs 

29from rivapy.tools.datetools import DayCounter, _date_to_datetime 

30from rivapy.marketdata.factory import create as _create 

31from rivapy.marketdata_tools.pfc_shaper import PFCShaper 

32from rivapy.marketdata_tools.pfc_shifter import PFCShifter 

33from rivapy.tools.scheduler import SimpleSchedule, OffPeakSchedule, PeakSchedule, BaseSchedule 

34from rivapy.instruments.energy_futures_specifications import EnergyFutureSpecifications 

35 

36from rivapy.tools.interpolate import Interpolator 

37 

38 

39from rivapy import _pyvacon_available 

40 

41if _pyvacon_available: 

42 from pyvacon.finance.marketdata import EquityForwardCurve as _EquityForwardCurve 

43 from pyvacon.finance.marketdata import SurvivalCurve as _SurvivalCurve 

44 from pyvacon.finance.marketdata import DiscountCurve as _DiscountCurve 

45 import pyvacon as _pyvacon 

46 

47 

48class DiscountCurve: 

49 

50 def __init__( 

51 self, 

52 id: str, 

53 refdate: Union[datetime, date], 

54 dates: List[Union[datetime, date]], 

55 df: List[float], 

56 interpolation: InterpolationType = InterpolationType.HAGAN_DF, 

57 extrapolation: ExtrapolationType = ExtrapolationType.NONE, 

58 daycounter: DayCounterType = DayCounterType.Act365Fixed, 

59 ): 

60 """Discountcurve 

61 

62 Args: 

63 id (str): Identifier of the discount curve. 

64 refdate (Union[datetime, date]): Reference date of the discount curve. 

65 dates (List[Union[datetime, date]]): List of dates belonging to the list of discount factors. All dates must be distinct and equal or after the refdate, otherwise an exception will be thrown. 

66 df (List[float]): List of discount factors. Length of list of discount factors must equal to length of list of dates, otherwise an exception will be thrown. 

67 interpolation (enums.InterpolationType, optional): Defaults to InterpolationType.HAGAN_DF. 

68 extrapolation (enums.ExtrapolationType, optional): Defaults to ExtrapolationType.NONE which does not allow to compute a discount factor for a date past all given dates given to this constructor. 

69 daycounter (enums.DayCounterType, optional): Daycounter used within the interpolation formula to compute a discount factor between two dates from the dates-list above. Defaults to DayCounterType.Act365Fixed. 

70 

71 """ 

72 if len(dates) < 1: 

73 raise Exception("Please specify at least one date and discount factor") 

74 if len(dates) != len(df): 

75 raise Exception("List of dates and discount factors must have equal length.") 

76 self.values = sorted(zip(dates, df), key=lambda tup: tup[0]) # zip dates and discount factors and sort by dates 

77 if isinstance(refdate, datetime): 

78 self.refdate = refdate 

79 else: 

80 # self.refdate = datetime(refdate, 0, 0, 0) # old version syntax?? 

81 self.refdate = datetime(refdate.year, refdate.month, refdate.day) 

82 if not isinstance(interpolation, InterpolationType): 

83 raise TypeError("Interpolation is not of type enums.InterpolationType") 

84 self.interpolation = interpolation 

85 if not isinstance(extrapolation, ExtrapolationType): 

86 raise TypeError("Extrapolation is not of type enums.ExtrapolationType") 

87 self.extrapolation = extrapolation 

88 if not isinstance(daycounter, DayCounterType): 

89 print(daycounter) 

90 raise TypeError("Daycounter is not of type enums.DaycounterType") 

91 self.daycounter = daycounter 

92 self.id = id 

93 # check if dates are monotonically increasing and if first date is greather then refdate 

94 if self.values[0][0] < refdate: 

95 raise Exception("First date must be equal or greater then reference date.") 

96 if self.values[0][0] > refdate: 

97 self.values = [(self.refdate, 1.0)] + self.values 

98 if self.values[0][1] != 1.0: 

99 raise Exception("Discount factor for today must equal 1.0.") 

100 for i in range(1, len(self.values)): 

101 if self.values[i - 1] >= self.values[i]: 

102 raise Exception("Dates must be given in monotonically increasing order.") 

103 self._pyvacon_obj = None 

104 

105 def get_dates(self) -> Tuple[datetime]: 

106 """Return list of dates of curve 

107 

108 Returns: 

109 Tuple[datetime]: List of dates 

110 """ 

111 x, y = zip(*self.values) 

112 return x 

113 

114 def get_df(self) -> Tuple[float]: 

115 """Return list of discount factors 

116 

117 Returns: 

118 Tuple[float]: List of discount factors 

119 """ 

120 x, y = zip(*self.values) 

121 return y 

122 

123 # Change the name with value once full pyvacon dependencies are removed throughout rivapy 

124 def value(self, refdate: Union[date, datetime], d: Union[date, datetime], payment_dates=None, annual_payment_frequency=None) -> float: 

125 """Return discount factor for a given date 

126 

127 Args: 

128 refdate (Union[date, datetime]): The reference date. If the reference date is in the future 

129 (compared to the curves reference date), the forward discount 

130 factor will be returned. 

131 d (Union[date, datetime]): The date for which the discount factor will be returned. Assumption 

132 is that the day given already follows correct business logic 

133 (e.g., roll convention) 

134 

135 Returns: 

136 float: discount factor 

137 """ 

138 

139 # { 

140 # Analytics_ASSERT(calcDate == validFrom_, "given calcdate must equal refdate of curve"); 

141 # double t = nP_.dc->yf(validFrom_, date); 

142 # return nP_.interp->compute(t); 

143 # } 

144 

145 # check valid dates 

146 if not isinstance(refdate, datetime): # handling date object -> datetime 

147 refdate = datetime(refdate, 0, 0, 0) 

148 if not isinstance(d, datetime): 

149 d = datetime(d, 0, 0, 0) 

150 if refdate < self.refdate: 

151 raise Exception("The given reference date is before the curves reference date.") 

152 

153 # get yearfrac, taking into account DCC 

154 dcc = DayCounter(self.daycounter) 

155 

156 yf_list = [ 

157 dcc.yf(self.refdate, x, payment_dates, annual_payment_frequency) for x in self.get_dates() 

158 ] # list(dcc.yf(self.refdate, self.get_dates())) 

159 df_list = [x for x in self.get_df()] 

160 

161 # interpolate/extrapolate given a chosen method 

162 interp = Interpolator(self.interpolation, self.extrapolation) 

163 

164 # temp testing delete when working 

165 # print(self.extrapolation) 

166 # print(f"x_data: {yf_list}") 

167 # print(f"y_data: {df_list}") 

168 # print(f"x_target: {dcc.yf(self.refdate,d)}") 

169 # print(dcc.yf(refdate, d)) 

170 

171 # give FWD value if given refdate is greater than curves reference date 

172 if refdate > self.refdate: 

173 df1 = interp.interp(yf_list, df_list, dcc.yf(self.refdate, refdate, payment_dates, annual_payment_frequency), self.extrapolation) 

174 df2 = interp.interp(yf_list, df_list, dcc.yf(self.refdate, d, payment_dates, annual_payment_frequency), self.extrapolation) 

175 df = df2 / df1 

176 else: # this also co ers the case if refdates are the same, and avoids division by zero 

177 df = interp.interp(yf_list, df_list, dcc.yf(self.refdate, d, payment_dates, annual_payment_frequency), self.extrapolation) 

178 

179 return df 

180 

181 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

182 """Return continuously compounded zero rate for a given date 

183 

184 Args: 

185 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward rate will be returned. 

186 d (Union[date, datetime]): The date for which the continuously compounded zero rate will be returned. 

187 Returns: 

188 float: continuously compounded zero rate 

189 """ 

190 if not isinstance(refdate, datetime): 

191 refdate = datetime(refdate, 0, 0, 0) 

192 if not isinstance(d, datetime): 

193 d = datetime(d, 0, 0, 0) 

194 if refdate < self.refdate: 

195 raise Exception("The given reference date is before the curves reference date.") 

196 r = -math.log(self.value(refdate, d)) / DayCounter(self.daycounter).yf(refdate, d) 

197 return r 

198 

199 def value_yf(self, yf: float) -> float: 

200 """Return discount factor for a given yearfrac as of the curve's reference date. 

201 Args: 

202 yf (float): The year fraction for which the discount factor will be returned. 

203 

204 Returns: 

205 float: discount factor 

206 """ 

207 

208 # get yearfrac, taking into account DCC 

209 dcc = DayCounter(self.daycounter) 

210 

211 yf_list = [dcc.yf(self.refdate, x) for x in self.get_dates()] # list(dcc.yf(self.refdate, self.get_dates())) 

212 df_list = [x for x in self.get_df()] 

213 

214 # interpolate/extrapolate given a chosen method 

215 interp = Interpolator(self.interpolation, self.extrapolation) 

216 

217 # temp testing delete when working 

218 # print(self.extrapolation) 

219 # print(f"x_data: {yf_list}") 

220 # print(f"y_data: {df_list}") 

221 # print(f"x_target: {dcc.yf(self.refdate,d)}") 

222 # print(dcc.yf(refdate, d)) 

223 

224 df = interp.interp(yf_list, df_list, yf, self.extrapolation) 

225 

226 return df 

227 

228 def value_fwd(self, val_date: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float: 

229 """Return forward discount factor for a given date (without dependencies from pyvacon) 

230 

231 The `value_fwd()` method has been updated to support forward valuation scenarios 

232 (`val_date > refdate`) by rebasing the curve from its construction date to the new 

233 valuation date. 

234 

235 The rebasement follows the relationship: 

236 

237 DF(val_date, t) = DF(refdate, t) / DF(refdate, val_date) 

238 

239 This adjustment ensures that discount factors and forward rates remain consistent 

240 across time, even when the valuation date is later than the curve’s reference date. 

241 

242 This approach aligns with market-standard practices for OIS and collateralized 

243 discounting frameworks, where forward discounting must be time-consistent with 

244 the curve’s anchor date. 

245 

246 Args: 

247 refdate (Union[date, datetime]): The reference date. If the reference date is in the future 

248 (compared to the curves reference date), the forward discount 

249 factor will be returned. 

250 d (Union[date, datetime]): The date for which the discount factor will be returned. Assumption 

251 is that the day given already follows correct business logic 

252 (e.g., roll convention) 

253 

254 Returns: 

255 float: discount factor 

256 """ 

257 

258 # double DiscountCurve::valueFwd( 

259 # const boost::posix_time::ptime &valDate, 

260 # const boost::posix_time::ptime& d1, 

261 # const boost::posix_time::ptime& d2) const 

262 # { 

263 # Analytics_ASSERT(d2 >= d1, "first date " << boost::posix_time::to_iso_string(d1) 

264 # << " must be less or equal to the second date " << boost::posix_time::to_iso_string(d2)); 

265 # double df1 = value(valDate, d1); 

266 # double df2 = value(valDate, d2); 

267 # return df2 / df1; 

268 # } 

269 

270 # check valid dates 

271 if isinstance(val_date, date): # handling date object -> datetime 

272 val_date = datetime.combine(val_date, datetime.min.time()) 

273 if isinstance(d1, date): 

274 d1 = datetime.combine(d1, datetime.min.time()) 

275 if isinstance(d2, date): 

276 d2 = datetime.combine(d2, datetime.min.time()) 

277 if val_date < self.refdate: 

278 raise Exception("The given value date is before the curves reference date.") 

279 

280 # get yearfrac, taking into account DCC 

281 dcc = DayCounter(self.daycounter) 

282 

283 yf_list = [dcc.yf(self.refdate, x) for x in self.get_dates()] # list(dcc.yf(self.refdate, self.get_dates())) 

284 df_list = [x for x in self.get_df()] 

285 

286 # DEBUG TODO REMOVE 

287 # print("Debugging value_fwd: x (yearfrac), then y (df) lists") 

288 # print(yf_list) 

289 # print(df_list) 

290 

291 # interpolate/extrapolate given a chosen method 

292 interp = Interpolator(self.interpolation, self.extrapolation) 

293 

294 # temp testing delete when working 

295 # print(self.extrapolation) 

296 # print(f"x_data: {yf_list}") 

297 # print(f"y_data: {df_list}") 

298 # print(f"x_target: {dcc.yf(self.refdate,d)}") 

299 # print(dcc.yf(refdate, d)) 

300 

301 # give FWD value if given refdate is greater than curves reference date 

302 # df1 = interp.interp(yf_list, df_list, dcc.yf(val_date, d1), self.extrapolation) 

303 # df2 = interp.interp(yf_list, df_list, dcc.yf(val_date, d2), self.extrapolation) 

304 

305 x1 = dcc.yf(self.refdate, d1) 

306 x2 = dcc.yf(self.refdate, d2) 

307 df1 = interp.interp(yf_list, df_list, x1, self.extrapolation) 

308 df2 = interp.interp(yf_list, df_list, x2, self.extrapolation) 

309 

310 if val_date > self.refdate: 

311 xval = dcc.yf(self.refdate, val_date) 

312 df_val = interp.interp(yf_list, df_list, xval, self.extrapolation) 

313 # rebase curve to val_date 

314 logger.info(f"{val_date} > {self.refdate}: forward valuation") 

315 df1 /= df_val 

316 df2 /= df_val 

317 

318 df = df2 / df1 

319 

320 return df 

321 

322 def value_fwd_rate(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float: 

323 """Return forward continuously compounded zero rate for a given date 

324 

325 Args: 

326 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward rate will be returned. 

327 d1 (Union[date, datetime]): The start date of the period for which the forward continuously compounded zero rate will be returned. 

328 d2 (Union[date, datetime]): The end date of the period for which the forward continuously compounded zero rate will be returned. 

329 Returns: 

330 float: forward continuously compounded zero rate 

331 """ 

332 if not isinstance(refdate, datetime): 

333 refdate = datetime(refdate, 0, 0, 0) 

334 if not isinstance(d1, datetime): 

335 d1 = datetime(d1, 0, 0, 0) 

336 if not isinstance(d2, datetime): 

337 d2 = datetime(d2, 0, 0, 0) 

338 if refdate < self.refdate: 

339 raise Exception("The given reference date is before the curves reference date.") 

340 r = -math.log(self.value_fwd(refdate, d1, d2)) / DayCounter(self.daycounter).yf(d1, d2) 

341 return r 

342 

343 def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None) -> float: 

344 if refdate is None or d is None: 

345 # directly return the zero rate for a given yearfrac t 

346 return -math.log(self.value_yf(t)) / t 

347 else: # return the zero rate for a given date d and reference date refdate 

348 return self.value_rate(refdate, d) 

349 

350 def plot(self, days: int = 10, discount_factors: bool = False, **kwargs): 

351 """Plots the discount curve using matplotlibs plot function. 

352 The timegrid includes the dates of the discount curve. Here either the discount factors or the zero rates (continuously compounded, ACT365 yearfraction) are plotted. 

353 

354 Args: 

355 days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10. 

356 discount_factors (bool, optional): If True, discount factors will be plotted, otherwise the rates. Defaults to False. 

357 **kwargs: optional arguments that will be directly passed to the matplotlib plot function 

358 """ 

359 dates = self.get_dates() 

360 dates_new = [dates[0]] 

361 for i in range(1, len(dates)): 

362 while dates_new[-1] + timedelta(days=days) < dates[i]: 

363 dates_new.append(dates_new[-1] + timedelta(days=days)) 

364 dates_new.append(dates[i]) 

365 # TODO: consider how best to deal with pyvacon version vs rivapy version 

366 # if self._pyvacon_obj is None: 

367 # values = [self.value(self.refdate, d) for d in dates_new] 

368 # else: 

369 # values = [self.value(self.refdate, d) for d in dates_new] 

370 ##values = [self.value(self.refdate, d) for d in dates_new] 

371 try: 

372 values = [self.value(self.refdate, d) for d in dates_new] 

373 except: 

374 values = [self.value(self.refdate, d) for d in dates_new] 

375 

376 if not discount_factors: 

377 for i in range(1, len(values)): 

378 dt = float((dates_new[i] - self.refdate).days) / 365.0 

379 values[i] = -math.log(values[i]) / dt 

380 values[0] = values[1] 

381 plt.plot(dates_new, values, label=self.id, **kwargs) 

382 

383 

384class FlatDiscountCurve(interfaces.BaseDatedCurve): 

385 """ 

386 A simple discount curve implementation based on a single flat interest rate. 

387 """ 

388 

389 def __init__( 

390 self, 

391 valuation_date: Union[date, datetime], 

392 flat_rate: Optional[float] = 0.05, 

393 curve_data: Any = None, 

394 day_counter_type: DayCounterType = DayCounterType.Act365Fixed, 

395 ): 

396 """ 

397 Initializes the flat discount curve. 

398 

399 Args: 

400 valuation_date (Union[date, datetime]): The valuation date of the curve. 

401 flat_rate (Optional[float], optional): The flat interest rate used for discounting. Defaults to 0.05. 

402 curve_data (Any, optional): Placeholder for more complex curve data (not used in this implementation). Defaults to None. 

403 day_counter_type (DayCounterType, optional): The day count convention for calculating year fractions. Defaults to DayCounterType.Act365Fixed. 

404 """ 

405 self.valuation_date = valuation_date 

406 self._flat_rate = flat_rate 

407 self._curve_data = curve_data # Placeholder for more complex curve data 

408 self._day_counter = DayCounter(day_counter_type) 

409 

410 @property 

411 def valuation_date(self) -> datetime: 

412 """The valuation date of the curve as a datetime object.""" 

413 return self._valuation_date 

414 

415 @valuation_date.setter 

416 def valuation_date(self, value: Union[date, datetime]): 

417 self._valuation_date = _date_to_datetime(value) 

418 

419 def get_discount_factor(self, target_date: Union[date, datetime], spread: float = 0.0) -> float: 

420 """ 

421 Calculates the discount factor from the valuation date to a target date. 

422 

423 Args: 

424 target_date (Union[date, datetime]): The date to which to discount. 

425 

426 Returns: 

427 float: The discount factor. Returns 0.0 if the target date is before the valuation date. 

428 """ 

429 val_date_dt = _date_to_datetime(self.valuation_date) 

430 target_date_dt = _date_to_datetime(target_date) 

431 

432 if target_date_dt < val_date_dt: 

433 return 0.0 

434 time_to_maturity_years = self._day_counter.yf(val_date_dt, target_date_dt) 

435 rate_to_use = self._flat_rate if self._flat_rate is not None else 0.02 # Fallback if flat_rate is None 

436 return 1 / ((1 + rate_to_use + spread) ** time_to_maturity_years) 

437 

438 def value(self, ref_date: datetime, target_date: datetime, spread: float = 0.0) -> float: 

439 """ 

440 Returns the discount factor from a reference date to a target date. 

441 For this simple implementation, the reference date must be the curve's valuation date. 

442 

443 Args: 

444 ref_date (datetime): The reference date (must match the curve's valuation date). 

445 target_date (datetime): The date to which to discount. 

446 

447 Raises: 

448 ValueError: If the reference date does not match the curve's valuation date. 

449 

450 Returns: 

451 float: The discount factor. 

452 """ 

453 # Ensure ref_date matches the curve's valuation_date for this simple implementation 

454 if _date_to_datetime(ref_date).date() != self.valuation_date.date(): 

455 raise ValueError(f"Reference date {ref_date} does not match DiscountCurve valuation date {self.valuation_date}") 

456 return self.get_discount_factor(target_date, spread=spread) 

457 

458 def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None) -> float: 

459 return self._flat_rate 

460 

461 

462class NelsonSiegel(interfaces.FactoryObject): 

463 def __init__(self, beta0: float, beta1: float, beta2: float, tau: float): 

464 """Nelson-Siegel parametrization for rates and yields, see :footcite:t:`Nelson1987`. 

465 

466 This parametrization is mostly used to parametrize rate curves and can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`. It is defined by 

467 

468 .. math:: 

469 

470 f(t) = \\beta_0 + (\\beta_1+\\beta_2)\\frac{1-e^{-t/\\tau}}{t/\\tau} -\\beta_2e^{t/\\tau} 

471 

472 

473 Args: 

474 beta0 (float): This parameter is the asymptotic (for arbitrary large maturities) rate, see formula above. 

475 beta1 (float): beta0 + beta1 give the short term rate, see formula above. 

476 beta2 (float): This parameter controls the size of the hump, see formula above. 

477 tau (float): This parameter controls the location of the hump, see formula above. 

478 

479 Examples: 

480 .. code-block:: python 

481 

482 >>> from rivapy.marketdata.curves import NelsonSiegel, DiscountCurveParametrized 

483 >>> ns = NelsonSiegel(beta0=0.05, beta1 = 0.02, beta2=0.1, tau=1.0) 

484 >>> dc = DiscountCurveParametrized('DC', refdate = dt.datetime(2023,1,1), rate_parametrization=ns, daycounter = DayCounterType.Act365Fixed) 

485 >>> dates = [dt.datetime(2023,1,1) + dt.timedelta(days=30*days) for days in range(120)] 

486 >>> values = [dc.value(refdate = dt.datetime(2023,1,1),d=d) for d in dates] 

487 >>> plt.plot(dates, values) 

488 """ 

489 self.beta0 = beta0 

490 self.beta1 = beta1 

491 self.beta2 = beta2 

492 self.tau = tau 

493 self._multiplier = 1.0 

494 

495 def _to_dict(self) -> dict: 

496 return {"beta0": self.beta0, "beta1": self.beta1, "beta2": self.beta2, "tau": self.tau} 

497 

498 def __call__(self, t: float): 

499 return self._multiplier * NelsonSiegel.compute(self.beta0, self.beta1, self.beta2, self.tau, t) 

500 

501 def __mul__(self, x: float): 

502 result = NelsonSiegel(self.beta0, self.beta1, self.beta2, self.tau) 

503 result._multiplier = x 

504 return result 

505 

506 @staticmethod 

507 def compute(beta0: float, beta1: float, beta2: float, tau: float, T: float) -> float: 

508 """_summary_ 

509 

510 Args: 

511 beta0 (float): longrun 

512 beta1 (float): beta0 + beta1 = shortrun 

513 beta2 (float): hump or through 

514 tau (float):locaton of hump 

515 T (float): _description_ 

516 

517 Returns: 

518 float: _description_ 

519 """ 

520 t = np.maximum(T, 1e-4) / tau 

521 return beta0 + beta1 * (1.0 - np.exp(-t)) / t + beta2 * ((1 - np.exp(-t)) / t - np.exp(-(t))) 

522 

523 @staticmethod 

524 def _create_sample( 

525 n_samples: int, 

526 seed: int = None, 

527 min_short_term_rate: float = -0.01, 

528 max_short_term_rate: float = 0.12, 

529 min_long_run_rate: float = 0.005, 

530 max_long_run_rate: float = 0.15, 

531 min_hump: float = -0.1, 

532 max_hump: float = 0.1, 

533 min_tau: float = 0.5, 

534 max_tau: float = 3.0, 

535 ): 

536 if seed is not None: 

537 np.random.seed(seed) 

538 result = [] 

539 for i in range(n_samples): 

540 beta0 = np.random.uniform(min_long_run_rate, max_long_run_rate) 

541 beta1 = np.random.uniform(min_short_term_rate - beta0, max_short_term_rate - beta0) 

542 beta2 = np.random.uniform(min_hump, max_hump) 

543 tau = np.random.uniform(min_tau, max_tau) 

544 result.append(NelsonSiegel(beta0, beta1, beta2, tau)) 

545 return result 

546 

547 if has_tf: 

548 

549 @staticmethod 

550 def compute_tf(beta0: tf.Tensor, beta1: tf.Tensor, beta2: tf.Tensor, tau: tf.Tensor, T: tf.Tensor) -> tf.Tensor: 

551 """_summary_ 

552 

553 Args: 

554 beta0 (float): longrun 

555 beta1 (float): beta0 + beta1 = shortrun 

556 beta2 (float): hump or through 

557 tau (float):locaton of hump 

558 T (float): _description_ 

559 

560 Returns: 

561 float: _description_ 

562 """ 

563 t = tf.maximum(T, 1e-4) / tau 

564 return beta0 + beta1 * (1.0 - tf.exp(-t)) / t + beta2 * ((1 - tf.exp(-t)) / t - tf.exp(-(t))) 

565 

566 

567class ConstantRate(interfaces.FactoryObject): 

568 def __init__(self, rate: float): 

569 """Continuously compounded flat rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`. 

570 

571 Args: 

572 rate (float): The constant rate. 

573 

574 """ 

575 self.rate = rate 

576 

577 def _to_dict(self) -> dict: 

578 return {"rate": self.rate} 

579 

580 @staticmethod 

581 def _create_sample(n_samples: int, seed: int = None): 

582 if seed is not None: 

583 np.random.seed(seed) 

584 result = [] 

585 for i in range(n_samples): 

586 result.append(ConstantRate(rate=np.random.uniform(-0.005, 0.1))) 

587 return result 

588 

589 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

590 if not isinstance(refdate, datetime): 

591 refdate = datetime(refdate, 0, 0, 0) 

592 if not isinstance(d, datetime): 

593 d = datetime(d, 0, 0, 0) 

594 r = self.rate 

595 yf = DayCounter(DayCounterType.Act365Fixed).yf(refdate, d) 

596 return np.exp(-r * yf) 

597 

598 def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None): 

599 return self.rate 

600 

601 

602class LinearRate(interfaces.FactoryObject): 

603 def __init__(self, shortterm_rate: float, longterm_rate: float, max_maturity: float = 10.0, min_maturity: float = 1.0): 

604 """Continuously compounded linearly interpolated rate object that can be used in conjunction with :class:`rivapy.marketdata.DiscountCurveParametrized`. 

605 

606 Args: 

607 shortterm_rate (float): The short term rate. 

608 longterm_rate (float): the longterm rate. 

609 max_maturity (float): AFer this timepoint constant extrapolation is applied. 

610 """ 

611 self.shortterm_rate = shortterm_rate 

612 self.min_maturity = min_maturity 

613 self.longterm_rate = longterm_rate 

614 self.max_maturity = max_maturity 

615 self._coeff = (self.longterm_rate - self.shortterm_rate) / (self.max_maturity - self.min_maturity) 

616 

617 @staticmethod 

618 def _create_sample(n_samples: int, seed: int = None): 

619 if seed is not None: 

620 np.random.seed(seed) 

621 result = [] 

622 for i in range(n_samples): 

623 shortterm_rate = np.random.uniform(-0.005, 0.07) 

624 longterm_rate = shortterm_rate + np.random.uniform(0.0025, 0.09) 

625 result.append(LinearRate(shortterm_rate=shortterm_rate, longterm_rate=longterm_rate)) 

626 return result 

627 

628 def _to_dict(self) -> dict: 

629 return {"shortterm_rate": self.shortterm_rate, "longterm_rate": self.longterm_rate, "max_maturity": self.max_maturity} 

630 

631 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

632 if not isinstance(refdate, datetime): 

633 refdate = datetime(refdate, 0, 0, 0) 

634 if not isinstance(d, datetime): 

635 d = datetime(d, 0, 0, 0) 

636 r = Interpolator(InterpolationType.LINEAR, ExtrapolationType.CONSTANT).interp( 

637 [self.min_maturity, self.max_maturity], 

638 [self.shortterm_rate, self.longterm_rate], 

639 DayCounter(DayCounterType.Act365Fixed).yf(refdate, d), 

640 ExtrapolationType.CONSTANT, 

641 ) 

642 yf = DayCounter(DayCounterType.Act365Fixed).yf(refdate, d) 

643 return np.exp(-r * yf) 

644 

645 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

646 if not isinstance(refdate, datetime): 

647 refdate = datetime(refdate, 0, 0, 0) 

648 if not isinstance(d, datetime): 

649 d = datetime(d, 0, 0, 0) 

650 r = -math.log(self.value(refdate, d)) / DayCounter(DayCounterType.Act365Fixed).yf(refdate, d) 

651 return r 

652 

653 def __call__(self, t: float, refdate: Union[date, datetime] = None, d: Union[date, datetime] = None): 

654 return Interpolator(InterpolationType.LINEAR, ExtrapolationType.CONSTANT).interp( 

655 [self.min_maturity, self.max_maturity], [self.shortterm_rate, self.longterm_rate], t, ExtrapolationType.CONSTANT 

656 ) 

657 

658 

659class NelsonSiegelSvensson(NelsonSiegel): 

660 def __init__(self, beta0: float, beta1: float, beta2: float, beta3: float, tau: float, tau2: float): 

661 super().__init__(beta0, beta1, beta2, tau) 

662 self.beta3 = beta3 

663 self.tau2 = tau2 

664 

665 def _to_dict(self) -> dict: 

666 tmp = super()._to_dict() 

667 tmp.update({"beta3": self.beta3, "tau2": self.tau2}) 

668 return tmp 

669 

670 def __call__(self, t: float): 

671 return NelsonSiegelSvensson.compute(self.beta0, self.beta1, self.beta2, self.beta3, self.tau, self.tau2, t) 

672 

673 @staticmethod 

674 def compute(beta0, beta1, beta2, beta3, tau, tau2, T): 

675 t = np.maximum(T, 1e-4) / tau2 

676 return NelsonSiegel.compute(beta0, beta1, beta2, tau, T) + beta3 * ((1 - np.exp(-t)) / t - np.exp(-(t))) 

677 

678 

679class DiscountCurveComposition(interfaces.FactoryObject): 

680 def __init__(self, a, b, c): 

681 # check if all discount curves have the same daycounter, otherwise exception 

682 if isinstance(a, dict): 

683 a = _create(a) 

684 if isinstance(b, dict): 

685 b = _create(b) 

686 if isinstance(c, dict): 

687 c = _create(c) 

688 dc = set() 

689 for k in [a, b, c]: 

690 if hasattr(k, "daycounter"): 

691 dc.add(k.daycounter) 

692 if len(dc) > 1: 

693 raise Exception("All curves must have same daycounter.") 

694 if len(dc) > 0: 

695 self.daycounter = dc.pop() 

696 else: 

697 self.daycounter = DayCounterType.Act365Fixed.value 

698 self._dc = DayCounter(self.daycounter) 

699 self.a = a 

700 if not hasattr(a, "value"): 

701 self.a = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(a), self.daycounter) 

702 self.b = b 

703 if not hasattr(b, "value"): 

704 self.b = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(b), self.daycounter) 

705 self.c = c 

706 if not hasattr(c, "value"): 

707 self.c = DiscountCurveParametrized("", datetime(1980, 1, 1), ConstantRate(c), self.daycounter) 

708 

709 def _to_dict(self) -> dict: 

710 if hasattr(self.a, "to_dict"): 

711 a = self.a.to_dict() 

712 else: 

713 a = self.a 

714 if hasattr(self.b, "to_dict"): 

715 b = self.b.to_dict() 

716 else: 

717 b = self.b 

718 if hasattr(self.c, "to_dict"): 

719 c = self.c.to_dict() 

720 else: 

721 c = self.c 

722 return {"a": a, "b": b, "c": c} 

723 

724 @staticmethod 

725 def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list: 

726 curves = DiscountCurveParametrized._create_sample(n_samples, seed, refdate, parametrization_type) 

727 results = [] 

728 for c in curves: 

729 results.append(c + 0.001) 

730 return results 

731 

732 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

733 r = self.value_rate(refdate, d) 

734 yf = self._dc.yf(refdate, d) 

735 return np.exp(-r * yf) 

736 

737 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

738 return self.a.value_rate(refdate, d) * self.b.value_rate(refdate, d) + self.c.value_rate(refdate, d) 

739 

740 def value_fwd(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float: 

741 """Return forward discount factor for a given date""" 

742 return self.value(refdate, d2) / self.value(refdate, d1) 

743 

744 def value_fwd_rate(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float: 

745 """Return forward continuously compounded zero rate for a given date""" 

746 r = -math.log(self.value_fwd(refdate, d1, d2)) / self._dc.yf(d1, d2) 

747 return r 

748 

749 def __mul__(self, other): 

750 # TODO unittests 

751 return DiscountCurveComposition(self, other, 0.0) 

752 

753 def __rmul__(self, other): 

754 return DiscountCurveComposition(self, other, 0.0) 

755 

756 def __add__(self, other): 

757 return DiscountCurveComposition(self, 1.0, other) 

758 

759 def __radd__(self, other): 

760 return DiscountCurveComposition(self, 1.0, other) 

761 

762 

763class DiscountCurveParametrized(interfaces.FactoryObject): 

764 def __init__( 

765 self, 

766 obj_id: str, 

767 refdate: Union[datetime, date], 

768 rate_parametrization, #: Callable[[float], float], 

769 daycounter: Union[DayCounterType, str] = DayCounterType.Act365Fixed, 

770 ): 

771 """_summary_ 

772 

773 Args: 

774 obj_id (str): _description_ 

775 refdate (Union[datetime, date]): _description_ 

776 rate_parametrization (Callable[[float], float]): _description_ 

777 daycounter (Union[DayCounterType, str], optional): _description_. Defaults to DayCounterType.Act365Fixed. 

778 """ 

779 if isinstance(refdate, datetime): 

780 self.refdate = refdate 

781 else: 

782 self.refdate = datetime(refdate, 0, 0, 0) 

783 

784 self.daycounter = DayCounterType.to_string(daycounter) 

785 self._dc = DayCounter(self.daycounter) 

786 self.obj_id = obj_id 

787 if isinstance(rate_parametrization, dict): # if schedule is a dict we try to create it from factory 

788 self.rate_parametrization = _create(rate_parametrization) 

789 else: 

790 self.rate_parametrization = rate_parametrization 

791 

792 def _to_dict(self) -> dict: 

793 try: 

794 parametrization = self.rate_parametrization.to_dict() 

795 except Exception as e: 

796 raise Exception("Missing implementation of to_dict() in parametrization of type " + type(self.rate_parametrization).__name__) 

797 return {"obj_id": self.obj_id, "refdate": self.refdate, "rate_parametrization": parametrization} 

798 

799 def value_fwd(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float: 

800 """Return forward discount factor for a given date 

801 

802 Args: 

803 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned. 

804 d1 (Union[date, datetime]): The start date of the forward period 

805 d2 (Union[date, datetime]): The end date of the forward period 

806 Returns: 

807 float: forward rate 

808 """ 

809 if not isinstance(refdate, datetime): 

810 refdate = datetime(refdate, 0, 0, 0) 

811 if not isinstance(d1, datetime): 

812 d1 = datetime(d1, 0, 0, 0) 

813 if not isinstance(d2, datetime): 

814 d2 = datetime(d2, 0, 0, 0) 

815 if refdate < self.refdate: 

816 raise Exception("The given reference date is before the curves reference date.") 

817 yf1 = self.value(refdate, d1) 

818 yf2 = self.value(refdate, d2) 

819 return yf2 / yf1 

820 

821 def value(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

822 """Return discount factor for a given date 

823 

824 Args: 

825 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned. 

826 d (Union[date, datetime]): The date for which the discount factor will be returned 

827 

828 Returns: 

829 float: discount factor 

830 """ 

831 if not isinstance(refdate, datetime): 

832 refdate = datetime(refdate, 0, 0, 0) 

833 if not isinstance(d, datetime): 

834 d = datetime(d, 0, 0, 0) 

835 if refdate < self.refdate: 

836 raise Exception("The given reference date is before the curves reference date.") 

837 yf = self._dc.yf(refdate, d) 

838 return np.exp(-self.rate_parametrization(yf, refdate, d) * yf) 

839 

840 def value_rate(self, refdate: Union[date, datetime], d: Union[date, datetime]) -> float: 

841 """Return the continuous rate for a given date 

842 

843 Args: 

844 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned. 

845 d (Union[date, datetime]): The date for which the discount factor will be returned 

846 

847 Returns: 

848 float: continuous rate 

849 """ 

850 if not isinstance(refdate, datetime): 

851 refdate = datetime(refdate, 0, 0, 0) 

852 if not isinstance(d, datetime): 

853 d = datetime(d, 0, 0, 0) 

854 if refdate < self.refdate: 

855 raise Exception("The given reference date is before the curves reference date.") 

856 yf = self._dc.yf(refdate, d) 

857 return self.rate_parametrization(yf, refdate, d) 

858 

859 def value_fwd_rate(self, refdate: Union[date, datetime], d1: Union[date, datetime], d2: Union[date, datetime]) -> float: 

860 """Return forward continuously compounded zero rate for a given date 

861 

862 Args: 

863 refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward rate will be returned. 

864 d1 (Union[date, datetime]): The start date of the period for which the forward continuously compounded zero rate will be returned. 

865 d2 (Union[date, datetime]): The end date of the period for which the forward continuously compounded zero rate will be returned. 

866 Returns: 

867 float: forward continuously compounded zero rate 

868 """ 

869 if not isinstance(refdate, datetime): 

870 refdate = datetime(refdate, 0, 0, 0) 

871 if not isinstance(d1, datetime): 

872 d1 = datetime(d1, 0, 0, 0) 

873 if not isinstance(d2, datetime): 

874 d2 = datetime(d2, 0, 0, 0) 

875 if refdate < self.refdate: 

876 raise Exception("The given reference date is before the curves reference date.") 

877 r = -math.log(self.value_fwd(refdate, d1, d2)) / self._dc.yf(d1, d2) 

878 return r 

879 

880 @staticmethod 

881 def _create_sample(n_samples: int, seed: int = None, refdate: Union[datetime, date] = None, parametrization_type=NelsonSiegel) -> list: 

882 if seed is not None: 

883 np.random.seed(seed) 

884 if refdate is None: 

885 refdate = datetime.now() 

886 parametrizations = parametrization_type._create_sample(n_samples) 

887 result = [] 

888 for i, p in enumerate(parametrizations): 

889 result.append(DiscountCurveParametrized("DCP_" + str(i), refdate, p)) 

890 return result 

891 

892 def __mul__(self, other): 

893 return DiscountCurveComposition(self, other, 0.0) 

894 

895 def __rmul__(self, other): 

896 return DiscountCurveComposition(self, other, 0.0) 

897 

898 def __add__(self, other): 

899 return DiscountCurveComposition(self, 1.0, other) 

900 

901 def __radd__(self, other): 

902 return DiscountCurveComposition(self, 1.0, other) 

903 

904 

905class EquityForwardCurve: 

906 def __init__(self, spot: float, funding_curve: DiscountCurve, borrow_curve: DiscountCurve, div_table): 

907 """Equity Forward Curve 

908 

909 Args: 

910 

911 spot (float): Current spot 

912 discount_curve (DiscountCurve): [description] 

913 funding_curve (DiscountCurve): [description] 

914 borrow_curve (DiscountCurve): [description] 

915 div_table (:class:`rivapy.marketdata.DividendTable`): [description] 

916 """ 

917 self.spot = spot 

918 

919 self.bc = borrow_curve 

920 self.fc = funding_curve 

921 self.div = div_table 

922 self._pyvacon_obj = None 

923 self.refdate = self.fc.refdate 

924 if self.bc is not None: 

925 if self.refdate < self.bc.refdate: 

926 self.refdate = self.bc.refdate 

927 

928 if self.div is not None: 

929 if hasattr(self.div, "refdate"): 

930 if self.refdate < self.div.refdate: 

931 self.refdate = self.div.refdate 

932 

933 def _get_pyvacon_obj(self): 

934 if self._pyvacon_obj is None: 

935 if hasattr(self.fc, "_get_pyvacon_obj"): 

936 fc = self.fc._get_pyvacon_obj() 

937 else: 

938 fc = self.fc 

939 

940 if hasattr(self.bc, "_get_pyvacon_obj"): 

941 bc = self.bc._get_pyvacon_obj() 

942 else: 

943 bc = self.bc 

944 

945 if hasattr(self.div, "_get_pyvacon_obj"): 

946 div = self.div._get_pyvacon_obj() 

947 else: 

948 div = self.div 

949 self._pyvacon_obj = _EquityForwardCurve(self.refdate, self.spot, fc, bc, div) 

950 

951 return self._pyvacon_obj 

952 

953 def value(self, refdate, expiry): 

954 return self._get_pyvacon_obj().value(refdate, expiry) 

955 

956 def plot(self, days: int = 10, days_end: int = 10 * 365, **kwargs): 

957 """Plots the forward curve using matplotlibs plot function. 

958 

959 Args: 

960 days (int, optional): The number of days between two plotted rates/discount factors. Defaults to 10. 

961 days_end (int. optional): Number of days when plotting will end. Defaults to 10*365 (10yr) 

962 **kwargs: optional arguments that will be directly passed to the matplotlib plto function 

963 """ 

964 dates = [self.refdate + timedelta(days=i) for i in range(0, days_end, days)] 

965 values = [self.value(self.refdate, d) for d in dates] 

966 plt.plot(dates, values, **kwargs) 

967 plt.xlabel("expiry") 

968 plt.ylabel("forward value") 

969 

970 

971class BootstrapHazardCurve: 

972 def __init__( 

973 self, ref_date: datetime, trade_date: datetime, dc: DiscountCurve, RR: float, payment_dates: List[datetime], market_spreads: List[float] 

974 ): 

975 """[summary] 

976 

977 Args: 

978 ref_date (datetime): [description] 

979 trade_date (datetime): [description] 

980 dc (DiscountCurve): [description] 

981 RR (float): [description] 

982 payment_dates (List[datetime]): [description] 

983 market_spreads (List[float]): [description] 

984 """ 

985 

986 self.ref_date = ref_date 

987 self.trade_date = trade_date 

988 self.dc = dc 

989 self.RR = RR 

990 self.payment_dates_bootstrapp = payment_dates 

991 self.market_spreads = market_spreads 

992 self._pyvacon_obj = None 

993 

994 def par_spread(self, dc_survival, maturity_date, payment_dates: List[datetime]): 

995 integration_step = relativedelta.relativedelta(days=365) 

996 premium_period_start = self.ref_date 

997 prev_date = self.ref_date 

998 current_date = min(prev_date + integration_step, maturity_date) 

999 dc_valuation_date = self.dc.value(self.ref_date, maturity_date) 

1000 risk_adj_factor_protection = 0 

1001 risk_adj_factor_premium = 0 

1002 risk_adj_factor_accrued = 0 

1003 

1004 while current_date <= maturity_date: 

1005 default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, current_date) 

1006 risk_adj_factor_protection += self.dc.value(self.ref_date, current_date) * default_prob 

1007 prev_date = current_date 

1008 current_date += integration_step 

1009 

1010 if prev_date < maturity_date and current_date > maturity_date: 

1011 default_prob = dc_survival.value(self.ref_date, prev_date) - dc_survival.value(self.ref_date, maturity_date) 

1012 risk_adj_factor_protection += self.dc.value(self.ref_date, maturity_date) * default_prob 

1013 

1014 for premium_payment in payment_dates: 

1015 if premium_payment >= self.ref_date: 

1016 period_length = ((premium_payment - premium_period_start).days) / 360 

1017 survival_prob = (dc_survival.value(self.ref_date, premium_period_start) + dc_survival.value(self.ref_date, premium_payment)) / 2 

1018 df = self.dc.value(self.ref_date, premium_payment) 

1019 risk_adj_factor_premium += period_length * survival_prob * df 

1020 default_prob = dc_survival.value(self.ref_date, premium_period_start) - dc_survival.value(self.ref_date, premium_payment) 

1021 risk_adj_factor_accrued += period_length * default_prob * df 

1022 premium_period_start = premium_payment 

1023 

1024 PV_accrued = (1 / 2) * risk_adj_factor_accrued 

1025 PV_premium = (1) * risk_adj_factor_premium 

1026 PV_protection = ((1 - self.RR)) * risk_adj_factor_protection 

1027 

1028 par_spread_i = (PV_protection) / ((PV_premium + PV_accrued)) 

1029 return par_spread_i 

1030 

1031 def create_survival(self, dates: List[datetime], hazard_rates: List[float]): 

1032 return _SurvivalCurve("survival_curve", self.refdate, dates, hazard_rates) 

1033 

1034 def calibration_error(x, self, mkt_par_spread, ref_date, payment_dates, dates, hazard_rates): 

1035 hazard_rates[-1] = x 

1036 maturity_date = dates[-1] 

1037 dc_surv = self.create_survival(ref_date, dates, hazard_rates) 

1038 return mkt_par_spread - self.par_spread(dc_surv, maturity_date, payment_dates) 

1039 

1040 def calibrate_hazard_rate(self): 

1041 sc_dates = [self.ref_date] 

1042 hazard_rates = [0.0] 

1043 for i in range(len(self.payment_dates_bootstrapp)): 

1044 payment_dates_iter = self.payment_dates_bootstrapp[i] 

1045 mkt_par_spread_iter = self.market_spreads[i] 

1046 sc_dates.append(payment_dates_iter[-1]) 

1047 hazard_rates.append(hazard_rates[-1]) 

1048 sol = scipy.optimize.root_scalar( 

1049 self.calibration_error, 

1050 args=(mkt_par_spread_iter, self.ref_date, payment_dates_iter, sc_dates, hazard_rates), 

1051 method="brentq", 

1052 bracket=[0, 3], 

1053 xtol=1e-8, 

1054 rtol=1e-8, 

1055 ) 

1056 hazard_rates[-1] = sol.root 

1057 return hazard_rates, sc_dates # self.create_survival(self.ref_date, sc_dates, hazard_rates)#.value, hazard_rates 

1058 

1059 # def hazard_rates(self): 

1060 # #hazard_rates_value=[] 

1061 # hazard_rates_value=self.calibrate_hazard_rate() 

1062 # return self.hazard_rates_value 

1063 

1064 # def value(self, refdate: Union[date, datetime], d: Union[date, datetime])->float: 

1065 # """Return discount factor for a given date 

1066 

1067 # Args: 

1068 # refdate (Union[date, datetime]): The reference date. If the reference date is in the future (compared to the curves reference date), the forward discount factor will be returned. 

1069 # d (Union[date, datetime]): The date for which the discount factor will be returned 

1070 

1071 # Returns: 

1072 # float: discount factor 

1073 # """ 

1074 # #if not isinstance(refdate, datetime): 

1075 # # refdate = datetime(refdate,0,0,0) 

1076 # #if not isinstance(d, datetime): 

1077 # # d = datetime(d,0,0,0) 

1078 # #if refdate < self.refdate: 

1079 # # raise Exception('The given reference date is before the curves reference date.') 

1080 # return self._get_pyvacon_obj().value(refdate, d) 

1081 

1082 # def _get_pyvacon_obj(self): 

1083 # if self._pyvacon_obj is None: 

1084 # self._pyvacon_obj = _SurvivalCurve('survival_curve', self.refdate, 

1085 # self.calibrate_hazard_rate[1], self.calibrate_hazard_rate[0]) 

1086 # return self._pyvacon_obj 

1087 

1088 

1089# class PowerPriceForwardCurve: 

1090# def __init__( 

1091# self, refdate: Union[datetime, date], start: datetime, end: datetime, values: np.ndarray, freq: str = "1H", tz: str = None, id: str = None 

1092# ): 

1093# """Simple forward curve for power. 

1094 

1095# Args: 

1096# refdate (Union[datetime, date]): Reference date of curve 

1097# start (dt.datetime): Start of forward curve datetimepoints (including this timepoint). 

1098# end (dt.datetime): End of forad curve datetimepoints (excluding this timepoint). 

1099# values (np.ndarray): One dimensional array holding the price for each datetimepint in the curve. The method value will raise an exception if the number of values is not equal to the number of datetimepoints. 

1100# freq (str, optional): Frequency of timepoints. Defaults to '1H'. See documentation for pandas.date_range for further details on freq. 

1101# tz (str or tzinfo): Time zone name for returning localized datetime points, for example ‘Asia/Hong_Kong’. 

1102# By default, the resulting datetime points are timezone-naive. See documentation for pandas.date_range for further details on tz. 

1103# id (str): Identifier for the curve. It has no impact on the valuation functionality. If None, a uuid will be generated. Defaults to None. 

1104# """ 

1105# self.id = id 

1106# if id is None: 

1107# self.id = "PFC/" + str(datetime.now()) 

1108# self.refdate = refdate 

1109# self.start = start 

1110# self.end = end 

1111# self.freq = freq 

1112# self.tz = tz 

1113# self.values = values 

1114# # timegrid used to compute prices for a certain schedule 

1115# self._tg = None 

1116# self._df = ( 

1117# pd.DataFrame( 

1118# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values} 

1119# ) 

1120# .set_index(["dates"]) 

1121# .sort_index() 

1122# ) 

1123 

1124# def value(self, refdate: Union[date, datetime], schedule) -> np.ndarray: 

1125# if self._tg is None: 

1126# self._tg = pd.DataFrame( 

1127# {"dates": pd.date_range(self.start, self.end, freq=self.freq, tz=self.tz, inclusive="left").to_pydatetime(), "values": self.values} 

1128# ).reset_index() 

1129# if self._tg.shape[0] != self.values.shape[0]: 

1130# raise Exception( 

1131# "The number of dates (" 

1132# + str(self._tg.shape[0]) 

1133# + ") does not equal number of values (" 

1134# + str(self.values.shape[0]) 

1135# + ") in forward curve." 

1136# ) 

1137# tg = self._tg[(self._tg.dates >= schedule.start) & (self._tg.dates < schedule.end)].set_index("dates") 

1138# _schedule = pd.DataFrame({"dates": schedule.get_schedule(refdate)}) 

1139# tg = _schedule.join(tg, on="dates") 

1140# # tg = tg[tg['dates']>=refdate] 

1141# if tg["index"].isna().sum() > 0: 

1142# raise Exception("There are " + str(tg["index"].isna().sum()) + " dates in the schedule not covered by the forward curve.") 

1143# return self.values[tg["index"].values] 

1144 

1145# def get_df(self) -> pd.DataFrame: 

1146# return self._df 

1147 

1148 

1149class EnergyPriceForwardCurve: 

1150 """Energy Price Forward Curve object. 

1151 It is recommended to initialze this object via the class methods ``from_existing_pfc``, ``from_existing_shape`` or ``from_scratch``. 

1152 

1153 Args: 

1154 id (_type_): ID for the PFC object 

1155 refdate (Union[datetime, date]): Reference date 

1156 pfc (pd.DataFrame, optional): This object can be initialized with an existing pfc. Defaults to None. 

1157 """ 

1158 

1159 def __init__(self, id, refdate: Union[datetime, date], pfc: pd.DataFrame = None, **kwargs): 

1160 self.id = id 

1161 if id is None: 

1162 self.id = "PFC/" + str(datetime.now()) 

1163 self.refdate = refdate 

1164 

1165 self._pfc = pfc 

1166 

1167 self._pfc_shape: pd.DataFrame = kwargs.get("pfc_shape", None) 

1168 

1169 self._apply_schedule: SimpleSchedule = kwargs.get("apply_schedule", None) 

1170 self._pfc_shaper: PFCShaper = kwargs.get("pfc_shaper", None) 

1171 

1172 list(map(lambda x: EnergyPriceForwardCurve._validate_dataframes(x), [self._pfc, self._pfc_shape])) 

1173 

1174 self._future_contracts: List[EnergyFutureSpecifications] = kwargs.get("future_contracts", None) 

1175 

1176 if self._pfc is None and self._pfc_shape is None and self._pfc_shaper is None: 

1177 raise ValueError("No values provided for the arguments pfc, pfc_shape and pfc_shaper!") 

1178 

1179 @staticmethod 

1180 def _validate_dataframes(dataframe: Optional[pd.DataFrame]): 

1181 if dataframe is not None: 

1182 validators._check_pandas_index_for_datetime(dataframe) 

1183 

1184 @classmethod 

1185 def from_existing_pfc(cls, id, refdate: Union[datetime, date], pfc: pd.DataFrame) -> "EnergyPriceForwardCurve": 

1186 """Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC. 

1187 

1188 Args: 

1189 id (_type_): ID for the PFC object 

1190 refdate (Union[datetime, date]): Reference Date 

1191 pfc (pd.DataFrame): Existing Pfc 

1192 

1193 Returns: 

1194 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object 

1195 """ 

1196 instance = cls(id=id, refdate=refdate, pfc=pfc) 

1197 return instance 

1198 

1199 @classmethod 

1200 def from_existing_shape( 

1201 cls, id, refdate: Union[datetime, date], pfc_shape: pd.DataFrame, future_contracts: List[EnergyFutureSpecifications] 

1202 ) -> "EnergyPriceForwardCurve": 

1203 """Initialization of the ``EnergyPriceForwardCurve`` given an existing PFC shape. The shape is then shifted in order to match the future contracts defined in the ``future_contracts`` list. 

1204 

1205 

1206 Args: 

1207 id (_type_): ID for the PFC object 

1208 refdate (Union[datetime, date]): Reference Date 

1209 pfc_shape (pd.DataFrame): Existing PFC shape 

1210 future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects) 

1211 

1212 Returns: 

1213 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object 

1214 """ 

1215 instance = cls(id=id, refdate=refdate, pfc_shape=pfc_shape, future_contracts=future_contracts) 

1216 instance._shift_shape() 

1217 return instance 

1218 

1219 @classmethod 

1220 def from_scratch( 

1221 cls, 

1222 id, 

1223 refdate: Union[datetime, date], 

1224 apply_schedule: SimpleSchedule, 

1225 pfc_shaper: PFCShaper, 

1226 future_contracts: List[EnergyFutureSpecifications], 

1227 ) -> "EnergyPriceForwardCurve": 

1228 """Initialization of the ``EnergyPriceForwardCurve`` from scratch. First a shape is created using the ``pfc_shaper``. Afterwards, shape is shifted in order to match the future contracts defined in the ``future_contracts`` list. 

1229 

1230 Args: 

1231 id (_type_): ID for the PFC object 

1232 refdate (Union[datetime, date]): Reference Date 

1233 apply_schedule (SimpleSchedule): Schedule to apply the ``pfc_shaper`` on, in order to obtain shape values for future time points 

1234 pfc_shaper (PFCShaper): PFC shaper 

1235 future_contracts (List[EnergyFutureSpecifications]): List of future contracts (``EnergyFutureSpecifications`` objects) 

1236 

1237 Returns: 

1238 EnergyPriceForwardCurve: ``EnergyPriceForwardCurve`` object 

1239 """ 

1240 instance = cls(id=id, refdate=refdate, pfc_shaper=pfc_shaper, future_contracts=future_contracts, apply_schedule=apply_schedule) 

1241 instance._create_shape() 

1242 instance._shift_shape() 

1243 return instance 

1244 

1245 def __validate_contracts_frequency(self): 

1246 """Checks if all contracts in ``self._future_contracts`` have the sample schedule frequency.""" 

1247 frequencies_contracts = defaultdict(list) 

1248 for future_contracts in self._future_contracts: 

1249 frequencies_contracts[future_contracts.schedule.freq].append((future_contracts.schedule.__class__.__name__, future_contracts.name)) 

1250 

1251 if len(list(frequencies_contracts.keys())) > 1: 

1252 raise ValueError( 

1253 f"Found different contract frequencies: {frequencies_contracts}.\n Please provide uniform frequencies for the elements in the `future_contract` dictionary!" 

1254 ) 

1255 

1256 def __get_offpeak_contracts( 

1257 self, base_contracts: List[EnergyFutureSpecifications], peak_contracts: List[EnergyFutureSpecifications] 

1258 ) -> List[EnergyFutureSpecifications]: 

1259 """In cases where base and peak contracts are part of the ``self._future_contracts``, offpeak contracts need to be deducted from these two in order to shift the shape properly. 

1260 

1261 Args: 

1262 base_contracts (List[EnergyFutureSpecifications]): List of base contracts 

1263 peak_contracts (List[EnergyFutureSpecifications]): List of peak contracts 

1264 

1265 Returns: 

1266 List[EnergyFutureSpecifications]: List of offpeak contracts 

1267 """ 

1268 offpeak_contracts = [] 

1269 

1270 # iterate over each combination of base and peak contracts 

1271 for base_contract_spec in base_contracts: 

1272 n_base = len(base_contract_spec.get_schedule()) 

1273 for peak_contract_spec in peak_contracts: 

1274 # match both by the start and end dates of their respective schedule 

1275 if base_contract_spec.get_start_end() == peak_contract_spec.get_start_end(): 

1276 # if both match, an offpeak contract can be created from these two 

1277 offpeak_name = f"offpeak_{base_contract_spec.name}&{peak_contract_spec.name}" 

1278 n_peak = len(peak_contract_spec.get_schedule()) 

1279 offpeak_price = ( 

1280 n_base / (n_base - n_peak) * base_contract_spec.get_price() - n_peak / (n_base - n_peak) * peak_contract_spec.get_price() 

1281 ) 

1282 offpeak_contracts.append( 

1283 EnergyFutureSpecifications( 

1284 schedule=OffPeakSchedule(start=base_contract_spec.get_start(), end=base_contract_spec.get_end()), 

1285 price=offpeak_price, 

1286 name=offpeak_name, 

1287 ) 

1288 ) 

1289 break 

1290 

1291 return offpeak_contracts 

1292 

1293 def _shift_shape(self): 

1294 """Shifts the shape to match the future contracts defined in the ``self._future_contracts`` list.""" 

1295 self.__validate_contracts_frequency() 

1296 

1297 base_contracts, peak_contracts = [ 

1298 [fc for fc in self._future_contracts if fc.schedule.__class__._name == schedule_type] for schedule_type in (etgs.BASE, etgs.PEAK) 

1299 ] 

1300 

1301 # if base and peak contracts both exist, offpeak contracts are computed 

1302 if (len(base_contracts) > 0) and (len(peak_contracts) > 0): 

1303 shifted_pfc = [] 

1304 offpeak_contracts = self.__get_offpeak_contracts(base_contracts=base_contracts, peak_contracts=peak_contracts) 

1305 

1306 # shift offpeak and peak separately 

1307 for contracts in [offpeak_contracts, peak_contracts]: 

1308 shifting_datetimes = np.sort(np.unique(np.concatenate([contract.get_schedule() for contract in contracts]))) 

1309 _pfc_shape = self._pfc_shape.loc[shifting_datetimes, :] 

1310 pfc_shifter = PFCShifter(shape=_pfc_shape, contracts=contracts) 

1311 shifted_pfc.append(pfc_shifter.compute()) 

1312 

1313 # combine offpeak and peak shifts 

1314 shifted_pfc = pd.concat(shifted_pfc, axis=0) 

1315 self._pfc = shifted_pfc.sort_index(ascending=True) 

1316 

1317 else: 

1318 # if either base of peak exists, shifting can be directly performed 

1319 pfc_shifter = PFCShifter(shape=self._pfc_shape, contracts=self._future_contracts) 

1320 self._pfc = pfc_shifter.compute() 

1321 

1322 def _create_shape(self): 

1323 """Creates a shape using the ``self._pfc_shaper`` model""" 

1324 self._pfc_shaper.calibrate() 

1325 self._pfc_shape = self._pfc_shaper.apply(self._apply_schedule.get_schedule()) 

1326 

1327 def get_pfc(self) -> pd.DataFrame: 

1328 """Returns the PFC 

1329 

1330 Returns: 

1331 pd.DataFrame: PFC 

1332 """ 

1333 return self._pfc