Coverage for tests/test_models.py: 75%
210 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
1import unittest
2import numpy as np
3import datetime as dt
5import rivapy.models as models
6import rivapy.marketdata as mktdata
7import rivapy.pricing.analytics as analytics
8import rivapy.tools.enums as enums
9from rivapy import _pyvacon_available
11class LocalVolModelTest(unittest.TestCase):
13 def test_local_vol_mc_with_ssvi(self):
14 """Simple test where call price from Local Vol MC simulation is compared against BS price
15 """
16 if not _pyvacon_available:
17 self.assertAlmostEqual(1,1)
18 return
19 ssvi = mktdata.VolatilityParametrizationSSVI(expiries=[1.0/365, 30/365, 0.5, 1.0], fwd_atm_vols=[0.25, 0.3, 0.28, 0.25], rho=-0.9, eta=0.5, gamma=0.5)
20 x_strikes = np.linspace(0.5,1.5,100)
21 time_grid = np.linspace(0.0,1.0,100)
22 lv = models.LocalVol(ssvi, x_strikes, time_grid)
23 n_sims = 100_000
24 S = np.ones((n_sims,1))
25 np.random.seed(42)
26 for i in range(1,time_grid.shape[0]):
27 rnd = np.random.normal(size=(n_sims, 1))
28 lv.apply_mc_step(S,time_grid[i-1], time_grid[i],rnd, inplace=True)
29 strike = 1.0
30 call_price = np.mean(np.maximum(S-strike, 0))
31 call_price_ref = analytics.compute_european_price_Buehler(strike = strike, maturity=1.0, volatility=ssvi.calc_implied_vol(1.0,strike))
32 self.assertAlmostEqual(call_price, call_price_ref, places=2)
34 def test_local_vol_with_flat_input_vol(self):
35 """Simple test where the flat input volatility is compared with the local volatility calculated with call prices from black scholes
36 """
37 x_strikes = np.linspace(0.5,1.5,100)
38 time_grid = np.linspace(0.0,1.0,365)
39 input_vol = 0.3
40 call_prices = np.array([[analytics.compute_european_price_Buehler(strike=k, maturity=t, volatility=input_vol) for k in x_strikes] for t in time_grid])
41 lv_model = models.LocalVol(vol_param=None, x_strikes=x_strikes, time_grid=time_grid, call_prices=call_prices)
42 var = lv_model.compute_local_var(vol_param=None, call_param = call_prices, x_strikes = x_strikes, time_grid = time_grid)
43 vol = np.sqrt(np.abs(var))
45 for i,t in enumerate(time_grid):
46 if t < 1/365:
47 continue
48 elif t < 2/365:
49 factor = 1.0
50 else:
51 factor = 2.0
52 range_low = np.exp(-factor * input_vol * np.sqrt(t))
53 range_up = np.exp(factor * input_vol * np.sqrt(t))
54 val_in_range = vol[i,(x_strikes>range_low)&(x_strikes<range_up)]
56 self.assertAlmostEqual(input_vol, np.mean(val_in_range), places=2)
58 def test_compare_local_var_implied_and_call(self):
59 """Simple test where the local volatility of a volatility surface is compared with the local volatility of the corresponding call price surface
60 """
61 if not _pyvacon_available:
62 self.assertAlmostEqual(1,1)
63 return
64 x_strikes = np.linspace(0.5,1.5,100)
65 time_grid = np.linspace(0.0,1.0,100)
67 ssvi = mktdata.VolatilityParametrizationSSVI(expiries=[1.0/365, 30/365, 0.5, 1.0], fwd_atm_vols=[0.25, 0.3, 0.28, 0.25], rho=-0.9, eta=0.5, gamma=0.5)
68 var_vol = np.sqrt(models.LocalVol.compute_local_var(ssvi, x_strikes, time_grid))
70 input_vol = np.array([[ssvi.calc_implied_vol(ttm=t, strike=k) for k in x_strikes] for t in time_grid])
71 call_prices = np.array([[analytics.compute_european_price_Buehler(strike=k, maturity=t, volatility=input_vol[i,j]) for j,k in enumerate(x_strikes)] for i,t in enumerate(time_grid)])
73 lv_model = models.LocalVol(vol_param=None, x_strikes=x_strikes, time_grid=time_grid, call_prices=call_prices)
74 var_call = np.sqrt(lv_model.compute_local_var(vol_param=None, call_param = call_prices, x_strikes = x_strikes, time_grid = time_grid))
76 for i,t in enumerate(time_grid):
77 if t < 1/365:
78 continue
79 range_low = np.exp(-input_vol[i,:] * np.sqrt(t))
80 range_up = np.exp(input_vol[i,:] * np.sqrt(t))
81 var_vol_range = var_vol[i,(x_strikes>range_low)&(x_strikes<range_up)]
82 var_call_range = var_call[i,(x_strikes>range_low)&(x_strikes<range_up)]
83 error = np.mean(var_vol_range) - np.mean(var_call_range)
84 self.assertLess(error, 2E-2)
87class HestonModelTest(unittest.TestCase):
89 def test_callprice_formula(self):
90 """Test analytic call price formula by comparing with MC simulated values
91 """
92 heston = models.HestonModel(long_run_variance=0.3**2,
93 mean_reversion_speed=0.5 ,
94 vol_of_vol=0.2,
95 initial_variance=0.1**2,
96 correlation = -0.9)
98 n_sims = 40_000
99 np.random.seed(42)
100 strikes = np.array([0.9,1.0,1.1])
101 timegrid = np.linspace(0.0,1.0,365)
103 simulated_values = np.empty((n_sims, 2))
104 simulated_values[:,0] = 1.0
105 simulated_values[:,1] = heston._initial_variance
108 cp_anayltic = heston.call_price(1.0, heston._initial_variance, K = strikes, ttm = 1.0)
110 for i in range(1, timegrid.shape[0]):
111 rnd = np.random.normal(size=(n_sims,2))
112 heston.apply_mc_step(simulated_values, timegrid[i-1], timegrid[i], rnd, inplace=True)
114 for i in range(strikes.shape[0]):
115 cp_mc = np.mean(np.maximum(simulated_values[:,0]-strikes[i], 0.0))
116 self.assertAlmostEqual(cp_anayltic[i], cp_mc, delta=1e-3)
118class HestonLocalVolModelTest(unittest.TestCase):
119 @staticmethod
120 def calc_imlied_vol_grid(expiries, strikes, call_prices):
121 vols = np.zeros((expiries.shape[0],strikes.shape[0]))
122 for i in range(expiries.shape[0]):
123 for j in range(strikes.shape[0]):
124 try:
125 vols[i,j] = analytics.compute_implied_vol_Buehler(strikes[j], maturity=expiries[i],
126 price=call_prices[i, j], min_vol=0.01)
127 except:
128 pass
130 for i in range(expiries.shape[0]):
131 extrapolation = False
132 for j in range(int(strikes.shape[0]/2),strikes.shape[0]):
133 if vols[i,j] <1e-6:
134 vols[i,j] = vols[i,j-1]
135 extrapolation = True
136 for j in range(23,-1,-1):
137 if vols[i,j] <1e-6:
138 vols[i,j] = vols[i,j+1]
139 extrapolation = True
140 if extrapolation:
141 print('Extrapolation necessary for expiry ' + str(i))
142 return vols
144 @staticmethod
145 def calc_callprice_MC(time_grid, strikes, n_sims, model):
146 if time_grid[0] > 1e-7:
147 raise Exception('The time_grid must include 0.0 as first point.')
148 call_prices = np.empty((time_grid.shape[0], strikes.shape[0]))
149 x = np.empty((n_sims,2))
150 x[:,0] = model.get_initial_value()[0]
151 x[:,1] = model.get_initial_value()[1]
152 for j in range(strikes.shape[0]):
153 call_prices[0][j] = np.maximum(x[0,0]-strikes[j], 0.0)
154 np.random.seed(42)
155 t0 = 0
156 for i in range(1, time_grid.shape[0]):
157 rnd = np.random.normal(size=(n_sims, 2))
158 model.apply_mc_step(x, t0, time_grid[i], rnd, inplace=True)
159 for j in range(strikes.shape[0]):
160 call_prices[i][j] = np.mean(np.maximum(x[:,0]-strikes[j], 0.0) )
161 t0 = time_grid[i]
162 return call_prices
164 def test_simple(self):
165 """Simple test: The given implied volatility surface equals the heston surface->stoch local variance must equal 1
166 """
167 heston = models.HestonModel(long_run_variance=0.3**2,
168 mean_reversion_speed=0.5,
169 vol_of_vol=0.2,
170 initial_variance=0.3**2,
171 correlation = -0.95)
172 x_strikes = np.linspace(0.5, 1.5, num=240)
173 time_grid = np.linspace(0.0, 1.0, num=240)
174 call_prices = heston.call_price(1.0, heston._initial_variance, x_strikes, time_grid)
175 heston_lv = models.StochasticLocalVol(heston)
176 heston_lv.calibrate_MC(None, x_strikes, time_grid, n_sims=10_000, call_prices=call_prices)
177 call_prices_sim = HestonLocalVolModelTest.calc_callprice_MC(time_grid, x_strikes, 10_000, heston_lv)
178 for t in [80,120,180,239]:
179 for k in [80, 100, 120, 140, 160]:
180 vol = analytics.compute_implied_vol_Buehler(x_strikes[k], maturity=time_grid[t],
181 price=call_prices[t, k], min_vol=0.01)
182 vol_sim = analytics.compute_implied_vol_Buehler(x_strikes[k], maturity=time_grid[t],
183 price=call_prices_sim[t, k], min_vol=0.01)
184 self.assertTrue(np.abs(vol-vol_sim)< 0.02,
185 'Vol from calibrated model ('+str(vol_sim)+') is not close enough to reference vol('+str(vol)+') for strike/expiry: '
186 + str(x_strikes[k])+'/'+str(time_grid[t]))
188 def test_simple_2(self):
189 """Simple test 2: The given implied volatility surface equals a surface from a heston model. Try to calibrate Heston Local Vol with other heson parameters to fit the surface
190 """
191 heston = models.HestonModel(long_run_variance=0.3**2,
192 mean_reversion_speed=0.5,
193 vol_of_vol=0.2,
194 initial_variance=0.3**2,
195 correlation = -0.95)
196 heston_2 = models.HestonModel(long_run_variance=0.2**2,
197 mean_reversion_speed=0.5,
198 vol_of_vol=0.2,
199 initial_variance=0.3**2,
200 correlation = -0.95)
201 x_strikes = np.linspace(0.5,1.5, num=240)
202 time_grid = np.linspace(0.0, 1.0, num=240)
203 call_prices = heston.call_price(1.0, heston._initial_variance, x_strikes, time_grid)
204 heston_lv = models.StochasticLocalVol(heston_2)
205 heston_lv.calibrate_MC(None, x_strikes, time_grid, n_sims=10_000, call_prices=call_prices)
206 call_prices_sim = HestonLocalVolModelTest.calc_callprice_MC(time_grid, x_strikes, 10_000, heston_lv)
207 for t in [80,120,180,239]:
208 for k in [80, 100, 120, 140, 160]:
209 vol = analytics.compute_implied_vol_Buehler(x_strikes[k], maturity=time_grid[t],
210 price=call_prices[t, k], min_vol=0.01)
211 vol_sim = analytics.compute_implied_vol_Buehler(x_strikes[k], maturity=time_grid[t],
212 price=call_prices_sim[t, k], min_vol=0.01)
213 self.assertTrue(np.abs(vol-vol_sim)< 0.02,
214 'Vol from calibrated model ('+str(vol_sim)+') is not close enough to reference vol('+str(vol)+') for strike/expiry: '
215 + str(x_strikes[k])+'/'+str(time_grid[t]))
218class OrnsteinUhlenbeckTest(unittest.TestCase):
219 def test_calibration(self):
220 """Simple test for calibration: Simulate a path of a model with fixed params and calibrate new model. Test if parameters are equal (up to MC error).
221 """
222 np.random.seed(42)
223 timegrid = np.arange(0.0,30.0,1.0/365.0) # simulate on daily timegrid over 30 yrs horizon
224 ou_model = models.OrnsteinUhlenbeck(speed_of_mean_reversion = 5.0, volatility=0.1)
225 sim = ou_model.simulate(timegrid, start_value=0.2,rnd=np.random.normal(size=(timegrid.shape[0],1)))
226 ou_model.calibrate(sim.reshape((-1)),dt=1.0/365.0, method = 'minimum_least_square')
227 self.assertAlmostEqual(0.1, ou_model.volatility, places=3)
228 self.assertAlmostEqual(0.0, ou_model.mean_reversion_level, places=2)
229 self.assertAlmostEqual(5.0, ou_model.speed_of_mean_reversion, delta=0.5)
231 def test_expectation(self):
232 """Simple test for analytical expectation: Compare analytical expectation with MC expectation
233 """
234 np.random.seed(42)
235 timegrid = np.linspace(0.0,1.0,365) # simulate on daily timegrid over 1 yr horizon
236 ou_model = models.OrnsteinUhlenbeck(speed_of_mean_reversion = 5.0, volatility=0.1)
237 n_sims = 1000
238 sim = ou_model.simulate(timegrid, start_value=0.2,rnd=np.random.normal(size=(timegrid.shape[0],n_sims)))
239 self.assertAlmostEqual(sim[-1,:].mean(), ou_model.compute_expected_value(0.2,1.0), places=3)
241 def test_call_price(self):
242 """Simple test for call price: Compare analytical call price with MC call price
243 """
244 K = 1.0
245 T=1.0
246 timegrid = np.linspace(0.0,T,num=240)
247 ou_model = models.OrnsteinUhlenbeck(speed_of_mean_reversion = 2.0, volatility=0.4)
248 n_sims = 10_000
249 sim = ou_model.simulate(timegrid, start_value=0.2,rnd=np.random.normal(size=(timegrid.shape[0],n_sims)))
250 mc_call_price = np.mean(np.maximum(sim[-1,:]-K,0.0))
251 analytical_call_price = ou_model.compute_call_price(0.2, K, T)
252 self.assertAlmostEqual(mc_call_price, analytical_call_price, places=2)
253class LuciaSchwartzTest(unittest.TestCase):
254 def test_expectation(self):
255 """Simple test for analytical expectation: Compare analytical expectation with MC expectation
256 """
257 np.random.seed(42)
258 timegrid = np.linspace(0.0,1.0,365) # simulate on daily timegrid over 1 yr horizon
259 ls_model = models.LuciaSchwartz(rho=-0.9, kappa=1.0, sigma1=0.5, mu=0.15, sigma2=0.3)
260 n_sims = 4_000
261 sim = ls_model.simulate(timegrid, start_value=np.array([0.0,0.0]),rnd=np.random.normal(size=ls_model.rnd_shape(n_sims,timegrid.shape[0])))
262 self.assertAlmostEqual(sim[-1,:].mean(), ls_model.compute_expected_value(x0=np.array([0.0,0.0]), T=1.0), places=2)
264 def test_forward_simulation(self):
265 """Simple test for forward simulation: Compare forward simulation with initial value (martingale)
266 """
267 np.random.seed(42)
268 timegrid = np.linspace(0.0,1.0,365)
269 ls_model = models.LuciaSchwartz(rho=-0.9, kappa=1.0, sigma1=0.5, mu=0.15, sigma2=0.3)
270 n_sims = 40_000
271 sim = ls_model.simulate(timegrid, start_value=np.array([0.0,0.0]),
272 forwards=[(1.0,1.0+1.0/365), (1.0,2.0)],
273 rnd=np.random.normal(size=ls_model.rnd_shape(n_sims,timegrid.shape[0])))
274 initial_fwd_value = ls_model.compute_fwd_value(x0=np.array([0.0,0.0]), T1=1.0, T2=2.0)
275 self.assertAlmostEqual(sim[-1,:,2].mean(), initial_fwd_value, places=2)
276 #check that final spot value and forward value is close
277 self.assertAlmostEqual(sim[-1,:,0].mean(), sim[-1,:,1].mean(), places=2)
279class WindPowerForecastModel(unittest.TestCase):
280 def test_initial_forecast(self):
281 """Simple test for initial forecast: Compare initial forecast with initial value (martingale)
282 and the mean at the first expiry of MC simulation
283 """
284 params = models.WindPowerForecastModelParameter(n_call_strikes=40, min_strike=-7.0, max_strike=7.0)
285 model = models.WindPowerForecastModel('Onshore', speed_of_mean_reversion=0.5, volatility=1.5, params=params)
286 timegrid = np.linspace(0.0,1.0, 365)
287 np.random.seed(42)
288 rnd = np.random.normal(size=model.rnd_shape(1_000, timegrid.shape[0]))
289 forecast = 0.8
290 results = model.simulate(timegrid, rnd, expiries=[1.0], initial_forecasts=[forecast], startvalue=0.0)
291 sim_fwd = results.get('Onshore_FWD0')
292 self.assertAlmostEqual(sim_fwd[-1,:].mean(), forecast, places=2)
293 self.assertAlmostEqual(sim_fwd[0,0], forecast, places=2)
295if __name__ == '__main__':
296 unittest.main()