Source code for rivapy.marketdata

# from pyvacon.marketdata.analytics_classes import *  # TODO: Clarify why this is necessary for imports in pricing_data.
# from pyvacon.marketdata import analytics_classes
# __all__ = ['analytics_classes', 'bootstrapping']
import abc
import numpy as np

# from pyvacon.pyvacon_swig import EquityOptionQuoteTable
from rivapy import enums
from typing import List, Union, Tuple
from rivapy import _pyvacon_available
from scipy.optimize import least_squares
from rivapy.marketdata.curves import *
from rivapy.marketdata.factory import _factory

if _pyvacon_available:
    import pyvacon.finance.marketdata as _mkt_data

    InflationIndexForwardCurve = _mkt_data.InflationIndexForwardCurve
    SurvivalCurve = _mkt_data.SurvivalCurve
    DatedCurve = _mkt_data.DatedCurve
    EquityOptionQuoteTable = _mkt_data.EquityOptionQuoteTable
    import pyvacon.finance.marketdata as _mkt_data
    import pyvacon.finance.utils as _utils
    import pyvacon.finance.pricing as _pricing

    # DividendTable = _mkt_data.DividendTable
else:

    class SurvivalCurve:
        def __init__(self):
            raise Exception("Up to now only implemented in pyvacon that has not been installed.")


[docs] class DividendTable: def __init__( self, id: str, refdate: datetime, ex_dates: List[datetime], pay_dates: List[datetime], div_yield: List[float], div_cash: List[float], tax_factors: List[float], ): """[summary] Args: id (str): [description] refdate (datetime): [description] ex_dates (List[datetime]): [description] pay_dates (List[datetime]): [description] div_yield (List[float]): [description] div_cash (List[float]): [description] tax_factors (List[float]): [description] Yields: [type]: [description] """ self.id = id self.refdate = refdate self.ex_dates = ex_dates self.pay_dates = pay_dates self.div_yield = div_yield self.div_cash = div_cash self.tax_factors = tax_factors self._pyvacon_obj = None def _get_pyvacon_obj(self): if self._pyvacon_obj is None: self._pyvacon_obj = _mkt_data.DividendTable( self.id, self.refdate, self.ex_dates, self.div_yield, self.div_cash, self.tax_factors, self.pay_dates ) return self._pyvacon_obj
class _VolatilityParametrizationExpiry: def __init__(self, expiries: List[float], params_at_expiry: List[Tuple]): self.n_params = len(params_at_expiry[0]) self.expiries = np.array(expiries) self._x = self._get_x(params_at_expiry) def get_params_at_expiry(self, expiry: int) -> np.array: """Get parameters for given expiry. Args: expiry (int): Position in expiry list. Returns: np.array: Parameter Tuple for given expiry. """ return self._x[self.n_params * expiry : self.n_params * (expiry + 1)] def calc_implied_vol(self, ttm, strike): """Calculate implied volatility for given expiry and strike Args: ttm ([float]): Expiry. strike ([float]): Strike. Returns: [float]: Implied volatility. """ i = np.searchsorted(self.expiries, ttm) if i == 0 or i == self.expiries.shape[0]: if i == self.expiries.shape[0]: i -= 1 return np.sqrt(self._calc_implied_vol_at_expiry(self.get_params_at_expiry(i), ttm, strike)) w0 = self._calc_implied_vol_at_expiry(self.get_params_at_expiry(i - 1), self.expiries[i - 1], strike) w1 = self._calc_implied_vol_at_expiry(self.get_params_at_expiry(i), self.expiries[i], strike) # linear n total variance delta_t = self.expiries[i] - self.expiries[i - 1] w = ((self.expiries[i] - ttm) * w0 + (ttm - self.expiries[i - 1]) * w1) / delta_t return np.sqrt(w / ttm) @abc.abstractmethod def _calc_implied_vol_at_expiry(self, params, ttm: float, strike: float): pass def _get_x(self, params) -> np.array: x = np.empty(len(params) * self.n_params) j = 0 for i in range(len(params)): for k in range(self.n_params): x[j] = params[i][k] j += 1 return x def _set_param(self, x) -> np.array: self._x = x def calibrate_params(self, quotes: pd.DataFrame, **kwargs): """Calibrate parameters to given implied volatility quotes. Args: quotes (pd.DataFrame): pd.DataFrame with columns EXPIRY as year fraction, STRIKE asm moneyness, BID_IV, ASK_IV. """ def cost_function(x): self._set_param(x) quotes["VOLS"] = [self.calc_implied_vol(expiry, strike) for expiry, strike in zip(quotes["EXPIRY"], quotes["STRIKE"])] quotes["DIST_ASK"] = [max(vol - ask, 0) for ask, vol in zip(quotes["ASK_IV"], quotes["VOLS"])] quotes["DIST_BID"] = [max(bid - vol, 0) for bid, vol in zip(quotes["BID_IV"], quotes["VOLS"])] quotes["DIST_TOTAL"] = quotes["DIST_ASK"] + quotes["DIST_BID"] return np.copy(quotes["DIST_TOTAL"].values) if kwargs is None: kwargs = {"method": "lm"} result = least_squares(cost_function, self._x, **kwargs) return result.x
[docs] class VolatilityParametrizationFlat: def __init__(self, vol: float): """Flat volatility parametrization Args: vol (float): Constant volatility. """ self.vol = vol self._pyvacon_obj = None def _get_pyvacon_obj(self): if self._pyvacon_obj is None: self._pyvacon_obj = _mkt_data.VolatilityParametrizationFlat(self.vol) return self._pyvacon_obj
[docs] class VolatilityParametrizationTerm: def __init__(self, expiries: List[float], fwd_atm_vols: List[float]): """Term volatility parametrization Args: expiries (List[float]): List of expiries (sorted from nearest to farest). fwd_atm_vols (List[float]): List of at-the-money volatilities. """ self.expiries = expiries self.fwd_atm_vols = fwd_atm_vols self._pyvacon_obj = None def _get_pyvacon_obj(self): if self._pyvacon_obj is None: self._pyvacon_obj = _mkt_data.VolatilityParametrizationTerm(self.expiries, self.fwd_atm_vols) return self._pyvacon_obj
[docs] class VolatilityParametrizationSVI(_VolatilityParametrizationExpiry): def __init__(self, expiries: List[float], svi_params: List[Tuple]): """Raw SVI parametrization (definition 3.1 in https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2033323) .. math:: w(k) = a + b(\\rho (k-m) + \\sqrt{(k-m)^2+\\sigma^2 }) Args: expiries (List[float]): List of expiries (sorted from nearest to farest). svi_params (List[Tuple]): List of SVI parameters (one Tuple for each expiry). Tuple in the order (a, b, rho, m, sigma). """ super().__init__(expiries, svi_params) def _calc_implied_vol_at_expiry(self, params: List[float], ttm: float, k: float): return params[0] + params[1] * (params[2] * (np.log(k) - params[3]) + np.sqrt((np.log(k) - params[3]) ** 2 + params[4] ** 2))
[docs] class VolatilityParametrizationSSVI: def __init__(self, expiries: List[float], fwd_atm_vols: List[float], rho: float, eta: float, gamma: float): """SSVI volatility parametrization https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2033323 Args: expiries (List[float]): List of expiries (sorted from nearest to farest). fwd_atm_vols (List[float]): List of at-the-money volatilities. rho (float): Responsible for the skewness of the volatility surface. eta (float): Responsible for the curvature. gamma (float): Responsible for the "rate of decay". """ self.expiries = expiries # self.fwd_atm_vols = fwd_atm_vols # self.rho = rho # self.eta = eta # self.gamma = gamma self._pyvacon_obj = None self._x = self._get_x(fwd_atm_vols, rho, eta, gamma) self.n_fwd_atm_vols = len(fwd_atm_vols)
[docs] def calc_implied_vol(self, ttm, strike): """Calculate implied volatility for given expiry and strike Args: ttm ([float]): Expiry. strike ([float]): Strike. Returns: [float]: Implied volatility. """ return self._get_pyvacon_obj().calcImpliedVol(ttm, strike)
def _get_pyvacon_obj(self): if self._pyvacon_obj is None: # self._pyvacon_obj = _mkt_data.VolatilityParametrizationSSVI(self.expiries, self.fwd_atm_vols, self.rho, self.eta, self.gamma) self._pyvacon_obj = _mkt_data.VolatilityParametrizationSSVI( self.expiries, self._x[: self.n_fwd_atm_vols], self._x[-3], self._x[-2], self._x[-1] ) return self._pyvacon_obj def _get_x(self, fwd_atm_vols, rho, eta, gamma) -> np.array: x = np.empty(len(fwd_atm_vols) + 3) j = 0 for i in range(len(fwd_atm_vols)): x[i] = fwd_atm_vols[i] x[i + 1] = rho x[i + 2] = eta x[i + 3] = gamma return x def _set_param(self, x) -> np.array: self._x = x self._pyvacon_obj = None
[docs] def get_rho(self): return self._x[-3]
[docs] def get_eta(self): return self._x[-2]
[docs] def get_gamma(self): return self._x[-1]
[docs] def get_fwd_atm_vols(self): return self._x[: self.n_fwd_atm_vols]
[docs] class VolatilityParametrizationSABR(_VolatilityParametrizationExpiry): def __init__(self, expiries: List[float], sabr_params: List[Tuple]): """SABR parametrization https://bsic.it/sabr-stochastic-volatility-model-volatility-smile/ The SABR model assumes that the forward rate and the instantaneous volatility are driven by two correlated Brownian motions: .. math:: df_t = \\alpha_t f_t^\\beta dW_t^1 .. math:: d\\alpha_t = \\nu\\alpha_t dW_t^2 .. math:: E\\bigl[dW_t^1 dW_T^2\\bigr] = \\rho dt The expression that the implied volatility must satisfy is .. math:: \\sigma_B(K,f) = \\frac{\\alpha\\biggl\{1+\\biggl[\\frac {(1-\\beta)^2}{24}\\frac {\\alpha^2}{(fK)^{1-\\beta}}+\\frac {1}{4}\\frac {\\rho\\beta\\nu\\alpha}{(FK)^{(1-\\beta)/2}}+\\frac {2-3\\rho^2}{24}\\nu^2\\biggr ]T\\biggr \\}}{(fK)^{(1-\\beta)/2}\\biggl[1+\\frac {(1-\\beta)^2}{24}{ln}^2\\frac {f}{K}+\\frac {(1-\\beta)^4}{1920}{ln}^4\\frac {f}{K}\\biggr]} \\frac {z}{\\chi(z)} .. math:: z = \\frac {\\nu }{\\alpha }(fK)^{(1-\\beta )/2} ln \\frac {f}{K} .. math:: \\chi(z) = ln \\bigl[ \\frac {\\sqrt{1-2 \\rho z+z^2}+z-\\rho }{1- \\rho} \\bigr] When :math:`f = K` (for ATM options), the above formula for implied volatility simplifies to: .. math:: \\sigma_{ATM} = \\sigma_B(f,f)=\\frac{\\alpha\\biggl\{1+\\biggl[\\frac{(1-\\beta)^2}{24}\\frac{\\alpha^2}{f^{2-2\\beta}}+\\frac{1}{4}\\frac{\\rho\\beta\\nu\\alpha}{f^{1-\\beta}}\\frac{2-3\\rho^2}{24}\\nu^2\\biggr]T\\biggr\}}{f^{1-\\beta}} where > :math:`\\alpha` is the instantaneous vol; > :math:`\\nu` is the vol of vol; > :math:`\\rho` is the correlation between the Brownian motions driving the forward rate and the instantaneous vol; > :math:`\\beta` is the CEV component for forward rate (determines shape of forward rates, leverage effect and backbond of ATM vol). Args: expiries (List[float]): List of expiries (sorted from nearest to farest). sabr_params (List[Tuple]): List of SABR parameters (one Tuple for each expiry). Tuple in the order (alpha, nu, beta, rho). """ super().__init__(expiries, sabr_params) def _calc_implied_vol_at_expiry(self, params: List[float], ttm: float, strike: float): K = strike alpha = params[0] nu = params[1] beta = params[2] rho = params[3] f = 1 zeta = nu / alpha * (f * K) ** ((1 - beta) / 2) * np.log(f / K) chi_zeta = np.log((np.sqrt(1 - 2 * rho * zeta + zeta**2) + zeta - rho) / (1 - rho)) if f == K: sigma = ( alpha * ( 1 + ( (1 - beta) ** 2 / 24 * alpha**2 / f ** (2 - 2 * beta) + 1 / 4 * rho * beta * nu * alpha / f ** (1 - beta) + (2 - 3 * rho**2) / 24 * nu**2 ) * ttm ) / f ** (1 - beta) ) else: sigma = ( alpha * ( 1 + ( (1 - beta) ** 2 / 24 * alpha**2 / (f * K) ** (1 - beta) + 1 / 4 * rho * beta * nu * alpha / (f * K) ** ((1 - beta) / 2) + (2 - 3 * rho**2) / 24 * nu**2 ) * ttm ) / (f * K) ** ((1 - beta) / 2) * (1 + (1 - beta) ** 2 / 24 * np.log(f / K) ** 2 + (1 - beta) ** 4 / 1920 * np.log(f / K) ** 4) * zeta / chi_zeta ) return sigma**2
[docs] class VolatilityGridParametrization: def __init__(self, expiries: np.array, strikes: np.ndarray, vols: np.ndarray): """Grid parametrization This parametrization stores a set of strike-vol grids for a given list of expiries and computes a volatility by - search for the neighboring expiries - apply a splien interpolation in each expiry to get the respective volatility - apply a linear interpolation (in total variance) Args: expiries (np.array): An array of the expiries. strikes (np.ndarray): vols (np.ndarray): Two dimensional array of volatilities where each row i contains the values for expiry i """ self.expiries = expiries if len(strikes.shape) == 1: strikes = [strikes] * expiries.shape[0] self.strikes = strikes self.vols = vols self._pyvacon_obj = None
[docs] def calc_implied_vol(self, ttm: float, strike: float): """Calculate implied volatility for given expiry and strike Args: ttm ([float]): Expiry. strike ([float]): Strike. Returns: [float]: Implied volatility. """ return self._get_pyvacon_obj().calcImpliedVol(ttm, strike)
def _get_pyvacon_obj(self): if self._pyvacon_obj is None: vol_params = [] self._pyvacon_obj = _mkt_data.VolatilityParametrizationTimeSlice(self.expiries, self.strikes, self.vols) return self._pyvacon_obj
[docs] class VolatilitySurface:
[docs] @staticmethod def load(filename: str): return _mkt_data.VolatilitySurface.load(filename)
@staticmethod def _create_param_pyvacon_obj(vol_param): if hasattr(vol_param, "_get_pyvacon_obj"): return vol_param._get_pyvacon_obj() if hasattr(vol_param, "expiries"): expiries = vol_param.expiries else: expiries = np.linspace(0.0, 4.0, 13, endpoint=True) strikes = np.linspace(0.4, 1.6, num=100) vols = np.empty((expiries.size, strikes.size)) for i in range(expiries.size): for j in range(strikes.size): vols[i, j] = vol_param.calc_implied_vol(expiries[i], strikes[j]) return VolatilityGridParametrization(expiries, strikes, vols)._get_pyvacon_obj() def __init__(self, id: str, refdate: datetime, forward_curve, daycounter, vol_param): """Volatility surface Args: id (str): Identifier (name) of the volatility surface. refdate (datetime): Valuation date. forward_curve (rivapy.market_data.EquityForwardCurve): Forward curve. daycounter (enums.DayCounterType): [description] vol_param ([VolatilityParametrizationFlat,VolatilityParametrizationTerm,VolatilityParametrizationSSVI]): Volatility parametrization. """ self.id = id self.refdate = refdate self.forward_curve = forward_curve self.daycounter = daycounter self.vol_param = vol_param self._pyvacon_obj = None def _get_pyvacon_obj(self, fwd_curve=None): if self._pyvacon_obj is None: if fwd_curve is None: fwd_curve = self.forward_curve try: _py_fwd_curve = fwd_curve._get_pyvacon_obj() except: _py_fwd_curve = fwd_curve self._pyvacon_obj = _mkt_data.VolatilitySurface( self.id, self.refdate, _py_fwd_curve, self.daycounter.name, VolatilitySurface._create_param_pyvacon_obj(self.vol_param) ) return self._pyvacon_obj
[docs] def calc_implied_vol(self, expiry: datetime, strike: float, refdate: datetime = None, forward_curve=None) -> float: """Calculate implied volatility Args: refdate (datetime): Valuation date. expiry (datetime): Expiration date. strike (float): Strike price. Raises: Exception: [description] Returns: float: Implied volatility. """ # convert strike into x_strike if refdate is None: refdate = self.forward_curve.refdate if forward_curve is None and self.forward_curve is None: raise Exception("Please specify a forward curve") vol = self._get_pyvacon_obj() if forward_curve is None: forward_curve = self.forward_curve elif self.forward_curve is not None: vol = _mkt_data.VolatilitySurface.createVolatilitySurfaceShiftedFwd(vol, forward_curve._get_pyvacon_obj()) forward_curve_obj = forward_curve._get_pyvacon_obj() x_strike = _utils.computeXStrike( strike, forward_curve_obj.value(refdate, expiry), forward_curve_obj.discountedFutureCashDivs(refdate, expiry) ) if x_strike < 0: raise Exception( f"The given strike value seems implausible compared to the discounted future cash dividends\ ({forward_curve_obj.discountedFutureCashDivs(refdate, expiry)})." ) return vol.calcImpliedVol(refdate, expiry, x_strike)
[docs] @staticmethod def set_stickyness(vol_stickyness: enums.VolatilityStickyness): if vol_stickyness is enums.VolatilityStickyness.StickyXStrike: _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(_pricing.VolatilitySurfaceFwdStickyness.Type.StickyXStrike) elif vol_stickyness is enums.VolatilityStickyness.StickyStrike: _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(vol_stickyness) elif vol_stickyness is enums.VolatilityStickyness.StickyFwdMoneyness: _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(_pricing.VolatilitySurfaceFwdStickyness.Type.StickyFwdMoneyness) elif vol_stickyness is enums.VolatilityStickyness.NONE: _pricing.GlobalSettings.setVolatilitySurfaceFwdStickyness(_pricing.VolatilitySurfaceFwdStickyness.Type.NONE) else: raise Exception("Error")
def _add_to_factory(cls): factory_entries = _factory() factory_entries[cls.__name__] = cls _add_to_factory(NelsonSiegel) _add_to_factory(NelsonSiegelSvensson) _add_to_factory(ConstantRate) _add_to_factory(LinearRate) _add_to_factory(DiscountCurveParametrized) _add_to_factory(DiscountCurveComposition) if __name__ == "__main__": svi = VolatilityParametrizationSVI( expiries=np.array([1.0 / 365.0, 1.0]), svi_params=[ (0.0001, 0.1, -0.5, 0.0, 0.0001), (0.2, 0.1, -0.5, 0.0, 0.4), ], ) expiry = 1.0 / 365.0 x_strike = 1.0 svi.calc_implied_vol(expiry, x_strike)