Coverage for rivapy/models/lucia_schwartz.py: 86%

90 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-05 14:27 +0000

1from typing import Union, Callable, Tuple, List 

2import numpy as np 

3import scipy 

4import scipy.integrate 

5from rivapy.tools.interfaces import FactoryObject 

6from rivapy.models.ornstein_uhlenbeck import OrnsteinUhlenbeck 

7 

8class LuciaSchwartz(FactoryObject): 

9 

10 def _eval_grid(f, timegrid): 

11 if f is None: 

12 return np.zeros(timegrid.shape) 

13 try: 

14 return f(timegrid) 

15 except: 

16 result = np.full(timegrid.shape, f) 

17 return result 

18 

19 def __init__(self, 

20 rho: float, 

21 kappa: Union[float, Callable]=None, 

22 sigma1: Union[float, Callable]=None, 

23 mu: Union[float, Callable]=None, 

24 sigma2:Union[float, Callable]=None, 

25 

26 f: Callable[[Union[float, np.ndarray]],Union[float, np.ndarray]]=None): 

27 """Lucia Schwartz two factor model. 

28 

29 The model may be used to simulate spot/forward prices via 

30 

31 .. math::  

32 

33 S(t) = f(t) + X_1(t) + X_2(t) 

34 

35 dX_1(t) = -\\kappa X_1(t)+\sigma_1dW_1(t) 

36 

37 dX_2(t) = \\mu dt + \sigma_2 dW_2 

38  

39 where :math:`f(t)` is a deterministic function, :math:`\\kappa` the speed of mean reversion for  

40 the first process that may be interpreted as the long-term factor and :math:`\\sigma_1` the respective volatility. 

41 The second factor :math:`X_2` may be interpreted as a short-term factor that is influenced by :math:`W_2` 

42 and has drift :math:`\\mu`. :math:`X_1` and :math:`X_2` may be correlated with correlation :math:`\\rho`. Note that this class just simulates  

43 

44  

45 Args: 

46 kappa (Union[float, Callable]): The speed of mean reversion for the first factor :math:`X_1`. Can be either constant or time dependent. 

47 sigma1 (Union[float, Callable]): The volatility of the first factor :math:`X_1`. Can be either constant or time dependent. 

48 mu (Union[float, Callable]): The drift of teh second factor :math:`X_2`. Can be either constant or time dependent. 

49 sigma2 (Union[float, Callable]): The volatility of the second factor :math:`X_2`. Can be either constant or time dependent. 

50 rho (float): Correlation between X1 and X2. 

51 f (Union[float, Callable], optional): Deterministic function of time. Defaults to 0. 

52 """ 

53 self.X1 = OrnsteinUhlenbeck(kappa, sigma1, 0.0) 

54 self.mu = mu 

55 self.sigma2 = sigma2 

56 self.rho = rho 

57 self._timegrid = None 

58 self.f = f 

59 

60 def _to_dict(self) -> dict: 

61 return {'kappa': self.X1.speed_of_mean_reversion, 'sigma1': self.X1.volatility, 

62 'mu': self.mu, 'sigma2': self.sigma2, 'f': self.f} 

63 

64 def _set_timegrid(self, timegrid: np.ndarray): 

65 """ 

66 Sets the timegrid for simulation. 

67 

68 Args: 

69 timegrid (np.ndarray): Timegrid for simulation. 

70 """ 

71 self._mu_grid = LuciaSchwartz._eval_grid(self.mu, timegrid) 

72 self._sigma2_grid = LuciaSchwartz._eval_grid(self.sigma2, timegrid) 

73 self._f_grid = LuciaSchwartz._eval_grid(self.f, timegrid) 

74 

75 def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple: 

76 return (n_timepoints-1, n_sims, 2) 

77 

78 

79 def compute_expected_value(self, x0: np.ndarray, T: Union[float, np.ndarray])->Union[float, np.ndarray]: 

80 """ 

81 Computes the expected value of the model. 

82 

83 Args: 

84 x0 (Union[float, np.ndarray]): Start value, either onedimensional (containing just the start value for X0 or X1) or twodimensional (different start values for X0 and X1 from e.g. a MC simulation). 

85 T (Union[float, np.ndarray]): Terminal time. 

86 

87 Returns: 

88 Union[float, np.ndarray]: Expected value. 

89 """ 

90 if callable(self.mu): 

91 raise NotImplementedError("Only implemented for fixed value of mu.") 

92 if callable(self.sigma2): 

93 raise NotImplementedError("Only implemented for fixed value of sigma2.") 

94 if len(x0.shape)==1: 

95 return self.X1.compute_expected_value(x0[0], T) + x0[1]+self.mu*T 

96 return self.X1.compute_expected_value(x0[:,0], T) + x0[:,1]+self.mu*T 

97 

98 def compute_fwd_value(self, x0: np.ndarray, T1:float, T2:float, 

99 qm: Callable[[Callable,float],float]=None, 

100 **qm_kwargs)->float: 

101 """Compute the forward value for a forward (swap) with continuos delivery between two time points. 

102  

103 In more detail, the forward value is computed as 

104 

105 .. math:: 

106 

107 F(t,t_1,t_2) = \\frac{E_t[\\int_{T_1}^{T_2} F(t,s) ds]}{T_2-T_1} 

108 

109 where :math:`F(t,s)` is the expected spot price at time :math:`T`. The expectation is 

110 taken under the risk neutral measure :math:`Q^M`. 

111 

112 Args: 

113 T1 (float): Start point of period. 

114 T2 (float): End point of period. If None, the expected value of the spot price :math:`F(t,T_1)` at T1 is returned. 

115 qm (Callable[[Callable,float,float]], optional): Quadrature method used for the integral. If None, scipy.integrate.romberg will be used. Defaults to None. 

116 **qm_kwargs: Keyword arguments for the quadrature method. 

117 

118 Returns: 

119 float: Forward value. 

120 """ 

121 if T2 is None: 

122 return self.compute_expected_value(x0,T1) 

123 if T2<=T1: 

124 raise ValueError("T2 must be larger than T1.") 

125 if qm is None: 

126 qm = scipy.integrate.quad 

127 result, err = qm(lambda t: self.compute_expected_value(x0,t), T1, T2, **qm_kwargs) 

128 return result/(T2-T1) 

129 

130 def simulate(self, timegrid, start_value, rnd, 

131 forwards:List[Tuple[float,float]]=None, 

132 forward_start_values:np.ndarray=None,): 

133 """ 

134 Simulates the model. 

135 

136 Args: 

137 timegrid (np.ndarray): Timegrid for simulation. 

138 start_value (Union[float, np.ndarray]): Start value for simulation. 

139 rnd (np.ndarray): Random numbers for simulation. 

140 forwards (List[Tuple[float,float]], optional): Forwards for simulation. Defaults to None. 

141 forward_start_values (np.ndarray, optional): Initial values for forwards.  

142 Defaults to None. If forwards is specified and this argument is None,  

143 the initial values will be computed with the method compute_fwd_value. 

144 """ 

145 n_assets = 1 

146 if forwards is not None: 

147 n_assets = len(forwards)+1 

148 self._set_timegrid(timegrid) 

149 rnd_ = np.copy(rnd) 

150 rnd_[:,:,1] = self.rho*rnd[:,:,0] + np.sqrt(1.0-self.rho**2)*rnd[:,:,1] 

151 X2 = np.empty((timegrid.shape[0],rnd.shape[1],)) 

152 if len(start_value.shape)==2: 

153 start_X1 = start_value[:,0] 

154 X2[0,:] = start_value[:,1] 

155 else: 

156 start_X1 = start_value[0] 

157 X2[0,:] = start_value[1] 

158 X1 = self.X1.simulate(timegrid, start_value=start_X1, rnd=rnd_[:,:,0]) 

159 tmp = self._mu_grid[:-1]*self.X1._delta_t 

160 tmp2 = self._sigma2_grid[:-1]*self.X1._sqrt_delta_t 

161 X2[1:,:] = tmp[:,np.newaxis] + tmp2[:,np.newaxis]*rnd_[:,:,1] 

162 X2 = X2.cumsum(axis=0) 

163 #for i in range(timegrid.shape[0]-1): 

164 # X2[i+1,:] = X2[i,:] + self._mu_grid[i]*self.X1._delta_t[i] + self._sigma2_grid[i]*rnd[i,:,1]*self.X1._sqrt_delta_t[i] 

165 if forwards is not None: 

166 if forward_start_values is None: 

167 forward_start_values = np.empty((len(forwards),)) 

168 for i in range(len(forwards)): 

169 forward_start_values[i] = self.compute_fwd_value(start_value, forwards[i][0], forwards[i][1]) 

170 

171 result = np.full((timegrid.shape[0], rnd.shape[1], n_assets), np.nan) 

172 result[:,:,0] = X1 + X2 + self._f_grid[:,np.newaxis] 

173 dW1 = self.X1._volatility_grid[:1,np.newaxis]*rnd_[:,:,0]*self.X1._sqrt_delta_t[:,np.newaxis] 

174 dW2 = self._sigma2_grid[:1,np.newaxis]*rnd_[:,:,0]*self.X1._sqrt_delta_t[:,np.newaxis] 

175 for fwd in range(len(forwards)): 

176 tt_T2 = forwards[fwd][1]-timegrid 

177 tt_T1 = forwards[fwd][0]-timegrid 

178 kappa = self.X1._speed_of_mean_reversion_grid 

179 tmp = -(np.exp(-kappa*tt_T2) - np.exp(-kappa*tt_T1))/(kappa*(forwards[fwd][1]-forwards[fwd][0])) 

180 result[0,:,fwd+1] = forward_start_values[fwd] 

181 result[1:,:,fwd+1] = tmp[:1,np.newaxis]*dW1 + dW2 

182 result[:,:,fwd+1] = result[:,:,fwd+1].cumsum(axis=0) 

183 else: 

184 return X1 + X2 + self._f_grid[:,np.newaxis] 

185 return result 

186 

187if __name__=='__main__': 

188 ls_model = LuciaSchwartz(rho=-.81, kappa = 0.077, sigma1 = 0.1, mu=-0.29, sigma2=0.1) 

189 n_sims = 10_000 

190 timegrid = np.linspace(0.0,1.0,365) 

191 rnd = np.random.normal(size=ls_model.rnd_shape(n_sims, timegrid.shape[0])) 

192 paths = ls_model.simulate(timegrid, start_value=np.array([0.0,0.0]), rnd=rnd, forwards=[(1.0,2.0)])