Source code for rivapy.models.ornstein_uhlenbeck

from typing import Union, Callable
import numpy as np
import scipy
from rivapy.tools.interfaces import FactoryObject

[docs] class OrnsteinUhlenbeck(FactoryObject): def _eval_grid(f, timegrid): try: return f(timegrid) except: result = np.full(timegrid.shape, f) return result def __init__(self, speed_of_mean_reversion: Union[float, Callable], volatility: Union[float, Callable], mean_reversion_level: Union[float, Callable] = 0): """Ornstein Uhlenbeck stochastic process. .. math:: dX = \\lambda(t) (\\theta(t)-X)dt + \\sigma(t) dW_t where :math:`\\lambda(t)` is the speed of mean reversion that determines how fast the process returns to the so-called mean reversion level :math:`\\theta(t)` and :math:`\sigma` is the volatility of the process. The higher :math:`\\lambda`, the faster the process return to the mean level, which can be seen in the following figure Args: speed_of_mean_reversion (Union[float, Callable]): The volatility (Union[float, Callable]): _description_ mean_reversion_level (Union[float, Callable], optional): _description_. Defaults to 0. """ self.speed_of_mean_reversion = speed_of_mean_reversion self.mean_reversion_level = mean_reversion_level self.volatility = volatility self._timegrid = None def _to_dict(self) -> dict: return {'speed_of_mean_reversion': self.speed_of_mean_reversion, 'volatility': self.volatility, 'mean_reversion_level': self.mean_reversion_level} def _set_timegrid(self, timegrid): self._timegrid = np.copy(timegrid) self._delta_t = self._timegrid[1:]-self._timegrid[:-1] self._sqrt_delta_t = np.sqrt(self._delta_t) self._speed_of_mean_reversion_grid = OrnsteinUhlenbeck._eval_grid(self.speed_of_mean_reversion, timegrid) self._volatility_grid = OrnsteinUhlenbeck._eval_grid(self.volatility, timegrid) self._mean_reversion_level_grid = OrnsteinUhlenbeck._eval_grid(self.mean_reversion_level, timegrid)
[docs] def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple: return (n_timepoints-1, n_sims)
[docs] def simulate(self, timegrid, start_value, rnd): """ Simulate the Ornstein Uhlenbeck process on the given timegrid using simple explicit euler scheme: .. math:: X_{t+\\delta t} = X_t + \\theta (\\mu(t) - X_t )\\delta t +\\sigma(t) \\varepsilon \\sqrt{\delta t} where :math:`\\varepsilon` is a (0,1)-normal random variate. Args: timegrid (np.ndarray): One dimensional array containing the time points where the process will be simulated (containing 0.0 as the first timepoint). start_value (Union[float, np.ndarray]): Either a float or an array (for each path) with the start value of the simulation. rnd (np.ndarray): Array of random normal (variance equal to one) variates used within the discretization (:math:`\varepsilon` in the above description). Here, shape[0] equals the number of timestes and shape[1] teh number of simulations. Returns: np.ndarray: Array r containing the simulations where r[:,i] is the path of the i-th simulation (r.shape[0] equals number of timepoints, r.shape[1] the number of simulations). """ self._set_timegrid(timegrid) result = np.empty((self._timegrid.shape[0], rnd.shape[1])) result[0,:] = start_value for i in range(self._timegrid.shape[0]-1): result[i+1,:] = (result[i, :] * np.exp(-self._speed_of_mean_reversion_grid[i]*self._delta_t[i]) + self._mean_reversion_level_grid[i]* (1 - np.exp(-self._speed_of_mean_reversion_grid[i]*self._delta_t[i])) + self._volatility_grid[i]* np.sqrt((1 - np.exp(-2*self._speed_of_mean_reversion_grid[i]*self._delta_t[i])) / (2*self._speed_of_mean_reversion_grid[i])) * rnd[i,:] ) return result
[docs] def compute_expected_value(self, x0: Union[float, np.ndarray], T: float): if callable(self.speed_of_mean_reversion): raise NotImplementedError("Expected value is only implemented for constant speed of mean reversion") if callable(self.volatility): raise NotImplementedError("Expected value is only implemented for constant volatility") if callable(self.mean_reversion_level): raise NotImplementedError("Expected value is only implemented for constant mean reversion level") return x0*np.exp(-self.speed_of_mean_reversion*T) + self.mean_reversion_level*(1.0-np.exp(-self.speed_of_mean_reversion*T))
[docs] def compute_call_price(self, X0: Union[float, np.ndarray], K: float, ttm: float): """Computes the price of a call option with strike K and time to maturity ttm for a spot following the Ornstein-Uhlenbeck process. It works only for constant speed of mean reversion, volatility and mean reversion level and throws a NotImplementedError otherwise. It does not encounter dividends or interest rates. Args: X0 (Union[float, np.ndarray]): Start value of the process. K (float): Strike of the call option. ttm (float): Time to maturity of the call option. Raises: NotImplementedError: If speed of mean reversion, volatility or mean reversion level are not constant. Returns: float: Price of the call option. """ if callable(self.speed_of_mean_reversion): raise NotImplementedError("Expected value is only implemented for constant speed of mean reversion") if callable(self.volatility): raise NotImplementedError("Expected value is only implemented for constant volatility") if callable(self.mean_reversion_level): raise NotImplementedError("Expected value is only implemented for constant mean reversion level") if ttm < 1e-5: return np.maximum(X0 - K, 0.0) g = X0 * np.exp(-self.speed_of_mean_reversion * ttm) + self.mean_reversion_level * (1.0 - np.exp(-self.speed_of_mean_reversion * ttm)) sigma_bar = self.volatility * self.volatility * (1.0 / ( 2.0 * self.speed_of_mean_reversion)) sigma_bar = sigma_bar * (1.0 - np.exp(-2.0 * self.speed_of_mean_reversion * ttm)) sigma_bar = np.sqrt(sigma_bar) d = (g - K) / sigma_bar return (g - K) * scipy.stats.norm.cdf(d) + sigma_bar * scipy.stats.norm.pdf(d)
[docs] def apply_mc_step(self, x: np.ndarray, t0: float, t1: float, rnd: np.ndarray, inplace: bool = True, slv: np.ndarray= None): if not inplace: x_ = x.copy() else: x_ = x dt = t1-t0 sqrt_dt = np.sqrt(dt) try: mu = self.speed_of_mean_reversion(t0) except: mu = self.speed_of_mean_reversion try: sigma = self.volatility(t0) except: sigma = self.volatility x_ = (1.0 - mu*dt)*x + sigma*sqrt_dt*rnd return x_
[docs] def conditional_probability_density(self, X_delta_t, delta_t, X0, volatility=None, speed_of_mean_reversion=None, mean_reversion_level = None): if volatility is None: volatility = self.volatility if speed_of_mean_reversion is None: speed_of_mean_reversion = self.speed_of_mean_reversion if mean_reversion_level is None: mean_reversion_level = self.mean_reversion_level volatility_2_ = volatility**2*(1.0-np.exp(-2.0*speed_of_mean_reversion*delta_t))/(2.0*speed_of_mean_reversion) result = 1.0/(2.0*np.pi*volatility_2_)*np.exp( -(X_delta_t-X0*np.exp(-speed_of_mean_reversion*delta_t) -mean_reversion_level*(1.0-np.exp(-speed_of_mean_reversion*delta_t))) /(2.0*volatility_2_)) return result
[docs] def calibrate(self, data: np.ndarray, dt: float, method: str='maximum_likelihood'): """Calibrate the Ornstein Uhlenbeck model with constant parameters to the given data. Args: data (np.ndarray): Array of values the model is fitted to (uniform timegrid is assumed). dt (float): Time step size between two datapoints from data (uniform timegrid is assumed. method (str, optional): Determines if maximum likelihood ('maximum_likelihood') or minimum least square ('minimum_least_square') is used for calibration. Defaults to 'maximum_likelihood'. """ Sx = (data[:-1]).sum() Sy = (data[1:]).sum() Sxx = (data[:-1]**2).sum() Syy = (data[1:]**2).sum() Sxy = (data[:-1] * data[1:]).sum() n = data[:-1].shape[0] if method == 'maximum_likelihood': mu = (Sy*Sxx - Sx*Sxy) / (n*(Sxx-Sxy) - (Sx**2-Sx*Sy)) if ((Sxy-mu*(Sx+Sy-n*mu))/(Sxx-2*mu*Sx+n*mu**2)) <= 0: raise Exception('Calibration failed.') speed_mr = -1/dt * np.log((Sxy-mu*(Sx+Sy-n*mu))/(Sxx-2*mu*Sx+n*mu**2)) alpha = np.exp(-speed_mr*dt) sigma_2 = 1/n * (Syy-2*alpha*Sxy+alpha**2*Sxx-2*mu*(1-alpha)*(Sy-alpha*Sx)+n*mu**2*(1-alpha)**2) sigma = np.sqrt(sigma_2 * (2*speed_mr) / (1-alpha**2)) elif method == 'minimum_least_square': a = (n*Sxy - Sx*Sy) / (n*Sxx - Sx**2) b = (Sy - a*Sx) / n sd = np.sqrt((n*Syy-Sy**2 - a*(n*Sxy-Sx*Sy)) / (n*(n-2))) speed_mr = -np.log(a) / dt mu = b / (1-a) sigma = sd * np.sqrt((-2*np.log(a)) / (dt*(1-a**2))) else: raise ValueError('Fitting method not defined ('+method+')') self.speed_of_mean_reversion = speed_mr self.volatility = sigma self.mean_reversion_level = mu