Coverage for rivapy / marketdata_tools / pfc_shaper.py: 91%
361 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-27 14:36 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-27 14:36 +0000
1import abc
2import holidays
3import numpy as np
4import pandas as pd
5import datetime as dt
6import rivapy.tools.interfaces as interfaces
7import rivapy.tools._validators as validator
8from rivapy.tools.scheduler import SimpleSchedule
9from typing import List, Dict, Literal, Optional
12class PFCShaper(interfaces.FactoryObject):
13 """PFCShaper interface. Each shaping model for energy price forward curves must inherit from this base class.
15 Args:
16 spot_prices (pd.DataFrame): Data used to calibrate the shaping model.
17 holiday_calendar (holidays.HolidayBase): Calendar object to obtain country specific holidays.
18 normalization_config (Optional[Dict[Literal["D", "W", "ME"], Optional[int]]], optional): A dictionary configurating the shape normalization periods.
19 Here ``D`` defines the number of days at the beginning of the shape over which the individual mean is normalized to one.
20 ``W`` defines the number of weeks at the beginning of the shape over which the individual mean is normalized to one.
21 ``ME`` defines the number of months at the beginning of the shape over which the individual mean is normalized to one. The remaining shape is then normalized over the individual years.Defaults to None.
22 """
24 def __init__(
25 self,
26 spot_prices: pd.DataFrame,
27 holiday_calendar: holidays.HolidayBase,
28 normalization_config: Optional[Dict[Literal["D", "W", "ME"], Optional[int]]] = None,
29 ):
30 super().__init__()
31 validator._check_pandas_index_for_datetime(spot_prices)
32 self.spot_prices = spot_prices
33 self.holiday_calendar = holiday_calendar
34 self.normalization_config = normalization_config
36 # normalization order containing also the resampling string pattern for pandas resample method
37 self.__normalization_order = [("D", "%Y-%m-%d"), ("W", "%G-%V"), ("ME", "%Y-%m")]
39 @abc.abstractmethod
40 def calibrate(self):
41 """Calibration of the shaping model"""
42 pass
44 @abc.abstractmethod
45 def apply(self, apply_schedule: List[dt.datetime]):
46 """Applies the model on a schedule in order to generate a shape for future dates.
48 Args:
49 apply_schedule (List[dt.datetime]): List of datetimes in order to generate a shape for future dates.
50 """
51 pass
53 def _preprocess(self, spot: pd.DataFrame, remove_outlier: bool, lower_quantile: float, upper_quantile: float) -> pd.DataFrame:
54 """
55 Preprocess spot price data by ensuring hourly continuity, interpolating missing values,
56 and optionally removing outliers based on normalized yearly values.
58 This method performs the following steps:
59 1. Aggregates duplicate timestamps by taking the mean of their values.
60 2. Reindexes the time series to ensure a continuous hourly frequency and
61 linearly interpolates missing values.
62 3. If `remove_outlier=True`, normalizes the time series on a yearly basis and
63 removes data points outside the specified quantile range.
65 Args:
66 spot (pd.DataFrame): Raw spot price data indexed by datetime. The first column
67 is assumed to contain the price values.
68 remove_outlier (bool): Whether to remove outliers after normalization.
69 lower_quantile (float): Lower quantile threshold (e.g., 0.01) used for outlier removal.
70 upper_quantile (float): Upper quantile threshold (e.g., 0.99) used for outlier removal.
72 Returns:
73 pd.DataFrame: A cleaned and time-continuous spot price time series, with optional
74 outliers removed.
75 """
76 # remove duplicate hours by replacing these with their mean
77 spot = spot.groupby(level=0).mean()
79 # include missing hours
80 full_idx = pd.date_range(start=spot.index.min(), end=spot.index.max(), freq="h")
81 spot = spot.reindex(full_idx)
82 spot.index.name = "date"
83 spot.iloc[:, 0] = spot.iloc[:, 0].interpolate(method="linear")
85 if remove_outlier:
87 yearly_normalized = self._normalize_year(df=spot)
89 q_lower = np.quantile(yearly_normalized.iloc[:, 0].to_numpy(), lower_quantile)
90 q_upper = np.quantile(yearly_normalized.iloc[:, 0].to_numpy(), upper_quantile)
92 remove_ids = yearly_normalized.loc[(yearly_normalized.iloc[:, 0] < q_lower) | (yearly_normalized.iloc[:, 0] > q_upper), :].index
93 spot = spot.loc[~spot.index.isin(remove_ids), :]
94 return spot
96 def _normalize_year(self, df: pd.DataFrame) -> pd.DataFrame:
97 """
98 Normalize time series values by their yearly mean.
100 This method computes the mean value for each calendar year and divides all data
101 points within that year by the corresponding annual mean. The result is a
102 year-normalized time series where each year has an average value of 1.
104 Args:
105 df (pd.DataFrame): A DataFrame indexed by datetime, containing one or more
106 numeric columns to be normalized.
108 Returns:
109 pd.DataFrame: A DataFrame where the values of each year have been normalized
110 relative to their annual mean.
111 """
112 yearly_mean = df.resample("YE").transform("mean")
114 normalized = df / yearly_mean
115 return normalized
117 def normalize_shape(self, shape: pd.DataFrame) -> pd.DataFrame:
118 """Normalizes the shape based on ``normalization_config``.\n
119 ``D`` defines the number of days at the beginning of the shape over which the individual mean is normalized to one.\n
120 ``W`` defines the number of weeks at the beginning of the shape over which the individual mean is normalized to one.\n
121 ``ME`` defines the number of months at the beginning of the shape over which the individual mean is normalized to one.
122 The remaining shape is then normalized over the individual years.\n
124 Example:
125 ``D`` is 2, ``W`` is 2 and ``ME`` is 1. The shape starts at 03.03.2025 (monday).
126 Since ``D`` is 2, the shape is normalized for 03.03.2025 and 04.03.2025 individually.\n
127 The weeks are normalized from 05.03.2025 to 09.03.2025 and from 10.03.2025 to 16.03.2025.\n
128 The month is then normalized from 17.03.2025 to 31.03.2025.
129 The remaining shape (starting from 01.04.2025) is normalized on a yearly level.
131 Args:
132 shape (pd.DataFrame): Shape which should be normalized
134 Returns:
135 pd.DataFrame: Normalized shape
136 """
138 datetime_list: List[dt.datetime] = list(shape.index.copy())
140 # yearly normalization
142 if self.normalization_config is None:
143 shape_df = self._normalize_year(df=shape)
144 return shape_df
145 else:
146 # the normalization through the normalization_config is done in different parts
147 normalized_datetimes = []
148 normalized_shapes = []
150 # iterate over the correct normalization order
151 for resample_freq, resample_format in self.__normalization_order:
152 if self.normalization_config.get(resample_freq, None) is None:
153 continue
154 else:
155 # if the whole shape is already normalized by the previous normalization processes, the loop is stopped
156 if len(normalized_datetimes) == len(shape):
157 return pd.concat(normalized_shapes, axis=0).sort_index(ascending=True)
159 # get the part of the shape which was not part of any previous normalizations
160 temp_shape = shape.loc[~shape.index.isin(normalized_datetimes), :]
162 # normalize shape by the cofigured amount of days, weeks or months
163 resampled_shape = temp_shape.resample(resample_freq).mean()
164 resampled_shape = resampled_shape.iloc[: self.normalization_config[resample_freq], :]
166 partially_normalized_shape = temp_shape.rename(index=lambda x: x.strftime(resample_format)).divide(
167 resampled_shape.rename(index=lambda x: x.strftime(resample_format)), axis="index"
168 )
170 # Due to the operations done in the previous lines, the partially_normalized_shape does not contain the exact datetime but rather
171 # a datetime corresponding to the resampled frequency. Hence, the correct datetimes are added to the DataFrame and set as an index.
172 # This allows to concatenate the partially normalized shapes more easily at a later stage
173 partially_normalized_shape["datetimes"] = list(temp_shape.index)
174 partially_normalized_shape = partially_normalized_shape.reset_index(drop=True).set_index("datetimes").dropna()
175 normalized_datetimes += list(partially_normalized_shape.index)
176 normalized_shapes.append(partially_normalized_shape)
178 if len(normalized_datetimes) == len(shape):
179 return pd.concat(normalized_shapes, axis=0).sort_index(ascending=True)
181 # the remaining shape is normalized on a yearly basis
182 leftover_shape = shape.loc[~shape.index.isin(normalized_datetimes), :]
183 leftover_datetime = list(leftover_shape.index)
184 yearly_normalized_shape = self._normalize_year(df=leftover_shape)
186 return pd.concat(normalized_shapes + [yearly_normalized_shape], axis=0).sort_index(ascending=True)
188 def _to_dict(self):
189 return {"spot_prices": self.spot_prices, "holiday_calendar": self.holiday_calendar, "normalization_config": self.normalization_config}
192class SimpleCategoricalRegression(PFCShaper):
193 r"""Linear regression model using categorical predictor variables to construct a PFC shape.
195 .. math::
197 s(t) = s_0 + \sum^{23}_{i=1}\beta^h_i\cdot\mathbb{I}_{h(t)=i} + \beta^d\cdot\mathbb{I}_{d(t)=1} + \beta^H\cdot\mathbb{I}_{H(t)=1} + \sum^{12}_{i=2}\beta^m_i\cdot\mathbb{I}_{m(t)=i}
199 where:
201 :math:`s_0`: Shape level level
203 :math:`\mathbb{I}_x = \begin{cases} 1, & \text{if the } x \text{ expression renders true} \\ 0, & \text{if the } x \text{ expression renders false} \end{cases}`
205 :math:`h(t)`: Hour of t
207 :math:`d(t) = \begin{cases} 1, & \text{if t is a weekday} \\ 0, & \text{if t is a day on a weekend} \end{cases}`
209 :math:`H(t) = \begin{cases} 1, & \text{if t public holidy} \\ 0, & \text{if t is not a public holiday} \end{cases}`
211 :math:`m(t)`: Month of t
213 Args:
214 spot_prices (pd.DataFrame): Data used to calibrate the shaping model.
215 holiday_calendar (holidays.HolidayBase): Calendar object to obtain country specific holidays.
216 normalization_config (Optional[Dict[Literal["D", "W", "ME"], Optional[int]]], optional): A dictionary configurating the shape normalization periods.
217 Here ``D`` defines the number of days at the beginning of the shape over which the individual mean is normalized to one.
218 ``W`` defines the number of weeks at the beginning of the shape over which the individual mean is normalized to one.
219 ``ME`` defines the number of months at the beginning of the shape over which the individual mean is normalized to one. The remaining shape is then normalized over the individual years.Defaults to None.
220 remove_outlier (bool): Wether to remove outliers for the seasonality shape regression. Defaults to False.
221 lower_quantile (float): Lower quantile for outlier detection. Defauls to 0.005.
222 upper_quantile (float): Upper quantile for outlier detection. Defaults to 0.995.
223 """
225 def __init__(
226 self,
227 spot_prices: pd.DataFrame,
228 holiday_calendar: holidays.HolidayBase,
229 normalization_config: Optional[Dict[Literal["D", "W", "M"], Optional[int]]] = None,
230 remove_outlier: bool = False,
231 lower_quantile: float = 0.005,
232 upper_quantile: float = 0.995,
233 ):
234 super().__init__(spot_prices=spot_prices, holiday_calendar=holiday_calendar, normalization_config=normalization_config)
235 self.remove_outlier = remove_outlier
236 self.lower_quantile = lower_quantile
237 self.upper_quantile = upper_quantile
239 def _transform(self, datetimes_list: List[dt.datetime]) -> np.ndarray:
240 """Transforms a list of datetimes in a numpy array which can then be used for the linear regression.
242 Args:
243 datetimes_list (List[dt.datetime]): List of datetimes
245 Returns:
246 np.ndarray: Numpy array containing the transformed datetimes list
247 """
248 _datetime_series = pd.Series(datetimes_list)
250 weekday = _datetime_series.dt.weekday.isin([0, 1, 2, 3, 4]).astype(int).to_numpy().reshape(-1, 1)
251 holiday = _datetime_series.isin(pd.to_datetime(list(self.holiday_calendar.keys()))).astype(int).to_numpy().reshape(-1, 1)
253 predictors = [weekday, holiday]
255 if len(_datetime_series.dt.hour.unique()) > 1:
256 hours = (
257 pd.get_dummies(_datetime_series.dt.hour, prefix="hour", drop_first=True)
258 .astype(int)
259 .to_numpy()
260 .reshape(-1, len(_datetime_series.dt.hour.unique()) - 1)
261 )
262 predictors.append(hours)
264 month = pd.get_dummies(_datetime_series.dt.month, prefix="month", drop_first=True).astype(int).to_numpy().reshape(-1, 11)
266 offset = np.ones(shape=(len(_datetime_series), 1))
267 return np.concatenate([offset, weekday, holiday, month, hours], axis=1)
269 def calibrate(self):
270 spot = self.spot_prices.copy()
271 spot = self._preprocess(spot=spot, remove_outlier=self.remove_outlier, lower_quantile=self.lower_quantile, upper_quantile=self.upper_quantile)
273 spot_normalized = self._normalize_year(spot)
274 data_array = self._transform(datetimes_list=self.spot_prices.index)
275 self._regression_parameters = np.linalg.inv(data_array.T @ data_array) @ data_array.T @ spot_normalized.iloc[:, 0].to_numpy().reshape(-1, 1)
277 # fit = data_array @ self._regression_parameters
279 # df = pd.DataFrame(fit.squeeze(), index=spot.index, columns=["shape"])
280 # return self._normalize_year(df)
282 def apply(self, apply_schedule: List[dt.datetime]) -> pd.DataFrame:
283 data_array = self._transform(datetimes_list=apply_schedule)
284 shape = data_array @ self._regression_parameters
286 shape_df = pd.DataFrame({"shape": shape.squeeze()}, index=apply_schedule)
287 shape_df = self.normalize_shape(shape=shape_df)
288 return shape_df
290 def _to_dict(self):
291 return super()._to_dict()
294class CategoricalRegression(PFCShaper):
295 r"""Linear regression model using categorical predictor variables to construct a PFC shape.
296 We follow the methodology in:
298 https://cem-a.org/wp-content/uploads/2019/10/A-Structureal-Model-for-Electricity-Forward-Prices.pdf
300 https://ieeexplore.ieee.org/document/6607349
302 https://www.researchgate.net/publication/229051446_Robust_Calculation_and_Parameter_Estimation_of_the_Hourly_Price_Forward_Curve
304 We create a regression model for bot the seasonality shape and the intra day shape. For the regression model of the seasonality shape,
305 the days are split into weekday, Saturdays and Sundays. Public holidays are considered as Sundays while bridge days are expected to behave like Saturdays.
306 Afterwards, weekdays are split into clusters representing the month they are in, while Saturdays and Sundays are assigned to clusters reaching over three months.
307 For the regression model of the intra day shape we keep the seasonality clusters but add a hourly cluster for each individual hour such that the
308 total number of intra day clusters becomes #Season Clusters * 24.
310 .. math::
311 \begin{aligned}
312 y_\text{season}(d) &= \frac{\frac{1}{24}\sum_{i=1}^{24}h_i^d}{\frac{1}{N_y}\sum_{i=1}^{N_y} h_i^y} \\
313 \hat{y}_\text{season}(d) & =\beta^{0}_\text{season} + \sum_{c \in C^\text{season}}\beta^c_{\text{season}}\cdot\mathbb{I}_{\text{Cluster}(d)=c} \\
314 y_\text{id}(h_i,d) &= \frac{h_i^d}{\frac{1}{24}\sum_{i=1}^{24}h_i^d} \\
315 \hat{y}_\text{id}(h,d) & =\beta^{0}_\text{id} + \sum_{c \in C^\text{id}}\beta^c\cdot\mathbb{I}_{\text{Cluster}(h_i^d)=c} \\
316 s(h_i,d) &= \hat{y}_\text{id}(h_i,d)\cdot\hat{y}_\text{season}(d)
317 \end{aligned}
319 where:
321 :math:`h_i^d`: i-th hour of d-th day
323 :math:`h_i^y`: i-th hour of the year :math:`y`
325 :math:`N_y`: number of days in year :math:`y`
327 :math:`C^\text{season}`: set of all clusters for the seasonality shape
329 :math:`C^\text{id}`: set of all clusters for the intra day shape
331 :math:`\text{Cluster}(X)`: returns the cluster of X
333 :math:`\mathbb{I}_x = \begin{cases}
334 1, & \text{if the } x \text{ expression renders true}\\
335 0, & \text{if the } x \text{ expression renders false}
336 \end{cases}`
338 Args:
339 spot_prices (pd.DataFrame): Data used to calibrate the shaping model.
340 holiday_calendar (holidays.HolidayBase): Calendar object to obtain country specific holidays.
341 normalization_config (Optional[Dict[Literal["D", "W", "ME"], Optional[int]]], optional): A dictionary configurating the shape normalization periods.
342 Here ``D`` defines the number of days at the beginning of the shape over which the individual mean is normalized to one.
343 ``W`` defines the number of weeks at the beginning of the shape over which the individual mean is normalized to one.
344 ``ME`` defines the number of months at the beginning of the shape over which the individual mean is normalized to one. The remaining shape is then normalized over the individual years.Defaults to None.
345 remove_outlier_season (bool): Wether to remove outliers for the seasonality shape regression. Defaults to False.
346 remove_outlier_id (bool): Wether to remove outliers for the intra day shape regression. Defaults to False.
347 lower_quantile_season (float): Lower quantile for outlier detection. Defauls to 0.005.
348 upper_quantile_season (float): Upper quantile for outlier detection. Defaults to 0.995.
349 lower_quantile_id (float): Lower quantile for outlier detection. Defauls to 0.005.
350 upper_quantile_id (float): Upper quantile for outlier detection. Defaults to 0.995.
351 """
353 def __init__(
354 self,
355 spot_prices: pd.DataFrame,
356 holiday_calendar: holidays.HolidayBase,
357 normalization_config: Optional[Dict[Literal["D", "W", "M"], Optional[int]]] = None,
358 remove_outlier_season: bool = False,
359 remove_outlier_id: bool = False,
360 lower_quantile_season: float = 0.005,
361 upper_quantile_season: float = 0.995,
362 lower_quantile_id: float = 0.005,
363 upper_quantile_id: float = 0.995,
364 ):
365 super().__init__(spot_prices=spot_prices, holiday_calendar=holiday_calendar, normalization_config=normalization_config)
366 self.remove_outlier_season = remove_outlier_season
367 self.remove_outlier_id = remove_outlier_id
368 self.lower_quantile_season = lower_quantile_season
369 self.upper_quantile_season = upper_quantile_season
370 self.lower_quantile_id = lower_quantile_id
371 self.upper_quantile_id = upper_quantile_id
373 def _create_cluster_df(self, day_list: List[dt.datetime], use_hours: bool = False):
374 """Create a DataFrame containing the clusters for the regression models.
376 Args:
377 day_list (List[dt.datetime]): List of datetimes for which a clustering should be performed
378 use_hours (bool, optional): Wether to extend the clustering to include hours. Defaults to False.
380 Returns:
381 None
382 """
383 holidays_list = pd.to_datetime(list(self.holiday_calendar.keys()))
384 cluster_df = pd.DataFrame(index=day_list)
386 cluster_df["year"] = cluster_df.index.year
387 cluster_df["month"] = cluster_df.index.month
388 cluster_df["day"] = cluster_df.index.day
389 cluster_df["weekday"] = cluster_df.index.weekday
391 if use_hours:
392 cluster_df["hour"] = cluster_df.index.hour
394 # get holidays and bridge days
395 temp_cluster_df = cluster_df[["year", "month", "day", "weekday"]].drop_duplicates().sort_index()
396 temp_cluster_df["holiday"] = 0
397 temp_cluster_df["bridge"] = 0
398 temp_cluster_df.loc[temp_cluster_df.index.isin(holidays_list), "holiday"] = 1
400 is_monday_bridge = (temp_cluster_df.index + pd.Timedelta(days=1)).isin(holidays_list) & (temp_cluster_df.index.weekday == 0)
401 is_friday_bridge = (temp_cluster_df.index - pd.Timedelta(days=1)).isin(holidays_list) & (temp_cluster_df.index.weekday == 4)
402 temp_cluster_df.loc[is_friday_bridge | is_monday_bridge, "bridge"] = 1
404 cluster_df = pd.merge(cluster_df, temp_cluster_df, on=["year", "month", "day", "weekday"])
405 # cluster_df.set_index(day_list, inplace=True)
406 cluster_df.index = day_list
408 cluster_df["day_indicator"] = 0
409 cluster_df.loc[cluster_df.index.weekday < 5, "day_indicator"] = 1
410 cluster_df.loc[cluster_df.index.weekday == 5, "day_indicator"] = 2
411 cluster_df.loc[cluster_df.index.weekday == 6, "day_indicator"] = 3
413 cluster_df.loc[cluster_df["holiday"] == 1, "day_indicator"] = 3
414 cluster_df.loc[cluster_df["bridge"] == 1, "day_indicator"] = 2
416 cluster_df.loc[(cluster_df.index.month == 12) & (cluster_df.index.day.isin([24, 31])) & (cluster_df.index.weekday < 5), "day_indicator"] = 2
418 cluster_df.loc[:, "cluster"] = 0
419 cluster_df.loc[cluster_df["day_indicator"] == 1, "cluster"] = cluster_df.loc[cluster_df["day_indicator"] == 1, "month"]
421 weekend_cluster_month = [[1, 2, 12], [3, 4, 5], [6, 7, 8], [9, 10, 11]]
422 # weekend_cluster_month = [[1], [2], [3], [4], [5], [6], [7], [8], [9], [10], [11], [12]]
423 count = cluster_df["month"].max()
425 for day_indicator in [2, 3]:
426 for month_lst in weekend_cluster_month:
427 if not len(cluster_df.loc[(cluster_df["day_indicator"] == day_indicator) & (cluster_df["month"].isin(month_lst)), "cluster"]) == 0:
428 count += 1
429 cluster_df.loc[(cluster_df["day_indicator"] == day_indicator) & (cluster_df["month"].isin(month_lst)), "cluster"] = count
431 if use_hours:
432 self.__add_hours_cluster(
433 df=cluster_df,
434 clusters_clmn="cluster",
435 hours_clmn="hour",
436 unique_clusters=cluster_df["cluster"].unique(),
437 unique_hours=cluster_df["hour"].unique(),
438 )
440 return cluster_df
442 def __add_hours_cluster(self, df: pd.DataFrame, clusters_clmn: str, hours_clmn: str, unique_clusters: List[int], unique_hours: List[int]):
443 """Add hourly clustering in the `cluster_hours` column of the provided DataFrame
445 Args:
446 df (pd.DataFrame): DataFrame containing the infos needed for an hourly clustering.
447 clusters_clmn (str): Column containing the seasonality clusters
448 hours_clmn (str): Columns containing the hours
449 unique_clusters (List[int]): List of all clusters
450 unique_hours (List[int]): List of all hourly clusters
451 """
452 df["cluster_hours"] = 0
453 count = 1
454 for cluster in unique_clusters:
455 for hour in unique_hours:
456 df.loc[(df[clusters_clmn] == cluster) & (df[hours_clmn] == hour), "cluster_hours"] = count
457 count += 1
459 def _create_one_hot_matrix(self, rows: int, clusters: pd.Series, max_clusters: int, adjust_clusters: bool, offset_col: bool):
460 """Create a matrix for a one hot encoding of a clusters pandas Series.
462 Args:
463 rows (int): Number of data points
464 clusters (pd.Series): Series containing the cluster for each data point
465 max_clusters (int): Total number of individual clusters
466 adjust_clusters (bool): Wether to adjust cluster by subtracting each cluster by one.
467 offset_col (bool): Wether to use the last column as an intercept for the regression model.
469 Returns:
470 _type_: _description_
471 """
472 one_hot = np.zeros(shape=(rows, max_clusters))
473 if adjust_clusters:
474 cluster_series = clusters - 1
475 else:
476 cluster_series = clusters
478 one_hot[np.arange(rows), cluster_series] = 1
480 if offset_col:
481 one_hot[:, -1] = 1
482 else:
483 one_hot = one_hot[:, :-1]
485 return one_hot
487 def _preprocess(self, spot: pd.DataFrame) -> pd.DataFrame:
488 # remove duplicate hours by replacing these with their mean
489 spot = spot.groupby(level=0).mean()
491 # include missing hours
492 full_idx = pd.date_range(start=spot.index.min(), end=spot.index.max(), freq="h")
493 spot = spot.reindex(full_idx)
494 spot.index.name = "date"
495 spot.iloc[:, 0] = spot.iloc[:, 0].interpolate(method="linear")
496 return spot
498 @staticmethod
499 def _remove_outliers(df: pd.DataFrame, value_clmn: str, grouping_clmn: str, lower_quantile: float, upper_quantile: float):
500 """
501 Remove outliers from a DataFrame based on quantile thresholds within groups.
503 This function applies a quantile-based filter to the values in `value_clmn` for each
504 unique category defined in `grouping_clmn`. For each group, values below the
505 `lower_quantile` or above the `upper_quantile` are considered outliers and removed.
506 The filtered rows are then returned as a cleaned DataFrame.
508 Args:
509 df (pd.DataFrame): Input DataFrame containing the data.
510 value_clmn (str): Name of the column containing the numerical values to evaluate for outliers.
511 grouping_clmn (str): Name of the column used to group the data before applying the quantile filter.
512 lower_quantile (float): Lower quantile threshold (e.g., 0.05). Values below this quantile are removed.
513 upper_quantile (float): Upper quantile threshold (e.g., 0.95). Values above this quantile are removed.
515 Returns:
516 pd.DataFrame: A DataFrame containing only the data points within the specified quantile bounds for each group.
517 """
519 def remove_outliers(series, lower_quantile=lower_quantile, upper_quantile=upper_quantile):
520 lower_bound = series.quantile(lower_quantile)
521 upper_bound = series.quantile(upper_quantile)
522 return series[(series >= lower_bound) & (series <= upper_bound)]
524 keep_ids = df.groupby(grouping_clmn, group_keys=False)[value_clmn].apply(remove_outliers).index
525 df_clean = df.loc[df.index.isin(keep_ids), :]
526 return df_clean
528 def calibrate(
529 self,
530 ):
531 spot = self.spot_prices.copy()
532 spot = self._preprocess(spot=spot)
534 cluster_df = self._create_cluster_df(spot.index, use_hours=True)
536 season_shape = self._normalize_year(spot)
537 season_shape = season_shape.resample("D").mean().dropna()
539 cluster_df_daily = cluster_df[["year", "month", "day", "weekday", "day_indicator", "cluster"]].drop_duplicates().sort_index()
540 calib_season_df = pd.merge(season_shape, cluster_df_daily, left_index=True, right_index=True)
542 if self.remove_outlier_season:
543 value_clmn = calib_season_df.columns[0]
544 calib_season_df = self._remove_outliers(
545 df=calib_season_df,
546 value_clmn=value_clmn,
547 grouping_clmn="cluster",
548 lower_quantile=self.lower_quantile_season,
549 upper_quantile=self.upper_quantile_season,
550 )
552 self.__max_cluster = calib_season_df["cluster"].max()
554 season_one_hot = self._create_one_hot_matrix(
555 rows=len(calib_season_df),
556 clusters=calib_season_df["cluster"],
557 max_clusters=self.__max_cluster,
558 adjust_clusters=True, # since clusters do not start at 0
559 offset_col=True, # since we would ignore the last column because it is obsolete due to our categorical variables,
560 # we actually set it all to 1 to account for the offset in our regression model
561 )
563 self._season_regression_params = (
564 np.linalg.inv(season_one_hot.T @ season_one_hot) @ season_one_hot.T @ calib_season_df.iloc[:, 0].to_numpy().reshape(-1, 1)
565 )
567 id_shape = spot / spot.resample("D").transform("mean").dropna()
569 calib_id_df = pd.merge(id_shape, cluster_df, left_index=True, right_index=True)
571 if self.remove_outlier_id:
572 value_clmn = calib_id_df.columns[0]
573 calib_id_df = self._remove_outliers(
574 df=calib_id_df,
575 grouping_clmn="cluster_hours",
576 value_clmn=value_clmn,
577 lower_quantile=self.lower_quantile_id,
578 upper_quantile=self.upper_quantile_id,
579 )
581 self.__max_hour_clusters = calib_id_df["cluster_hours"].max()
583 id_one_hot = self._create_one_hot_matrix(
584 rows=len(calib_id_df),
585 clusters=calib_id_df["cluster_hours"],
586 max_clusters=self.__max_hour_clusters,
587 adjust_clusters=True,
588 offset_col=True,
589 )
591 self._id_regression_params = np.linalg.inv(id_one_hot.T @ id_one_hot) @ id_one_hot.T @ calib_id_df.iloc[:, 0].to_numpy().reshape(-1, 1)
593 def apply(self, apply_schedule: List[dt.datetime]) -> pd.DataFrame:
594 cluster_df = self._create_cluster_df(apply_schedule, use_hours=True)
596 season_one_hot = self._create_one_hot_matrix(
597 rows=len(cluster_df),
598 clusters=cluster_df["cluster"],
599 max_clusters=self.__max_cluster,
600 adjust_clusters=True,
601 offset_col=True,
602 )
604 id_one_hot = self._create_one_hot_matrix(
605 rows=len(cluster_df),
606 clusters=cluster_df["cluster_hours"],
607 max_clusters=self.__max_hour_clusters,
608 adjust_clusters=True,
609 offset_col=True,
610 )
612 season_fit = season_one_hot @ self._season_regression_params
613 id_fit = id_one_hot @ self._id_regression_params
615 cluster_df["shape"] = (season_fit * id_fit).squeeze()
616 cluster_df["shape"] = self._normalize_year(df=cluster_df.loc[:, "shape"])
617 shape_df = pd.DataFrame(cluster_df.loc[:, "shape"])
618 shape_df = self.normalize_shape(shape=shape_df)
619 return shape_df
621 def _to_dict(self):
622 return super()._to_dict()
625class CategoricalFourierShaper(PFCShaper):
626 r"""Linear regression model using categorical predictor variables to construct a PFC shape.
627 We follow the methodology in:
629 https://cem-a.org/wp-content/uploads/2019/10/A-Structureal-Model-for-Electricity-Forward-Prices.pdf
631 https://ieeexplore.ieee.org/document/6607349
633 https://www.researchgate.net/publication/229051446_Robust_Calculation_and_Parameter_Estimation_of_the_Hourly_Price_Forward_Curve
635 We create a regression model for bot the seasonality shape and the intra day shape. For the regression model of the seasonality shape,
636 the days are split into weekday, Saturdays and Sundays. Public holidays are considered as Sundays while bridge days are expected to behave like Saturdays.
637 Afterwards, weekdays are split into clusters representing the month they are in, while Saturdays and Sundays are assigned to clusters reaching over three months.
638 For the regression model of the intra day shape we use a fourier series to model periodicities over each hour in a year. This way we can model the solar dip over the year more reliably.
640 .. math::
641 \begin{aligned}
642 y_\text{season}(d) &= \frac{\frac{1}{24}\sum_{i=1}^{24}h_i^d}{\frac{1}{N_y}\sum_{i=1}^{N_y} h_i^y} \\
643 \hat{y}_\text{season}(d) & =\beta^{0}_\text{season} + \sum_{c \in C^\text{season}}\beta^c_{\text{season}}\cdot\mathbb{I}_{\text{Cluster}(d)=c} \\
644 y_\text{id}(h_i,d) &= \frac{h_i^d}{\frac{1}{N_y}\sum_{i=1}^{N_y} h_i^y} \\
645 \hat{y}_\text{id}(h_i,d) & =\beta^{0,H(h_i^d)}_\text{id} + \sum_{k=1}^{K}\beta^{k,H(h_i^d)}_\text{id}\cdot\left(\sin(2\pi k\cdot t(h_i^d)) + \cos(2\pi k\cdot t(h_i^d))\right)\\
646 s(h_i,d) &= \hat{y}_\text{id}(h_i,d)\cdot\hat{y}_\text{season}(d)
647 \end{aligned}
649 where:
651 :math:`h_i^d`: i-th hour of d-th day
653 :math:`h_i^y`: i-th hour of the year :math:`y`
655 :math:`N_y`: number of days in year :math:`y`
657 :math:`H(h_i^d)`: hour (0-23) of :math:`h_i^d` independent of the day
659 :math:`t(h_i^d)`: function which returns a number between [0,1] depending on the position of :math:`h_i^d` in the respecitve year
661 :math:`C^\text{season}`: set of all clusters for the seasonality shape
663 :math:`K`: number of fourier partials
665 :math:`\text{Cluster}(X)`: returns the cluster of X
667 :math:`\mathbb{I}_x = \begin{cases}
668 1, & \text{if the } x \text{ expression renders true}\\
669 0, & \text{if the } x \text{ expression renders false}
670 \end{cases}`
672 Args:
673 spot_prices (pd.DataFrame): Data used to calibrate the shaping model.
674 holiday_calendar (holidays.HolidayBase): Calendar object to obtain country specific holidays.
675 normalization_config (Optional[Dict[Literal["D", "W", "ME"], Optional[int]]], optional): A dictionary configurating the shape normalization periods.
676 Here ``D`` defines the number of days at the beginning of the shape over which the individual mean is normalized to one.
677 ``W`` defines the number of weeks at the beginning of the shape over which the individual mean is normalized to one.
678 ``ME`` defines the number of months at the beginning of the shape over which the individual mean is normalized to one. The remaining shape is then normalized over the individual years. Defaults to None.
679 k_fourier (int): Number of partial sums for the fourier series .Defaults to 2.
680 remove_outlier_season (bool): Wether to remove outliers for the seasonality shape regression. Defaults to False.
681 remove_outlier_id (bool): Wether to remove outliers for the intra day shape regression. Defaults to False.
682 lower_quantile_season (float): Lower quantile for outlier detection. Defauls to 0.005.
683 upper_quantile_season (float): Upper quantile for outlier detection. Defaults to 0.995.
684 lower_quantile_id (float): Lower quantile for outlier detection. Defauls to 0.005.
685 upper_quantile_id (float): Upper quantile for outlier detection. Defaults to 0.995.
686 """
688 def __init__(
689 self,
690 spot_prices: pd.DataFrame,
691 holiday_calendar: holidays.HolidayBase,
692 normalization_config: Optional[Dict[Literal["D", "W", "M"], Optional[int]]] = None,
693 k_fourier: int = 2,
694 remove_outlier_season: bool = True,
695 remove_outlier_id: bool = True,
696 lower_quantile_season: float = 0.005,
697 upper_quantile_season: float = 0.995,
698 lower_quantile_id: float = 0.005,
699 upper_quantile_id: float = 0.995,
700 ):
701 super().__init__(spot_prices=spot_prices, holiday_calendar=holiday_calendar, normalization_config=normalization_config)
702 self.k_fourier = k_fourier
704 self.remove_outlier_season = remove_outlier_season
705 self.remove_outlier_id = remove_outlier_id
706 self.lower_quantile_season = lower_quantile_season
707 self.upper_quantile_season = upper_quantile_season
708 self.lower_quantile_id = lower_quantile_id
709 self.upper_quantile_id = upper_quantile_id
711 def _create_cluster_df(self, day_list: List[dt.datetime], use_hours: bool = False):
712 holidays_list = pd.to_datetime(list(self.holiday_calendar.keys()))
713 cluster_df = pd.DataFrame(index=day_list)
715 cluster_df["year"] = cluster_df.index.year
716 cluster_df["month"] = cluster_df.index.month
717 cluster_df["day"] = cluster_df.index.day
718 cluster_df["weekday"] = cluster_df.index.weekday
720 if use_hours:
721 cluster_df["hour"] = cluster_df.index.hour
723 # get holidays and bridge days
724 temp_cluster_df = cluster_df[["year", "month", "day", "weekday"]].drop_duplicates().sort_index()
725 temp_cluster_df["holiday"] = 0
726 temp_cluster_df["bridge"] = 0
727 temp_cluster_df.loc[temp_cluster_df.index.isin(holidays_list), "holiday"] = 1
729 is_monday_bridge = (temp_cluster_df.index + pd.Timedelta(days=1)).isin(holidays_list) & (temp_cluster_df.index.weekday == 0)
730 is_friday_bridge = (temp_cluster_df.index - pd.Timedelta(days=1)).isin(holidays_list) & (temp_cluster_df.index.weekday == 4)
731 temp_cluster_df.loc[is_friday_bridge | is_monday_bridge, "bridge"] = 1
733 cluster_df = pd.merge(cluster_df, temp_cluster_df, on=["year", "month", "day", "weekday"])
734 # cluster_df.set_index(day_list, inplace=True)
735 cluster_df.index = day_list
737 cluster_df["day_indicator"] = 0
738 cluster_df.loc[cluster_df.index.weekday < 5, "day_indicator"] = 1
739 cluster_df.loc[cluster_df.index.weekday == 5, "day_indicator"] = 2
740 cluster_df.loc[cluster_df.index.weekday == 6, "day_indicator"] = 3
742 cluster_df.loc[cluster_df["holiday"] == 1, "day_indicator"] = 3
743 cluster_df.loc[cluster_df["bridge"] == 1, "day_indicator"] = 2
745 cluster_df.loc[(cluster_df.index.month == 12) & (cluster_df.index.day.isin([24, 31])) & (cluster_df.index.weekday < 5), "day_indicator"] = 2
747 cluster_df.loc[:, "cluster"] = 0
748 cluster_df.loc[cluster_df["day_indicator"] == 1, "cluster"] = cluster_df.loc[cluster_df["day_indicator"] == 1, "month"]
750 weekend_cluster_month = [[1, 2, 12], [3, 4, 5], [6, 7, 8], [9, 10, 11]]
751 count = cluster_df["month"].max()
753 for day_indicator in [2, 3]:
754 for month_lst in weekend_cluster_month:
755 if not len(cluster_df.loc[(cluster_df["day_indicator"] == day_indicator) & (cluster_df["month"].isin(month_lst)), "cluster"]) == 0:
756 count += 1
757 cluster_df.loc[(cluster_df["day_indicator"] == day_indicator) & (cluster_df["month"].isin(month_lst)), "cluster"] = count
759 if use_hours:
760 self.__add_hours_cluster(
761 df=cluster_df,
762 clusters_clmn="cluster",
763 hours_clmn="hour",
764 unique_clusters=cluster_df["cluster"].unique(),
765 unique_hours=cluster_df["hour"].unique(),
766 )
768 return cluster_df
770 def __add_hours_cluster(self, df: pd.DataFrame, clusters_clmn: str, hours_clmn: str, unique_clusters: List[int], unique_hours: List[int]):
771 df["cluster_hours"] = 0
772 count = 1
773 for cluster in unique_clusters:
774 for hour in unique_hours:
775 df.loc[(df[clusters_clmn] == cluster) & (df[hours_clmn] == hour), "cluster_hours"] = count
776 count += 1
778 def _create_one_hot_matrix(self, rows: int, clusters: pd.Series, max_clusters: int, adjust_clusters: bool, offset_col: bool):
779 one_hot = np.zeros(shape=(rows, max_clusters))
780 if adjust_clusters:
781 cluster_series = clusters - 1
782 else:
783 cluster_series = clusters
785 one_hot[np.arange(rows), cluster_series] = 1
787 if offset_col:
788 one_hot[:, -1] = 1
789 else:
790 one_hot = one_hot[:, :-1]
792 return one_hot
794 def times_to_zero_one(self, h: pd.DatetimeIndex):
795 """
796 Convert any time point into the corresponding fraction w.r.t. the
797 beginning of the year it belongs to.
798 1st of January are always zeros, while 31th of December can be
799 1 / 365 or 1/366 according to the year.
800 The idea is to map any year to [0,1), on which the seasonality
801 curve, periodic on [0,1], is fitted.
803 """
804 if not isinstance(h, pd.DatetimeIndex):
805 raise TypeError("index must be of type pd.DatetimeIndex")
806 if len(h) < 1:
807 raise ValueError("index must contain at least one value!")
808 # Build a DataFrame where each point is the start of the year
809 # w.r.t. each date in h
810 start_of_years = pd.to_datetime(h.year.astype(str) + "-01-01 00:00:00").tz_localize(h.tz)
811 # Build a DataFrame where each point is the start of the year
812 # w.r.t. each date in h
813 end_of_years = pd.to_datetime((h.year + 1).astype(str) + "-01-01 00:00:00").tz_localize(h.tz)
814 # Compute then the fractions, using pandas vectorization
815 result = np.array((h - start_of_years) / (end_of_years - start_of_years))
816 # Internal sanity check: all points must lie in [0, 1)
817 assert all(result < 1.0) and all(result >= 0.0)
818 return result
820 def _preprocess(self, spot: pd.DataFrame) -> pd.DataFrame:
821 # remove duplicate hours by replacing these with their mean
822 spot = spot.groupby(level=0).mean()
824 # include missing hours
825 full_idx = pd.date_range(start=spot.index.min(), end=spot.index.max(), freq="h")
826 spot = spot.reindex(full_idx)
827 spot.index.name = "date"
828 spot.iloc[:, 0] = spot.iloc[:, 0].interpolate(method="linear")
829 return spot
831 @staticmethod
832 def _remove_outliers(df: pd.DataFrame, value_clmn: str, grouping_clmn: str, lower_quantile: float, upper_quantile: float):
833 def remove_outliers(series, lower_quantile=lower_quantile, upper_quantile=upper_quantile):
834 lower_bound = series.quantile(lower_quantile)
835 upper_bound = series.quantile(upper_quantile)
836 return series[(series >= lower_bound) & (series <= upper_bound)]
838 keep_ids = df.groupby(grouping_clmn, group_keys=False)[value_clmn].apply(remove_outliers).index
839 df_clean = df.loc[df.index.isin(keep_ids), :]
840 return df_clean
842 def calibrate(
843 self,
844 ):
845 spot = self.spot_prices.copy()
846 spot = self._preprocess(spot=spot)
848 cluster_df = self._create_cluster_df(spot.index, use_hours=True)
850 season_shape = self._normalize_year(spot)
851 season_shape = season_shape.resample("D").mean().dropna()
853 cluster_df_daily = cluster_df[["year", "month", "day", "weekday", "day_indicator", "cluster"]].drop_duplicates().sort_index()
854 calib_season_df = pd.merge(season_shape, cluster_df_daily, left_index=True, right_index=True)
856 if self.remove_outlier_season:
857 value_clmn = calib_season_df.columns[0]
858 calib_season_df = self._remove_outliers(
859 df=calib_season_df,
860 value_clmn=value_clmn,
861 grouping_clmn="cluster",
862 lower_quantile=self.lower_quantile_season,
863 upper_quantile=self.upper_quantile_season,
864 )
866 self.__max_cluster = calib_season_df["cluster"].max()
868 season_one_hot = self._create_one_hot_matrix(
869 rows=len(calib_season_df),
870 clusters=calib_season_df["cluster"],
871 max_clusters=self.__max_cluster,
872 adjust_clusters=True, # since clusters do not start at 0
873 offset_col=True, # since we would ignore the last column because it is obsolete due to our categorical variables,
874 # we actually set it all to 1 to account for the offset in our regression model
875 )
877 self._season_regression_params = (
878 np.linalg.inv(season_one_hot.T @ season_one_hot) @ season_one_hot.T @ calib_season_df.iloc[:, 0].to_numpy().reshape(-1, 1)
879 )
881 id_shape = self._normalize_year(spot)
883 calib_id_df = pd.merge(id_shape, cluster_df, left_index=True, right_index=True)
884 calib_id_df["t"] = self.times_to_zero_one(calib_id_df.index)
886 if self.remove_outlier_id:
887 value_clmn = calib_id_df.columns[0]
888 calib_id_df = self._remove_outliers(
889 df=calib_id_df,
890 grouping_clmn="hour",
891 value_clmn=value_clmn,
892 lower_quantile=self.lower_quantile_id,
893 upper_quantile=self.upper_quantile_id,
894 )
896 self._id_params = {}
897 for h in calib_id_df["hour"].unique():
898 x = calib_id_df.loc[calib_id_df["hour"] == h, "t"].to_numpy().reshape(-1, 1)
899 y = calib_id_df.loc[calib_id_df["hour"] == h, :].iloc[:, 0].to_numpy().reshape(-1, 1)
900 m = self.k_fourier * 2 + 1
901 x_fit = np.zeros((x.shape[0], m))
902 fourier_parts = []
903 for k in np.arange(start=1, stop=self.k_fourier + 1):
904 fourier_parts.append(np.sin(2 * k * np.pi * x))
905 fourier_parts.append(np.cos(2 * k * np.pi * x))
907 x_fit[:, :-1] = np.concatenate(fourier_parts, axis=1)
908 x_fit[:, -1] = 1.0
909 self._id_params[h] = np.linalg.pinv(x_fit) @ y
911 def apply(self, apply_schedule: List[dt.datetime]) -> pd.DataFrame:
912 cluster_df = self._create_cluster_df(apply_schedule, use_hours=True)
914 cluster_df["t"] = self.times_to_zero_one(cluster_df.index)
916 season_one_hot = self._create_one_hot_matrix(
917 rows=len(cluster_df),
918 clusters=cluster_df["cluster"],
919 max_clusters=self.__max_cluster,
920 adjust_clusters=True,
921 offset_col=True,
922 )
924 season_fit = season_one_hot @ self._season_regression_params
925 self._season_fit = season_fit
927 cluster_df["id_fit"] = 0.0
928 for h in cluster_df["hour"].unique():
929 x = cluster_df.loc[cluster_df["hour"] == h, "t"].to_numpy().reshape(-1, 1)
930 x_fit = np.zeros((x.shape[0], 5))
931 m = self.k_fourier * 2 + 1
932 x_fit = np.zeros((x.shape[0], m))
933 fourier_parts = []
934 for k in np.arange(start=1, stop=self.k_fourier + 1):
935 fourier_parts.append(np.sin(2 * k * np.pi * x))
936 fourier_parts.append(np.cos(2 * k * np.pi * x))
937 x_fit[:, :-1] = np.concatenate(fourier_parts, axis=1)
938 x_fit[:, -1] = 1.0
939 cluster_df.loc[cluster_df["hour"] == h, "id_fit"] = (x_fit @ self._id_params[h]).squeeze()
941 cluster_df["shape"] = season_fit.squeeze() * cluster_df["id_fit"].to_numpy()
942 shape_df = pd.DataFrame(cluster_df.loc[:, "shape"])
943 shape_df = self.normalize_shape(shape=shape_df)
944 return shape_df
946 def _to_dict(self):
947 return super()._to_dict()