Coverage for rivapy/marketdata/__init__.py: 47%
236 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
1# from pyvacon.marketdata.analytics_classes import * # TODO: Clarify why this is necessary for imports in pricing_data.
2# from pyvacon.marketdata import analytics_classes
3# __all__ = ['analytics_classes', 'bootstrapping']
4import abc
5import numpy as np
7# from pyvacon.pyvacon_swig import EquityOptionQuoteTable
8from rivapy import enums
9from typing import List, Union, Tuple
10from rivapy import _pyvacon_available
11from scipy.optimize import least_squares
12from rivapy.marketdata.curves import *
13from rivapy.marketdata.factory import _factory
15if _pyvacon_available:
16 import pyvacon.finance.marketdata as _mkt_data
18 InflationIndexForwardCurve = _mkt_data.InflationIndexForwardCurve
19 SurvivalCurve = _mkt_data.SurvivalCurve
20 DatedCurve = _mkt_data.DatedCurve
21 EquityOptionQuoteTable = _mkt_data.EquityOptionQuoteTable
22 import pyvacon.finance.marketdata as _mkt_data
23 import pyvacon.finance.utils as _utils
24 import pyvacon.finance.pricing as _pricing
26 # DividendTable = _mkt_data.DividendTable
27else:
29 class SurvivalCurve:
30 def __init__(self):
31 raise Exception("Up to now only implemented in pyvacon that has not been installed.")
34class DividendTable:
35 def __init__(
36 self,
37 id: str,
38 refdate: datetime,
39 ex_dates: List[datetime],
40 pay_dates: List[datetime],
41 div_yield: List[float],
42 div_cash: List[float],
43 tax_factors: List[float],
44 ):
45 """[summary]
47 Args:
48 id (str): [description]
49 refdate (datetime): [description]
50 ex_dates (List[datetime]): [description]
51 pay_dates (List[datetime]): [description]
52 div_yield (List[float]): [description]
53 div_cash (List[float]): [description]
54 tax_factors (List[float]): [description]
56 Yields:
57 [type]: [description]
58 """
59 self.id = id
60 self.refdate = refdate
61 self.ex_dates = ex_dates
62 self.pay_dates = pay_dates
63 self.div_yield = div_yield
64 self.div_cash = div_cash
65 self.tax_factors = tax_factors
66 self._pyvacon_obj = None
68 def _get_pyvacon_obj(self):
69 if self._pyvacon_obj is None:
70 self._pyvacon_obj = _mkt_data.DividendTable(
71 self.id, self.refdate, self.ex_dates, self.div_yield, self.div_cash, self.tax_factors, self.pay_dates
72 )
73 return self._pyvacon_obj
76class _VolatilityParametrizationExpiry:
77 def __init__(self, expiries: List[float], params_at_expiry: List[Tuple]):
78 self.n_params = len(params_at_expiry[0])
79 self.expiries = np.array(expiries)
80 self._x = self._get_x(params_at_expiry)
82 def get_params_at_expiry(self, expiry: int) -> np.array:
83 """Get parameters for given expiry.
85 Args:
86 expiry (int): Position in expiry list.
88 Returns:
89 np.array: Parameter Tuple for given expiry.
90 """
91 return self._x[self.n_params * expiry : self.n_params * (expiry + 1)]
93 def calc_implied_vol(self, ttm, strike):
94 """Calculate implied volatility for given expiry and strike
96 Args:
97 ttm ([float]): Expiry.
98 strike ([float]): Strike.
100 Returns:
101 [float]: Implied volatility.
102 """
103 i = np.searchsorted(self.expiries, ttm)
104 if i == 0 or i == self.expiries.shape[0]:
105 if i == self.expiries.shape[0]:
106 i -= 1
107 return np.sqrt(self._calc_implied_vol_at_expiry(self.get_params_at_expiry(i), ttm, strike))
108 w0 = self._calc_implied_vol_at_expiry(self.get_params_at_expiry(i - 1), self.expiries[i - 1], strike)
109 w1 = self._calc_implied_vol_at_expiry(self.get_params_at_expiry(i), self.expiries[i], strike)
110 # linear n total variance
111 delta_t = self.expiries[i] - self.expiries[i - 1]
112 w = ((self.expiries[i] - ttm) * w0 + (ttm - self.expiries[i - 1]) * w1) / delta_t
113 return np.sqrt(w / ttm)
115 @abc.abstractmethod
116 def _calc_implied_vol_at_expiry(self, params, ttm: float, strike: float):
117 pass
119 def _get_x(self, params) -> np.array:
120 x = np.empty(len(params) * self.n_params)
121 j = 0
122 for i in range(len(params)):
123 for k in range(self.n_params):
124 x[j] = params[i][k]
125 j += 1
126 return x
128 def _set_param(self, x) -> np.array:
129 self._x = x
131 def calibrate_params(self, quotes: pd.DataFrame, **kwargs):
132 """Calibrate parameters to given implied volatility quotes.
134 Args:
135 quotes (pd.DataFrame): pd.DataFrame with columns EXPIRY as year fraction, STRIKE asm moneyness, BID_IV, ASK_IV.
136 """
138 def cost_function(x):
139 self._set_param(x)
140 quotes["VOLS"] = [self.calc_implied_vol(expiry, strike) for expiry, strike in zip(quotes["EXPIRY"], quotes["STRIKE"])]
141 quotes["DIST_ASK"] = [max(vol - ask, 0) for ask, vol in zip(quotes["ASK_IV"], quotes["VOLS"])]
142 quotes["DIST_BID"] = [max(bid - vol, 0) for bid, vol in zip(quotes["BID_IV"], quotes["VOLS"])]
143 quotes["DIST_TOTAL"] = quotes["DIST_ASK"] + quotes["DIST_BID"]
144 return np.copy(quotes["DIST_TOTAL"].values)
146 if kwargs is None:
147 kwargs = {"method": "lm"}
148 result = least_squares(cost_function, self._x, **kwargs)
150 return result.x
153class VolatilityParametrizationFlat:
154 def __init__(self, vol: float):
155 """Flat volatility parametrization
157 Args:
158 vol (float): Constant volatility.
159 """
160 self.vol = vol
161 self._pyvacon_obj = None
163 def _get_pyvacon_obj(self):
164 if self._pyvacon_obj is None:
165 self._pyvacon_obj = _mkt_data.VolatilityParametrizationFlat(self.vol)
166 return self._pyvacon_obj
169class VolatilityParametrizationTerm:
170 def __init__(self, expiries: List[float], fwd_atm_vols: List[float]):
171 """Term volatility parametrization
173 Args:
174 expiries (List[float]): List of expiries (sorted from nearest to farest).
175 fwd_atm_vols (List[float]): List of at-the-money volatilities.
176 """
177 self.expiries = expiries
178 self.fwd_atm_vols = fwd_atm_vols
179 self._pyvacon_obj = None
181 def _get_pyvacon_obj(self):
182 if self._pyvacon_obj is None:
183 self._pyvacon_obj = _mkt_data.VolatilityParametrizationTerm(self.expiries, self.fwd_atm_vols)
184 return self._pyvacon_obj
187class VolatilityParametrizationSVI(_VolatilityParametrizationExpiry):
188 def __init__(self, expiries: List[float], svi_params: List[Tuple]):
189 """Raw SVI parametrization (definition 3.1 in https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2033323)
191 .. math::
192 w(k) = a + b(\\rho (k-m) + \\sqrt{(k-m)^2+\\sigma^2 })
194 Args:
195 expiries (List[float]): List of expiries (sorted from nearest to farest).
196 svi_params (List[Tuple]): List of SVI parameters (one Tuple for each expiry). Tuple in the order (a, b, rho, m, sigma).
198 """
199 super().__init__(expiries, svi_params)
201 def _calc_implied_vol_at_expiry(self, params: List[float], ttm: float, k: float):
202 return params[0] + params[1] * (params[2] * (np.log(k) - params[3]) + np.sqrt((np.log(k) - params[3]) ** 2 + params[4] ** 2))
205class VolatilityParametrizationSSVI:
206 def __init__(self, expiries: List[float], fwd_atm_vols: List[float], rho: float, eta: float, gamma: float):
207 """SSVI volatility parametrization
208 https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2033323
210 Args:
211 expiries (List[float]): List of expiries (sorted from nearest to farest).
212 fwd_atm_vols (List[float]): List of at-the-money volatilities.
213 rho (float): Responsible for the skewness of the volatility surface.
214 eta (float): Responsible for the curvature.
215 gamma (float): Responsible for the "rate of decay".
217 """
218 self.expiries = expiries
219 # self.fwd_atm_vols = fwd_atm_vols
220 # self.rho = rho
221 # self.eta = eta
222 # self.gamma = gamma
223 self._pyvacon_obj = None
225 self._x = self._get_x(fwd_atm_vols, rho, eta, gamma)
226 self.n_fwd_atm_vols = len(fwd_atm_vols)
228 def calc_implied_vol(self, ttm, strike):
229 """Calculate implied volatility for given expiry and strike
231 Args:
232 ttm ([float]): Expiry.
233 strike ([float]): Strike.
235 Returns:
236 [float]: Implied volatility.
237 """
238 return self._get_pyvacon_obj().calcImpliedVol(ttm, strike)
240 def _get_pyvacon_obj(self):
241 if self._pyvacon_obj is None:
242 # self._pyvacon_obj = _mkt_data.VolatilityParametrizationSSVI(self.expiries, self.fwd_atm_vols, self.rho, self.eta, self.gamma)
243 self._pyvacon_obj = _mkt_data.VolatilityParametrizationSSVI(
244 self.expiries, self._x[: self.n_fwd_atm_vols], self._x[-3], self._x[-2], self._x[-1]
245 )
246 return self._pyvacon_obj
248 def _get_x(self, fwd_atm_vols, rho, eta, gamma) -> np.array:
249 x = np.empty(len(fwd_atm_vols) + 3)
250 j = 0
251 for i in range(len(fwd_atm_vols)):
252 x[i] = fwd_atm_vols[i]
253 x[i + 1] = rho
254 x[i + 2] = eta
255 x[i + 3] = gamma
257 return x
259 def _set_param(self, x) -> np.array:
260 self._x = x
261 self._pyvacon_obj = None
263 def get_rho(self):
264 return self._x[-3]
266 def get_eta(self):
267 return self._x[-2]
269 def get_gamma(self):
270 return self._x[-1]
272 def get_fwd_atm_vols(self):
273 return self._x[: self.n_fwd_atm_vols]
276class VolatilityParametrizationSABR(_VolatilityParametrizationExpiry):
277 def __init__(self, expiries: List[float], sabr_params: List[Tuple]):
278 """SABR parametrization
279 https://bsic.it/sabr-stochastic-volatility-model-volatility-smile/
281 The SABR model assumes that the forward rate and the instantaneous volatility are driven by two correlated Brownian motions:
283 .. math::
284 df_t = \\alpha_t f_t^\\beta dW_t^1
285 .. math::
286 d\\alpha_t = \\nu\\alpha_t dW_t^2
287 .. math::
288 E\\bigl[dW_t^1 dW_T^2\\bigr] = \\rho dt
290 The expression that the implied volatility must satisfy is
292 .. math::
293 \\sigma_B(K,f) = \\frac{\\alpha\\biggl\{1+\\biggl[\\frac {(1-\\beta)^2}{24}\\frac {\\alpha^2}{(fK)^{1-\\beta}}+\\frac {1}{4}\\frac {\\rho\\beta\\nu\\alpha}{(FK)^{(1-\\beta)/2}}+\\frac {2-3\\rho^2}{24}\\nu^2\\biggr ]T\\biggr \\}}{(fK)^{(1-\\beta)/2}\\biggl[1+\\frac {(1-\\beta)^2}{24}{ln}^2\\frac {f}{K}+\\frac {(1-\\beta)^4}{1920}{ln}^4\\frac {f}{K}\\biggr]} \\frac {z}{\\chi(z)}
295 .. math::
296 z = \\frac {\\nu }{\\alpha }(fK)^{(1-\\beta )/2} ln \\frac {f}{K}
298 .. math::
299 \\chi(z) = ln \\bigl[ \\frac {\\sqrt{1-2 \\rho z+z^2}+z-\\rho }{1- \\rho} \\bigr]
301 When :math:`f = K` (for ATM options), the above formula for implied volatility simplifies to:
303 .. math::
304 \\sigma_{ATM} = \\sigma_B(f,f)=\\frac{\\alpha\\biggl\{1+\\biggl[\\frac{(1-\\beta)^2}{24}\\frac{\\alpha^2}{f^{2-2\\beta}}+\\frac{1}{4}\\frac{\\rho\\beta\\nu\\alpha}{f^{1-\\beta}}\\frac{2-3\\rho^2}{24}\\nu^2\\biggr]T\\biggr\}}{f^{1-\\beta}}
306 where
308 > :math:`\\alpha` is the instantaneous vol;
310 > :math:`\\nu` is the vol of vol;
312 > :math:`\\rho` is the correlation between the Brownian motions driving the forward rate and the instantaneous vol;
314 > :math:`\\beta` is the CEV component for forward rate (determines shape of forward rates, leverage effect and backbond of ATM vol).
316 Args:
317 expiries (List[float]): List of expiries (sorted from nearest to farest).
318 sabr_params (List[Tuple]): List of SABR parameters (one Tuple for each expiry). Tuple in the order (alpha, nu, beta, rho).
319 """
321 super().__init__(expiries, sabr_params)
323 def _calc_implied_vol_at_expiry(self, params: List[float], ttm: float, strike: float):
324 K = strike
325 alpha = params[0]
326 nu = params[1]
327 beta = params[2]
328 rho = params[3]
329 f = 1
331 zeta = nu / alpha * (f * K) ** ((1 - beta) / 2) * np.log(f / K)
332 chi_zeta = np.log((np.sqrt(1 - 2 * rho * zeta + zeta**2) + zeta - rho) / (1 - rho))
334 if f == K:
335 sigma = (
336 alpha
337 * (
338 1
339 + (
340 (1 - beta) ** 2 / 24 * alpha**2 / f ** (2 - 2 * beta)
341 + 1 / 4 * rho * beta * nu * alpha / f ** (1 - beta)
342 + (2 - 3 * rho**2) / 24 * nu**2
343 )
344 * ttm
345 )
346 / f ** (1 - beta)
347 )
349 else:
350 sigma = (
351 alpha
352 * (
353 1
354 + (
355 (1 - beta) ** 2 / 24 * alpha**2 / (f * K) ** (1 - beta)
356 + 1 / 4 * rho * beta * nu * alpha / (f * K) ** ((1 - beta) / 2)
357 + (2 - 3 * rho**2) / 24 * nu**2
358 )
359 * ttm
360 )
361 / (f * K) ** ((1 - beta) / 2)
362 * (1 + (1 - beta) ** 2 / 24 * np.log(f / K) ** 2 + (1 - beta) ** 4 / 1920 * np.log(f / K) ** 4)
363 * zeta
364 / chi_zeta
365 )
367 return sigma**2
370class VolatilityGridParametrization:
371 def __init__(self, expiries: np.array, strikes: np.ndarray, vols: np.ndarray):
372 """Grid parametrization
373 This parametrization stores a set of strike-vol grids for a given list of expiries and computes a volatility by
374 - search for the neighboring expiries
375 - apply a splien interpolation in each expiry to get the respective volatility
376 - apply a linear interpolation (in total variance)
378 Args:
379 expiries (np.array): An array of the expiries.
380 strikes (np.ndarray):
381 vols (np.ndarray): Two dimensional array of volatilities where each row i contains the values for expiry i
382 """
383 self.expiries = expiries
384 if len(strikes.shape) == 1:
385 strikes = [strikes] * expiries.shape[0]
386 self.strikes = strikes
387 self.vols = vols
388 self._pyvacon_obj = None
390 def calc_implied_vol(self, ttm: float, strike: float):
391 """Calculate implied volatility for given expiry and strike
393 Args:
394 ttm ([float]): Expiry.
395 strike ([float]): Strike.
397 Returns:
398 [float]: Implied volatility.
399 """
400 return self._get_pyvacon_obj().calcImpliedVol(ttm, strike)
402 def _get_pyvacon_obj(self):
403 if self._pyvacon_obj is None:
404 vol_params = []
405 self._pyvacon_obj = _mkt_data.VolatilityParametrizationTimeSlice(self.expiries, self.strikes, self.vols)
406 return self._pyvacon_obj
409class VolatilitySurface:
410 @staticmethod
411 def load(filename: str):
412 return _mkt_data.VolatilitySurface.load(filename)
414 @staticmethod
415 def _create_param_pyvacon_obj(vol_param):
416 if hasattr(vol_param, "_get_pyvacon_obj"):
417 return vol_param._get_pyvacon_obj()
418 if hasattr(vol_param, "expiries"):
419 expiries = vol_param.expiries
420 else:
421 expiries = np.linspace(0.0, 4.0, 13, endpoint=True)
422 strikes = np.linspace(0.4, 1.6, num=100)
423 vols = np.empty((expiries.size, strikes.size))
424 for i in range(expiries.size):
425 for j in range(strikes.size):
426 vols[i, j] = vol_param.calc_implied_vol(expiries[i], strikes[j])
427 return VolatilityGridParametrization(expiries, strikes, vols)._get_pyvacon_obj()
429 def __init__(self, id: str, refdate: datetime, forward_curve, daycounter, vol_param):
430 """Volatility surface
432 Args:
433 id (str): Identifier (name) of the volatility surface.
434 refdate (datetime): Valuation date.
435 forward_curve (rivapy.market_data.EquityForwardCurve): Forward curve.
436 daycounter (enums.DayCounterType): [description]
437 vol_param ([VolatilityParametrizationFlat,VolatilityParametrizationTerm,VolatilityParametrizationSSVI]): Volatility parametrization.
438 """
439 self.id = id
440 self.refdate = refdate
441 self.forward_curve = forward_curve
442 self.daycounter = daycounter
443 self.vol_param = vol_param
444 self._pyvacon_obj = None
446 def _get_pyvacon_obj(self, fwd_curve=None):
447 if self._pyvacon_obj is None:
448 if fwd_curve is None:
449 fwd_curve = self.forward_curve
450 try:
451 _py_fwd_curve = fwd_curve._get_pyvacon_obj()
452 except:
453 _py_fwd_curve = fwd_curve
454 self._pyvacon_obj = _mkt_data.VolatilitySurface(
455 self.id, self.refdate, _py_fwd_curve, self.daycounter.name, VolatilitySurface._create_param_pyvacon_obj(self.vol_param)
456 )
457 return self._pyvacon_obj
459 def calc_implied_vol(self, expiry: datetime, strike: float, refdate: datetime = None, forward_curve=None) -> float:
460 """Calculate implied volatility
462 Args:
463 refdate (datetime): Valuation date.
464 expiry (datetime): Expiration date.
465 strike (float): Strike price.
467 Raises:
468 Exception: [description]
470 Returns:
471 float: Implied volatility.
472 """
473 # convert strike into x_strike
474 if refdate is None:
475 refdate = self.forward_curve.refdate
476 if forward_curve is None and self.forward_curve is None:
477 raise Exception("Please specify a forward curve")
478 vol = self._get_pyvacon_obj()
479 if forward_curve is None:
480 forward_curve = self.forward_curve
481 elif self.forward_curve is not None:
482 vol = _mkt_data.VolatilitySurface.createVolatilitySurfaceShiftedFwd(vol, forward_curve._get_pyvacon_obj())
483 forward_curve_obj = forward_curve._get_pyvacon_obj()
484 x_strike = _utils.computeXStrike(
485 strike, forward_curve_obj.value(refdate, expiry), forward_curve_obj.discountedFutureCashDivs(refdate, expiry)
486 )
487 if x_strike < 0:
488 raise Exception(
489 f"The given strike value seems implausible compared to the discounted future cash dividends\
490 ({forward_curve_obj.discountedFutureCashDivs(refdate, expiry)})."
491 )
492 return vol.calcImpliedVol(refdate, expiry, x_strike)
494 @staticmethod
495 def set_stickyness(vol_stickyness: enums.VolatilityStickyness):
496 if vol_stickyness is enums.VolatilityStickyness.StickyXStrike:
497 _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(_pricing.VolatilitySurfaceFwdStickyness.Type.StickyXStrike)
498 elif vol_stickyness is enums.VolatilityStickyness.StickyStrike:
499 _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(vol_stickyness)
500 elif vol_stickyness is enums.VolatilityStickyness.StickyFwdMoneyness:
501 _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(_pricing.VolatilitySurfaceFwdStickyness.Type.StickyFwdMoneyness)
502 elif vol_stickyness is enums.VolatilityStickyness.NONE:
503 _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(_pricing.VolatilitySurfaceFwdStickyness.Type.NONE)
504 else:
505 raise Exception("Error")
508def _add_to_factory(cls):
509 factory_entries = _factory()
510 factory_entries[cls.__name__] = cls
513_add_to_factory(NelsonSiegel)
514_add_to_factory(NelsonSiegelSvensson)
515_add_to_factory(ConstantRate)
516_add_to_factory(LinearRate)
517_add_to_factory(DiscountCurveParametrized)
518_add_to_factory(DiscountCurveComposition)
520if __name__ == "__main__":
521 svi = VolatilityParametrizationSVI(
522 expiries=np.array([1.0 / 365.0, 1.0]),
523 svi_params=[
524 (0.0001, 0.1, -0.5, 0.0, 0.0001),
525 (0.2, 0.1, -0.5, 0.0, 0.4),
526 ],
527 )
528 expiry = 1.0 / 365.0
529 x_strike = 1.0
530 svi.calc_implied_vol(expiry, x_strike)