Coverage for rivapy/models/ornstein_uhlenbeck.py: 63%

109 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-05 14:27 +0000

1from typing import Union, Callable 

2import numpy as np 

3import scipy 

4from rivapy.tools.interfaces import FactoryObject 

5 

6class OrnsteinUhlenbeck(FactoryObject): 

7 

8 def _eval_grid(f, timegrid): 

9 try: 

10 return f(timegrid) 

11 except: 

12 result = np.full(timegrid.shape, f) 

13 return result 

14 

15 def __init__(self, speed_of_mean_reversion: Union[float, Callable], 

16 volatility: Union[float, Callable], 

17 mean_reversion_level: Union[float, Callable] = 0): 

18 """Ornstein Uhlenbeck stochastic process. 

19 

20 .. math:: dX = \\lambda(t) (\\theta(t)-X)dt + \\sigma(t) dW_t 

21  

22 where :math:`\\lambda(t)` is the speed of mean reversion that determines how fast the process returns to the 

23 so-called mean reversion level :math:`\\theta(t)` and :math:`\sigma` is the volatility of the process. The higher 

24 :math:`\\lambda`, the faster the process return to the mean level, which can be seen in the following figure 

25 

26  

27 Args: 

28 speed_of_mean_reversion (Union[float, Callable]): The  

29 volatility (Union[float, Callable]): _description_ 

30 mean_reversion_level (Union[float, Callable], optional): _description_. Defaults to 0. 

31 """ 

32 self.speed_of_mean_reversion = speed_of_mean_reversion 

33 self.mean_reversion_level = mean_reversion_level 

34 self.volatility = volatility 

35 self._timegrid = None 

36 

37 def _to_dict(self) -> dict: 

38 return {'speed_of_mean_reversion': self.speed_of_mean_reversion, 'volatility': self.volatility, 

39 'mean_reversion_level': self.mean_reversion_level} 

40 

41 def _set_timegrid(self, timegrid): 

42 self._timegrid = np.copy(timegrid) 

43 self._delta_t = self._timegrid[1:]-self._timegrid[:-1] 

44 self._sqrt_delta_t = np.sqrt(self._delta_t) 

45 

46 self._speed_of_mean_reversion_grid = OrnsteinUhlenbeck._eval_grid(self.speed_of_mean_reversion, timegrid) 

47 self._volatility_grid = OrnsteinUhlenbeck._eval_grid(self.volatility, timegrid) 

48 self._mean_reversion_level_grid = OrnsteinUhlenbeck._eval_grid(self.mean_reversion_level, timegrid) 

49 

50 def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple: 

51 return (n_timepoints-1, n_sims) 

52 

53 

54 def simulate(self, timegrid, start_value, rnd): 

55 """ Simulate the Ornstein Uhlenbeck process on the given timegrid using simple explicit euler scheme: 

56  

57 .. math::  

58  

59 X_{t+\\delta t} = X_t + \\theta (\\mu(t) - X_t )\\delta t +\\sigma(t) \\varepsilon \\sqrt{\delta t} 

60 

61 where :math:`\\varepsilon` is a (0,1)-normal random variate. 

62  

63 Args: 

64 timegrid (np.ndarray): One dimensional array containing the time points where the process will be simulated (containing 0.0 as the first timepoint). 

65 start_value (Union[float, np.ndarray]): Either a float or an array (for each path) with the start value of the simulation. 

66 rnd (np.ndarray): Array of random normal (variance equal to one) variates used within the discretization (:math:`\varepsilon` in the above description). Here, shape[0] equals the number of timestes and shape[1] teh number of simulations. 

67 

68 Returns: 

69 np.ndarray: Array r containing the simulations where r[:,i] is the path of the i-th simulation (r.shape[0] equals number of timepoints, r.shape[1] the number of simulations).  

70 """ 

71 self._set_timegrid(timegrid) 

72 result = np.empty((self._timegrid.shape[0], rnd.shape[1])) 

73 result[0,:] = start_value 

74 

75 for i in range(self._timegrid.shape[0]-1): 

76 result[i+1,:] = (result[i, :] * np.exp(-self._speed_of_mean_reversion_grid[i]*self._delta_t[i]) 

77 + self._mean_reversion_level_grid[i]* (1 - np.exp(-self._speed_of_mean_reversion_grid[i]*self._delta_t[i])) 

78 + self._volatility_grid[i]* np.sqrt((1 - np.exp(-2*self._speed_of_mean_reversion_grid[i]*self._delta_t[i])) / (2*self._speed_of_mean_reversion_grid[i])) * rnd[i,:] 

79 ) 

80 return result 

81 

82 def compute_expected_value(self, x0: Union[float, np.ndarray], T: float): 

83 if callable(self.speed_of_mean_reversion): 

84 raise NotImplementedError("Expected value is only implemented for constant speed of mean reversion") 

85 if callable(self.volatility): 

86 raise NotImplementedError("Expected value is only implemented for constant volatility") 

87 if callable(self.mean_reversion_level): 

88 raise NotImplementedError("Expected value is only implemented for constant mean reversion level") 

89 return x0*np.exp(-self.speed_of_mean_reversion*T) + self.mean_reversion_level*(1.0-np.exp(-self.speed_of_mean_reversion*T)) 

90 

91 def compute_call_price(self, X0: Union[float, np.ndarray], K: float, ttm: float): 

92 """Computes the price of a call option with strike K and time to maturity ttm for a spot following the Ornstein-Uhlenbeck process. 

93 

94 It works only for constant speed of mean reversion, volatility and mean reversion level and throws a NotImplementedError otherwise. It does not encounter dividends or interest rates. 

95 

96 Args: 

97 X0 (Union[float, np.ndarray]): Start value of the process. 

98 K (float): Strike of the call option. 

99 ttm (float): Time to maturity of the call option. 

100 

101 Raises: 

102 NotImplementedError: If speed of mean reversion, volatility or mean reversion level are not constant. 

103  

104 Returns: 

105 float: Price of the call option. 

106 """ 

107 if callable(self.speed_of_mean_reversion): 

108 raise NotImplementedError("Expected value is only implemented for constant speed of mean reversion") 

109 if callable(self.volatility): 

110 raise NotImplementedError("Expected value is only implemented for constant volatility") 

111 if callable(self.mean_reversion_level): 

112 raise NotImplementedError("Expected value is only implemented for constant mean reversion level") 

113 if ttm < 1e-5: 

114 return np.maximum(X0 - K, 0.0) 

115 g = X0 * np.exp(-self.speed_of_mean_reversion * ttm) + self.mean_reversion_level * (1.0 - np.exp(-self.speed_of_mean_reversion * ttm)) 

116 sigma_bar = self.volatility * self.volatility * (1.0 / ( 2.0 * self.speed_of_mean_reversion)) 

117 sigma_bar = sigma_bar * (1.0 - np.exp(-2.0 * self.speed_of_mean_reversion * ttm)) 

118 sigma_bar = np.sqrt(sigma_bar) 

119 d = (g - K) / sigma_bar 

120 return (g - K) * scipy.stats.norm.cdf(d) + sigma_bar * scipy.stats.norm.pdf(d) 

121 

122 def apply_mc_step(self, x: np.ndarray, 

123 t0: float, t1: float, 

124 rnd: np.ndarray, 

125 inplace: bool = True, 

126 slv: np.ndarray= None): 

127 if not inplace: 

128 x_ = x.copy() 

129 else: 

130 x_ = x 

131 dt = t1-t0 

132 sqrt_dt = np.sqrt(dt) 

133 try: 

134 mu = self.speed_of_mean_reversion(t0) 

135 except: 

136 mu = self.speed_of_mean_reversion 

137 try: 

138 sigma = self.volatility(t0) 

139 except: 

140 sigma = self.volatility 

141 x_ = (1.0 - mu*dt)*x + sigma*sqrt_dt*rnd 

142 return x_ 

143 

144 def conditional_probability_density(self, X_delta_t, delta_t, X0, 

145 volatility=None, 

146 speed_of_mean_reversion=None, 

147 mean_reversion_level = None): 

148 if volatility is None: 

149 volatility = self.volatility 

150 if speed_of_mean_reversion is None: 

151 speed_of_mean_reversion = self.speed_of_mean_reversion 

152 if mean_reversion_level is None: 

153 mean_reversion_level = self.mean_reversion_level 

154 volatility_2_ = volatility**2*(1.0-np.exp(-2.0*speed_of_mean_reversion*delta_t))/(2.0*speed_of_mean_reversion) 

155 result = 1.0/(2.0*np.pi*volatility_2_)*np.exp( 

156 -(X_delta_t-X0*np.exp(-speed_of_mean_reversion*delta_t) 

157 -mean_reversion_level*(1.0-np.exp(-speed_of_mean_reversion*delta_t))) 

158 /(2.0*volatility_2_)) 

159 return result 

160 

161 def calibrate(self, data: np.ndarray, dt: float, method: str='maximum_likelihood'): 

162 """Calibrate the Ornstein Uhlenbeck model with constant parameters to the given data. 

163 

164 Args: 

165 data (np.ndarray): Array of values the model is fitted to (uniform timegrid is assumed). 

166 dt (float): Time step size between two datapoints from data (uniform timegrid is assumed. 

167 method (str, optional): Determines if maximum likelihood ('maximum_likelihood') or minimum least square ('minimum_least_square') is used for calibration. Defaults to 'maximum_likelihood'. 

168 """ 

169 Sx = (data[:-1]).sum() 

170 Sy = (data[1:]).sum() 

171 Sxx = (data[:-1]**2).sum() 

172 Syy = (data[1:]**2).sum() 

173 Sxy = (data[:-1] * data[1:]).sum() 

174 n = data[:-1].shape[0] 

175 if method == 'maximum_likelihood': 

176 mu = (Sy*Sxx - Sx*Sxy) / (n*(Sxx-Sxy) - (Sx**2-Sx*Sy)) 

177 if ((Sxy-mu*(Sx+Sy-n*mu))/(Sxx-2*mu*Sx+n*mu**2)) <= 0: 

178 raise Exception('Calibration failed.') 

179 speed_mr = -1/dt * np.log((Sxy-mu*(Sx+Sy-n*mu))/(Sxx-2*mu*Sx+n*mu**2)) 

180 alpha = np.exp(-speed_mr*dt) 

181 sigma_2 = 1/n * (Syy-2*alpha*Sxy+alpha**2*Sxx-2*mu*(1-alpha)*(Sy-alpha*Sx)+n*mu**2*(1-alpha)**2) 

182 sigma = np.sqrt(sigma_2 * (2*speed_mr) / (1-alpha**2)) 

183 elif method == 'minimum_least_square': 

184 a = (n*Sxy - Sx*Sy) / (n*Sxx - Sx**2) 

185 b = (Sy - a*Sx) / n 

186 sd = np.sqrt((n*Syy-Sy**2 - a*(n*Sxy-Sx*Sy)) / (n*(n-2))) 

187 speed_mr = -np.log(a) / dt 

188 mu = b / (1-a) 

189 sigma = sd * np.sqrt((-2*np.log(a)) / (dt*(1-a**2))) 

190 else: 

191 raise ValueError('Fitting method not defined ('+method+')') 

192 self.speed_of_mean_reversion = speed_mr 

193 self.volatility = sigma 

194 self.mean_reversion_level = mu