Coverage for rivapy / credit / creditmetrics.py: 100%
121 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-27 14:36 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-27 14:36 +0000
1from __future__ import division
2import pandas as pd
3import numpy as np
4from scipy.stats import norm
5from typing import List, Optional as _Optional
6from rivapy.instruments.components import Issuer
9class CreditMetricsModel:
10 def __init__(
11 self,
12 n_simulation: int,
13 transition_matrix: np.matrix,
14 position_data: pd.DataFrame,
15 issuer_data: List[Issuer],
16 stock_data: pd.DataFrame,
17 r: float,
18 t: float,
19 confidencelevel: float,
20 seed: _Optional[int] = None,
21 list_of_indices: _Optional[List[str]] = ["DAX", "SP"],
22 mapping_countries_on_indices: _Optional[dict] = {
23 "DE": "DAX",
24 "US": "SP",
25 },
26 ):
27 """CreditMetrics model initializer.
29 Args:
30 n_simulation (int): Number of simulations.
31 transition_matrix (np.matrix): Transition matrix (format np.matrix).
32 S&P 8x8 matrix is integrated.
33 position_data (pd.DataFrame): DataFrame with position data. Specific format is needed.
34 issuer_data (pd.DataFrame): List of Issuer objects containing issuer metadata.
35 stock_data (pd.DataFrame): DataFrame with stock data. Must include closing
36 prices of the different issuers as well as reference indices (e.g. DAX).
37 r (float): Risk-free rate. Needed to compute expected value of positions and
38 state valuations during the transition process.
39 t (float): Time horizon for the credit risk calculation.
40 confidencelevel (float): Confidence level used in VaR calculation (percentage).
41 seed (int, optional): Seed for random number generator. Defaults to None.
42 """
44 self.n_simulation = n_simulation
45 self.transition_matrix = transition_matrix
46 self.position_data = position_data
47 self.issuer_data = issuer_data
48 self.stock_data = stock_data
49 self.r = r
50 self.t = t
51 self.confidencelevel = confidencelevel
52 self.seed = seed
53 self.list_of_indices = list_of_indices
54 self.mapping_countries_on_indices = mapping_countries_on_indices
56 def merge_positions_issuer(self):
57 """
58 Merges position dataframe with issuer dataframe to obtain rating-data for each position.
59 Maps all +/- Rating variants to the same RatingID.
60 Returns:
61 DataFrame: Returns adjusted position dataframe.
62 """
63 # Map all rating variants (including +/-) to a single RatingID
64 rating_map = pd.DataFrame(
65 {
66 "Rating": [
67 "AAA",
68 "AA+",
69 "AA",
70 "AA-",
71 "A+",
72 "A",
73 "A-",
74 "BBB+",
75 "BBB",
76 "BBB-",
77 "BB+",
78 "BB",
79 "BB-",
80 "B+",
81 "B",
82 "B-",
83 "CCC+",
84 "CCC",
85 "CCC-",
86 "D",
87 ],
88 "RatingID": [0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7],
89 }
90 )
92 # issuer_data is now a list of Issuer objects
93 issuer_df = pd.DataFrame(
94 [
95 {
96 "IssuerID": issuer.obj_id,
97 "IssuerName": issuer.name,
98 "Rating": str(issuer.rating),
99 "Land": issuer.country,
100 }
101 for issuer in self.issuer_data
102 ]
103 )
105 # Apply mapping
106 issuer_adj = issuer_df.merge(rating_map, on="Rating", how="left")
107 positions_adj = self.position_data.merge(issuer_adj[["IssuerID", "IssuerName", "Rating", "Land", "RatingID"]], on="IssuerID", how="left")
109 return positions_adj
111 def get_correlation(self):
112 """Calculates correlation pairs for issuer with a specific reference time series.
114 Returns:
115 DataFrame: Dataframe with correlation coefficient for each issuer.
116 """
118 mergedData = self.stock_data.drop(["Date"], axis=1)
119 returns = mergedData.pct_change()
121 # Split returns into indices and stocks
122 returns_indices = returns[self.list_of_indices]
123 returns_stocks = returns.drop(self.list_of_indices, axis=1)
124 # Compute correlations with different indices
125 indices_correlation = returns_indices.corr()
127 # Create an empty DataFrame for the results
128 corr_pairs = pd.DataFrame()
130 # Loop over the index columns and compute correlations with stocks
131 for col_name in self.list_of_indices:
132 # Compute the correlation of the current index series with all stock series
133 correlation = returns_stocks.corrwith(returns[col_name])
134 # Add the results as a new column to the result DataFrame
135 corr_pairs[f"{col_name}"] = correlation
136 return indices_correlation, corr_pairs
138 def get_cutoffs_rating(self):
139 """Compute cutoffs for each initial rating based on the transition matrix.
141 The inverse cumulative distribution function of the standard normal is used to
142 obtain thresholds corresponding to cumulative transition probabilities.
144 Returns:
145 np.ndarray: Array with threshold values for each target rating per initial rating.
146 """
147 Z = np.cumsum(np.flipud(self.transition_matrix.T), 0)
148 Z[Z >= (1 - 1 / 1e12)] = 1 - 1 / 1e12
149 Z[Z <= (0 + 1 / 1e12)] = 0 + 1 / 1e12
151 CutOffs = norm.ppf(Z, 0, 1) # compute cut offes by inverting normal distribution
152 return CutOffs
154 def get_credit_spreads(self, LGD, idx):
155 """Compute credit spreads implied by the transition matrix.
157 Formula used (per time horizon t): spread = -log(1 - LGD * PD_t) / t
159 Args:
160 LGD (pd.Series): Instrument-specific loss-given-default values (0-1).
161 idx (pd.Series or list): RatingID index array aligning instruments to rows of PD_t.
163 Returns:
164 np.ndarray: Credit spread per instrument (shape: n_instruments x 1).
165 """
166 # credit spread implied by transmat
167 PD_t = self.transition_matrix[:, -1]
168 PD_vec = PD_t[idx]
169 LGD_np = LGD.to_numpy().reshape(-1, 1)
170 credit_spread = -np.log(1 - np.multiply(LGD_np, PD_vec)) / self.t
171 return credit_spread
173 def get_expected_value(self):
174 """Calculate the expected present value of each instrument specified by issuer and recovery rate.
176 Uses the instrument exposure, the risk-free rate and the credit spread implied
177 by the transition matrix to discount the expected payoff over the time horizon.
179 Returns:
180 DataFrame: DataFrame including expected values (grouped by issuer).
181 """
182 positions = self.get_issuer_groups()
183 exposure = np.matrix(positions["Exposure"]).T
184 idx = positions["RatingID"]
185 LGD = 1 - positions["RecoveryRate"]
186 credit_spread = self.get_credit_spreads(LGD, idx)
187 EV = np.multiply(exposure, np.exp(-(self.r + credit_spread) * self.t))
188 EV = pd.DataFrame(EV, columns=["EV"]) # keep in same order as credit cutoff
189 EV["issuer"] = positions["IssuerID"].to_list()
190 EV = EV.groupby("issuer").sum() # group by issuer to sum up expected values
191 return EV
193 def get_states(self):
194 """Compute matrix of present values for each issuer's positions under all possible future ratings.
196 Each column corresponds to a target rating (including default). The values are the
197 discounted present values of the issuer's positions assuming the given rating outcome.
199 Returns:
200 DataFrame: DataFrame with present values by issuer and rating state.
201 """
202 positions = self.get_issuer_groups()
203 LGD = 1 - np.array(positions["RecoveryRate"])
204 PD_t = self.transition_matrix[:, -1] # default probability at t
205 credit_spread = -np.log(1 - PD_t * LGD.T)
206 exposure = np.matrix(positions["Exposure"])
207 state = np.multiply(exposure, np.exp(-(self.r + credit_spread) * self.t)).T
208 state = np.append(state, np.multiply(exposure, np.matrix(positions["RecoveryRate"])).T, axis=1) # last column is default case
209 states = pd.DataFrame(np.fliplr(state), columns=["D", "C", "B", "BB", "BBB", "A", "AA", "AAA"]) # keep in same order as credit cutoff
210 states["issuer"] = positions["IssuerID"].to_list()
211 states = states.groupby("issuer").sum()
212 return states
214 def get_issuer_groups(self):
215 df_positions_grouped = self.merge_positions_issuer()
216 df_positions_grouped = df_positions_grouped[["IssuerID", "IssuerName", "RecoveryRate", "Rating", "RatingID", "Land", "Exposure"]]
217 df_positions_grouped = df_positions_grouped.groupby(
218 ["IssuerID", "IssuerName", "RecoveryRate", "Rating", "RatingID", "Land"], as_index=False
219 ).sum()
220 df_positions_grouped["Position_Index"] = [self.mapping_countries_on_indices.get(country) for country in df_positions_grouped["Land"]]
222 return df_positions_grouped
224 def mc_calculation(self):
225 """
226 Monte-Carlo simulation of portfolio based on positions, issuer, correlation and transition matrix.
228 For each simulation step, the return of each issuer is simulated:
229 - The return of the benchmark (Y) is simulated and multiplied with the issuer-specific correlation.
230 This random number is consistent for every issuer during one simulation step.
231 - Afterwards, the idiosyncratic return of each issuer is simulated and multiplied with the idiosyncratic
232 risk factor sqrt(1-p^2).
233 - This results in the simulated return for every issuer in every simulation step:
234 r_k = rho * Y + sqrt(1 - rho^2) * Z_k
236 For each issuer, the new rating is determined and the loss is calculated as the difference between the
237 new value and the expected value.
239 Returns:
240 tuple:
241 Loss (np.ndarray): Array of shape (n_simulation, n_issuer) with losses for each scenario and issuer.
242 issuer_ids (np.ndarray): Array of issuer IDs, order matches Loss columns.
243 issuer_names (list): List of issuer names, order matches Loss columns.
244 """
245 positions = self.get_issuer_groups()
246 indices_correlation, corr_pairs = self.get_correlation()
247 indices_cholesky = np.linalg.cholesky(indices_correlation)
248 cutOffs = self.get_cutoffs_rating()
249 states = self.get_states()
250 EV = self.get_expected_value()
251 issuer_info = positions[["IssuerName", "IssuerID", "Rating", "RatingID", "Position_Index"]].drop_duplicates()
252 issuer_ids = issuer_info["IssuerID"].to_numpy()
253 issuer_names = issuer_info["IssuerName"].to_list()
254 Loss = pd.DataFrame(np.zeros((self.n_simulation, len(issuer_ids))), columns=issuer_ids, index=range(self.n_simulation))
255 rr_scenarios = pd.DataFrame(np.zeros((self.n_simulation, len(issuer_ids))), columns=issuer_ids, index=range(self.n_simulation))
256 np.random.seed(self.seed)
258 # random numbers for indices
259 normal_random_indices = np.random.randn(len(self.list_of_indices), self.n_simulation)
260 YY = np.matmul(indices_cholesky, normal_random_indices)
261 YY_idio = np.random.randn(len(issuer_ids), self.n_simulation)
263 # Calculate Losses for each issuer
264 # Iterate
265 i = 0
266 for idx in issuer_ids:
267 # correlation between issuer and its assigned index
268 issuer_name = issuer_info.loc[issuer_info["IssuerID"] == idx, "IssuerName"].iloc[0]
269 index_name = issuer_info.loc[issuer_info["IssuerID"] == idx, "Position_Index"].iloc[0]
270 correlation = corr_pairs.loc[issuer_name, index_name]
271 rating_id = issuer_info.loc[issuer_info["IssuerID"] == idx, "RatingID"].iloc[0]
272 cutoffs_vec = np.matrix(cutOffs[:, rating_id]).T
273 rr = YY[self.list_of_indices.index(index_name), :] * correlation
274 rr_idio = np.sqrt(1 - (correlation**2)) * YY_idio[i, :]
275 rr_all = rr + rr_idio
276 # Determine new rating by comparing the simulated score to cutoffs
277 new_ratings = np.array(rr_all < cutoffs_vec)
278 new_ratings_idx = len(new_ratings) - np.sum(new_ratings, 0)
279 col_idx = new_ratings_idx.astype(int)
280 V_t = states.loc[idx].iloc[col_idx]
281 Loss_t = V_t.to_numpy() - EV.loc[idx].iloc[0]
282 Loss.loc[:, idx] = Loss_t
283 rr_scenarios.loc[:, idx] = rr_all
284 i += 1
286 return Loss, rr_scenarios, issuer_ids, issuer_names
288 def get_loss_distribution(self, mc_scenario_values: np.ndarray):
289 """Computes loss distribution for portfolio after monte-carlo-simulation.
291 Returns:
292 Array: Portfolio loss distribution.
293 """
294 loss_distribution = np.sum(mc_scenario_values, 1)
296 return loss_distribution
298 def get_portfolio_VaR(self, loss_distribution: np.ndarray):
299 """Computes Credit Value at Risk for specific portfolio and confidence level.
301 Returns:
302 Float: Portfolio Value at Risk of specific confidence level.
303 """
304 Port_Var = -1.0 * np.percentile(loss_distribution, self.confidencelevel)
306 return Port_Var
308 def get_portfolio_ES(self, loss_distribution: np.ndarray):
309 """Computes expected shortfall for specific portfolio and confidence level.
311 Returns:
312 Float: Expected shorfall of porfolio.
313 """
315 expectedShortfall = -1.0 * np.mean(loss_distribution[loss_distribution <= np.percentile(loss_distribution, self.confidencelevel)])
317 return expectedShortfall