Source code for rivapy.models.local_vol

import bisect
import numpy as np
from scipy import interpolate

def _interpolate_2D(time_grid, strikes, f, x, t):
    t_index = bisect.bisect_left(time_grid, t)
    if t_index == 0:# or t0_index == self._time_grid.shape[0]:
        result = f[0]
    elif t_index == time_grid.shape[0]-1:
        result = f[-1]
    else:
        dt = time_grid[t_index] - time_grid[t_index-1]
        w1 = (t-time_grid[t_index-1])/dt
        w2 = (time_grid[t_index] - t)/dt
        result = w1*f[t_index] + w2*f[t_index-1]
    return np.interp(x, strikes, result)

[docs] class LocalVol: def __init__(self, vol_param, x_strikes: np.array, time_grid: np.array, call_prices: np.ndarray=None, local_vol_grid: np.ndarray=None): """Local Volatility Class Args: vol_param: a grid or a parametrisation of the volatility x_strikes (np.array): strikes time_grid (np.array): time_grid call_param (np.ndarray, optional): A grid of call prices. Not compatible with vol_param. Defaults to None. """ if (vol_param is None) and (call_prices is None) and (local_vol_grid is None): raise Exception('Set vol_params, call_params or local_vol_grid!') if (vol_param is not None) and (call_prices is not None) and (local_vol_grid is not None): raise Exception('Set either vol_params or call_params or local_vol_grid, not all!') if (vol_param is not None) and (call_prices is not None): raise Exception('Set either vol_params or call_params, not both!') if (vol_param is not None) and (local_vol_grid is not None): raise Exception('Set either vol_params or local_vol_grid, not both!') if (local_vol_grid is not None) and (call_prices is not None): raise Exception('Set either local_vol_grid or call_params, not both!') self._x_strikes = x_strikes self._time_grid = time_grid if local_vol_grid is not None: self._local_variance = local_vol_grid**2 else: self._local_variance = LocalVol.compute_local_var(vol_param, x_strikes, time_grid, call_prices) self._variance = interpolate.RectBivariateSpline(time_grid, x_strikes, self._local_variance, bbox=[None, None, None, None], kx=1, ky=1, s=0) #interpolation.interp2d(time_grid, x_strikes, self._local_variance.T) @staticmethod def _compute_local_var_from_vol(vol_param, x_strikes: np.array, time_grid: np.array): # setup grids eps = 1e-8 log_x_strikes = np.log(x_strikes) if isinstance(vol_param, np.ndarray): iv = np.copy(vol_param) else: iv = np.empty(shape=(time_grid.shape[0], x_strikes.shape[0])) #implied variance grid for i in range(time_grid.shape[0]): for j in range(x_strikes.shape[0]): iv[i,j] = vol_param.calc_implied_vol(time_grid[i], x_strikes[j]) iv *= iv tiv = np.maximum((time_grid*iv.T).T, eps) h = log_x_strikes[1:] - log_x_strikes[:-1] hm = h[:-1] hp = h[1:] fd1a = -1.0 / (2 * hm) fd1c = 1.0 / (2 * hp) fd1b = -(fd1c + fd1a) fd2a = 2.0 / (hm*(hm + hp)) fd2c = 2.0 / (hp*(hm + hp)) fd2b = -(fd2a + fd2c) min_lv = 0.01 max_lv = 1.5 inv_dt = 1.0/(time_grid[1:]-time_grid[:-1]) dyw = fd1a*tiv[:,:-2] + fd1b*tiv[:,1:-1] + fd1c*tiv[:,2:] dyyw = fd2a*tiv[:,:-2] + fd2b*tiv[:,1:-1] + fd2c*tiv[:,2:] dtw = np.maximum((inv_dt*(tiv[1:,:]-tiv[:-1,:]).T).T,eps) p = log_x_strikes[1:-1] / tiv[:,1:-1] q = np.maximum(1 - p*dyw + 0.25*(-0.25 - 1.0 / tiv[:,1:-1] + p*p)*dyw*dyw + 0.5*dyyw, eps) local_var = np.empty(shape=(time_grid.shape[0], x_strikes.shape[0])) local_var[1:-1,1:-1] = np.minimum(np.maximum(min_lv*min_lv, dtw[:-1,1:-1] / q[1:-1,:]), max_lv*max_lv) local_var[:,-1] = local_var[:,-2] local_var[:,0] = local_var[:, 1] local_var[0,:] = local_var[1,:] local_var[-1,:] = local_var[-2,:] return local_var @staticmethod def _compute_local_var_from_call(call_param: np.ndarray, x_strikes:np.ndarray, time_grid:np.ndarray): """ Calculate the local volatility from a call price surface with Dupire's equation sigma^2(K,T) = d_T(C) / (1/2*K^2 * d_KK(C)) with dx1 = x_i - x_i-1 dx2 = x_i+1 - x_i d_T[i,:] = 1/(d_t1+d_t2) * [(d_t1/d_t2)*(c[i+1,:]-c[i,:]) + (d_t2/d_t1)*(c[i,:]-c[i-1,:])] which simplifies on a uniform grid to: d_T[i,:] = (c[i+1,:]-c[i-1,:])/(2*d_t) for i = 1, ..., len(time_grid)-1 d_KK[:,i] = 2.0*(c[:,i-1]/(d_k1*(d_k1+d_k2)) - (c[:,i]/(d_k1*d_k2)) + (c[:,i+1]/(d_k2*(d_k1+d_k2)))) which simplifies on a uniform grid to: d_KK[:,i] = (c[:,i-1]-2*c[:,i]+c[:,i+1])/(d_k**2) for i = 1, ..., len(strikes)-1 The formula can be interpreted as an infinitesimal calendar / butterfly. The square root yields the local volatility. Args: call_param (np.ndarray): array of call prices (2D) of the form (n_expiries, n_strikes) x_strikes (np.ndarray): timegrid of x_strikes (1D) time_grid (np.ndarray): timegrid of expiries (1D) Returns: local variance as a 2D grid of expiries and x_strikes """ d_T = np.zeros((len(time_grid),len(x_strikes))) deltas_t = np.diff(time_grid) #time_grid[1:]-time_grid[:-1] if all(deltas_t-deltas_t[0]) < 1E-15: #uniform grid d_T[1:-1,:] = 1/(2*deltas_t[0]) * (call_param[2:,:] - call_param[:-2,:]) else: d_t1 = time_grid[1:-1] - time_grid[:-2] #time_grid[i] - time_grid[i-1] d_t2 = time_grid[2:] - time_grid[1:-1] #time_grid[i+1] - time_grid[i] d_T[1:-1,:] = np.multiply( np.multiply((call_param[2:,:]-call_param[1:-1,:]).T, (d_t1/d_t2)) + np.multiply((call_param[1:-1,:]-call_param[:-2,:]).T, (d_t2/d_t1)), 1./(d_t1+d_t2)).T d_KK = np.zeros((len(time_grid),len(x_strikes))) deltas_k = np.diff(x_strikes) if all(deltas_k-deltas_k[0]) < 1E-15: #uniform grid d_KK[:,1:-1] = (1/(deltas_k[0]**2))*(call_param[:,:-2]-2*call_param[:,1:-1]+call_param[:,2:]) else: d_k1 = x_strikes[1:-1] - x_strikes[:-2] # x_strikes[i] - x_strikes[i-1] d_k2 = x_strikes[2:] - x_strikes[1:-1] # x_strikes[i+1] - x_strikes[i] d_KK[:,1:-1] = 2.0 * (np.multiply((call_param[:,:-2]), 1/(d_k1*(d_k1+d_k2))) - np.multiply((call_param[:,1:-1]), 1/(d_k1*d_k2)) + np.multiply((call_param[:,2:]), 1/(d_k2*(d_k1+d_k2)))) # remove extreme cases (numerical inconsistencies) d_KK = np.maximum(d_KK, 1E-8) var = d_T / (1/2*(x_strikes**2)*d_KK) # boundary cases var[0,:] = var[1,:] var[-1,:] = var[-2,:] var[:,0] = var[:,1] var[:,-1] = var[:,-2] # corner cases var[0,0] = var[1,1] var[-1,-1] = var[-2,-2] var[0,-1] = var[1,-2] var[-1,0] = var[-2,1] # remove extreme cases (numerical inconsistencies) var[var>2.5] = 2.5 return var
[docs] @staticmethod def compute_local_var(vol_param, x_strikes: np.array, time_grid: np.array, call_param: np.ndarray=None, min_lv = 0.01, max_lv = 1.5): """Calculate the local variance from vol_param or call_param for x_strikes on time_grid Args: vol_param: a grid or a parametrisation of the volatility x_strikes (np.array): strikes time_grid (np.array): time_grid call_param (np.ndarray, optional): A grid of call prices. Not compatible with vol_param. Defaults to None. Returns: local volatility surface on the grid """ if (vol_param is None) and (call_param is None): raise Exception('Set vol_params or call_params!') if (vol_param is not None) and (call_param is not None): raise Exception('Set either vol_params or call_params, not both!') if vol_param is not None: local_var = LocalVol._compute_local_var_from_vol(vol_param, x_strikes, time_grid) elif call_param is not None: local_var = LocalVol._compute_local_var_from_call(call_param, x_strikes, time_grid) return np.minimum(np.maximum(min_lv*min_lv, local_var), max_lv*max_lv)
[docs] def apply_mc_step(self, x: np.ndarray, t0: float, t1: float, rnd: np.ndarray, inplace: bool = True): """Apply a MC-Euler step for the LV Model for n different paths. Args: x (np.ndarray): 2-d array containing the start values for the spot and variance. The first column contains the spot, the second the variance values. t0 ([type]): [description] t1 ([type]): [description] rnd ([type]): [description] """ if not inplace: x_ = x.copy() else: x_ = x S = x_[:,0] lv = _interpolate_2D(self._time_grid, self._x_strikes, self._local_variance, S, t0) #self._variance(t0, S).reshape((-1,)) dt = t1-t0 sqrt_dt = np.sqrt(dt) S *= np.exp(- 0.5*lv*dt + np.sqrt(lv)*rnd[:,0]*sqrt_dt) return x_