Coverage for rivapy/models/lucia_schwartz.py: 86%
90 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
1from typing import Union, Callable, Tuple, List
2import numpy as np
3import scipy
4import scipy.integrate
5from rivapy.tools.interfaces import FactoryObject
6from rivapy.models.ornstein_uhlenbeck import OrnsteinUhlenbeck
8class LuciaSchwartz(FactoryObject):
10 def _eval_grid(f, timegrid):
11 if f is None:
12 return np.zeros(timegrid.shape)
13 try:
14 return f(timegrid)
15 except:
16 result = np.full(timegrid.shape, f)
17 return result
19 def __init__(self,
20 rho: float,
21 kappa: Union[float, Callable]=None,
22 sigma1: Union[float, Callable]=None,
23 mu: Union[float, Callable]=None,
24 sigma2:Union[float, Callable]=None,
26 f: Callable[[Union[float, np.ndarray]],Union[float, np.ndarray]]=None):
27 """Lucia Schwartz two factor model.
29 The model may be used to simulate spot/forward prices via
31 .. math::
33 S(t) = f(t) + X_1(t) + X_2(t)
35 dX_1(t) = -\\kappa X_1(t)+\sigma_1dW_1(t)
37 dX_2(t) = \\mu dt + \sigma_2 dW_2
39 where :math:`f(t)` is a deterministic function, :math:`\\kappa` the speed of mean reversion for
40 the first process that may be interpreted as the long-term factor and :math:`\\sigma_1` the respective volatility.
41 The second factor :math:`X_2` may be interpreted as a short-term factor that is influenced by :math:`W_2`
42 and has drift :math:`\\mu`. :math:`X_1` and :math:`X_2` may be correlated with correlation :math:`\\rho`. Note that this class just simulates
45 Args:
46 kappa (Union[float, Callable]): The speed of mean reversion for the first factor :math:`X_1`. Can be either constant or time dependent.
47 sigma1 (Union[float, Callable]): The volatility of the first factor :math:`X_1`. Can be either constant or time dependent.
48 mu (Union[float, Callable]): The drift of teh second factor :math:`X_2`. Can be either constant or time dependent.
49 sigma2 (Union[float, Callable]): The volatility of the second factor :math:`X_2`. Can be either constant or time dependent.
50 rho (float): Correlation between X1 and X2.
51 f (Union[float, Callable], optional): Deterministic function of time. Defaults to 0.
52 """
53 self.X1 = OrnsteinUhlenbeck(kappa, sigma1, 0.0)
54 self.mu = mu
55 self.sigma2 = sigma2
56 self.rho = rho
57 self._timegrid = None
58 self.f = f
60 def _to_dict(self) -> dict:
61 return {'kappa': self.X1.speed_of_mean_reversion, 'sigma1': self.X1.volatility,
62 'mu': self.mu, 'sigma2': self.sigma2, 'f': self.f}
64 def _set_timegrid(self, timegrid: np.ndarray):
65 """
66 Sets the timegrid for simulation.
68 Args:
69 timegrid (np.ndarray): Timegrid for simulation.
70 """
71 self._mu_grid = LuciaSchwartz._eval_grid(self.mu, timegrid)
72 self._sigma2_grid = LuciaSchwartz._eval_grid(self.sigma2, timegrid)
73 self._f_grid = LuciaSchwartz._eval_grid(self.f, timegrid)
75 def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple:
76 return (n_timepoints-1, n_sims, 2)
79 def compute_expected_value(self, x0: np.ndarray, T: Union[float, np.ndarray])->Union[float, np.ndarray]:
80 """
81 Computes the expected value of the model.
83 Args:
84 x0 (Union[float, np.ndarray]): Start value, either onedimensional (containing just the start value for X0 or X1) or twodimensional (different start values for X0 and X1 from e.g. a MC simulation).
85 T (Union[float, np.ndarray]): Terminal time.
87 Returns:
88 Union[float, np.ndarray]: Expected value.
89 """
90 if callable(self.mu):
91 raise NotImplementedError("Only implemented for fixed value of mu.")
92 if callable(self.sigma2):
93 raise NotImplementedError("Only implemented for fixed value of sigma2.")
94 if len(x0.shape)==1:
95 return self.X1.compute_expected_value(x0[0], T) + x0[1]+self.mu*T
96 return self.X1.compute_expected_value(x0[:,0], T) + x0[:,1]+self.mu*T
98 def compute_fwd_value(self, x0: np.ndarray, T1:float, T2:float,
99 qm: Callable[[Callable,float],float]=None,
100 **qm_kwargs)->float:
101 """Compute the forward value for a forward (swap) with continuos delivery between two time points.
103 In more detail, the forward value is computed as
105 .. math::
107 F(t,t_1,t_2) = \\frac{E_t[\\int_{T_1}^{T_2} F(t,s) ds]}{T_2-T_1}
109 where :math:`F(t,s)` is the expected spot price at time :math:`T`. The expectation is
110 taken under the risk neutral measure :math:`Q^M`.
112 Args:
113 T1 (float): Start point of period.
114 T2 (float): End point of period. If None, the expected value of the spot price :math:`F(t,T_1)` at T1 is returned.
115 qm (Callable[[Callable,float,float]], optional): Quadrature method used for the integral. If None, scipy.integrate.romberg will be used. Defaults to None.
116 **qm_kwargs: Keyword arguments for the quadrature method.
118 Returns:
119 float: Forward value.
120 """
121 if T2 is None:
122 return self.compute_expected_value(x0,T1)
123 if T2<=T1:
124 raise ValueError("T2 must be larger than T1.")
125 if qm is None:
126 qm = scipy.integrate.quad
127 result, err = qm(lambda t: self.compute_expected_value(x0,t), T1, T2, **qm_kwargs)
128 return result/(T2-T1)
130 def simulate(self, timegrid, start_value, rnd,
131 forwards:List[Tuple[float,float]]=None,
132 forward_start_values:np.ndarray=None,):
133 """
134 Simulates the model.
136 Args:
137 timegrid (np.ndarray): Timegrid for simulation.
138 start_value (Union[float, np.ndarray]): Start value for simulation.
139 rnd (np.ndarray): Random numbers for simulation.
140 forwards (List[Tuple[float,float]], optional): Forwards for simulation. Defaults to None.
141 forward_start_values (np.ndarray, optional): Initial values for forwards.
142 Defaults to None. If forwards is specified and this argument is None,
143 the initial values will be computed with the method compute_fwd_value.
144 """
145 n_assets = 1
146 if forwards is not None:
147 n_assets = len(forwards)+1
148 self._set_timegrid(timegrid)
149 rnd_ = np.copy(rnd)
150 rnd_[:,:,1] = self.rho*rnd[:,:,0] + np.sqrt(1.0-self.rho**2)*rnd[:,:,1]
151 X2 = np.empty((timegrid.shape[0],rnd.shape[1],))
152 if len(start_value.shape)==2:
153 start_X1 = start_value[:,0]
154 X2[0,:] = start_value[:,1]
155 else:
156 start_X1 = start_value[0]
157 X2[0,:] = start_value[1]
158 X1 = self.X1.simulate(timegrid, start_value=start_X1, rnd=rnd_[:,:,0])
159 tmp = self._mu_grid[:-1]*self.X1._delta_t
160 tmp2 = self._sigma2_grid[:-1]*self.X1._sqrt_delta_t
161 X2[1:,:] = tmp[:,np.newaxis] + tmp2[:,np.newaxis]*rnd_[:,:,1]
162 X2 = X2.cumsum(axis=0)
163 #for i in range(timegrid.shape[0]-1):
164 # X2[i+1,:] = X2[i,:] + self._mu_grid[i]*self.X1._delta_t[i] + self._sigma2_grid[i]*rnd[i,:,1]*self.X1._sqrt_delta_t[i]
165 if forwards is not None:
166 if forward_start_values is None:
167 forward_start_values = np.empty((len(forwards),))
168 for i in range(len(forwards)):
169 forward_start_values[i] = self.compute_fwd_value(start_value, forwards[i][0], forwards[i][1])
171 result = np.full((timegrid.shape[0], rnd.shape[1], n_assets), np.nan)
172 result[:,:,0] = X1 + X2 + self._f_grid[:,np.newaxis]
173 dW1 = self.X1._volatility_grid[:1,np.newaxis]*rnd_[:,:,0]*self.X1._sqrt_delta_t[:,np.newaxis]
174 dW2 = self._sigma2_grid[:1,np.newaxis]*rnd_[:,:,0]*self.X1._sqrt_delta_t[:,np.newaxis]
175 for fwd in range(len(forwards)):
176 tt_T2 = forwards[fwd][1]-timegrid
177 tt_T1 = forwards[fwd][0]-timegrid
178 kappa = self.X1._speed_of_mean_reversion_grid
179 tmp = -(np.exp(-kappa*tt_T2) - np.exp(-kappa*tt_T1))/(kappa*(forwards[fwd][1]-forwards[fwd][0]))
180 result[0,:,fwd+1] = forward_start_values[fwd]
181 result[1:,:,fwd+1] = tmp[:1,np.newaxis]*dW1 + dW2
182 result[:,:,fwd+1] = result[:,:,fwd+1].cumsum(axis=0)
183 else:
184 return X1 + X2 + self._f_grid[:,np.newaxis]
185 return result
187if __name__=='__main__':
188 ls_model = LuciaSchwartz(rho=-.81, kappa = 0.077, sigma1 = 0.1, mu=-0.29, sigma2=0.1)
189 n_sims = 10_000
190 timegrid = np.linspace(0.0,1.0,365)
191 rnd = np.random.normal(size=ls_model.rnd_shape(n_sims, timegrid.shape[0]))
192 paths = ls_model.simulate(timegrid, start_value=np.array([0.0,0.0]), rnd=rnd, forwards=[(1.0,2.0)])