Coverage for rivapy/models/local_vol.py: 42%

117 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-05 14:27 +0000

1import bisect 

2import numpy as np 

3from scipy import interpolate 

4 

5def _interpolate_2D(time_grid, strikes, f, x, t): 

6 t_index = bisect.bisect_left(time_grid, t) 

7 if t_index == 0:# or t0_index == self._time_grid.shape[0]: 

8 result = f[0] 

9 elif t_index == time_grid.shape[0]-1: 

10 result = f[-1] 

11 else: 

12 dt = time_grid[t_index] - time_grid[t_index-1] 

13 w1 = (t-time_grid[t_index-1])/dt 

14 w2 = (time_grid[t_index] - t)/dt 

15 result = w1*f[t_index] + w2*f[t_index-1] 

16 return np.interp(x, strikes, result) 

17 

18class LocalVol: 

19 

20 def __init__(self, vol_param, x_strikes: np.array, time_grid: np.array, call_prices: np.ndarray=None, local_vol_grid: np.ndarray=None): 

21 """Local Volatility Class  

22 

23 Args: 

24 vol_param: a grid or a parametrisation of the volatility 

25 x_strikes (np.array): strikes 

26 time_grid (np.array): time_grid 

27 call_param (np.ndarray, optional): A grid of call prices. Not compatible with vol_param. Defaults to None. 

28 """ 

29 

30 if (vol_param is None) and (call_prices is None) and (local_vol_grid is None): 

31 raise Exception('Set vol_params, call_params or local_vol_grid!') 

32 

33 if (vol_param is not None) and (call_prices is not None) and (local_vol_grid is not None): 

34 raise Exception('Set either vol_params or call_params or local_vol_grid, not all!') 

35 

36 if (vol_param is not None) and (call_prices is not None): 

37 raise Exception('Set either vol_params or call_params, not both!') 

38 

39 if (vol_param is not None) and (local_vol_grid is not None): 

40 raise Exception('Set either vol_params or local_vol_grid, not both!') 

41 

42 if (local_vol_grid is not None) and (call_prices is not None): 

43 raise Exception('Set either local_vol_grid or call_params, not both!') 

44 

45 self._x_strikes = x_strikes 

46 self._time_grid = time_grid 

47 

48 if local_vol_grid is not None: 

49 self._local_variance = local_vol_grid**2 

50 else: 

51 self._local_variance = LocalVol.compute_local_var(vol_param, x_strikes, time_grid, call_prices) 

52 

53 self._variance = interpolate.RectBivariateSpline(time_grid, x_strikes, self._local_variance, bbox=[None, None, None, None], kx=1, ky=1, s=0) 

54 #interpolation.interp2d(time_grid, x_strikes, self._local_variance.T) 

55 

56 @staticmethod 

57 def _compute_local_var_from_vol(vol_param, x_strikes: np.array, time_grid: np.array): 

58 # setup grids  

59 eps = 1e-8 

60 log_x_strikes = np.log(x_strikes) 

61 if isinstance(vol_param, np.ndarray): 

62 iv = np.copy(vol_param) 

63 else: 

64 iv = np.empty(shape=(time_grid.shape[0], x_strikes.shape[0])) #implied variance grid 

65 for i in range(time_grid.shape[0]): 

66 for j in range(x_strikes.shape[0]): 

67 iv[i,j] = vol_param.calc_implied_vol(time_grid[i], x_strikes[j]) 

68 iv *= iv 

69 tiv = np.maximum((time_grid*iv.T).T, eps) 

70 h = log_x_strikes[1:] - log_x_strikes[:-1] 

71 hm = h[:-1] 

72 hp = h[1:] 

73 fd1a = -1.0 / (2 * hm) 

74 fd1c = 1.0 / (2 * hp) 

75 fd1b = -(fd1c + fd1a) 

76 fd2a = 2.0 / (hm*(hm + hp)) 

77 fd2c = 2.0 / (hp*(hm + hp)) 

78 fd2b = -(fd2a + fd2c) 

79 

80 min_lv = 0.01 

81 max_lv = 1.5 

82 inv_dt = 1.0/(time_grid[1:]-time_grid[:-1]) 

83 

84 dyw = fd1a*tiv[:,:-2] + fd1b*tiv[:,1:-1] + fd1c*tiv[:,2:] 

85 dyyw = fd2a*tiv[:,:-2] + fd2b*tiv[:,1:-1] + fd2c*tiv[:,2:] 

86 dtw = np.maximum((inv_dt*(tiv[1:,:]-tiv[:-1,:]).T).T,eps) 

87 

88 p = log_x_strikes[1:-1] / tiv[:,1:-1] 

89 q = np.maximum(1 - p*dyw + 0.25*(-0.25 - 1.0 / tiv[:,1:-1] + p*p)*dyw*dyw + 0.5*dyyw, eps) 

90 local_var = np.empty(shape=(time_grid.shape[0], x_strikes.shape[0])) 

91 local_var[1:-1,1:-1] = np.minimum(np.maximum(min_lv*min_lv, dtw[:-1,1:-1] / q[1:-1,:]), max_lv*max_lv) 

92 local_var[:,-1] = local_var[:,-2] 

93 local_var[:,0] = local_var[:, 1] 

94 local_var[0,:] = local_var[1,:] 

95 local_var[-1,:] = local_var[-2,:] 

96 return local_var 

97 

98 @staticmethod 

99 def _compute_local_var_from_call(call_param: np.ndarray, x_strikes:np.ndarray, time_grid:np.ndarray): 

100 """ 

101 Calculate the local volatility from a call price surface with Dupire's equation 

102  

103 sigma^2(K,T) = d_T(C) / (1/2*K^2 * d_KK(C)) 

104  

105 with  

106  

107 dx1 = x_i - x_i-1 

108 dx2 = x_i+1 - x_i 

109 

110 d_T[i,:] = 1/(d_t1+d_t2) * [(d_t1/d_t2)*(c[i+1,:]-c[i,:]) + (d_t2/d_t1)*(c[i,:]-c[i-1,:])] 

111 which simplifies on a uniform grid to: d_T[i,:] = (c[i+1,:]-c[i-1,:])/(2*d_t) 

112 for i = 1, ..., len(time_grid)-1 

113  

114 d_KK[:,i] = 2.0*(c[:,i-1]/(d_k1*(d_k1+d_k2)) - (c[:,i]/(d_k1*d_k2)) + (c[:,i+1]/(d_k2*(d_k1+d_k2))))  

115 which simplifies on a uniform grid to: d_KK[:,i] = (c[:,i-1]-2*c[:,i]+c[:,i+1])/(d_k**2) 

116 for i = 1, ..., len(strikes)-1 

117 

118 The formula can be interpreted as an infinitesimal calendar / butterfly. 

119 

120 The square root yields the local volatility. 

121 

122 Args: 

123 call_param (np.ndarray): array of call prices (2D) of the form (n_expiries, n_strikes) 

124 x_strikes (np.ndarray): timegrid of x_strikes (1D) 

125 time_grid (np.ndarray): timegrid of expiries (1D) 

126 

127 Returns: 

128 local variance as a 2D grid of expiries and x_strikes  

129 """ 

130 

131 d_T = np.zeros((len(time_grid),len(x_strikes))) 

132 deltas_t = np.diff(time_grid) #time_grid[1:]-time_grid[:-1] 

133 if all(deltas_t-deltas_t[0]) < 1E-15: #uniform grid 

134 d_T[1:-1,:] = 1/(2*deltas_t[0]) * (call_param[2:,:] - call_param[:-2,:]) 

135 else: 

136 d_t1 = time_grid[1:-1] - time_grid[:-2] #time_grid[i] - time_grid[i-1] 

137 d_t2 = time_grid[2:] - time_grid[1:-1] #time_grid[i+1] - time_grid[i]  

138 d_T[1:-1,:] = np.multiply( 

139 np.multiply((call_param[2:,:]-call_param[1:-1,:]).T, (d_t1/d_t2)) + 

140 np.multiply((call_param[1:-1,:]-call_param[:-2,:]).T, (d_t2/d_t1)), 

141 1./(d_t1+d_t2)).T 

142 

143 d_KK = np.zeros((len(time_grid),len(x_strikes))) 

144 deltas_k = np.diff(x_strikes) 

145 if all(deltas_k-deltas_k[0]) < 1E-15: #uniform grid 

146 d_KK[:,1:-1] = (1/(deltas_k[0]**2))*(call_param[:,:-2]-2*call_param[:,1:-1]+call_param[:,2:]) 

147 else: 

148 

149 d_k1 = x_strikes[1:-1] - x_strikes[:-2] # x_strikes[i] - x_strikes[i-1] 

150 d_k2 = x_strikes[2:] - x_strikes[1:-1] # x_strikes[i+1] - x_strikes[i]  

151 d_KK[:,1:-1] = 2.0 * (np.multiply((call_param[:,:-2]), 1/(d_k1*(d_k1+d_k2))) - 

152 np.multiply((call_param[:,1:-1]), 1/(d_k1*d_k2)) + 

153 np.multiply((call_param[:,2:]), 1/(d_k2*(d_k1+d_k2)))) 

154 

155 # remove extreme cases (numerical inconsistencies) 

156 d_KK = np.maximum(d_KK, 1E-8) 

157 

158 var = d_T / (1/2*(x_strikes**2)*d_KK) 

159 

160 # boundary cases  

161 var[0,:] = var[1,:] 

162 var[-1,:] = var[-2,:] 

163 var[:,0] = var[:,1] 

164 var[:,-1] = var[:,-2] 

165 

166 # corner cases 

167 var[0,0] = var[1,1] 

168 var[-1,-1] = var[-2,-2] 

169 var[0,-1] = var[1,-2] 

170 var[-1,0] = var[-2,1] 

171 

172 # remove extreme cases (numerical inconsistencies) 

173 var[var>2.5] = 2.5 

174 

175 return var 

176 

177 @staticmethod 

178 def compute_local_var(vol_param, x_strikes: np.array, time_grid: np.array, call_param: np.ndarray=None, min_lv = 0.01, max_lv = 1.5): 

179 """Calculate the local variance from vol_param or call_param for x_strikes on time_grid 

180 

181 Args: 

182 vol_param: a grid or a parametrisation of the volatility 

183 x_strikes (np.array): strikes 

184 time_grid (np.array): time_grid 

185 call_param (np.ndarray, optional): A grid of call prices. Not compatible with vol_param. Defaults to None. 

186 

187 Returns: 

188 local volatility surface on the grid 

189 """ 

190 

191 if (vol_param is None) and (call_param is None): 

192 raise Exception('Set vol_params or call_params!') 

193 

194 if (vol_param is not None) and (call_param is not None): 

195 raise Exception('Set either vol_params or call_params, not both!') 

196 

197 if vol_param is not None: 

198 local_var = LocalVol._compute_local_var_from_vol(vol_param, x_strikes, time_grid) 

199 elif call_param is not None: 

200 local_var = LocalVol._compute_local_var_from_call(call_param, x_strikes, time_grid) 

201 return np.minimum(np.maximum(min_lv*min_lv, local_var), max_lv*max_lv) 

202 

203 def apply_mc_step(self, x: np.ndarray, t0: float, t1: float, rnd: np.ndarray, inplace: bool = True): 

204 """Apply a MC-Euler step for the LV Model for n different paths. 

205 

206 Args: 

207 x (np.ndarray): 2-d array containing the start values for the spot and variance. The first column contains the spot, the second the variance values. 

208 t0 ([type]): [description] 

209 t1 ([type]): [description] 

210 rnd ([type]): [description] 

211 """ 

212 if not inplace: 

213 x_ = x.copy() 

214 else: 

215 x_ = x 

216 S = x_[:,0] 

217 lv = _interpolate_2D(self._time_grid, self._x_strikes, self._local_variance, S, t0) #self._variance(t0, S).reshape((-1,)) 

218 dt = t1-t0 

219 sqrt_dt = np.sqrt(dt) 

220 S *= np.exp(- 0.5*lv*dt + np.sqrt(lv)*rnd[:,0]*sqrt_dt) 

221 return x_ 

222