Coverage for rivapy / tools / interpolate.py: 81%
197 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-27 14:36 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-27 14:36 +0000
1# 2025.07.02 Hans Nguyen
2#
3#
5# -----------------------------------
6# #IMPORT Modules
7from typing import List as _List, Union as _Union, Callable
8import datetime as dt
9import abc
10import pandas as pd
11import numpy as np
12from bisect import bisect_left
14# from scipy.optimize import curve_fit as curve_fit
15# from scipy.interpolate import interp1d
16from bisect import bisect_left
17from rivapy.tools.enums import DayCounterType, InterpolationType, ExtrapolationType
18from rivapy.tools.datetools import DayCounter
21# TODO list of interpolaters to implement
22# -----------------------------------
23# FUNCTION def
25# class InterpolationType(_MyEnum):
26# CONSTANT = "CONSTANT"
27# LINEAR = "LINEAR"
28# LINEAR_LOG = "LINEARLOG"
29# CONSTRAINED_SPLINE = "CONSTRAINED_SPLINE"
30# HAGAN = "HAGAN"
31# HAGAN_DF = "HAGAN_DF"
34class Interpolator:
36 def __init__(self, interpolation_type: _Union[str, InterpolationType], extrapolation_type: _Union[str, ExtrapolationType]):
37 """Constructor for Interpolatr class object.
39 Args:
40 interpolation_type (_Union[str, InterpolationType]): The interpolation method to be used
41 extrapolation_type (_Union[str, ExtrapolationType]): the extrapolation method to be used
42 """
43 self._interpolation_type = InterpolationType.to_string(interpolation_type)
44 self._extrapolation_type = ExtrapolationType.to_string(
45 extrapolation_type
46 ) # TODO is this redundant as we feed extrapolation method into interp as argument?
47 self._interp = Interpolator.get(self._interpolation_type)
49 def interp(
50 self, x_list: list, y_list: list, target_x: _Union[float, _List[float]], extrapolation: _Union[str, ExtrapolationType]
51 ) -> _Union[float, _List[float]]:
52 """Wrapper method to execute desired interpolation method. If given a list of targets will return a list.
54 Args:
55 x_list (_List[float]): x-values
56 y_list (_List[float]): y-values
57 target_x (float,_List[float]): x-value for which a desired y-value is to be determined
59 Returns:
60 _Union[float, _List[float]]: return interpolation valuues
61 """
63 extrapolation_type = ExtrapolationType.to_string(extrapolation)
65 if isinstance(target_x, list):
66 return [self._interp(x_list, y_list, target_x_, extrapolation_type) for target_x_ in target_x]
67 else:
68 return self._interp(x_list, y_list, target_x, extrapolation_type)
70 @staticmethod
71 def get(interpolator: _Union[str, InterpolationType]) -> Callable[[list, list, _Union[float, _List[float]], str], float]:
72 """Mapping function to determine which interpolator to use
74 Args:
75 interpolator (_Union[str, InterpolationType]): _description_
77 Raises:
78 NotImplementedError: _description_
80 Returns:
81 Callable[[list, list, _Union[float, _List[float]], str], float]: _description_
82 """
83 interp = InterpolationType.to_string(interpolator)
84 # extrap = ExtrapolationType.to_string(extrapolator)
85 # the assumption at the moment is that for a given interpolation type, the extrapolation type must be the same or CONSTANT
86 # this is a design choice for the moment
88 mapping = {
89 InterpolationType.LINEAR.value: Interpolator.linear,
90 InterpolationType.LINEAR_LOG.value: Interpolator.linear_log,
91 InterpolationType.CONSTANT.value: Interpolator.constant,
92 InterpolationType.HAGAN.value: Interpolator.hagan,
93 InterpolationType.HAGAN_DF.value: Interpolator.hagan_df,
94 }
96 if interp in mapping:
97 return mapping[interp]
98 else:
99 raise NotImplementedError(f"{interp} not yet implemented.")
101 @staticmethod
102 def linear(x_list: list, y_list: list, x: float, extrapolation: str) -> float:
103 """Simple linear interpolation. TDDO : simply use scipy? No...match design structure
105 Args:
106 x_list (_type_): values that are assumed to be sorted
107 y_list (_type_): corresponding y values
108 x (_type_): target x-value
109 extrapolate (str): extrapolation method chosen for when x is outside x_list.
111 Returns:
112 float: interpolated value
113 """
114 # print("interpolation values")
115 # print(x_list) # DEBUG TEST TODO REMOVE
116 # print(y_list)
117 if not x_list or not y_list or len(x_list) != len(y_list):
118 raise ValueError("x_list and y_list must be non-empty and of the same length.")
120 if x <= x_list[0] or x_list[-1] <= x:
122 if extrapolation == "NONE":
123 raise ValueError("Extrapolation chosen as NONE but target 'x' lies outside of range")
124 elif extrapolation == "CONSTANT":
125 if x <= x_list[0]:
126 return y_list[0]
127 elif x_list[-1] <= x:
128 return y_list[-1]
129 elif extrapolation == "LINEAR":
130 if x <= x_list[0]:
131 x0, x1 = x_list[0], x_list[1]
132 y0, y1 = y_list[0], y_list[1]
133 elif x_list[-1] <= x:
134 x0, x1 = x_list[-2], x_list[-1]
135 y0, y1 = y_list[-2], y_list[-1]
137 else:
138 i = bisect_left(x_list, x) # insert target x next to 2 closest points
139 x0, x1 = x_list[i - 1], x_list[i]
140 y0, y1 = y_list[i - 1], y_list[i]
142 # Linear interpolation formula
143 slope = (y1 - y0) / (x1 - x0)
144 y = y0 + slope * (x - x0)
146 return y
148 @staticmethod
149 def constant(x_list: list, y_list: list, x: float, extrapolation: str) -> float:
150 """PLACEHOLDER #TODO implement"""
152 return -9999.999
154 @staticmethod
155 def linear_log(x_list: list, y_list: list, x: float, extrapolation: str) -> float:
156 # x_val = np.array(x_list)
157 y_val = np.array(y_list)
159 if np.any(y_val <= 0):
160 raise ValueError("All y-values must be positive for log-linear interpolation.")
162 log_y_val = np.log(y_val).tolist()
164 # handle the extrapolation properly TODO
165 if extrapolation == "LINEAR_LOG":
166 extr = "LINEAR"
167 else:
168 extr = extrapolation
169 log_y_interp = Interpolator.linear(x_list, log_y_val, x, extr)
170 y_interp = np.exp(log_y_interp)
171 return y_interp
175 # Helper: compute Hagan piecewise polynomials
176 @staticmethod
177 def _hagan_polynomials(x_list: _List[float], y_list: _List[float]):
178 """
179 Compute the piecewise quadratic coefficients for Hagan interpolation.
181 Args:
182 x_list: grid of cell boundaries (length n+1)
183 y_list: cell averages (length n)
185 Returns:
186 x_vals: left boundaries of polynomial segments
187 a0, a1, a2: quadratic coefficients for each segment
188 """
189 x_cells = np.array(x_list, dtype=float) # cell boundaries
190 u = np.array(y_list, dtype=float) # the cell averages, one per cell
191 n = len(u) # numberr of cells
193 if len(x_cells) != n + 1:
194 raise ValueError("x_list must have length len(y_list)+1") # i.e. for every 2 subsequent x points, there is one y point representing the cell average
196 dx = np.diff(x_cells) # the width of the cells used to compute the slope of scale polynomials
198 # Compute breakpoints (un) based on neighboring cell averages in a weighted way.
199 # breakppoints = approximated value at cell boundaries
200 un = np.zeros(n + 1)
201 for i in range(1, n):
202 un[i] = (dx[i]*u[i-1] + dx[i-1]*u[i]) / (dx[i] + dx[i-1])
203 un[0] = 2*u[0] - un[1] #first point extrapolated to close the system
204 un[n] = 2*u[n-1] - un[n-1] #last popint extrapolated to close the system
206 # Store polynomial segments
207 x_vals = [x_cells[0]] # Left boundaries of each polynomial segment. Segments will go from x_vals[i] -> x_vals[i+1].
208 a0, a1, a2 = [], [], [] # Coefficients of quadratic polynomials on segment i
209 EPS = 1e-15 # epsilon, tiny value to avoid numerical issues (e.g., division by zero).
211 for i in range(n): # iterate through each segment
212 # define shape of polynomial
213 # = how far does the polynomial need to go from the cell average to match the boundary values
214 g0 = un[i] - u[i] # difference between the left breakpoint of cell i and the cell average.
215 g1 = un[i+1] - u[i] # difference between the right breakpoint of cell i and the cell average.
216 dx_i = dx[i] # width of cell i
218 # NOTE: from what i can tell case i matches with case iii if g1 = -0.5*g0
219 # NOTE case i matches case ii if g1 = -2*g0
220 # the order of the cases defines the priority on the mathcing border cases
222 # CASE i: simple quadratic incl. g0=g1=0
223 if ((g0 >= 0 and -2*g0 <= g1 <= -0.5*g0) or
224 (g0 <= 0 and -0.5*g0 <= g1 <= -2*g0)):
226 a0.append(un[i]) # value at left boundary
227 a1.append((-4*un[i] - 2*un[i+1] + 6*u[i])/dx_i) # slope
228 a2.append((3*un[i] + 3*un[i+1] - 6*u[i])/dx_i**2) # curvature
229 x_vals.append(x_cells[i+1])
230 continue
232 # CASE ii: constant + quadratic
233 if ((g0 <= 0 and -2*g0 <= g1) or
234 (g0 >= 0 and g1 <= -2*g0)):
236 eta = dx_i * ((2*g0 + g1) / (g1 - g0)) # point inside the cell where slope changes
237 a0.append(un[i]); a1.append(0.0); a2.append(0.0) # first segment is constant, a1=a2=0,
238 x_vals.append(x_cells[i] + eta)
239 if abs(dx_i - eta) > EPS: # second segment is quadratic,provided greater than EPS
240 a0.append(un[i]); a1.append(0.0)
241 a2.append((un[i+1]-un[i]) / (dx_i - eta)**2)
242 x_vals.append(x_cells[i+1])
243 continue
245 # CASE iii: negative/positive slope adjustments
246 if ((g0 >= 0 and -0.5*g0 <= g1 <= 0) or
247 (g0 <= 0 and 0 <= g1 <= -0.5*g0)):
249 eta = dx_i * (3*g1)/(g1 - g0) # eta= location inside cell for quadratic segment
250 if abs(eta) > EPS:
251 q = (un[i]-un[i+1])/eta**2 #q = curvature needed to go from left boundary to the slope at eta
252 a0.append(un[i]); a1.append(-2*q*eta); a2.append(q)
253 x_vals.append(x_cells[i] + eta)
254 a0.append(un[i+1]); a1.append(0.0); a2.append(0.0)
255 x_vals.append(x_cells[i+1])
256 continue
258 # CASE iv: monotone convex
259 if (g0 > 0 and g1 > 0) or (g0 < 0 and g1 < 0):
261 r = g1 / (g1 + g0)
262 eta = dx_i * r
263 A = -g0 * r
264 if abs(eta) > EPS:
265 q = (g0 - A)/eta**2
266 a0.append(g0+u[i]); a1.append(-2*(g0-A)/eta); a2.append(q)
267 x_vals.append(x_cells[i] + eta)
268 if abs(dx_i - eta) > EPS:
269 a0.append(A+u[i]); a1.append(0.0)
270 a2.append((g1-A)/(dx_i-eta)**2)
271 x_vals.append(x_cells[i+1])
272 continue
274 return np.array(x_vals), np.array(a0), np.array(a1), np.array(a2)
277 # Hagan interpolation for forward rate
278 @staticmethod
279 def hagan(x_list: _List[float], y_list: _List[float], x: float, extrapolation: str) -> float:
280 """
281 Interpolate the forward rate f(x) using Hagan's method.
282 Returns the **exact piecewise quadratic value** at x.
284 Args:
285 x_list (List[float]): grid of cell boundaries (length n+1)
286 y_list (List[float]): cell averages (length n)
287 x (float): target point to interpolate
288 extrapolation (str): determine extrapolation method to use if x is outside x_list
290 Returns:
292 float: interpolated forward rate at x
293 """
294 x_vals, a0, a1, a2 = Interpolator._hagan_polynomials(x_list, y_list)
296 # Handle extrapolation
297 if x <= x_vals[0]:
298 if extrapolation.upper() in ("CONSTANT", "CONSTANT_DF"):
299 return a0[0]
300 else:
301 raise ValueError("x below grid and extrapolation not allowed")
302 if x >= x_vals[-1]:
303 if extrapolation.upper() in ("CONSTANT", "CONSTANT_DF"):
304 # last segment quadratic evaluation
305 s = x_vals[-1] - x_vals[-2]
306 return a0[-1] + (a1[-1] + a2[-1]*s)*s
307 else:
308 raise ValueError("x above grid and extrapolation not allowed")
310 # find correct segment
311 idx = np.searchsorted(x_vals, x) - 1
312 s = x - x_vals[idx] # distance from left boundary of the segment
313 return a0[idx] + (a1[idx] + a2[idx]*s)*s
316 # Hagan integration (cumulative integral of forward rate)
317 @staticmethod
318 def hagan_integrate(x_list: _List[float], y_list: _List[float], x: float) -> float:
319 """
320 Integrate the Hagan forward rate piecewise polynomial from 0 to x.
321 Returns the **exact integral**, used for computing DF(x) = exp(-integral)
322 """
323 x_vals, a0, a1, a2 = Interpolator._hagan_polynomials(x_list, y_list)
324 integral = 0.0
326 # Handle extrapolation implicitly: integrate only inside segments
327 for i in range(len(a0)): # integrate oveer all segments up to x
328 x_left = x_vals[i]
329 x_right = x_vals[i+1] if i+1 < len(x_vals) else x_vals[-1]
330 dx_seg = min(max(x - x_left, 0.0), x_right - x_left) # how much of this segment to integrate over. so if it is pst target x, dont integerate
331 if dx_seg > 0:
332 integral += (a0[i]*dx_seg + 0.5*a1[i]*dx_seg**2 + (1./3.)*a2[i]*dx_seg**3)
333 if x <= x_right:
334 break
336 return integral
339 # Hagan discount factor DF(x)
340 @staticmethod
341 def hagan_df(x_list: _List[float], y_list: _List[float], x: float, extrapolation: str) -> float:
342 """
343 Compute the discount factor DF(x) = exp(-INTEGRAL( f(s) ds) )
344 using Hagan's piecewise quadratic interpolation of the forward rate.
346 Args:
347 x_list: time grid (e.g. [0.5, 1.0, 2.0, ...])
348 y_list: known discount factors DF(t_i) at those grid points
349 x: target time where we want DF(x)
350 extrapolation: defines behavior outside grid ("CONSTANT_DF" or error)
352 Returns:
353 float: interpolated discount factor at time x
354 """
356 # Convert input lists to numpy arrays for numerical ops
357 x_cells = np.array(x_list, dtype=float)
358 df = np.array(y_list, dtype=float)
360 if len(df) < 2:
361 raise ValueError("At least 2 discount factors required")
363 # STEP 1: Compute "forward rates" between grid points
364 # f_i = (log DF_i - log DF_{i+1}) / (x_{i+1} - x_i)
365 # This gives the *instantaneous forward rate* assumed constant
366 # between each pair of discount factors.
367 #
368 # Recall: DF(x) = exp(-integral f(s) ds)
369 # So ln DF_i - ln DF_{i+1} = integral_{x_i}^{x_{i+1}} f(s) ds
370 #
371 # Approximating f(s) as constant over each small interval
372 # gives this finite-difference form.
373 fwd = (np.log(df[:-1]) - np.log(df[1:])) / (x_cells[1:] - x_cells[:-1])
376 # STEP 2: Handle extrapolation (x outside the grid)
377 # Case A: x < first grid point
378 if x < x_cells[0]:
379 if extrapolation.upper() == "CONSTANT_DF":
381 # Handle the special case where first grid point is 0
382 if x_cells[0] == 0:
383 # Use the forward rate of the first segment as the extrapolation rate
384 r = fwd[0]
385 else:
386 # Integrate forward rates up to the first point
387 # This gives the cumulative area integral f(s) ds = y
388 y = Interpolator.hagan_integrate(x_cells, fwd, x_cells[0])
389 # Compute *average rate* up to the first knot:
390 # r = (1/x_o) * integral(f(s) ds)
391 r = y / x_cells[0]
394 # Now assume that beyond this point, the discount factor
395 # decays at that "constant average rate":
396 # DF(x) = exp(-x * r)
397 return np.exp(-x * r)
398 else:
399 raise ValueError("x below grid and extrapolation not allowed")
401 # Case B: x > last grid point
402 elif x > x_cells[-1]:
403 if extrapolation.upper() == "CONSTANT_DF":
404 # Integrate up to the last available grid point
405 y = Interpolator.hagan_integrate(x_cells, fwd, x_cells[-1])
407 # Compute the average continuously-compounded rate up to that last point
408 r = y / x_cells[-1]
410 # Apply constant extrapolation beyond that:
411 return np.exp(-x * r)
412 else:
413 raise ValueError("x above grid and extrapolation not allowed")
416 # STEP 3: Inside the grid — exact Hagan interpolation
417 # Compute the "exact integral" of f(s) ds using the Hagan
418 # piecewise quadratic polynomial representation of f(s)
419 integral = Interpolator.hagan_integrate(x_cells, fwd, x)
421 # Finally compute DF(x) = exp(-∫₀ˣ f(s) ds)
422 return np.exp(-integral)
425 # Hagan discount factor derivative DF'(x)
426 @staticmethod
427 def hagan_df_derivative(x_list: _List[float], df_list: _List[float], x: float, extrapolation: str) -> float:
428 """
429 Compute derivative of discount factor: DF'(x) = -f(x) * DF(x)
430 Exact calculation, matching C++ InterpolationHagan1D_DF::computeDerivative
431 """
432 x_cells = np.array(x_list, dtype=float)
433 df = np.array(df_list, dtype=float)
434 if len(df) < 2:
435 raise ValueError("At least 2 discount factors required")
437 # Forward rates
438 fwd = (np.log(df[:-1]) - np.log(df[1:])) / (x_cells[1:] - x_cells[:-1])
440 # Extrapolation handling
441 if x < x_cells[0]:
442 if extrapolation.upper() == "CONSTANT_DF":
443 y = Interpolator.hagan_integrate(x_cells, fwd, x_cells[0])
444 r = y / x_cells[0]
445 return -r * np.exp(-x * r)
446 else:
447 raise ValueError("x below grid and extrapolation not allowed")
448 elif x > x_cells[-1]:
449 if extrapolation.upper() == "CONSTANT_DF":
450 y = Interpolator.hagan_integrate(x_cells, fwd, x_cells[-1])
451 r = y / x_cells[-1]
452 return -r * np.exp(-x * r)
453 else:
454 raise ValueError("x above grid and extrapolation not allowed")
456 # Inside grid: exact DF and f(x)
457 DF_val = Interpolator.hagan_df(x_list, df_list, x, extrapolation)
458 r_val = Interpolator.hagan(x_list, fwd, x, extrapolation)
459 return -r_val * DF_val
461# if __name__ == "__main__":
464# -----------------------------------
465# Unit tests
466# can be found in rivapy/tests/ folder
467# please use the python unittest framework.