Coverage for rivapy/models/ornstein_uhlenbeck.py: 63%
109 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
1from typing import Union, Callable
2import numpy as np
3import scipy
4from rivapy.tools.interfaces import FactoryObject
6class OrnsteinUhlenbeck(FactoryObject):
8 def _eval_grid(f, timegrid):
9 try:
10 return f(timegrid)
11 except:
12 result = np.full(timegrid.shape, f)
13 return result
15 def __init__(self, speed_of_mean_reversion: Union[float, Callable],
16 volatility: Union[float, Callable],
17 mean_reversion_level: Union[float, Callable] = 0):
18 """Ornstein Uhlenbeck stochastic process.
20 .. math:: dX = \\lambda(t) (\\theta(t)-X)dt + \\sigma(t) dW_t
22 where :math:`\\lambda(t)` is the speed of mean reversion that determines how fast the process returns to the
23 so-called mean reversion level :math:`\\theta(t)` and :math:`\sigma` is the volatility of the process. The higher
24 :math:`\\lambda`, the faster the process return to the mean level, which can be seen in the following figure
27 Args:
28 speed_of_mean_reversion (Union[float, Callable]): The
29 volatility (Union[float, Callable]): _description_
30 mean_reversion_level (Union[float, Callable], optional): _description_. Defaults to 0.
31 """
32 self.speed_of_mean_reversion = speed_of_mean_reversion
33 self.mean_reversion_level = mean_reversion_level
34 self.volatility = volatility
35 self._timegrid = None
37 def _to_dict(self) -> dict:
38 return {'speed_of_mean_reversion': self.speed_of_mean_reversion, 'volatility': self.volatility,
39 'mean_reversion_level': self.mean_reversion_level}
41 def _set_timegrid(self, timegrid):
42 self._timegrid = np.copy(timegrid)
43 self._delta_t = self._timegrid[1:]-self._timegrid[:-1]
44 self._sqrt_delta_t = np.sqrt(self._delta_t)
46 self._speed_of_mean_reversion_grid = OrnsteinUhlenbeck._eval_grid(self.speed_of_mean_reversion, timegrid)
47 self._volatility_grid = OrnsteinUhlenbeck._eval_grid(self.volatility, timegrid)
48 self._mean_reversion_level_grid = OrnsteinUhlenbeck._eval_grid(self.mean_reversion_level, timegrid)
50 def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple:
51 return (n_timepoints-1, n_sims)
54 def simulate(self, timegrid, start_value, rnd):
55 """ Simulate the Ornstein Uhlenbeck process on the given timegrid using simple explicit euler scheme:
57 .. math::
59 X_{t+\\delta t} = X_t + \\theta (\\mu(t) - X_t )\\delta t +\\sigma(t) \\varepsilon \\sqrt{\delta t}
61 where :math:`\\varepsilon` is a (0,1)-normal random variate.
63 Args:
64 timegrid (np.ndarray): One dimensional array containing the time points where the process will be simulated (containing 0.0 as the first timepoint).
65 start_value (Union[float, np.ndarray]): Either a float or an array (for each path) with the start value of the simulation.
66 rnd (np.ndarray): Array of random normal (variance equal to one) variates used within the discretization (:math:`\varepsilon` in the above description). Here, shape[0] equals the number of timestes and shape[1] teh number of simulations.
68 Returns:
69 np.ndarray: Array r containing the simulations where r[:,i] is the path of the i-th simulation (r.shape[0] equals number of timepoints, r.shape[1] the number of simulations).
70 """
71 self._set_timegrid(timegrid)
72 result = np.empty((self._timegrid.shape[0], rnd.shape[1]))
73 result[0,:] = start_value
75 for i in range(self._timegrid.shape[0]-1):
76 result[i+1,:] = (result[i, :] * np.exp(-self._speed_of_mean_reversion_grid[i]*self._delta_t[i])
77 + self._mean_reversion_level_grid[i]* (1 - np.exp(-self._speed_of_mean_reversion_grid[i]*self._delta_t[i]))
78 + self._volatility_grid[i]* np.sqrt((1 - np.exp(-2*self._speed_of_mean_reversion_grid[i]*self._delta_t[i])) / (2*self._speed_of_mean_reversion_grid[i])) * rnd[i,:]
79 )
80 return result
82 def compute_expected_value(self, x0: Union[float, np.ndarray], T: float):
83 if callable(self.speed_of_mean_reversion):
84 raise NotImplementedError("Expected value is only implemented for constant speed of mean reversion")
85 if callable(self.volatility):
86 raise NotImplementedError("Expected value is only implemented for constant volatility")
87 if callable(self.mean_reversion_level):
88 raise NotImplementedError("Expected value is only implemented for constant mean reversion level")
89 return x0*np.exp(-self.speed_of_mean_reversion*T) + self.mean_reversion_level*(1.0-np.exp(-self.speed_of_mean_reversion*T))
91 def compute_call_price(self, X0: Union[float, np.ndarray], K: float, ttm: float):
92 """Computes the price of a call option with strike K and time to maturity ttm for a spot following the Ornstein-Uhlenbeck process.
94 It works only for constant speed of mean reversion, volatility and mean reversion level and throws a NotImplementedError otherwise. It does not encounter dividends or interest rates.
96 Args:
97 X0 (Union[float, np.ndarray]): Start value of the process.
98 K (float): Strike of the call option.
99 ttm (float): Time to maturity of the call option.
101 Raises:
102 NotImplementedError: If speed of mean reversion, volatility or mean reversion level are not constant.
104 Returns:
105 float: Price of the call option.
106 """
107 if callable(self.speed_of_mean_reversion):
108 raise NotImplementedError("Expected value is only implemented for constant speed of mean reversion")
109 if callable(self.volatility):
110 raise NotImplementedError("Expected value is only implemented for constant volatility")
111 if callable(self.mean_reversion_level):
112 raise NotImplementedError("Expected value is only implemented for constant mean reversion level")
113 if ttm < 1e-5:
114 return np.maximum(X0 - K, 0.0)
115 g = X0 * np.exp(-self.speed_of_mean_reversion * ttm) + self.mean_reversion_level * (1.0 - np.exp(-self.speed_of_mean_reversion * ttm))
116 sigma_bar = self.volatility * self.volatility * (1.0 / ( 2.0 * self.speed_of_mean_reversion))
117 sigma_bar = sigma_bar * (1.0 - np.exp(-2.0 * self.speed_of_mean_reversion * ttm))
118 sigma_bar = np.sqrt(sigma_bar)
119 d = (g - K) / sigma_bar
120 return (g - K) * scipy.stats.norm.cdf(d) + sigma_bar * scipy.stats.norm.pdf(d)
122 def apply_mc_step(self, x: np.ndarray,
123 t0: float, t1: float,
124 rnd: np.ndarray,
125 inplace: bool = True,
126 slv: np.ndarray= None):
127 if not inplace:
128 x_ = x.copy()
129 else:
130 x_ = x
131 dt = t1-t0
132 sqrt_dt = np.sqrt(dt)
133 try:
134 mu = self.speed_of_mean_reversion(t0)
135 except:
136 mu = self.speed_of_mean_reversion
137 try:
138 sigma = self.volatility(t0)
139 except:
140 sigma = self.volatility
141 x_ = (1.0 - mu*dt)*x + sigma*sqrt_dt*rnd
142 return x_
144 def conditional_probability_density(self, X_delta_t, delta_t, X0,
145 volatility=None,
146 speed_of_mean_reversion=None,
147 mean_reversion_level = None):
148 if volatility is None:
149 volatility = self.volatility
150 if speed_of_mean_reversion is None:
151 speed_of_mean_reversion = self.speed_of_mean_reversion
152 if mean_reversion_level is None:
153 mean_reversion_level = self.mean_reversion_level
154 volatility_2_ = volatility**2*(1.0-np.exp(-2.0*speed_of_mean_reversion*delta_t))/(2.0*speed_of_mean_reversion)
155 result = 1.0/(2.0*np.pi*volatility_2_)*np.exp(
156 -(X_delta_t-X0*np.exp(-speed_of_mean_reversion*delta_t)
157 -mean_reversion_level*(1.0-np.exp(-speed_of_mean_reversion*delta_t)))
158 /(2.0*volatility_2_))
159 return result
161 def calibrate(self, data: np.ndarray, dt: float, method: str='maximum_likelihood'):
162 """Calibrate the Ornstein Uhlenbeck model with constant parameters to the given data.
164 Args:
165 data (np.ndarray): Array of values the model is fitted to (uniform timegrid is assumed).
166 dt (float): Time step size between two datapoints from data (uniform timegrid is assumed.
167 method (str, optional): Determines if maximum likelihood ('maximum_likelihood') or minimum least square ('minimum_least_square') is used for calibration. Defaults to 'maximum_likelihood'.
168 """
169 Sx = (data[:-1]).sum()
170 Sy = (data[1:]).sum()
171 Sxx = (data[:-1]**2).sum()
172 Syy = (data[1:]**2).sum()
173 Sxy = (data[:-1] * data[1:]).sum()
174 n = data[:-1].shape[0]
175 if method == 'maximum_likelihood':
176 mu = (Sy*Sxx - Sx*Sxy) / (n*(Sxx-Sxy) - (Sx**2-Sx*Sy))
177 if ((Sxy-mu*(Sx+Sy-n*mu))/(Sxx-2*mu*Sx+n*mu**2)) <= 0:
178 raise Exception('Calibration failed.')
179 speed_mr = -1/dt * np.log((Sxy-mu*(Sx+Sy-n*mu))/(Sxx-2*mu*Sx+n*mu**2))
180 alpha = np.exp(-speed_mr*dt)
181 sigma_2 = 1/n * (Syy-2*alpha*Sxy+alpha**2*Sxx-2*mu*(1-alpha)*(Sy-alpha*Sx)+n*mu**2*(1-alpha)**2)
182 sigma = np.sqrt(sigma_2 * (2*speed_mr) / (1-alpha**2))
183 elif method == 'minimum_least_square':
184 a = (n*Sxy - Sx*Sy) / (n*Sxx - Sx**2)
185 b = (Sy - a*Sx) / n
186 sd = np.sqrt((n*Syy-Sy**2 - a*(n*Sxy-Sx*Sy)) / (n*(n-2)))
187 speed_mr = -np.log(a) / dt
188 mu = b / (1-a)
189 sigma = sd * np.sqrt((-2*np.log(a)) / (dt*(1-a**2)))
190 else:
191 raise ValueError('Fitting method not defined ('+method+')')
192 self.speed_of_mean_reversion = speed_mr
193 self.volatility = sigma
194 self.mean_reversion_level = mu