Coverage for rivapy/models/residual_demand_model.py: 29%
202 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-05 14:27 +0000
1import numpy as np
2import pandas as pd
3import datetime as dt
4import matplotlib.pyplot as plt
5import warnings
6from typing import Set, Callable
7from scipy.special import comb
8from rivapy.tools.datetime_grid import DateTimeGrid, InterpolatedFunction, PeriodicFunction
9from rivapy.models.ornstein_uhlenbeck import OrnsteinUhlenbeck
10from rivapy.tools.interfaces import DateTimeFunction, FactoryObject
11from rivapy.models.base_model import BaseModel
13def _logit(x):
14 return np.log(x/(1-x))
16def _inv_logit(x):
17 return 1.0/(1+np.exp(-x))
19class CosinusSeasonality:
20 def __init__(self, x: np.ndarray = np.array([0,1,0,1,0])):
21 self.x = x
23 def __call__(self, x):
24 return self.x[0]*np.cos(2*np.pi*x+self.x[1]) + self.x[2]*np.cos(4*np.pi*x+self.x[3]) + self.x[4]
27class SolarProfile:
28 def __init__(self, profile:Callable):
29 self._profile = profile
31 def get_profile(self, timegrid: DateTimeGrid):
32 result = np.empty(timegrid.timegrid.shape[0])
33 for i in range(result.shape[0]):
34 result[i] = self._profile(timegrid.dates[i])
35 return result
37class MonthlySolarProfile(SolarProfile):
38 def __init__(self, monthly_profiles: np.ndarray):
39 self._monthly_profiles = monthly_profiles
40 super().__init__(self.__monthly_solar_profile)
41 if monthly_profiles is not None:
42 if monthly_profiles.shape != (12,24):
43 raise ValueError('Monthly profiles must have shape (12,24)')
45 def __monthly_solar_profile(self, d):
46 return self._monthly_profiles[d.month-1, d.hour]
49class SolarPowerModel(BaseModel):
50 def _eval_grid(f, timegrid):
51 return f(timegrid)
52 try:
53 return f(timegrid)
54 except:
55 result = np.full(timegrid.shape, f)
56 return result
58 def __init__(self,
59 daily_maximum_process,
60 profile,
61 mean_level,
62 name:str = 'Solar_Germany'):
63 self.name = name
64 self._daily_maximum_process = daily_maximum_process
65 self._profile = profile
66 self.mean_level = mean_level
68 def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple:
69 return self._daily_maximum_process.rnd_shape(n_sims, n_timepoints)
71 def simulate(self, timegrid: DateTimeGrid, start_value: float, rnd):
72 # daily timegrid for daily maximum simulation
73 tg_ = timegrid.get_daily_subgrid()
74 if hasattr(self.mean_level, 'compute'):
75 ml = self.mean_level.compute(timegrid)[:, np.newaxis]
76 else:
77 ml = self.mean_level(timegrid)[:, np.newaxis]
78 start_value_ = _logit(start_value) - ml[0,0]
79 daily_maximum = self._daily_maximum_process.simulate(tg_.timegrid, start_value_, rnd)
80 profile = self._profile.get_profile(timegrid)
81 result = np.empty((timegrid.shape[0], rnd.shape[1]))
82 day = 0
83 d = tg_.dates[0].date()
84 for i in range(timegrid.timegrid.shape[0]):
85 if d != timegrid.dates[i].date():
86 day += 1
87 d = timegrid.dates[i].date()
88 result[i,:] = _inv_logit(daily_maximum[day,:] + ml[i,0])* profile[i]
89 return result
91 def udls(self)->Set[str]:
92 """Return the name of all underlyings modeled
94 Returns:
95 Set[str]: Set of the modeled underlyings.
96 """
97 return set([self.name])
99 def _to_dict(self) -> dict:
100 raise NotImplementedError()
103class WindPowerModel(BaseModel):
105 def _eval_grid(f, timegrid):
106 try:
107 return f(timegrid)
108 except:
109 result = np.full(timegrid.shape, f)
110 return result
112 def __init__(self, deviation_process: object,
113 seasonal_function: object,
114 name:str = 'Wind_Germany'):
115 """Wind Power Model to model the efficiency of wind power production.
117 Args:
118 speed_of_mean_reversion (Union[float, Callable]): _description_
119 volatility (Union[float, Callable]): _description_
120 mean_level (Union[float, Callable]): _description_
121 """
122 self.deviation_process = deviation_process
123 self.seasonal_function = seasonal_function
124 # self.speed_of_mean_reversion = speed_of_mean_reversion
125 # self.volatility = volatility
126 # self.mean_level = mean_level
127 self.name = name
128 self._timegrid = None
130 def udls(self)->Set[str]:
131 """Return the name of all underlyings modeled
133 Returns:
134 Set[str]: Set of the modeled underlyings.
135 """
136 return set([self.name])
138 def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple:
139 return self.deviation_process.rnd_shape(n_sims, n_timepoints)
141 def simulate(self, timegrid: DateTimeGrid, start_value, rnd):
142 if hasattr(self.seasonal_function, 'compute'):
143 mean = self.seasonal_function.compute(timegrid)[:, np.newaxis]
144 else:
145 mean = self.seasonal_function(timegrid)[:, np.newaxis]
146 start_value_ = _logit(start_value) - mean[0,0]
147 deviation = self.deviation_process.simulate(timegrid.timegrid, start_value_, rnd)
148 return _inv_logit(mean + deviation)
150 @staticmethod
151 def calibrate(deviation_model, capacities:pd.DataFrame, data: pd.DataFrame, seasonality_function, min_efficiency=0.001, max_efficiency=0.99, **kwargs):
152 if capacities is not None:
153 if 'efficiency' in data.columns:
154 warnings.warn('Capacities are defined but the data already contains a column efficiency with productions transformed by capacity!')
155 capacities_interp = InterpolatedFunction(capacities.index, capacities['capacity'])
156 data['efficiency'] = data['production']/capacities_interp.compute(data.index)
157 data['logit_efficiency'] = _logit(np.minimum(np.maximum(data['efficiency'], min_efficiency), max_efficiency))
158 f = CosinusSeasonality(x=np.array([1.0, 1, 0.9, 1.1, 0.5, -1.0]))
159 pf_target = PeriodicFunction(f, frequency='Y', granularity=pd.infer_freq(data.index), ignore_leap_day=True)
160 pf_target.calibrate(data.index, data['logit_efficiency'].values)
161 data['des_logit_efficiency'] = data['logit_efficiency']-pf_target.compute(DateTimeGrid(data.index))
162 deviation_model.calibrate(data['des_logit_efficiency'].values,dt=1.0/(24.0*365.0),**kwargs)
163 return WindPowerModel(deviation_model, pf_target)
165 def _to_dict(self) -> dict:
166 raise NotImplementedError()
168class SmoothstepSupplyCurve(FactoryObject):
169 def __init__(self, s, N):
170 self.s = s
171 self.N = N
173 def _to_dict(self):
174 return {'s': self.s, 'N': self.N}
176 @staticmethod
177 def smoothstep(x, x_min=0, x_max=1, N=1):
178 x = np.clip((x - x_min) / (x_max - x_min), 0, 1)
179 result = 0
180 for n in range(0, N + 1):
181 result += comb(N + n, n) * comb(2 * N + 1, N - n) * (-x) ** n
182 result *= x ** (N + 1)
183 return result
185 def __call__(self, residual):
186 #wind_production = wind_production#np.maximum(np.minimum(wind_production, 0.99), 0.01)
187 #residual = (1.0-wind_production)
188 residual = np.power(residual, self.s)
189 return SmoothstepSupplyCurve.smoothstep(residual, N=self.N)
191class SupplyFunction:
192 def __init__(self, floor:tuple, cap:tuple, peak:tuple, offpeak:tuple, peak_hours: set):
193 self.floor = floor
194 self.cap = cap
195 self.peak = peak
196 self.offpeak = offpeak
197 self.peak_hours = peak_hours
199 def compute(self, q, d:dt.datetime):
200 def cutoff(x):
201 return np.minimum(self.cap[1], np.maximum(self.floor[1], x))
202 if q<=self.floor[0]:
203 return self.floor[1]
204 elif q>=self.cap[0]:
205 return self.cap[1]
206 if d.hour not in self.peak_hours:
207 return cutoff(self.offpeak[0]+self.offpeak[1]/(q-self.floor[0])+self.offpeak[2]*q)
208 return cutoff(self.peak[0]+self.peak[1]/(self.cap[0]-q))
210 def plot(self, d:dt.datetime, res_demand_low = None, res_demand_high = None):
211 if res_demand_low is None:
212 res_demand_low = self.floor[0]
213 if res_demand_high is None:
214 res_demand_high = self.cap[0]
215 q = np.linspace(res_demand_low, res_demand_high, 50)
216 f = [self.compute(x, d) for x in q]
217 plt.plot(q,f,'-', label=str(d))
218 plt.xlabel('residual demand')
219 plt.ylabel('price')
221class LoadModel:
222 def __init__(self,deviation_process: object, load_profile: object):
223 """Model the power load.
225 Args:
226 deviation_process (object): _description_
227 load_profile (object): _description_
228 """
229 self.load_profile = load_profile
230 self.deviation_process = deviation_process
232 def rnd_shape(self, n_sims: int, n_timepoints: int)->tuple:
233 return self.deviation_process.rnd_shape(n_sims, n_timepoints)
235 def simulate(self, timegrid: DateTimeGrid, start_value: float, rnd:np.ndarray):
236 result = np.empty((timegrid.shape[0], rnd.shape[0]))
237 result[0,:] = start_value
238 deviation = self.deviation_process.simulate(timegrid.timegrid, start_value, rnd)
239 return self.load_profile.get_profile(timegrid)[:, np.newaxis] + deviation
241class ResidualDemandModel(BaseModel):
242 def __init__(self, wind_model: object,
243 capacity_wind: float,
244 solar_model: object,
245 capacity_solar: float,
246 load_model: object,
247 supply_curve: object,
248 power_name:str,
249 ):
250 """Residual demand model to model power prices.
252 This model is based on the paper by :footcite:t:`Wagner2012` and models power (spot) prices :math:`p_t` depending (by a deterministic function :math:`f`) on the residual demand
253 :math:`R_t`,
255 .. math::
256 p_t = f(R_t) = f(L_t - IC^w\cdot E_t^w - IC_t^s\cdot E_t^s)
258 where
260 - :math:`L_t` is the demand (load) at time :math:`t`,
261 - :math:`IC^w` denotes the installed capacity of wind (in contrast to the paper this is not time dependent),
262 - :math:`E_t^w` is the wind efficiency at time :math:`t`,
263 - :math:`IC^s` denotes the installed capacity of solar (in contrast to the paper this is not time dependent),
264 - :math:`E_t^s` is the solar efficiency at time :math:`t`.
268 Args:
269 wind_model (object): Model for wind efficiency (needs to implement a method simulate in order to work with this model).
270 See :func:`rivapy.models.WindPowerModel` as an example for a wind model.
271 capacity_wind (object): The capacity of wind power. This is multiplied with the simulated efficiency to obtain the simulated absolute amount of wind.
272 solar_model (object): Model for solar efficiency (needs to implement a method simulate in order to work with this model).
273 See :func:`rivapy.models.SolarPowerModel` as an example for a solar model.
274 capacity_solar (object): The capacity of solar power. This is multiplied with the simulated efficiency to obtain the simulated absolute amount of solar.
275 load_model (object): Model for load. See :func:`rivapy.models.LoadModel` as an example for a load model.
276 supply_curve (object): The total demand, see :func:`rivapy.models.SupplyFunction` for an example.
277 power_name (str): Name of the simulated power. This is used within pricing to identify the simulated power.
278 wind_name (str): Name of the simulated wind. This is used within pricing to identify the simulated wind.
279 """
280 self.wind_model = wind_model
281 self.capacity_wind = capacity_wind
282 self.solar_model = solar_model
283 self.capacity_solar = capacity_solar
284 self.load_model = load_model
285 self.supply_curve = supply_curve
286 self.power_name = power_name
288 def udls(self)->Set[str]:
289 """Return the name of all underlyings modeled
291 Returns:
292 Set[str]: Set of the modeled underlyings.
293 """
294 result = set([self.power_name])
295 result.update(self.wind_model.udls())
296 result.update(self.solar_model.udls())
297 return result
299 def simulate(self, timegrid: DateTimeGrid,
300 start_value_wind: float,
301 start_value_solar: float,
302 start_value_load: float,
303 n_sims: int,
304 rnd_wind: np.ndarray=None,
305 rnd_solar: np.ndarray=None,
306 rnd_load: float=None,
307 seed = None):
308 """Simulate the residual demand model on a given datetimegrid.
310 Args:
311 timegrid (DateTimeGrid): _description_
312 start_value_wind (float): _description_
313 start_value_solar (float): _description_
314 start_value_load (float): _description_
315 n_sims (int): _description_
316 rnd_wind (np.ndarray, optional): _description_. Defaults to None.
317 rnd_solar (np.ndarray, optional): _description_. Defaults to None.
318 rnd_load (float, optional): _description_. Defaults to None.
319 rnd_state (_type_, optional): _description_. Defaults to None.
321 Returns:
322 _type_: _description_
323 """
324 np.random.seed(seed)
325 if rnd_wind is None:
326 rnd_wind = np.random.normal(size=self.wind_model.rnd_shape(n_sims,timegrid.shape[0]))
327 if rnd_solar is None:
328 rnd_solar = np.random.normal(size=self.solar_model.rnd_shape(n_sims, timegrid.get_daily_subgrid().shape[0]))
329 if rnd_load is None:
330 rnd_load = np.random.normal(size=self.load_model.rnd_shape(n_sims,timegrid.shape[0]))
332 lm = self.load_model.simulate(timegrid, start_value_load, rnd_load)
333 sm = self.capacity_solar*self.solar_model.simulate(timegrid, start_value_solar, rnd_solar)
334 wm = self.capacity_wind*self.wind_model.simulate(timegrid, start_value_wind, rnd_wind)
335 residual_demand = lm - sm - wm
336 power_price = np.zeros(shape=( timegrid.shape[0], n_sims))
337 for i in range(timegrid.shape[0]):
338 for j in range(n_sims):
339 power_price[i,j] = self.supply_curve.compute(residual_demand[i,j],timegrid.dates[i] )
340 result = {}
341 result['load'] = lm
342 result['solar'] = sm
343 result['wind'] = wm
344 result['price'] = power_price
345 return result
347 def _to_dict(self) -> dict:
348 raise NotImplementedError()
350if __name__=='__main__':
351 pass