Coverage for rivapy / credit / creditmetrics.py: 100%

121 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-27 14:36 +0000

1from __future__ import division 

2import pandas as pd 

3import numpy as np 

4from scipy.stats import norm 

5from typing import List, Optional as _Optional 

6from rivapy.instruments.components import Issuer 

7 

8 

9class CreditMetricsModel: 

10 def __init__( 

11 self, 

12 n_simulation: int, 

13 transition_matrix: np.matrix, 

14 position_data: pd.DataFrame, 

15 issuer_data: List[Issuer], 

16 stock_data: pd.DataFrame, 

17 r: float, 

18 t: float, 

19 confidencelevel: float, 

20 seed: _Optional[int] = None, 

21 list_of_indices: _Optional[List[str]] = ["DAX", "SP"], 

22 mapping_countries_on_indices: _Optional[dict] = { 

23 "DE": "DAX", 

24 "US": "SP", 

25 }, 

26 ): 

27 """CreditMetrics model initializer. 

28 

29 Args: 

30 n_simulation (int): Number of simulations. 

31 transition_matrix (np.matrix): Transition matrix (format np.matrix). 

32 S&P 8x8 matrix is integrated. 

33 position_data (pd.DataFrame): DataFrame with position data. Specific format is needed. 

34 issuer_data (pd.DataFrame): List of Issuer objects containing issuer metadata. 

35 stock_data (pd.DataFrame): DataFrame with stock data. Must include closing 

36 prices of the different issuers as well as reference indices (e.g. DAX). 

37 r (float): Risk-free rate. Needed to compute expected value of positions and 

38 state valuations during the transition process. 

39 t (float): Time horizon for the credit risk calculation. 

40 confidencelevel (float): Confidence level used in VaR calculation (percentage). 

41 seed (int, optional): Seed for random number generator. Defaults to None. 

42 """ 

43 

44 self.n_simulation = n_simulation 

45 self.transition_matrix = transition_matrix 

46 self.position_data = position_data 

47 self.issuer_data = issuer_data 

48 self.stock_data = stock_data 

49 self.r = r 

50 self.t = t 

51 self.confidencelevel = confidencelevel 

52 self.seed = seed 

53 self.list_of_indices = list_of_indices 

54 self.mapping_countries_on_indices = mapping_countries_on_indices 

55 

56 def merge_positions_issuer(self): 

57 """ 

58 Merges position dataframe with issuer dataframe to obtain rating-data for each position. 

59 Maps all +/- Rating variants to the same RatingID. 

60 Returns: 

61 DataFrame: Returns adjusted position dataframe. 

62 """ 

63 # Map all rating variants (including +/-) to a single RatingID 

64 rating_map = pd.DataFrame( 

65 { 

66 "Rating": [ 

67 "AAA", 

68 "AA+", 

69 "AA", 

70 "AA-", 

71 "A+", 

72 "A", 

73 "A-", 

74 "BBB+", 

75 "BBB", 

76 "BBB-", 

77 "BB+", 

78 "BB", 

79 "BB-", 

80 "B+", 

81 "B", 

82 "B-", 

83 "CCC+", 

84 "CCC", 

85 "CCC-", 

86 "D", 

87 ], 

88 "RatingID": [0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7], 

89 } 

90 ) 

91 

92 # issuer_data is now a list of Issuer objects 

93 issuer_df = pd.DataFrame( 

94 [ 

95 { 

96 "IssuerID": issuer.obj_id, 

97 "IssuerName": issuer.name, 

98 "Rating": str(issuer.rating), 

99 "Land": issuer.country, 

100 } 

101 for issuer in self.issuer_data 

102 ] 

103 ) 

104 

105 # Apply mapping 

106 issuer_adj = issuer_df.merge(rating_map, on="Rating", how="left") 

107 positions_adj = self.position_data.merge(issuer_adj[["IssuerID", "IssuerName", "Rating", "Land", "RatingID"]], on="IssuerID", how="left") 

108 

109 return positions_adj 

110 

111 def get_correlation(self): 

112 """Calculates correlation pairs for issuer with a specific reference time series. 

113 

114 Returns: 

115 DataFrame: Dataframe with correlation coefficient for each issuer. 

116 """ 

117 

118 mergedData = self.stock_data.drop(["Date"], axis=1) 

119 returns = mergedData.pct_change() 

120 

121 # Split returns into indices and stocks 

122 returns_indices = returns[self.list_of_indices] 

123 returns_stocks = returns.drop(self.list_of_indices, axis=1) 

124 # Compute correlations with different indices 

125 indices_correlation = returns_indices.corr() 

126 

127 # Create an empty DataFrame for the results 

128 corr_pairs = pd.DataFrame() 

129 

130 # Loop over the index columns and compute correlations with stocks 

131 for col_name in self.list_of_indices: 

132 # Compute the correlation of the current index series with all stock series 

133 correlation = returns_stocks.corrwith(returns[col_name]) 

134 # Add the results as a new column to the result DataFrame 

135 corr_pairs[f"{col_name}"] = correlation 

136 return indices_correlation, corr_pairs 

137 

138 def get_cutoffs_rating(self): 

139 """Compute cutoffs for each initial rating based on the transition matrix. 

140 

141 The inverse cumulative distribution function of the standard normal is used to 

142 obtain thresholds corresponding to cumulative transition probabilities. 

143 

144 Returns: 

145 np.ndarray: Array with threshold values for each target rating per initial rating. 

146 """ 

147 Z = np.cumsum(np.flipud(self.transition_matrix.T), 0) 

148 Z[Z >= (1 - 1 / 1e12)] = 1 - 1 / 1e12 

149 Z[Z <= (0 + 1 / 1e12)] = 0 + 1 / 1e12 

150 

151 CutOffs = norm.ppf(Z, 0, 1) # compute cut offes by inverting normal distribution 

152 return CutOffs 

153 

154 def get_credit_spreads(self, LGD, idx): 

155 """Compute credit spreads implied by the transition matrix. 

156 

157 Formula used (per time horizon t): spread = -log(1 - LGD * PD_t) / t 

158 

159 Args: 

160 LGD (pd.Series): Instrument-specific loss-given-default values (0-1). 

161 idx (pd.Series or list): RatingID index array aligning instruments to rows of PD_t. 

162 

163 Returns: 

164 np.ndarray: Credit spread per instrument (shape: n_instruments x 1). 

165 """ 

166 # credit spread implied by transmat 

167 PD_t = self.transition_matrix[:, -1] 

168 PD_vec = PD_t[idx] 

169 LGD_np = LGD.to_numpy().reshape(-1, 1) 

170 credit_spread = -np.log(1 - np.multiply(LGD_np, PD_vec)) / self.t 

171 return credit_spread 

172 

173 def get_expected_value(self): 

174 """Calculate the expected present value of each instrument specified by issuer and recovery rate. 

175 

176 Uses the instrument exposure, the risk-free rate and the credit spread implied 

177 by the transition matrix to discount the expected payoff over the time horizon. 

178 

179 Returns: 

180 DataFrame: DataFrame including expected values (grouped by issuer). 

181 """ 

182 positions = self.get_issuer_groups() 

183 exposure = np.matrix(positions["Exposure"]).T 

184 idx = positions["RatingID"] 

185 LGD = 1 - positions["RecoveryRate"] 

186 credit_spread = self.get_credit_spreads(LGD, idx) 

187 EV = np.multiply(exposure, np.exp(-(self.r + credit_spread) * self.t)) 

188 EV = pd.DataFrame(EV, columns=["EV"]) # keep in same order as credit cutoff 

189 EV["issuer"] = positions["IssuerID"].to_list() 

190 EV = EV.groupby("issuer").sum() # group by issuer to sum up expected values 

191 return EV 

192 

193 def get_states(self): 

194 """Compute matrix of present values for each issuer's positions under all possible future ratings. 

195 

196 Each column corresponds to a target rating (including default). The values are the 

197 discounted present values of the issuer's positions assuming the given rating outcome. 

198 

199 Returns: 

200 DataFrame: DataFrame with present values by issuer and rating state. 

201 """ 

202 positions = self.get_issuer_groups() 

203 LGD = 1 - np.array(positions["RecoveryRate"]) 

204 PD_t = self.transition_matrix[:, -1] # default probability at t 

205 credit_spread = -np.log(1 - PD_t * LGD.T) 

206 exposure = np.matrix(positions["Exposure"]) 

207 state = np.multiply(exposure, np.exp(-(self.r + credit_spread) * self.t)).T 

208 state = np.append(state, np.multiply(exposure, np.matrix(positions["RecoveryRate"])).T, axis=1) # last column is default case 

209 states = pd.DataFrame(np.fliplr(state), columns=["D", "C", "B", "BB", "BBB", "A", "AA", "AAA"]) # keep in same order as credit cutoff 

210 states["issuer"] = positions["IssuerID"].to_list() 

211 states = states.groupby("issuer").sum() 

212 return states 

213 

214 def get_issuer_groups(self): 

215 df_positions_grouped = self.merge_positions_issuer() 

216 df_positions_grouped = df_positions_grouped[["IssuerID", "IssuerName", "RecoveryRate", "Rating", "RatingID", "Land", "Exposure"]] 

217 df_positions_grouped = df_positions_grouped.groupby( 

218 ["IssuerID", "IssuerName", "RecoveryRate", "Rating", "RatingID", "Land"], as_index=False 

219 ).sum() 

220 df_positions_grouped["Position_Index"] = [self.mapping_countries_on_indices.get(country) for country in df_positions_grouped["Land"]] 

221 

222 return df_positions_grouped 

223 

224 def mc_calculation(self): 

225 """ 

226 Monte-Carlo simulation of portfolio based on positions, issuer, correlation and transition matrix. 

227 

228 For each simulation step, the return of each issuer is simulated: 

229 - The return of the benchmark (Y) is simulated and multiplied with the issuer-specific correlation. 

230 This random number is consistent for every issuer during one simulation step. 

231 - Afterwards, the idiosyncratic return of each issuer is simulated and multiplied with the idiosyncratic 

232 risk factor sqrt(1-p^2). 

233 - This results in the simulated return for every issuer in every simulation step: 

234 r_k = rho * Y + sqrt(1 - rho^2) * Z_k 

235 

236 For each issuer, the new rating is determined and the loss is calculated as the difference between the 

237 new value and the expected value. 

238 

239 Returns: 

240 tuple: 

241 Loss (np.ndarray): Array of shape (n_simulation, n_issuer) with losses for each scenario and issuer. 

242 issuer_ids (np.ndarray): Array of issuer IDs, order matches Loss columns. 

243 issuer_names (list): List of issuer names, order matches Loss columns. 

244 """ 

245 positions = self.get_issuer_groups() 

246 indices_correlation, corr_pairs = self.get_correlation() 

247 indices_cholesky = np.linalg.cholesky(indices_correlation) 

248 cutOffs = self.get_cutoffs_rating() 

249 states = self.get_states() 

250 EV = self.get_expected_value() 

251 issuer_info = positions[["IssuerName", "IssuerID", "Rating", "RatingID", "Position_Index"]].drop_duplicates() 

252 issuer_ids = issuer_info["IssuerID"].to_numpy() 

253 issuer_names = issuer_info["IssuerName"].to_list() 

254 Loss = pd.DataFrame(np.zeros((self.n_simulation, len(issuer_ids))), columns=issuer_ids, index=range(self.n_simulation)) 

255 rr_scenarios = pd.DataFrame(np.zeros((self.n_simulation, len(issuer_ids))), columns=issuer_ids, index=range(self.n_simulation)) 

256 np.random.seed(self.seed) 

257 

258 # random numbers for indices 

259 normal_random_indices = np.random.randn(len(self.list_of_indices), self.n_simulation) 

260 YY = np.matmul(indices_cholesky, normal_random_indices) 

261 YY_idio = np.random.randn(len(issuer_ids), self.n_simulation) 

262 

263 # Calculate Losses for each issuer 

264 # Iterate 

265 i = 0 

266 for idx in issuer_ids: 

267 # correlation between issuer and its assigned index 

268 issuer_name = issuer_info.loc[issuer_info["IssuerID"] == idx, "IssuerName"].iloc[0] 

269 index_name = issuer_info.loc[issuer_info["IssuerID"] == idx, "Position_Index"].iloc[0] 

270 correlation = corr_pairs.loc[issuer_name, index_name] 

271 rating_id = issuer_info.loc[issuer_info["IssuerID"] == idx, "RatingID"].iloc[0] 

272 cutoffs_vec = np.matrix(cutOffs[:, rating_id]).T 

273 rr = YY[self.list_of_indices.index(index_name), :] * correlation 

274 rr_idio = np.sqrt(1 - (correlation**2)) * YY_idio[i, :] 

275 rr_all = rr + rr_idio 

276 # Determine new rating by comparing the simulated score to cutoffs 

277 new_ratings = np.array(rr_all < cutoffs_vec) 

278 new_ratings_idx = len(new_ratings) - np.sum(new_ratings, 0) 

279 col_idx = new_ratings_idx.astype(int) 

280 V_t = states.loc[idx].iloc[col_idx] 

281 Loss_t = V_t.to_numpy() - EV.loc[idx].iloc[0] 

282 Loss.loc[:, idx] = Loss_t 

283 rr_scenarios.loc[:, idx] = rr_all 

284 i += 1 

285 

286 return Loss, rr_scenarios, issuer_ids, issuer_names 

287 

288 def get_loss_distribution(self, mc_scenario_values: np.ndarray): 

289 """Computes loss distribution for portfolio after monte-carlo-simulation. 

290 

291 Returns: 

292 Array: Portfolio loss distribution. 

293 """ 

294 loss_distribution = np.sum(mc_scenario_values, 1) 

295 

296 return loss_distribution 

297 

298 def get_portfolio_VaR(self, loss_distribution: np.ndarray): 

299 """Computes Credit Value at Risk for specific portfolio and confidence level. 

300 

301 Returns: 

302 Float: Portfolio Value at Risk of specific confidence level. 

303 """ 

304 Port_Var = -1.0 * np.percentile(loss_distribution, self.confidencelevel) 

305 

306 return Port_Var 

307 

308 def get_portfolio_ES(self, loss_distribution: np.ndarray): 

309 """Computes expected shortfall for specific portfolio and confidence level. 

310 

311 Returns: 

312 Float: Expected shorfall of porfolio. 

313 """ 

314 

315 expectedShortfall = -1.0 * np.mean(loss_distribution[loss_distribution <= np.percentile(loss_distribution, self.confidencelevel)]) 

316 

317 return expectedShortfall