Coverage for rivapy / tools / interpolate.py: 81%

197 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-27 14:36 +0000

1# 2025.07.02 Hans Nguyen 

2# 

3# 

4 

5# ----------------------------------- 

6# #IMPORT Modules 

7from typing import List as _List, Union as _Union, Callable 

8import datetime as dt 

9import abc 

10import pandas as pd 

11import numpy as np 

12from bisect import bisect_left 

13 

14# from scipy.optimize import curve_fit as curve_fit 

15# from scipy.interpolate import interp1d 

16from bisect import bisect_left 

17from rivapy.tools.enums import DayCounterType, InterpolationType, ExtrapolationType 

18from rivapy.tools.datetools import DayCounter 

19 

20 

21# TODO list of interpolaters to implement 

22# ----------------------------------- 

23# FUNCTION def 

24 

25# class InterpolationType(_MyEnum): 

26# CONSTANT = "CONSTANT" 

27# LINEAR = "LINEAR" 

28# LINEAR_LOG = "LINEARLOG" 

29# CONSTRAINED_SPLINE = "CONSTRAINED_SPLINE" 

30# HAGAN = "HAGAN" 

31# HAGAN_DF = "HAGAN_DF" 

32 

33 

34class Interpolator: 

35 

36 def __init__(self, interpolation_type: _Union[str, InterpolationType], extrapolation_type: _Union[str, ExtrapolationType]): 

37 """Constructor for Interpolatr class object. 

38 

39 Args: 

40 interpolation_type (_Union[str, InterpolationType]): The interpolation method to be used 

41 extrapolation_type (_Union[str, ExtrapolationType]): the extrapolation method to be used 

42 """ 

43 self._interpolation_type = InterpolationType.to_string(interpolation_type) 

44 self._extrapolation_type = ExtrapolationType.to_string( 

45 extrapolation_type 

46 ) # TODO is this redundant as we feed extrapolation method into interp as argument? 

47 self._interp = Interpolator.get(self._interpolation_type) 

48 

49 def interp( 

50 self, x_list: list, y_list: list, target_x: _Union[float, _List[float]], extrapolation: _Union[str, ExtrapolationType] 

51 ) -> _Union[float, _List[float]]: 

52 """Wrapper method to execute desired interpolation method. If given a list of targets will return a list. 

53 

54 Args: 

55 x_list (_List[float]): x-values 

56 y_list (_List[float]): y-values 

57 target_x (float,_List[float]): x-value for which a desired y-value is to be determined 

58 

59 Returns: 

60 _Union[float, _List[float]]: return interpolation valuues 

61 """ 

62 

63 extrapolation_type = ExtrapolationType.to_string(extrapolation) 

64 

65 if isinstance(target_x, list): 

66 return [self._interp(x_list, y_list, target_x_, extrapolation_type) for target_x_ in target_x] 

67 else: 

68 return self._interp(x_list, y_list, target_x, extrapolation_type) 

69 

70 @staticmethod 

71 def get(interpolator: _Union[str, InterpolationType]) -> Callable[[list, list, _Union[float, _List[float]], str], float]: 

72 """Mapping function to determine which interpolator to use 

73 

74 Args: 

75 interpolator (_Union[str, InterpolationType]): _description_ 

76 

77 Raises: 

78 NotImplementedError: _description_ 

79 

80 Returns: 

81 Callable[[list, list, _Union[float, _List[float]], str], float]: _description_ 

82 """ 

83 interp = InterpolationType.to_string(interpolator) 

84 # extrap = ExtrapolationType.to_string(extrapolator) 

85 # the assumption at the moment is that for a given interpolation type, the extrapolation type must be the same or CONSTANT 

86 # this is a design choice for the moment 

87 

88 mapping = { 

89 InterpolationType.LINEAR.value: Interpolator.linear, 

90 InterpolationType.LINEAR_LOG.value: Interpolator.linear_log, 

91 InterpolationType.CONSTANT.value: Interpolator.constant, 

92 InterpolationType.HAGAN.value: Interpolator.hagan, 

93 InterpolationType.HAGAN_DF.value: Interpolator.hagan_df, 

94 } 

95 

96 if interp in mapping: 

97 return mapping[interp] 

98 else: 

99 raise NotImplementedError(f"{interp} not yet implemented.") 

100 

101 @staticmethod 

102 def linear(x_list: list, y_list: list, x: float, extrapolation: str) -> float: 

103 """Simple linear interpolation. TDDO : simply use scipy? No...match design structure 

104 

105 Args: 

106 x_list (_type_): values that are assumed to be sorted 

107 y_list (_type_): corresponding y values 

108 x (_type_): target x-value 

109 extrapolate (str): extrapolation method chosen for when x is outside x_list. 

110 

111 Returns: 

112 float: interpolated value 

113 """ 

114 # print("interpolation values") 

115 # print(x_list) # DEBUG TEST TODO REMOVE 

116 # print(y_list) 

117 if not x_list or not y_list or len(x_list) != len(y_list): 

118 raise ValueError("x_list and y_list must be non-empty and of the same length.") 

119 

120 if x <= x_list[0] or x_list[-1] <= x: 

121 

122 if extrapolation == "NONE": 

123 raise ValueError("Extrapolation chosen as NONE but target 'x' lies outside of range") 

124 elif extrapolation == "CONSTANT": 

125 if x <= x_list[0]: 

126 return y_list[0] 

127 elif x_list[-1] <= x: 

128 return y_list[-1] 

129 elif extrapolation == "LINEAR": 

130 if x <= x_list[0]: 

131 x0, x1 = x_list[0], x_list[1] 

132 y0, y1 = y_list[0], y_list[1] 

133 elif x_list[-1] <= x: 

134 x0, x1 = x_list[-2], x_list[-1] 

135 y0, y1 = y_list[-2], y_list[-1] 

136 

137 else: 

138 i = bisect_left(x_list, x) # insert target x next to 2 closest points 

139 x0, x1 = x_list[i - 1], x_list[i] 

140 y0, y1 = y_list[i - 1], y_list[i] 

141 

142 # Linear interpolation formula 

143 slope = (y1 - y0) / (x1 - x0) 

144 y = y0 + slope * (x - x0) 

145 

146 return y 

147 

148 @staticmethod 

149 def constant(x_list: list, y_list: list, x: float, extrapolation: str) -> float: 

150 """PLACEHOLDER #TODO implement""" 

151 

152 return -9999.999 

153 

154 @staticmethod 

155 def linear_log(x_list: list, y_list: list, x: float, extrapolation: str) -> float: 

156 # x_val = np.array(x_list) 

157 y_val = np.array(y_list) 

158 

159 if np.any(y_val <= 0): 

160 raise ValueError("All y-values must be positive for log-linear interpolation.") 

161 

162 log_y_val = np.log(y_val).tolist() 

163 

164 # handle the extrapolation properly TODO 

165 if extrapolation == "LINEAR_LOG": 

166 extr = "LINEAR" 

167 else: 

168 extr = extrapolation 

169 log_y_interp = Interpolator.linear(x_list, log_y_val, x, extr) 

170 y_interp = np.exp(log_y_interp) 

171 return y_interp 

172 

173 

174 

175 # Helper: compute Hagan piecewise polynomials 

176 @staticmethod 

177 def _hagan_polynomials(x_list: _List[float], y_list: _List[float]): 

178 """ 

179 Compute the piecewise quadratic coefficients for Hagan interpolation. 

180 

181 Args: 

182 x_list: grid of cell boundaries (length n+1) 

183 y_list: cell averages (length n) 

184  

185 Returns: 

186 x_vals: left boundaries of polynomial segments 

187 a0, a1, a2: quadratic coefficients for each segment 

188 """ 

189 x_cells = np.array(x_list, dtype=float) # cell boundaries 

190 u = np.array(y_list, dtype=float) # the cell averages, one per cell 

191 n = len(u) # numberr of cells 

192 

193 if len(x_cells) != n + 1: 

194 raise ValueError("x_list must have length len(y_list)+1") # i.e. for every 2 subsequent x points, there is one y point representing the cell average 

195 

196 dx = np.diff(x_cells) # the width of the cells used to compute the slope of scale polynomials 

197 

198 # Compute breakpoints (un) based on neighboring cell averages in a weighted way.  

199 # breakppoints = approximated value at cell boundaries 

200 un = np.zeros(n + 1) 

201 for i in range(1, n): 

202 un[i] = (dx[i]*u[i-1] + dx[i-1]*u[i]) / (dx[i] + dx[i-1]) 

203 un[0] = 2*u[0] - un[1] #first point extrapolated to close the system 

204 un[n] = 2*u[n-1] - un[n-1] #last popint extrapolated to close the system 

205 

206 # Store polynomial segments 

207 x_vals = [x_cells[0]] # Left boundaries of each polynomial segment. Segments will go from x_vals[i] -> x_vals[i+1]. 

208 a0, a1, a2 = [], [], [] # Coefficients of quadratic polynomials on segment i 

209 EPS = 1e-15 # epsilon, tiny value to avoid numerical issues (e.g., division by zero). 

210 

211 for i in range(n): # iterate through each segment 

212 # define shape of polynomial 

213 # = how far does the polynomial need to go from the cell average to match the boundary values 

214 g0 = un[i] - u[i] # difference between the left breakpoint of cell i and the cell average. 

215 g1 = un[i+1] - u[i] # difference between the right breakpoint of cell i and the cell average. 

216 dx_i = dx[i] # width of cell i 

217 

218 # NOTE: from what i can tell case i matches with case iii if g1 = -0.5*g0 

219 # NOTE case i matches case ii if g1 = -2*g0 

220 # the order of the cases defines the priority on the mathcing border cases 

221 

222 # CASE i: simple quadratic incl. g0=g1=0 

223 if ((g0 >= 0 and -2*g0 <= g1 <= -0.5*g0) or 

224 (g0 <= 0 and -0.5*g0 <= g1 <= -2*g0)): 

225 

226 a0.append(un[i]) # value at left boundary 

227 a1.append((-4*un[i] - 2*un[i+1] + 6*u[i])/dx_i) # slope 

228 a2.append((3*un[i] + 3*un[i+1] - 6*u[i])/dx_i**2) # curvature 

229 x_vals.append(x_cells[i+1]) 

230 continue 

231 

232 # CASE ii: constant + quadratic 

233 if ((g0 <= 0 and -2*g0 <= g1) or 

234 (g0 >= 0 and g1 <= -2*g0)): 

235 

236 eta = dx_i * ((2*g0 + g1) / (g1 - g0)) # point inside the cell where slope changes 

237 a0.append(un[i]); a1.append(0.0); a2.append(0.0) # first segment is constant, a1=a2=0,  

238 x_vals.append(x_cells[i] + eta) 

239 if abs(dx_i - eta) > EPS: # second segment is quadratic,provided greater than EPS 

240 a0.append(un[i]); a1.append(0.0) 

241 a2.append((un[i+1]-un[i]) / (dx_i - eta)**2) 

242 x_vals.append(x_cells[i+1]) 

243 continue 

244 

245 # CASE iii: negative/positive slope adjustments 

246 if ((g0 >= 0 and -0.5*g0 <= g1 <= 0) or 

247 (g0 <= 0 and 0 <= g1 <= -0.5*g0)): 

248 

249 eta = dx_i * (3*g1)/(g1 - g0) # eta= location inside cell for quadratic segment 

250 if abs(eta) > EPS: 

251 q = (un[i]-un[i+1])/eta**2 #q = curvature needed to go from left boundary to the slope at eta 

252 a0.append(un[i]); a1.append(-2*q*eta); a2.append(q) 

253 x_vals.append(x_cells[i] + eta) 

254 a0.append(un[i+1]); a1.append(0.0); a2.append(0.0) 

255 x_vals.append(x_cells[i+1]) 

256 continue 

257 

258 # CASE iv: monotone convex 

259 if (g0 > 0 and g1 > 0) or (g0 < 0 and g1 < 0): 

260 

261 r = g1 / (g1 + g0) 

262 eta = dx_i * r 

263 A = -g0 * r 

264 if abs(eta) > EPS: 

265 q = (g0 - A)/eta**2 

266 a0.append(g0+u[i]); a1.append(-2*(g0-A)/eta); a2.append(q) 

267 x_vals.append(x_cells[i] + eta) 

268 if abs(dx_i - eta) > EPS: 

269 a0.append(A+u[i]); a1.append(0.0) 

270 a2.append((g1-A)/(dx_i-eta)**2) 

271 x_vals.append(x_cells[i+1]) 

272 continue 

273 

274 return np.array(x_vals), np.array(a0), np.array(a1), np.array(a2) 

275 

276 

277 # Hagan interpolation for forward rate 

278 @staticmethod 

279 def hagan(x_list: _List[float], y_list: _List[float], x: float, extrapolation: str) -> float: 

280 """ 

281 Interpolate the forward rate f(x) using Hagan's method. 

282 Returns the **exact piecewise quadratic value** at x. 

283 

284 Args: 

285 x_list (List[float]): grid of cell boundaries (length n+1) 

286 y_list (List[float]): cell averages (length n) 

287 x (float): target point to interpolate 

288 extrapolation (str): determine extrapolation method to use if x is outside x_list 

289 

290 Returns: 

291 

292 float: interpolated forward rate at x 

293 """ 

294 x_vals, a0, a1, a2 = Interpolator._hagan_polynomials(x_list, y_list) 

295 

296 # Handle extrapolation 

297 if x <= x_vals[0]: 

298 if extrapolation.upper() in ("CONSTANT", "CONSTANT_DF"): 

299 return a0[0] 

300 else: 

301 raise ValueError("x below grid and extrapolation not allowed") 

302 if x >= x_vals[-1]: 

303 if extrapolation.upper() in ("CONSTANT", "CONSTANT_DF"): 

304 # last segment quadratic evaluation 

305 s = x_vals[-1] - x_vals[-2] 

306 return a0[-1] + (a1[-1] + a2[-1]*s)*s 

307 else: 

308 raise ValueError("x above grid and extrapolation not allowed") 

309 

310 # find correct segment 

311 idx = np.searchsorted(x_vals, x) - 1 

312 s = x - x_vals[idx] # distance from left boundary of the segment 

313 return a0[idx] + (a1[idx] + a2[idx]*s)*s 

314 

315 

316 # Hagan integration (cumulative integral of forward rate) 

317 @staticmethod 

318 def hagan_integrate(x_list: _List[float], y_list: _List[float], x: float) -> float: 

319 """ 

320 Integrate the Hagan forward rate piecewise polynomial from 0 to x. 

321 Returns the **exact integral**, used for computing DF(x) = exp(-integral) 

322 """ 

323 x_vals, a0, a1, a2 = Interpolator._hagan_polynomials(x_list, y_list) 

324 integral = 0.0 

325 

326 # Handle extrapolation implicitly: integrate only inside segments 

327 for i in range(len(a0)): # integrate oveer all segments up to x 

328 x_left = x_vals[i] 

329 x_right = x_vals[i+1] if i+1 < len(x_vals) else x_vals[-1] 

330 dx_seg = min(max(x - x_left, 0.0), x_right - x_left) # how much of this segment to integrate over. so if it is pst target x, dont integerate  

331 if dx_seg > 0: 

332 integral += (a0[i]*dx_seg + 0.5*a1[i]*dx_seg**2 + (1./3.)*a2[i]*dx_seg**3) 

333 if x <= x_right: 

334 break 

335 

336 return integral 

337 

338 

339 # Hagan discount factor DF(x) 

340 @staticmethod 

341 def hagan_df(x_list: _List[float], y_list: _List[float], x: float, extrapolation: str) -> float: 

342 """ 

343 Compute the discount factor DF(x) = exp(-INTEGRAL( f(s) ds) ) 

344 using Hagan's piecewise quadratic interpolation of the forward rate. 

345 

346 Args: 

347 x_list: time grid (e.g. [0.5, 1.0, 2.0, ...]) 

348 y_list: known discount factors DF(t_i) at those grid points 

349 x: target time where we want DF(x) 

350 extrapolation: defines behavior outside grid ("CONSTANT_DF" or error) 

351 

352 Returns: 

353 float: interpolated discount factor at time x 

354 """ 

355 

356 # Convert input lists to numpy arrays for numerical ops 

357 x_cells = np.array(x_list, dtype=float) 

358 df = np.array(y_list, dtype=float) 

359 

360 if len(df) < 2: 

361 raise ValueError("At least 2 discount factors required") 

362 

363 # STEP 1: Compute "forward rates" between grid points 

364 # f_i = (log DF_i - log DF_{i+1}) / (x_{i+1} - x_i) 

365 # This gives the *instantaneous forward rate* assumed constant 

366 # between each pair of discount factors. 

367 # 

368 # Recall: DF(x) = exp(-integral f(s) ds) 

369 # So ln DF_i - ln DF_{i+1} = integral_{x_i}^{x_{i+1}} f(s) ds 

370 # 

371 # Approximating f(s) as constant over each small interval 

372 # gives this finite-difference form. 

373 fwd = (np.log(df[:-1]) - np.log(df[1:])) / (x_cells[1:] - x_cells[:-1]) 

374 

375 

376 # STEP 2: Handle extrapolation (x outside the grid) 

377 # Case A: x < first grid point 

378 if x < x_cells[0]: 

379 if extrapolation.upper() == "CONSTANT_DF": 

380 

381 # Handle the special case where first grid point is 0 

382 if x_cells[0] == 0: 

383 # Use the forward rate of the first segment as the extrapolation rate 

384 r = fwd[0] 

385 else: 

386 # Integrate forward rates up to the first point 

387 # This gives the cumulative area integral f(s) ds = y 

388 y = Interpolator.hagan_integrate(x_cells, fwd, x_cells[0]) 

389 # Compute *average rate* up to the first knot: 

390 # r = (1/x_o) * integral(f(s) ds) 

391 r = y / x_cells[0] 

392 

393 

394 # Now assume that beyond this point, the discount factor 

395 # decays at that "constant average rate": 

396 # DF(x) = exp(-x * r) 

397 return np.exp(-x * r) 

398 else: 

399 raise ValueError("x below grid and extrapolation not allowed") 

400 

401 # Case B: x > last grid point 

402 elif x > x_cells[-1]: 

403 if extrapolation.upper() == "CONSTANT_DF": 

404 # Integrate up to the last available grid point 

405 y = Interpolator.hagan_integrate(x_cells, fwd, x_cells[-1]) 

406 

407 # Compute the average continuously-compounded rate up to that last point 

408 r = y / x_cells[-1] 

409 

410 # Apply constant extrapolation beyond that: 

411 return np.exp(-x * r) 

412 else: 

413 raise ValueError("x above grid and extrapolation not allowed") 

414 

415 

416 # STEP 3: Inside the grid — exact Hagan interpolation 

417 # Compute the "exact integral" of f(s) ds using the Hagan 

418 # piecewise quadratic polynomial representation of f(s) 

419 integral = Interpolator.hagan_integrate(x_cells, fwd, x) 

420 

421 # Finally compute DF(x) = exp(-∫₀ˣ f(s) ds) 

422 return np.exp(-integral) 

423 

424 

425 # Hagan discount factor derivative DF'(x) 

426 @staticmethod 

427 def hagan_df_derivative(x_list: _List[float], df_list: _List[float], x: float, extrapolation: str) -> float: 

428 """ 

429 Compute derivative of discount factor: DF'(x) = -f(x) * DF(x) 

430 Exact calculation, matching C++ InterpolationHagan1D_DF::computeDerivative 

431 """ 

432 x_cells = np.array(x_list, dtype=float) 

433 df = np.array(df_list, dtype=float) 

434 if len(df) < 2: 

435 raise ValueError("At least 2 discount factors required") 

436 

437 # Forward rates 

438 fwd = (np.log(df[:-1]) - np.log(df[1:])) / (x_cells[1:] - x_cells[:-1]) 

439 

440 # Extrapolation handling 

441 if x < x_cells[0]: 

442 if extrapolation.upper() == "CONSTANT_DF": 

443 y = Interpolator.hagan_integrate(x_cells, fwd, x_cells[0]) 

444 r = y / x_cells[0] 

445 return -r * np.exp(-x * r) 

446 else: 

447 raise ValueError("x below grid and extrapolation not allowed") 

448 elif x > x_cells[-1]: 

449 if extrapolation.upper() == "CONSTANT_DF": 

450 y = Interpolator.hagan_integrate(x_cells, fwd, x_cells[-1]) 

451 r = y / x_cells[-1] 

452 return -r * np.exp(-x * r) 

453 else: 

454 raise ValueError("x above grid and extrapolation not allowed") 

455 

456 # Inside grid: exact DF and f(x) 

457 DF_val = Interpolator.hagan_df(x_list, df_list, x, extrapolation) 

458 r_val = Interpolator.hagan(x_list, fwd, x, extrapolation) 

459 return -r_val * DF_val 

460 

461# if __name__ == "__main__": 

462 

463 

464# ----------------------------------- 

465# Unit tests 

466# can be found in rivapy/tests/ folder 

467# please use the python unittest framework.