Coverage for tests/test_models.py: 75%

210 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-05 14:27 +0000

1import unittest 

2import numpy as np 

3import datetime as dt 

4 

5import rivapy.models as models 

6import rivapy.marketdata as mktdata 

7import rivapy.pricing.analytics as analytics 

8import rivapy.tools.enums as enums 

9from rivapy import _pyvacon_available 

10 

11class LocalVolModelTest(unittest.TestCase): 

12 

13 def test_local_vol_mc_with_ssvi(self): 

14 """Simple test where call price from Local Vol MC simulation is compared against BS price 

15 """ 

16 if not _pyvacon_available: 

17 self.assertAlmostEqual(1,1) 

18 return 

19 ssvi = mktdata.VolatilityParametrizationSSVI(expiries=[1.0/365, 30/365, 0.5, 1.0], fwd_atm_vols=[0.25, 0.3, 0.28, 0.25], rho=-0.9, eta=0.5, gamma=0.5) 

20 x_strikes = np.linspace(0.5,1.5,100) 

21 time_grid = np.linspace(0.0,1.0,100) 

22 lv = models.LocalVol(ssvi, x_strikes, time_grid) 

23 n_sims = 100_000 

24 S = np.ones((n_sims,1)) 

25 np.random.seed(42) 

26 for i in range(1,time_grid.shape[0]): 

27 rnd = np.random.normal(size=(n_sims, 1)) 

28 lv.apply_mc_step(S,time_grid[i-1], time_grid[i],rnd, inplace=True) 

29 strike = 1.0 

30 call_price = np.mean(np.maximum(S-strike, 0)) 

31 call_price_ref = analytics.compute_european_price_Buehler(strike = strike, maturity=1.0, volatility=ssvi.calc_implied_vol(1.0,strike)) 

32 self.assertAlmostEqual(call_price, call_price_ref, places=2) 

33 

34 def test_local_vol_with_flat_input_vol(self): 

35 """Simple test where the flat input volatility is compared with the local volatility calculated with call prices from black scholes 

36 """ 

37 x_strikes = np.linspace(0.5,1.5,100) 

38 time_grid = np.linspace(0.0,1.0,365) 

39 input_vol = 0.3 

40 call_prices = np.array([[analytics.compute_european_price_Buehler(strike=k, maturity=t, volatility=input_vol) for k in x_strikes] for t in time_grid]) 

41 lv_model = models.LocalVol(vol_param=None, x_strikes=x_strikes, time_grid=time_grid, call_prices=call_prices) 

42 var = lv_model.compute_local_var(vol_param=None, call_param = call_prices, x_strikes = x_strikes, time_grid = time_grid) 

43 vol = np.sqrt(np.abs(var)) 

44 

45 for i,t in enumerate(time_grid): 

46 if t < 1/365: 

47 continue 

48 elif t < 2/365: 

49 factor = 1.0 

50 else: 

51 factor = 2.0 

52 range_low = np.exp(-factor * input_vol * np.sqrt(t)) 

53 range_up = np.exp(factor * input_vol * np.sqrt(t)) 

54 val_in_range = vol[i,(x_strikes>range_low)&(x_strikes<range_up)] 

55 

56 self.assertAlmostEqual(input_vol, np.mean(val_in_range), places=2) 

57 

58 def test_compare_local_var_implied_and_call(self): 

59 """Simple test where the local volatility of a volatility surface is compared with the local volatility of the corresponding call price surface 

60 """ 

61 if not _pyvacon_available: 

62 self.assertAlmostEqual(1,1) 

63 return 

64 x_strikes = np.linspace(0.5,1.5,100) 

65 time_grid = np.linspace(0.0,1.0,100) 

66 

67 ssvi = mktdata.VolatilityParametrizationSSVI(expiries=[1.0/365, 30/365, 0.5, 1.0], fwd_atm_vols=[0.25, 0.3, 0.28, 0.25], rho=-0.9, eta=0.5, gamma=0.5) 

68 var_vol = np.sqrt(models.LocalVol.compute_local_var(ssvi, x_strikes, time_grid)) 

69 

70 input_vol = np.array([[ssvi.calc_implied_vol(ttm=t, strike=k) for k in x_strikes] for t in time_grid]) 

71 call_prices = np.array([[analytics.compute_european_price_Buehler(strike=k, maturity=t, volatility=input_vol[i,j]) for j,k in enumerate(x_strikes)] for i,t in enumerate(time_grid)]) 

72 

73 lv_model = models.LocalVol(vol_param=None, x_strikes=x_strikes, time_grid=time_grid, call_prices=call_prices) 

74 var_call = np.sqrt(lv_model.compute_local_var(vol_param=None, call_param = call_prices, x_strikes = x_strikes, time_grid = time_grid)) 

75 

76 for i,t in enumerate(time_grid): 

77 if t < 1/365: 

78 continue 

79 range_low = np.exp(-input_vol[i,:] * np.sqrt(t)) 

80 range_up = np.exp(input_vol[i,:] * np.sqrt(t)) 

81 var_vol_range = var_vol[i,(x_strikes>range_low)&(x_strikes<range_up)] 

82 var_call_range = var_call[i,(x_strikes>range_low)&(x_strikes<range_up)] 

83 error = np.mean(var_vol_range) - np.mean(var_call_range) 

84 self.assertLess(error, 2E-2) 

85 

86 

87class HestonModelTest(unittest.TestCase): 

88 

89 def test_callprice_formula(self): 

90 """Test analytic call price formula by comparing with MC simulated values 

91 """ 

92 heston = models.HestonModel(long_run_variance=0.3**2, 

93 mean_reversion_speed=0.5 , 

94 vol_of_vol=0.2, 

95 initial_variance=0.1**2, 

96 correlation = -0.9) 

97 

98 n_sims = 40_000 

99 np.random.seed(42) 

100 strikes = np.array([0.9,1.0,1.1]) 

101 timegrid = np.linspace(0.0,1.0,365) 

102 

103 simulated_values = np.empty((n_sims, 2)) 

104 simulated_values[:,0] = 1.0 

105 simulated_values[:,1] = heston._initial_variance 

106 

107 

108 cp_anayltic = heston.call_price(1.0, heston._initial_variance, K = strikes, ttm = 1.0) 

109 

110 for i in range(1, timegrid.shape[0]): 

111 rnd = np.random.normal(size=(n_sims,2)) 

112 heston.apply_mc_step(simulated_values, timegrid[i-1], timegrid[i], rnd, inplace=True) 

113 

114 for i in range(strikes.shape[0]): 

115 cp_mc = np.mean(np.maximum(simulated_values[:,0]-strikes[i], 0.0)) 

116 self.assertAlmostEqual(cp_anayltic[i], cp_mc, delta=1e-3) 

117 

118class HestonLocalVolModelTest(unittest.TestCase): 

119 @staticmethod 

120 def calc_imlied_vol_grid(expiries, strikes, call_prices): 

121 vols = np.zeros((expiries.shape[0],strikes.shape[0])) 

122 for i in range(expiries.shape[0]): 

123 for j in range(strikes.shape[0]): 

124 try: 

125 vols[i,j] = analytics.compute_implied_vol_Buehler(strikes[j], maturity=expiries[i], 

126 price=call_prices[i, j], min_vol=0.01) 

127 except: 

128 pass 

129 

130 for i in range(expiries.shape[0]): 

131 extrapolation = False 

132 for j in range(int(strikes.shape[0]/2),strikes.shape[0]): 

133 if vols[i,j] <1e-6: 

134 vols[i,j] = vols[i,j-1] 

135 extrapolation = True 

136 for j in range(23,-1,-1): 

137 if vols[i,j] <1e-6: 

138 vols[i,j] = vols[i,j+1] 

139 extrapolation = True 

140 if extrapolation: 

141 print('Extrapolation necessary for expiry ' + str(i)) 

142 return vols 

143 

144 @staticmethod 

145 def calc_callprice_MC(time_grid, strikes, n_sims, model): 

146 if time_grid[0] > 1e-7: 

147 raise Exception('The time_grid must include 0.0 as first point.') 

148 call_prices = np.empty((time_grid.shape[0], strikes.shape[0])) 

149 x = np.empty((n_sims,2)) 

150 x[:,0] = model.get_initial_value()[0] 

151 x[:,1] = model.get_initial_value()[1] 

152 for j in range(strikes.shape[0]): 

153 call_prices[0][j] = np.maximum(x[0,0]-strikes[j], 0.0) 

154 np.random.seed(42) 

155 t0 = 0 

156 for i in range(1, time_grid.shape[0]): 

157 rnd = np.random.normal(size=(n_sims, 2)) 

158 model.apply_mc_step(x, t0, time_grid[i], rnd, inplace=True) 

159 for j in range(strikes.shape[0]): 

160 call_prices[i][j] = np.mean(np.maximum(x[:,0]-strikes[j], 0.0) ) 

161 t0 = time_grid[i] 

162 return call_prices 

163 

164 def test_simple(self): 

165 """Simple test: The given implied volatility surface equals the heston surface->stoch local variance must equal 1 

166 """ 

167 heston = models.HestonModel(long_run_variance=0.3**2, 

168 mean_reversion_speed=0.5, 

169 vol_of_vol=0.2, 

170 initial_variance=0.3**2, 

171 correlation = -0.95) 

172 x_strikes = np.linspace(0.5, 1.5, num=240) 

173 time_grid = np.linspace(0.0, 1.0, num=240) 

174 call_prices = heston.call_price(1.0, heston._initial_variance, x_strikes, time_grid) 

175 heston_lv = models.StochasticLocalVol(heston) 

176 heston_lv.calibrate_MC(None, x_strikes, time_grid, n_sims=10_000, call_prices=call_prices) 

177 call_prices_sim = HestonLocalVolModelTest.calc_callprice_MC(time_grid, x_strikes, 10_000, heston_lv) 

178 for t in [80,120,180,239]: 

179 for k in [80, 100, 120, 140, 160]: 

180 vol = analytics.compute_implied_vol_Buehler(x_strikes[k], maturity=time_grid[t], 

181 price=call_prices[t, k], min_vol=0.01) 

182 vol_sim = analytics.compute_implied_vol_Buehler(x_strikes[k], maturity=time_grid[t], 

183 price=call_prices_sim[t, k], min_vol=0.01) 

184 self.assertTrue(np.abs(vol-vol_sim)< 0.02, 

185 'Vol from calibrated model ('+str(vol_sim)+') is not close enough to reference vol('+str(vol)+') for strike/expiry: ' 

186 + str(x_strikes[k])+'/'+str(time_grid[t])) 

187 

188 def test_simple_2(self): 

189 """Simple test 2: The given implied volatility surface equals a surface from a heston model. Try to calibrate Heston Local Vol with other heson parameters to fit the surface 

190 """ 

191 heston = models.HestonModel(long_run_variance=0.3**2, 

192 mean_reversion_speed=0.5, 

193 vol_of_vol=0.2, 

194 initial_variance=0.3**2, 

195 correlation = -0.95) 

196 heston_2 = models.HestonModel(long_run_variance=0.2**2, 

197 mean_reversion_speed=0.5, 

198 vol_of_vol=0.2, 

199 initial_variance=0.3**2, 

200 correlation = -0.95) 

201 x_strikes = np.linspace(0.5,1.5, num=240) 

202 time_grid = np.linspace(0.0, 1.0, num=240) 

203 call_prices = heston.call_price(1.0, heston._initial_variance, x_strikes, time_grid) 

204 heston_lv = models.StochasticLocalVol(heston_2) 

205 heston_lv.calibrate_MC(None, x_strikes, time_grid, n_sims=10_000, call_prices=call_prices) 

206 call_prices_sim = HestonLocalVolModelTest.calc_callprice_MC(time_grid, x_strikes, 10_000, heston_lv) 

207 for t in [80,120,180,239]: 

208 for k in [80, 100, 120, 140, 160]: 

209 vol = analytics.compute_implied_vol_Buehler(x_strikes[k], maturity=time_grid[t], 

210 price=call_prices[t, k], min_vol=0.01) 

211 vol_sim = analytics.compute_implied_vol_Buehler(x_strikes[k], maturity=time_grid[t], 

212 price=call_prices_sim[t, k], min_vol=0.01) 

213 self.assertTrue(np.abs(vol-vol_sim)< 0.02, 

214 'Vol from calibrated model ('+str(vol_sim)+') is not close enough to reference vol('+str(vol)+') for strike/expiry: ' 

215 + str(x_strikes[k])+'/'+str(time_grid[t])) 

216 

217 

218class OrnsteinUhlenbeckTest(unittest.TestCase): 

219 def test_calibration(self): 

220 """Simple test for calibration: Simulate a path of a model with fixed params and calibrate new model. Test if parameters are equal (up to MC error). 

221 """ 

222 np.random.seed(42) 

223 timegrid = np.arange(0.0,30.0,1.0/365.0) # simulate on daily timegrid over 30 yrs horizon 

224 ou_model = models.OrnsteinUhlenbeck(speed_of_mean_reversion = 5.0, volatility=0.1) 

225 sim = ou_model.simulate(timegrid, start_value=0.2,rnd=np.random.normal(size=(timegrid.shape[0],1))) 

226 ou_model.calibrate(sim.reshape((-1)),dt=1.0/365.0, method = 'minimum_least_square') 

227 self.assertAlmostEqual(0.1, ou_model.volatility, places=3) 

228 self.assertAlmostEqual(0.0, ou_model.mean_reversion_level, places=2) 

229 self.assertAlmostEqual(5.0, ou_model.speed_of_mean_reversion, delta=0.5) 

230 

231 def test_expectation(self): 

232 """Simple test for analytical expectation: Compare analytical expectation with MC expectation 

233 """ 

234 np.random.seed(42) 

235 timegrid = np.linspace(0.0,1.0,365) # simulate on daily timegrid over 1 yr horizon 

236 ou_model = models.OrnsteinUhlenbeck(speed_of_mean_reversion = 5.0, volatility=0.1) 

237 n_sims = 1000 

238 sim = ou_model.simulate(timegrid, start_value=0.2,rnd=np.random.normal(size=(timegrid.shape[0],n_sims))) 

239 self.assertAlmostEqual(sim[-1,:].mean(), ou_model.compute_expected_value(0.2,1.0), places=3) 

240 

241 def test_call_price(self): 

242 """Simple test for call price: Compare analytical call price with MC call price 

243 """ 

244 K = 1.0 

245 T=1.0 

246 timegrid = np.linspace(0.0,T,num=240) 

247 ou_model = models.OrnsteinUhlenbeck(speed_of_mean_reversion = 2.0, volatility=0.4) 

248 n_sims = 10_000 

249 sim = ou_model.simulate(timegrid, start_value=0.2,rnd=np.random.normal(size=(timegrid.shape[0],n_sims))) 

250 mc_call_price = np.mean(np.maximum(sim[-1,:]-K,0.0)) 

251 analytical_call_price = ou_model.compute_call_price(0.2, K, T) 

252 self.assertAlmostEqual(mc_call_price, analytical_call_price, places=2) 

253class LuciaSchwartzTest(unittest.TestCase): 

254 def test_expectation(self): 

255 """Simple test for analytical expectation: Compare analytical expectation with MC expectation 

256 """ 

257 np.random.seed(42) 

258 timegrid = np.linspace(0.0,1.0,365) # simulate on daily timegrid over 1 yr horizon 

259 ls_model = models.LuciaSchwartz(rho=-0.9, kappa=1.0, sigma1=0.5, mu=0.15, sigma2=0.3) 

260 n_sims = 4_000 

261 sim = ls_model.simulate(timegrid, start_value=np.array([0.0,0.0]),rnd=np.random.normal(size=ls_model.rnd_shape(n_sims,timegrid.shape[0]))) 

262 self.assertAlmostEqual(sim[-1,:].mean(), ls_model.compute_expected_value(x0=np.array([0.0,0.0]), T=1.0), places=2) 

263 

264 def test_forward_simulation(self): 

265 """Simple test for forward simulation: Compare forward simulation with initial value (martingale) 

266 """ 

267 np.random.seed(42) 

268 timegrid = np.linspace(0.0,1.0,365) 

269 ls_model = models.LuciaSchwartz(rho=-0.9, kappa=1.0, sigma1=0.5, mu=0.15, sigma2=0.3) 

270 n_sims = 40_000 

271 sim = ls_model.simulate(timegrid, start_value=np.array([0.0,0.0]), 

272 forwards=[(1.0,1.0+1.0/365), (1.0,2.0)], 

273 rnd=np.random.normal(size=ls_model.rnd_shape(n_sims,timegrid.shape[0]))) 

274 initial_fwd_value = ls_model.compute_fwd_value(x0=np.array([0.0,0.0]), T1=1.0, T2=2.0) 

275 self.assertAlmostEqual(sim[-1,:,2].mean(), initial_fwd_value, places=2) 

276 #check that final spot value and forward value is close 

277 self.assertAlmostEqual(sim[-1,:,0].mean(), sim[-1,:,1].mean(), places=2) 

278 

279class WindPowerForecastModel(unittest.TestCase): 

280 def test_initial_forecast(self): 

281 """Simple test for initial forecast: Compare initial forecast with initial value (martingale)  

282 and the mean at the first expiry of MC simulation 

283 """ 

284 params = models.WindPowerForecastModelParameter(n_call_strikes=40, min_strike=-7.0, max_strike=7.0) 

285 model = models.WindPowerForecastModel('Onshore', speed_of_mean_reversion=0.5, volatility=1.5, params=params) 

286 timegrid = np.linspace(0.0,1.0, 365) 

287 np.random.seed(42) 

288 rnd = np.random.normal(size=model.rnd_shape(1_000, timegrid.shape[0])) 

289 forecast = 0.8 

290 results = model.simulate(timegrid, rnd, expiries=[1.0], initial_forecasts=[forecast], startvalue=0.0) 

291 sim_fwd = results.get('Onshore_FWD0') 

292 self.assertAlmostEqual(sim_fwd[-1,:].mean(), forecast, places=2) 

293 self.assertAlmostEqual(sim_fwd[0,0], forecast, places=2) 

294 

295if __name__ == '__main__': 

296 unittest.main()