Coverage for rivapy/models/residual_demand_model.py: 29%

202 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-05 14:27 +0000

1import numpy as np 

2import pandas as pd 

3import datetime as dt 

4import matplotlib.pyplot as plt 

5import warnings 

6from typing import Set, Callable 

7from scipy.special import comb 

8from rivapy.tools.datetime_grid import DateTimeGrid, InterpolatedFunction, PeriodicFunction 

9from rivapy.models.ornstein_uhlenbeck import OrnsteinUhlenbeck 

10from rivapy.tools.interfaces import DateTimeFunction, FactoryObject 

11from rivapy.models.base_model import BaseModel 

12 

13def _logit(x): 

14 return np.log(x/(1-x)) 

15 

16def _inv_logit(x): 

17 return 1.0/(1+np.exp(-x)) 

18 

19class CosinusSeasonality: 

20 def __init__(self, x: np.ndarray = np.array([0,1,0,1,0])): 

21 self.x = x 

22 

23 def __call__(self, x): 

24 return self.x[0]*np.cos(2*np.pi*x+self.x[1]) + self.x[2]*np.cos(4*np.pi*x+self.x[3]) + self.x[4] 

25 

26 

27class SolarProfile: 

28 def __init__(self, profile:Callable): 

29 self._profile = profile 

30 

31 def get_profile(self, timegrid: DateTimeGrid): 

32 result = np.empty(timegrid.timegrid.shape[0]) 

33 for i in range(result.shape[0]): 

34 result[i] = self._profile(timegrid.dates[i]) 

35 return result 

36 

37class MonthlySolarProfile(SolarProfile): 

38 def __init__(self, monthly_profiles: np.ndarray): 

39 self._monthly_profiles = monthly_profiles 

40 super().__init__(self.__monthly_solar_profile) 

41 if monthly_profiles is not None: 

42 if monthly_profiles.shape != (12,24): 

43 raise ValueError('Monthly profiles must have shape (12,24)') 

44 

45 def __monthly_solar_profile(self, d): 

46 return self._monthly_profiles[d.month-1, d.hour] 

47 

48 

49class SolarPowerModel(BaseModel): 

50 def _eval_grid(f, timegrid): 

51 return f(timegrid) 

52 try: 

53 return f(timegrid) 

54 except: 

55 result = np.full(timegrid.shape, f) 

56 return result 

57 

58 def __init__(self, 

59 daily_maximum_process, 

60 profile, 

61 mean_level, 

62 name:str = 'Solar_Germany'): 

63 self.name = name 

64 self._daily_maximum_process = daily_maximum_process 

65 self._profile = profile 

66 self.mean_level = mean_level 

67 

68 def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple: 

69 return self._daily_maximum_process.rnd_shape(n_sims, n_timepoints) 

70 

71 def simulate(self, timegrid: DateTimeGrid, start_value: float, rnd): 

72 # daily timegrid for daily maximum simulation 

73 tg_ = timegrid.get_daily_subgrid() 

74 if hasattr(self.mean_level, 'compute'): 

75 ml = self.mean_level.compute(timegrid)[:, np.newaxis] 

76 else: 

77 ml = self.mean_level(timegrid)[:, np.newaxis] 

78 start_value_ = _logit(start_value) - ml[0,0] 

79 daily_maximum = self._daily_maximum_process.simulate(tg_.timegrid, start_value_, rnd) 

80 profile = self._profile.get_profile(timegrid) 

81 result = np.empty((timegrid.shape[0], rnd.shape[1])) 

82 day = 0 

83 d = tg_.dates[0].date() 

84 for i in range(timegrid.timegrid.shape[0]): 

85 if d != timegrid.dates[i].date(): 

86 day += 1 

87 d = timegrid.dates[i].date() 

88 result[i,:] = _inv_logit(daily_maximum[day,:] + ml[i,0])* profile[i] 

89 return result 

90 

91 def udls(self)->Set[str]: 

92 """Return the name of all underlyings modeled 

93 

94 Returns: 

95 Set[str]: Set of the modeled underlyings. 

96 """ 

97 return set([self.name]) 

98 

99 def _to_dict(self) -> dict: 

100 raise NotImplementedError() 

101 

102 

103class WindPowerModel(BaseModel): 

104 

105 def _eval_grid(f, timegrid): 

106 try: 

107 return f(timegrid) 

108 except: 

109 result = np.full(timegrid.shape, f) 

110 return result 

111 

112 def __init__(self, deviation_process: object, 

113 seasonal_function: object, 

114 name:str = 'Wind_Germany'): 

115 """Wind Power Model to model the efficiency of wind power production. 

116 

117 Args: 

118 speed_of_mean_reversion (Union[float, Callable]): _description_ 

119 volatility (Union[float, Callable]): _description_ 

120 mean_level (Union[float, Callable]): _description_ 

121 """ 

122 self.deviation_process = deviation_process 

123 self.seasonal_function = seasonal_function 

124 # self.speed_of_mean_reversion = speed_of_mean_reversion 

125 # self.volatility = volatility 

126 # self.mean_level = mean_level 

127 self.name = name 

128 self._timegrid = None 

129 

130 def udls(self)->Set[str]: 

131 """Return the name of all underlyings modeled 

132 

133 Returns: 

134 Set[str]: Set of the modeled underlyings. 

135 """ 

136 return set([self.name]) 

137 

138 def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple: 

139 return self.deviation_process.rnd_shape(n_sims, n_timepoints) 

140 

141 def simulate(self, timegrid: DateTimeGrid, start_value, rnd): 

142 if hasattr(self.seasonal_function, 'compute'): 

143 mean = self.seasonal_function.compute(timegrid)[:, np.newaxis] 

144 else: 

145 mean = self.seasonal_function(timegrid)[:, np.newaxis] 

146 start_value_ = _logit(start_value) - mean[0,0] 

147 deviation = self.deviation_process.simulate(timegrid.timegrid, start_value_, rnd) 

148 return _inv_logit(mean + deviation) 

149 

150 @staticmethod 

151 def calibrate(deviation_model, capacities:pd.DataFrame, data: pd.DataFrame, seasonality_function, min_efficiency=0.001, max_efficiency=0.99, **kwargs): 

152 if capacities is not None: 

153 if 'efficiency' in data.columns: 

154 warnings.warn('Capacities are defined but the data already contains a column efficiency with productions transformed by capacity!') 

155 capacities_interp = InterpolatedFunction(capacities.index, capacities['capacity']) 

156 data['efficiency'] = data['production']/capacities_interp.compute(data.index) 

157 data['logit_efficiency'] = _logit(np.minimum(np.maximum(data['efficiency'], min_efficiency), max_efficiency)) 

158 f = CosinusSeasonality(x=np.array([1.0, 1, 0.9, 1.1, 0.5, -1.0])) 

159 pf_target = PeriodicFunction(f, frequency='Y', granularity=pd.infer_freq(data.index), ignore_leap_day=True) 

160 pf_target.calibrate(data.index, data['logit_efficiency'].values) 

161 data['des_logit_efficiency'] = data['logit_efficiency']-pf_target.compute(DateTimeGrid(data.index)) 

162 deviation_model.calibrate(data['des_logit_efficiency'].values,dt=1.0/(24.0*365.0),**kwargs) 

163 return WindPowerModel(deviation_model, pf_target) 

164 

165 def _to_dict(self) -> dict: 

166 raise NotImplementedError() 

167 

168class SmoothstepSupplyCurve(FactoryObject): 

169 def __init__(self, s, N): 

170 self.s = s 

171 self.N = N 

172 

173 def _to_dict(self): 

174 return {'s': self.s, 'N': self.N} 

175 

176 @staticmethod 

177 def smoothstep(x, x_min=0, x_max=1, N=1): 

178 x = np.clip((x - x_min) / (x_max - x_min), 0, 1) 

179 result = 0 

180 for n in range(0, N + 1): 

181 result += comb(N + n, n) * comb(2 * N + 1, N - n) * (-x) ** n 

182 result *= x ** (N + 1) 

183 return result 

184 

185 def __call__(self, residual): 

186 #wind_production = wind_production#np.maximum(np.minimum(wind_production, 0.99), 0.01) 

187 #residual = (1.0-wind_production) 

188 residual = np.power(residual, self.s) 

189 return SmoothstepSupplyCurve.smoothstep(residual, N=self.N) 

190 

191class SupplyFunction: 

192 def __init__(self, floor:tuple, cap:tuple, peak:tuple, offpeak:tuple, peak_hours: set): 

193 self.floor = floor 

194 self.cap = cap 

195 self.peak = peak 

196 self.offpeak = offpeak 

197 self.peak_hours = peak_hours 

198 

199 def compute(self, q, d:dt.datetime): 

200 def cutoff(x): 

201 return np.minimum(self.cap[1], np.maximum(self.floor[1], x)) 

202 if q<=self.floor[0]: 

203 return self.floor[1] 

204 elif q>=self.cap[0]: 

205 return self.cap[1] 

206 if d.hour not in self.peak_hours: 

207 return cutoff(self.offpeak[0]+self.offpeak[1]/(q-self.floor[0])+self.offpeak[2]*q) 

208 return cutoff(self.peak[0]+self.peak[1]/(self.cap[0]-q)) 

209 

210 def plot(self, d:dt.datetime, res_demand_low = None, res_demand_high = None): 

211 if res_demand_low is None: 

212 res_demand_low = self.floor[0] 

213 if res_demand_high is None: 

214 res_demand_high = self.cap[0] 

215 q = np.linspace(res_demand_low, res_demand_high, 50) 

216 f = [self.compute(x, d) for x in q] 

217 plt.plot(q,f,'-', label=str(d)) 

218 plt.xlabel('residual demand') 

219 plt.ylabel('price') 

220 

221class LoadModel: 

222 def __init__(self,deviation_process: object, load_profile: object): 

223 """Model the power load.  

224 

225 Args: 

226 deviation_process (object): _description_ 

227 load_profile (object): _description_ 

228 """ 

229 self.load_profile = load_profile 

230 self.deviation_process = deviation_process 

231 

232 def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple: 

233 return self.deviation_process.rnd_shape(n_sims, n_timepoints) 

234 

235 def simulate(self, timegrid: DateTimeGrid, start_value: float, rnd:np.ndarray): 

236 result = np.empty((timegrid.shape[0], rnd.shape[0])) 

237 result[0,:] = start_value 

238 deviation = self.deviation_process.simulate(timegrid.timegrid, start_value, rnd) 

239 return self.load_profile.get_profile(timegrid)[:, np.newaxis] + deviation 

240 

241class ResidualDemandModel(BaseModel): 

242 def __init__(self, wind_model: object, 

243 capacity_wind: float, 

244 solar_model: object, 

245 capacity_solar: float, 

246 load_model: object, 

247 supply_curve: object, 

248 power_name:str, 

249 ): 

250 """Residual demand model to model power prices. 

251 

252 This model is based on the paper by :footcite:t:`Wagner2012` and models power (spot) prices :math:`p_t` depending (by a deterministic function :math:`f`) on the residual demand 

253 :math:`R_t`, 

254 

255 .. math:: 

256 p_t = f(R_t) = f(L_t - IC^w\cdot E_t^w - IC_t^s\cdot E_t^s) 

257 

258 where 

259 

260 - :math:`L_t` is the demand (load) at time :math:`t`, 

261 - :math:`IC^w` denotes the installed capacity of wind (in contrast to the paper this is not time dependent), 

262 - :math:`E_t^w` is the wind efficiency at time :math:`t`, 

263 - :math:`IC^s` denotes the installed capacity of solar (in contrast to the paper this is not time dependent), 

264 - :math:`E_t^s` is the solar efficiency at time :math:`t`. 

265 

266 

267 

268 Args: 

269 wind_model (object): Model for wind efficiency (needs to implement a method simulate in order to work with this model).  

270 See :func:`rivapy.models.WindPowerModel` as an example for a wind model. 

271 capacity_wind (object): The capacity of wind power. This is multiplied with the simulated efficiency to obtain the simulated absolute amount of wind. 

272 solar_model (object): Model for solar efficiency (needs to implement a method simulate in order to work with this model).  

273 See :func:`rivapy.models.SolarPowerModel` as an example for a solar model. 

274 capacity_solar (object): The capacity of solar power. This is multiplied with the simulated efficiency to obtain the simulated absolute amount of solar. 

275 load_model (object): Model for load. See :func:`rivapy.models.LoadModel` as an example for a load model. 

276 supply_curve (object): The total demand, see :func:`rivapy.models.SupplyFunction` for an example. 

277 power_name (str): Name of the simulated power. This is used within pricing to identify the simulated power. 

278 wind_name (str): Name of the simulated wind. This is used within pricing to identify the simulated wind. 

279 """ 

280 self.wind_model = wind_model 

281 self.capacity_wind = capacity_wind 

282 self.solar_model = solar_model 

283 self.capacity_solar = capacity_solar 

284 self.load_model = load_model 

285 self.supply_curve = supply_curve 

286 self.power_name = power_name 

287 

288 def udls(self)->Set[str]: 

289 """Return the name of all underlyings modeled 

290 

291 Returns: 

292 Set[str]: Set of the modeled underlyings. 

293 """ 

294 result = set([self.power_name]) 

295 result.update(self.wind_model.udls()) 

296 result.update(self.solar_model.udls()) 

297 return result 

298 

299 def simulate(self, timegrid: DateTimeGrid, 

300 start_value_wind: float, 

301 start_value_solar: float, 

302 start_value_load: float, 

303 n_sims: int, 

304 rnd_wind: np.ndarray=None, 

305 rnd_solar: np.ndarray=None, 

306 rnd_load: float=None, 

307 seed = None): 

308 """Simulate the residual demand model on a given datetimegrid. 

309 

310 Args: 

311 timegrid (DateTimeGrid): _description_ 

312 start_value_wind (float): _description_ 

313 start_value_solar (float): _description_ 

314 start_value_load (float): _description_ 

315 n_sims (int): _description_ 

316 rnd_wind (np.ndarray, optional): _description_. Defaults to None. 

317 rnd_solar (np.ndarray, optional): _description_. Defaults to None. 

318 rnd_load (float, optional): _description_. Defaults to None. 

319 rnd_state (_type_, optional): _description_. Defaults to None. 

320 

321 Returns: 

322 _type_: _description_ 

323 """ 

324 np.random.seed(seed) 

325 if rnd_wind is None: 

326 rnd_wind = np.random.normal(size=self.wind_model.rnd_shape(n_sims,timegrid.shape[0])) 

327 if rnd_solar is None: 

328 rnd_solar = np.random.normal(size=self.solar_model.rnd_shape(n_sims, timegrid.get_daily_subgrid().shape[0])) 

329 if rnd_load is None: 

330 rnd_load = np.random.normal(size=self.load_model.rnd_shape(n_sims,timegrid.shape[0])) 

331 

332 lm = self.load_model.simulate(timegrid, start_value_load, rnd_load) 

333 sm = self.capacity_solar*self.solar_model.simulate(timegrid, start_value_solar, rnd_solar) 

334 wm = self.capacity_wind*self.wind_model.simulate(timegrid, start_value_wind, rnd_wind) 

335 residual_demand = lm - sm - wm 

336 power_price = np.zeros(shape=( timegrid.shape[0], n_sims)) 

337 for i in range(timegrid.shape[0]): 

338 for j in range(n_sims): 

339 power_price[i,j] = self.supply_curve.compute(residual_demand[i,j],timegrid.dates[i] ) 

340 result = {} 

341 result['load'] = lm 

342 result['solar'] = sm 

343 result['wind'] = wm 

344 result['price'] = power_price 

345 return result 

346 

347 def _to_dict(self) -> dict: 

348 raise NotImplementedError() 

349 

350if __name__=='__main__': 

351 pass