Source code for rivapy.marketdata_tools.pfc_shaper

import abc
import holidays
import numpy as np
import pandas as pd
import datetime as dt
import rivapy.tools.interfaces as interfaces
import rivapy.tools._validators as validator
from rivapy.tools.scheduler import SimpleSchedule
from typing import List, Dict, Literal, Optional


[docs] class PFCShaper(interfaces.FactoryObject): """PFCShaper interface. Each shaping model for energy price forward curves must inherit from this base class. Args: spot_prices (pd.DataFrame): Data used to calibrate the shaping model. holiday_calendar (holidays.HolidayBase): Calendar object to obtain country specific holidays. normalization_config (Optional[Dict[Literal["D", "W", "ME"], Optional[int]]], optional): A dictionary configurating the shape normalization periods. Here ``D`` defines the number of days at the beginning of the shape over which the individual mean is normalized to one. ``W`` defines the number of weeks at the beginning of the shape over which the individual mean is normalized to one. ``ME`` defines the number of months at the beginning of the shape over which the individual mean is normalized to one. The remaining shape is then normalized over the individual years.Defaults to None. """ def __init__( self, spot_prices: pd.DataFrame, holiday_calendar: holidays.HolidayBase, normalization_config: Optional[Dict[Literal["D", "W", "ME"], Optional[int]]] = None, ): super().__init__() validator._check_pandas_index_for_datetime(spot_prices) self.spot_prices = spot_prices self.holiday_calendar = holiday_calendar self.normalization_config = normalization_config # normalization order containing also the resampling string pattern for pandas resample method self.__normalization_order = [("D", "%Y-%m-%d"), ("W", "%G-%V"), ("ME", "%Y-%m")]
[docs] @abc.abstractmethod def calibrate(self): """Calibration of the shaping model""" pass
[docs] @abc.abstractmethod def apply(self, apply_schedule: List[dt.datetime]): """Applies the model on a schedule in order to generate a shape for future dates. Args: apply_schedule (List[dt.datetime]): List of datetimes in order to generate a shape for future dates. """ pass
def _preprocess(self, spot: pd.DataFrame, remove_outlier: bool, lower_quantile: float, upper_quantile: float) -> pd.DataFrame: """ Preprocess spot price data by ensuring hourly continuity, interpolating missing values, and optionally removing outliers based on normalized yearly values. This method performs the following steps: 1. Aggregates duplicate timestamps by taking the mean of their values. 2. Reindexes the time series to ensure a continuous hourly frequency and linearly interpolates missing values. 3. If `remove_outlier=True`, normalizes the time series on a yearly basis and removes data points outside the specified quantile range. Args: spot (pd.DataFrame): Raw spot price data indexed by datetime. The first column is assumed to contain the price values. remove_outlier (bool): Whether to remove outliers after normalization. lower_quantile (float): Lower quantile threshold (e.g., 0.01) used for outlier removal. upper_quantile (float): Upper quantile threshold (e.g., 0.99) used for outlier removal. Returns: pd.DataFrame: A cleaned and time-continuous spot price time series, with optional outliers removed. """ # remove duplicate hours by replacing these with their mean spot = spot.groupby(level=0).mean() # include missing hours full_idx = pd.date_range(start=spot.index.min(), end=spot.index.max(), freq="h") spot = spot.reindex(full_idx) spot.index.name = "date" spot.iloc[:, 0] = spot.iloc[:, 0].interpolate(method="linear") if remove_outlier: yearly_normalized = self._normalize_year(df=spot) q_lower = np.quantile(yearly_normalized.iloc[:, 0].to_numpy(), lower_quantile) q_upper = np.quantile(yearly_normalized.iloc[:, 0].to_numpy(), upper_quantile) remove_ids = yearly_normalized.loc[(yearly_normalized.iloc[:, 0] < q_lower) | (yearly_normalized.iloc[:, 0] > q_upper), :].index spot = spot.loc[~spot.index.isin(remove_ids), :] return spot def _normalize_year(self, df: pd.DataFrame) -> pd.DataFrame: """ Normalize time series values by their yearly mean. This method computes the mean value for each calendar year and divides all data points within that year by the corresponding annual mean. The result is a year-normalized time series where each year has an average value of 1. Args: df (pd.DataFrame): A DataFrame indexed by datetime, containing one or more numeric columns to be normalized. Returns: pd.DataFrame: A DataFrame where the values of each year have been normalized relative to their annual mean. """ yearly_mean = df.resample("YE").transform("mean") normalized = df / yearly_mean return normalized
[docs] def normalize_shape(self, shape: pd.DataFrame) -> pd.DataFrame: """Normalizes the shape based on ``normalization_config``.\n ``D`` defines the number of days at the beginning of the shape over which the individual mean is normalized to one.\n ``W`` defines the number of weeks at the beginning of the shape over which the individual mean is normalized to one.\n ``ME`` defines the number of months at the beginning of the shape over which the individual mean is normalized to one. The remaining shape is then normalized over the individual years.\n Example: ``D`` is 2, ``W`` is 2 and ``ME`` is 1. The shape starts at 03.03.2025 (monday). Since ``D`` is 2, the shape is normalized for 03.03.2025 and 04.03.2025 individually.\n The weeks are normalized from 05.03.2025 to 09.03.2025 and from 10.03.2025 to 16.03.2025.\n The month is then normalized from 17.03.2025 to 31.03.2025. The remaining shape (starting from 01.04.2025) is normalized on a yearly level. Args: shape (pd.DataFrame): Shape which should be normalized Returns: pd.DataFrame: Normalized shape """ datetime_list: List[dt.datetime] = list(shape.index.copy()) # yearly normalization if self.normalization_config is None: shape_df = self._normalize_year(df=shape) return shape_df else: # the normalization through the normalization_config is done in different parts normalized_datetimes = [] normalized_shapes = [] # iterate over the correct normalization order for resample_freq, resample_format in self.__normalization_order: if self.normalization_config.get(resample_freq, None) is None: continue else: # if the whole shape is already normalized by the previous normalization processes, the loop is stopped if len(normalized_datetimes) == len(shape): return pd.concat(normalized_shapes, axis=0).sort_index(ascending=True) # get the part of the shape which was not part of any previous normalizations temp_shape = shape.loc[~shape.index.isin(normalized_datetimes), :] # normalize shape by the cofigured amount of days, weeks or months resampled_shape = temp_shape.resample(resample_freq).mean() resampled_shape = resampled_shape.iloc[: self.normalization_config[resample_freq], :] partially_normalized_shape = temp_shape.rename(index=lambda x: x.strftime(resample_format)).divide( resampled_shape.rename(index=lambda x: x.strftime(resample_format)), axis="index" ) # Due to the operations done in the previous lines, the partially_normalized_shape does not contain the exact datetime but rather # a datetime corresponding to the resampled frequency. Hence, the correct datetimes are added to the DataFrame and set as an index. # This allows to concatenate the partially normalized shapes more easily at a later stage partially_normalized_shape["datetimes"] = list(temp_shape.index) partially_normalized_shape = partially_normalized_shape.reset_index(drop=True).set_index("datetimes").dropna() normalized_datetimes += list(partially_normalized_shape.index) normalized_shapes.append(partially_normalized_shape) if len(normalized_datetimes) == len(shape): return pd.concat(normalized_shapes, axis=0).sort_index(ascending=True) # the remaining shape is normalized on a yearly basis leftover_shape = shape.loc[~shape.index.isin(normalized_datetimes), :] leftover_datetime = list(leftover_shape.index) yearly_normalized_shape = self._normalize_year(df=leftover_shape) return pd.concat(normalized_shapes + [yearly_normalized_shape], axis=0).sort_index(ascending=True)
def _to_dict(self): return {"spot_prices": self.spot_prices, "holiday_calendar": self.holiday_calendar, "normalization_config": self.normalization_config}
[docs] class SimpleCategoricalRegression(PFCShaper): r"""Linear regression model using categorical predictor variables to construct a PFC shape. .. math:: s(t) = s_0 + \sum^{23}_{i=1}\beta^h_i\cdot\mathbb{I}_{h(t)=i} + \beta^d\cdot\mathbb{I}_{d(t)=1} + \beta^H\cdot\mathbb{I}_{H(t)=1} + \sum^{12}_{i=2}\beta^m_i\cdot\mathbb{I}_{m(t)=i} where: :math:`s_0`: Shape level level :math:`\mathbb{I}_x = \begin{cases} 1, & \text{if the } x \text{ expression renders true} \\ 0, & \text{if the } x \text{ expression renders false} \end{cases}` :math:`h(t)`: Hour of t :math:`d(t) = \begin{cases} 1, & \text{if t is a weekday} \\ 0, & \text{if t is a day on a weekend} \end{cases}` :math:`H(t) = \begin{cases} 1, & \text{if t public holidy} \\ 0, & \text{if t is not a public holiday} \end{cases}` :math:`m(t)`: Month of t Args: spot_prices (pd.DataFrame): Data used to calibrate the shaping model. holiday_calendar (holidays.HolidayBase): Calendar object to obtain country specific holidays. normalization_config (Optional[Dict[Literal["D", "W", "ME"], Optional[int]]], optional): A dictionary configurating the shape normalization periods. Here ``D`` defines the number of days at the beginning of the shape over which the individual mean is normalized to one. ``W`` defines the number of weeks at the beginning of the shape over which the individual mean is normalized to one. ``ME`` defines the number of months at the beginning of the shape over which the individual mean is normalized to one. The remaining shape is then normalized over the individual years.Defaults to None. remove_outlier (bool): Wether to remove outliers for the seasonality shape regression. Defaults to False. lower_quantile (float): Lower quantile for outlier detection. Defauls to 0.005. upper_quantile (float): Upper quantile for outlier detection. Defaults to 0.995. """ def __init__( self, spot_prices: pd.DataFrame, holiday_calendar: holidays.HolidayBase, normalization_config: Optional[Dict[Literal["D", "W", "M"], Optional[int]]] = None, remove_outlier: bool = False, lower_quantile: float = 0.005, upper_quantile: float = 0.995, ): super().__init__(spot_prices=spot_prices, holiday_calendar=holiday_calendar, normalization_config=normalization_config) self.remove_outlier = remove_outlier self.lower_quantile = lower_quantile self.upper_quantile = upper_quantile def _transform(self, datetimes_list: List[dt.datetime]) -> np.ndarray: """Transforms a list of datetimes in a numpy array which can then be used for the linear regression. Args: datetimes_list (List[dt.datetime]): List of datetimes Returns: np.ndarray: Numpy array containing the transformed datetimes list """ _datetime_series = pd.Series(datetimes_list) weekday = _datetime_series.dt.weekday.isin([0, 1, 2, 3, 4]).astype(int).to_numpy().reshape(-1, 1) holiday = _datetime_series.isin(pd.to_datetime(list(self.holiday_calendar.keys()))).astype(int).to_numpy().reshape(-1, 1) predictors = [weekday, holiday] if len(_datetime_series.dt.hour.unique()) > 1: hours = ( pd.get_dummies(_datetime_series.dt.hour, prefix="hour", drop_first=True) .astype(int) .to_numpy() .reshape(-1, len(_datetime_series.dt.hour.unique()) - 1) ) predictors.append(hours) month = pd.get_dummies(_datetime_series.dt.month, prefix="month", drop_first=True).astype(int).to_numpy().reshape(-1, 11) offset = np.ones(shape=(len(_datetime_series), 1)) return np.concatenate([offset, weekday, holiday, month, hours], axis=1)
[docs] def calibrate(self): spot = self.spot_prices.copy() spot = self._preprocess(spot=spot, remove_outlier=self.remove_outlier, lower_quantile=self.lower_quantile, upper_quantile=self.upper_quantile) spot_normalized = self._normalize_year(spot) data_array = self._transform(datetimes_list=self.spot_prices.index) self._regression_parameters = np.linalg.inv(data_array.T @ data_array) @ data_array.T @ spot_normalized.iloc[:, 0].to_numpy().reshape(-1, 1)
# fit = data_array @ self._regression_parameters # df = pd.DataFrame(fit.squeeze(), index=spot.index, columns=["shape"]) # return self._normalize_year(df)
[docs] def apply(self, apply_schedule: List[dt.datetime]) -> pd.DataFrame: data_array = self._transform(datetimes_list=apply_schedule) shape = data_array @ self._regression_parameters shape_df = pd.DataFrame({"shape": shape.squeeze()}, index=apply_schedule) shape_df = self.normalize_shape(shape=shape_df) return shape_df
def _to_dict(self): return super()._to_dict()
[docs] class CategoricalRegression(PFCShaper): r"""Linear regression model using categorical predictor variables to construct a PFC shape. We follow the methodology in: https://cem-a.org/wp-content/uploads/2019/10/A-Structureal-Model-for-Electricity-Forward-Prices.pdf https://ieeexplore.ieee.org/document/6607349 https://www.researchgate.net/publication/229051446_Robust_Calculation_and_Parameter_Estimation_of_the_Hourly_Price_Forward_Curve We create a regression model for bot the seasonality shape and the intra day shape. For the regression model of the seasonality shape, the days are split into weekday, Saturdays and Sundays. Public holidays are considered as Sundays while bridge days are expected to behave like Saturdays. Afterwards, weekdays are split into clusters representing the month they are in, while Saturdays and Sundays are assigned to clusters reaching over three months. For the regression model of the intra day shape we keep the seasonality clusters but add a hourly cluster for each individual hour such that the total number of intra day clusters becomes #Season Clusters * 24. .. math:: \begin{aligned} y_\text{season}(d) &= \frac{\frac{1}{24}\sum_{i=1}^{24}h_i^d}{\frac{1}{N_y}\sum_{i=1}^{N_y} h_i^y} \\ \hat{y}_\text{season}(d) & =\beta^{0}_\text{season} + \sum_{c \in C^\text{season}}\beta^c_{\text{season}}\cdot\mathbb{I}_{\text{Cluster}(d)=c} \\ y_\text{id}(h_i,d) &= \frac{h_i^d}{\frac{1}{24}\sum_{i=1}^{24}h_i^d} \\ \hat{y}_\text{id}(h,d) & =\beta^{0}_\text{id} + \sum_{c \in C^\text{id}}\beta^c\cdot\mathbb{I}_{\text{Cluster}(h_i^d)=c} \\ s(h_i,d) &= \hat{y}_\text{id}(h_i,d)\cdot\hat{y}_\text{season}(d) \end{aligned} where: :math:`h_i^d`: i-th hour of d-th day :math:`h_i^y`: i-th hour of the year :math:`y` :math:`N_y`: number of days in year :math:`y` :math:`C^\text{season}`: set of all clusters for the seasonality shape :math:`C^\text{id}`: set of all clusters for the intra day shape :math:`\text{Cluster}(X)`: returns the cluster of X :math:`\mathbb{I}_x = \begin{cases} 1, & \text{if the } x \text{ expression renders true}\\ 0, & \text{if the } x \text{ expression renders false} \end{cases}` Args: spot_prices (pd.DataFrame): Data used to calibrate the shaping model. holiday_calendar (holidays.HolidayBase): Calendar object to obtain country specific holidays. normalization_config (Optional[Dict[Literal["D", "W", "ME"], Optional[int]]], optional): A dictionary configurating the shape normalization periods. Here ``D`` defines the number of days at the beginning of the shape over which the individual mean is normalized to one. ``W`` defines the number of weeks at the beginning of the shape over which the individual mean is normalized to one. ``ME`` defines the number of months at the beginning of the shape over which the individual mean is normalized to one. The remaining shape is then normalized over the individual years.Defaults to None. remove_outlier_season (bool): Wether to remove outliers for the seasonality shape regression. Defaults to False. remove_outlier_id (bool): Wether to remove outliers for the intra day shape regression. Defaults to False. lower_quantile_season (float): Lower quantile for outlier detection. Defauls to 0.005. upper_quantile_season (float): Upper quantile for outlier detection. Defaults to 0.995. lower_quantile_id (float): Lower quantile for outlier detection. Defauls to 0.005. upper_quantile_id (float): Upper quantile for outlier detection. Defaults to 0.995. """ def __init__( self, spot_prices: pd.DataFrame, holiday_calendar: holidays.HolidayBase, normalization_config: Optional[Dict[Literal["D", "W", "M"], Optional[int]]] = None, remove_outlier_season: bool = False, remove_outlier_id: bool = False, lower_quantile_season: float = 0.005, upper_quantile_season: float = 0.995, lower_quantile_id: float = 0.005, upper_quantile_id: float = 0.995, ): super().__init__(spot_prices=spot_prices, holiday_calendar=holiday_calendar, normalization_config=normalization_config) self.remove_outlier_season = remove_outlier_season self.remove_outlier_id = remove_outlier_id self.lower_quantile_season = lower_quantile_season self.upper_quantile_season = upper_quantile_season self.lower_quantile_id = lower_quantile_id self.upper_quantile_id = upper_quantile_id def _create_cluster_df(self, day_list: List[dt.datetime], use_hours: bool = False): """Create a DataFrame containing the clusters for the regression models. Args: day_list (List[dt.datetime]): List of datetimes for which a clustering should be performed use_hours (bool, optional): Wether to extend the clustering to include hours. Defaults to False. Returns: None """ holidays_list = pd.to_datetime(list(self.holiday_calendar.keys())) cluster_df = pd.DataFrame(index=day_list) cluster_df["year"] = cluster_df.index.year cluster_df["month"] = cluster_df.index.month cluster_df["day"] = cluster_df.index.day cluster_df["weekday"] = cluster_df.index.weekday if use_hours: cluster_df["hour"] = cluster_df.index.hour # get holidays and bridge days temp_cluster_df = cluster_df[["year", "month", "day", "weekday"]].drop_duplicates().sort_index() temp_cluster_df["holiday"] = 0 temp_cluster_df["bridge"] = 0 temp_cluster_df.loc[temp_cluster_df.index.isin(holidays_list), "holiday"] = 1 is_monday_bridge = (temp_cluster_df.index + pd.Timedelta(days=1)).isin(holidays_list) & (temp_cluster_df.index.weekday == 0) is_friday_bridge = (temp_cluster_df.index - pd.Timedelta(days=1)).isin(holidays_list) & (temp_cluster_df.index.weekday == 4) temp_cluster_df.loc[is_friday_bridge | is_monday_bridge, "bridge"] = 1 cluster_df = pd.merge(cluster_df, temp_cluster_df, on=["year", "month", "day", "weekday"]) # cluster_df.set_index(day_list, inplace=True) cluster_df.index = day_list cluster_df["day_indicator"] = 0 cluster_df.loc[cluster_df.index.weekday < 5, "day_indicator"] = 1 cluster_df.loc[cluster_df.index.weekday == 5, "day_indicator"] = 2 cluster_df.loc[cluster_df.index.weekday == 6, "day_indicator"] = 3 cluster_df.loc[cluster_df["holiday"] == 1, "day_indicator"] = 3 cluster_df.loc[cluster_df["bridge"] == 1, "day_indicator"] = 2 cluster_df.loc[(cluster_df.index.month == 12) & (cluster_df.index.day.isin([24, 31])) & (cluster_df.index.weekday < 5), "day_indicator"] = 2 cluster_df.loc[:, "cluster"] = 0 cluster_df.loc[cluster_df["day_indicator"] == 1, "cluster"] = cluster_df.loc[cluster_df["day_indicator"] == 1, "month"] weekend_cluster_month = [[1, 2, 12], [3, 4, 5], [6, 7, 8], [9, 10, 11]] # weekend_cluster_month = [[1], [2], [3], [4], [5], [6], [7], [8], [9], [10], [11], [12]] count = cluster_df["month"].max() for day_indicator in [2, 3]: for month_lst in weekend_cluster_month: if not len(cluster_df.loc[(cluster_df["day_indicator"] == day_indicator) & (cluster_df["month"].isin(month_lst)), "cluster"]) == 0: count += 1 cluster_df.loc[(cluster_df["day_indicator"] == day_indicator) & (cluster_df["month"].isin(month_lst)), "cluster"] = count if use_hours: self.__add_hours_cluster( df=cluster_df, clusters_clmn="cluster", hours_clmn="hour", unique_clusters=cluster_df["cluster"].unique(), unique_hours=cluster_df["hour"].unique(), ) return cluster_df def __add_hours_cluster(self, df: pd.DataFrame, clusters_clmn: str, hours_clmn: str, unique_clusters: List[int], unique_hours: List[int]): """Add hourly clustering in the `cluster_hours` column of the provided DataFrame Args: df (pd.DataFrame): DataFrame containing the infos needed for an hourly clustering. clusters_clmn (str): Column containing the seasonality clusters hours_clmn (str): Columns containing the hours unique_clusters (List[int]): List of all clusters unique_hours (List[int]): List of all hourly clusters """ df["cluster_hours"] = 0 count = 1 for cluster in unique_clusters: for hour in unique_hours: df.loc[(df[clusters_clmn] == cluster) & (df[hours_clmn] == hour), "cluster_hours"] = count count += 1 def _create_one_hot_matrix(self, rows: int, clusters: pd.Series, max_clusters: int, adjust_clusters: bool, offset_col: bool): """Create a matrix for a one hot encoding of a clusters pandas Series. Args: rows (int): Number of data points clusters (pd.Series): Series containing the cluster for each data point max_clusters (int): Total number of individual clusters adjust_clusters (bool): Wether to adjust cluster by subtracting each cluster by one. offset_col (bool): Wether to use the last column as an intercept for the regression model. Returns: _type_: _description_ """ one_hot = np.zeros(shape=(rows, max_clusters)) if adjust_clusters: cluster_series = clusters - 1 else: cluster_series = clusters one_hot[np.arange(rows), cluster_series] = 1 if offset_col: one_hot[:, -1] = 1 else: one_hot = one_hot[:, :-1] return one_hot def _preprocess(self, spot: pd.DataFrame) -> pd.DataFrame: # remove duplicate hours by replacing these with their mean spot = spot.groupby(level=0).mean() # include missing hours full_idx = pd.date_range(start=spot.index.min(), end=spot.index.max(), freq="h") spot = spot.reindex(full_idx) spot.index.name = "date" spot.iloc[:, 0] = spot.iloc[:, 0].interpolate(method="linear") return spot @staticmethod def _remove_outliers(df: pd.DataFrame, value_clmn: str, grouping_clmn: str, lower_quantile: float, upper_quantile: float): """ Remove outliers from a DataFrame based on quantile thresholds within groups. This function applies a quantile-based filter to the values in `value_clmn` for each unique category defined in `grouping_clmn`. For each group, values below the `lower_quantile` or above the `upper_quantile` are considered outliers and removed. The filtered rows are then returned as a cleaned DataFrame. Args: df (pd.DataFrame): Input DataFrame containing the data. value_clmn (str): Name of the column containing the numerical values to evaluate for outliers. grouping_clmn (str): Name of the column used to group the data before applying the quantile filter. lower_quantile (float): Lower quantile threshold (e.g., 0.05). Values below this quantile are removed. upper_quantile (float): Upper quantile threshold (e.g., 0.95). Values above this quantile are removed. Returns: pd.DataFrame: A DataFrame containing only the data points within the specified quantile bounds for each group. """ def remove_outliers(series, lower_quantile=lower_quantile, upper_quantile=upper_quantile): lower_bound = series.quantile(lower_quantile) upper_bound = series.quantile(upper_quantile) return series[(series >= lower_bound) & (series <= upper_bound)] keep_ids = df.groupby(grouping_clmn, group_keys=False)[value_clmn].apply(remove_outliers).index df_clean = df.loc[df.index.isin(keep_ids), :] return df_clean
[docs] def calibrate( self, ): spot = self.spot_prices.copy() spot = self._preprocess(spot=spot) cluster_df = self._create_cluster_df(spot.index, use_hours=True) season_shape = self._normalize_year(spot) season_shape = season_shape.resample("D").mean().dropna() cluster_df_daily = cluster_df[["year", "month", "day", "weekday", "day_indicator", "cluster"]].drop_duplicates().sort_index() calib_season_df = pd.merge(season_shape, cluster_df_daily, left_index=True, right_index=True) if self.remove_outlier_season: value_clmn = calib_season_df.columns[0] calib_season_df = self._remove_outliers( df=calib_season_df, value_clmn=value_clmn, grouping_clmn="cluster", lower_quantile=self.lower_quantile_season, upper_quantile=self.upper_quantile_season, ) self.__max_cluster = calib_season_df["cluster"].max() season_one_hot = self._create_one_hot_matrix( rows=len(calib_season_df), clusters=calib_season_df["cluster"], max_clusters=self.__max_cluster, adjust_clusters=True, # since clusters do not start at 0 offset_col=True, # since we would ignore the last column because it is obsolete due to our categorical variables, # we actually set it all to 1 to account for the offset in our regression model ) self._season_regression_params = ( np.linalg.inv(season_one_hot.T @ season_one_hot) @ season_one_hot.T @ calib_season_df.iloc[:, 0].to_numpy().reshape(-1, 1) ) id_shape = spot / spot.resample("D").transform("mean").dropna() calib_id_df = pd.merge(id_shape, cluster_df, left_index=True, right_index=True) if self.remove_outlier_id: value_clmn = calib_id_df.columns[0] calib_id_df = self._remove_outliers( df=calib_id_df, grouping_clmn="cluster_hours", value_clmn=value_clmn, lower_quantile=self.lower_quantile_id, upper_quantile=self.upper_quantile_id, ) self.__max_hour_clusters = calib_id_df["cluster_hours"].max() id_one_hot = self._create_one_hot_matrix( rows=len(calib_id_df), clusters=calib_id_df["cluster_hours"], max_clusters=self.__max_hour_clusters, adjust_clusters=True, offset_col=True, ) self._id_regression_params = np.linalg.inv(id_one_hot.T @ id_one_hot) @ id_one_hot.T @ calib_id_df.iloc[:, 0].to_numpy().reshape(-1, 1)
[docs] def apply(self, apply_schedule: List[dt.datetime]) -> pd.DataFrame: cluster_df = self._create_cluster_df(apply_schedule, use_hours=True) season_one_hot = self._create_one_hot_matrix( rows=len(cluster_df), clusters=cluster_df["cluster"], max_clusters=self.__max_cluster, adjust_clusters=True, offset_col=True, ) id_one_hot = self._create_one_hot_matrix( rows=len(cluster_df), clusters=cluster_df["cluster_hours"], max_clusters=self.__max_hour_clusters, adjust_clusters=True, offset_col=True, ) season_fit = season_one_hot @ self._season_regression_params id_fit = id_one_hot @ self._id_regression_params cluster_df["shape"] = (season_fit * id_fit).squeeze() cluster_df["shape"] = self._normalize_year(df=cluster_df.loc[:, "shape"]) shape_df = pd.DataFrame(cluster_df.loc[:, "shape"]) shape_df = self.normalize_shape(shape=shape_df) return shape_df
def _to_dict(self): return super()._to_dict()
[docs] class CategoricalFourierShaper(PFCShaper): r"""Linear regression model using categorical predictor variables to construct a PFC shape. We follow the methodology in: https://cem-a.org/wp-content/uploads/2019/10/A-Structureal-Model-for-Electricity-Forward-Prices.pdf https://ieeexplore.ieee.org/document/6607349 https://www.researchgate.net/publication/229051446_Robust_Calculation_and_Parameter_Estimation_of_the_Hourly_Price_Forward_Curve We create a regression model for bot the seasonality shape and the intra day shape. For the regression model of the seasonality shape, the days are split into weekday, Saturdays and Sundays. Public holidays are considered as Sundays while bridge days are expected to behave like Saturdays. Afterwards, weekdays are split into clusters representing the month they are in, while Saturdays and Sundays are assigned to clusters reaching over three months. For the regression model of the intra day shape we use a fourier series to model periodicities over each hour in a year. This way we can model the solar dip over the year more reliably. .. math:: \begin{aligned} y_\text{season}(d) &= \frac{\frac{1}{24}\sum_{i=1}^{24}h_i^d}{\frac{1}{N_y}\sum_{i=1}^{N_y} h_i^y} \\ \hat{y}_\text{season}(d) & =\beta^{0}_\text{season} + \sum_{c \in C^\text{season}}\beta^c_{\text{season}}\cdot\mathbb{I}_{\text{Cluster}(d)=c} \\ y_\text{id}(h_i,d) &= \frac{h_i^d}{\frac{1}{N_y}\sum_{i=1}^{N_y} h_i^y} \\ \hat{y}_\text{id}(h_i,d) & =\beta^{0,H(h_i^d)}_\text{id} + \sum_{k=1}^{K}\beta^{k,H(h_i^d)}_\text{id}\cdot\left(\sin(2\pi k\cdot t(h_i^d)) + \cos(2\pi k\cdot t(h_i^d))\right)\\ s(h_i,d) &= \hat{y}_\text{id}(h_i,d)\cdot\hat{y}_\text{season}(d) \end{aligned} where: :math:`h_i^d`: i-th hour of d-th day :math:`h_i^y`: i-th hour of the year :math:`y` :math:`N_y`: number of days in year :math:`y` :math:`H(h_i^d)`: hour (0-23) of :math:`h_i^d` independent of the day :math:`t(h_i^d)`: function which returns a number between [0,1] depending on the position of :math:`h_i^d` in the respecitve year :math:`C^\text{season}`: set of all clusters for the seasonality shape :math:`K`: number of fourier partials :math:`\text{Cluster}(X)`: returns the cluster of X :math:`\mathbb{I}_x = \begin{cases} 1, & \text{if the } x \text{ expression renders true}\\ 0, & \text{if the } x \text{ expression renders false} \end{cases}` Args: spot_prices (pd.DataFrame): Data used to calibrate the shaping model. holiday_calendar (holidays.HolidayBase): Calendar object to obtain country specific holidays. normalization_config (Optional[Dict[Literal["D", "W", "ME"], Optional[int]]], optional): A dictionary configurating the shape normalization periods. Here ``D`` defines the number of days at the beginning of the shape over which the individual mean is normalized to one. ``W`` defines the number of weeks at the beginning of the shape over which the individual mean is normalized to one. ``ME`` defines the number of months at the beginning of the shape over which the individual mean is normalized to one. The remaining shape is then normalized over the individual years. Defaults to None. k_fourier (int): Number of partial sums for the fourier series .Defaults to 2. remove_outlier_season (bool): Wether to remove outliers for the seasonality shape regression. Defaults to False. remove_outlier_id (bool): Wether to remove outliers for the intra day shape regression. Defaults to False. lower_quantile_season (float): Lower quantile for outlier detection. Defauls to 0.005. upper_quantile_season (float): Upper quantile for outlier detection. Defaults to 0.995. lower_quantile_id (float): Lower quantile for outlier detection. Defauls to 0.005. upper_quantile_id (float): Upper quantile for outlier detection. Defaults to 0.995. """ def __init__( self, spot_prices: pd.DataFrame, holiday_calendar: holidays.HolidayBase, normalization_config: Optional[Dict[Literal["D", "W", "M"], Optional[int]]] = None, k_fourier: int = 2, remove_outlier_season: bool = True, remove_outlier_id: bool = True, lower_quantile_season: float = 0.005, upper_quantile_season: float = 0.995, lower_quantile_id: float = 0.005, upper_quantile_id: float = 0.995, ): super().__init__(spot_prices=spot_prices, holiday_calendar=holiday_calendar, normalization_config=normalization_config) self.k_fourier = k_fourier self.remove_outlier_season = remove_outlier_season self.remove_outlier_id = remove_outlier_id self.lower_quantile_season = lower_quantile_season self.upper_quantile_season = upper_quantile_season self.lower_quantile_id = lower_quantile_id self.upper_quantile_id = upper_quantile_id def _create_cluster_df(self, day_list: List[dt.datetime], use_hours: bool = False): holidays_list = pd.to_datetime(list(self.holiday_calendar.keys())) cluster_df = pd.DataFrame(index=day_list) cluster_df["year"] = cluster_df.index.year cluster_df["month"] = cluster_df.index.month cluster_df["day"] = cluster_df.index.day cluster_df["weekday"] = cluster_df.index.weekday if use_hours: cluster_df["hour"] = cluster_df.index.hour # get holidays and bridge days temp_cluster_df = cluster_df[["year", "month", "day", "weekday"]].drop_duplicates().sort_index() temp_cluster_df["holiday"] = 0 temp_cluster_df["bridge"] = 0 temp_cluster_df.loc[temp_cluster_df.index.isin(holidays_list), "holiday"] = 1 is_monday_bridge = (temp_cluster_df.index + pd.Timedelta(days=1)).isin(holidays_list) & (temp_cluster_df.index.weekday == 0) is_friday_bridge = (temp_cluster_df.index - pd.Timedelta(days=1)).isin(holidays_list) & (temp_cluster_df.index.weekday == 4) temp_cluster_df.loc[is_friday_bridge | is_monday_bridge, "bridge"] = 1 cluster_df = pd.merge(cluster_df, temp_cluster_df, on=["year", "month", "day", "weekday"]) # cluster_df.set_index(day_list, inplace=True) cluster_df.index = day_list cluster_df["day_indicator"] = 0 cluster_df.loc[cluster_df.index.weekday < 5, "day_indicator"] = 1 cluster_df.loc[cluster_df.index.weekday == 5, "day_indicator"] = 2 cluster_df.loc[cluster_df.index.weekday == 6, "day_indicator"] = 3 cluster_df.loc[cluster_df["holiday"] == 1, "day_indicator"] = 3 cluster_df.loc[cluster_df["bridge"] == 1, "day_indicator"] = 2 cluster_df.loc[(cluster_df.index.month == 12) & (cluster_df.index.day.isin([24, 31])) & (cluster_df.index.weekday < 5), "day_indicator"] = 2 cluster_df.loc[:, "cluster"] = 0 cluster_df.loc[cluster_df["day_indicator"] == 1, "cluster"] = cluster_df.loc[cluster_df["day_indicator"] == 1, "month"] weekend_cluster_month = [[1, 2, 12], [3, 4, 5], [6, 7, 8], [9, 10, 11]] count = cluster_df["month"].max() for day_indicator in [2, 3]: for month_lst in weekend_cluster_month: if not len(cluster_df.loc[(cluster_df["day_indicator"] == day_indicator) & (cluster_df["month"].isin(month_lst)), "cluster"]) == 0: count += 1 cluster_df.loc[(cluster_df["day_indicator"] == day_indicator) & (cluster_df["month"].isin(month_lst)), "cluster"] = count if use_hours: self.__add_hours_cluster( df=cluster_df, clusters_clmn="cluster", hours_clmn="hour", unique_clusters=cluster_df["cluster"].unique(), unique_hours=cluster_df["hour"].unique(), ) return cluster_df def __add_hours_cluster(self, df: pd.DataFrame, clusters_clmn: str, hours_clmn: str, unique_clusters: List[int], unique_hours: List[int]): df["cluster_hours"] = 0 count = 1 for cluster in unique_clusters: for hour in unique_hours: df.loc[(df[clusters_clmn] == cluster) & (df[hours_clmn] == hour), "cluster_hours"] = count count += 1 def _create_one_hot_matrix(self, rows: int, clusters: pd.Series, max_clusters: int, adjust_clusters: bool, offset_col: bool): one_hot = np.zeros(shape=(rows, max_clusters)) if adjust_clusters: cluster_series = clusters - 1 else: cluster_series = clusters one_hot[np.arange(rows), cluster_series] = 1 if offset_col: one_hot[:, -1] = 1 else: one_hot = one_hot[:, :-1] return one_hot
[docs] def times_to_zero_one(self, h: pd.DatetimeIndex): """ Convert any time point into the corresponding fraction w.r.t. the beginning of the year it belongs to. 1st of January are always zeros, while 31th of December can be 1 / 365 or 1/366 according to the year. The idea is to map any year to [0,1), on which the seasonality curve, periodic on [0,1], is fitted. """ if not isinstance(h, pd.DatetimeIndex): raise TypeError("index must be of type pd.DatetimeIndex") if len(h) < 1: raise ValueError("index must contain at least one value!") # Build a DataFrame where each point is the start of the year # w.r.t. each date in h start_of_years = pd.to_datetime(h.year.astype(str) + "-01-01 00:00:00").tz_localize(h.tz) # Build a DataFrame where each point is the start of the year # w.r.t. each date in h end_of_years = pd.to_datetime((h.year + 1).astype(str) + "-01-01 00:00:00").tz_localize(h.tz) # Compute then the fractions, using pandas vectorization result = np.array((h - start_of_years) / (end_of_years - start_of_years)) # Internal sanity check: all points must lie in [0, 1) assert all(result < 1.0) and all(result >= 0.0) return result
def _preprocess(self, spot: pd.DataFrame) -> pd.DataFrame: # remove duplicate hours by replacing these with their mean spot = spot.groupby(level=0).mean() # include missing hours full_idx = pd.date_range(start=spot.index.min(), end=spot.index.max(), freq="h") spot = spot.reindex(full_idx) spot.index.name = "date" spot.iloc[:, 0] = spot.iloc[:, 0].interpolate(method="linear") return spot @staticmethod def _remove_outliers(df: pd.DataFrame, value_clmn: str, grouping_clmn: str, lower_quantile: float, upper_quantile: float): def remove_outliers(series, lower_quantile=lower_quantile, upper_quantile=upper_quantile): lower_bound = series.quantile(lower_quantile) upper_bound = series.quantile(upper_quantile) return series[(series >= lower_bound) & (series <= upper_bound)] keep_ids = df.groupby(grouping_clmn, group_keys=False)[value_clmn].apply(remove_outliers).index df_clean = df.loc[df.index.isin(keep_ids), :] return df_clean
[docs] def calibrate( self, ): spot = self.spot_prices.copy() spot = self._preprocess(spot=spot) cluster_df = self._create_cluster_df(spot.index, use_hours=True) season_shape = self._normalize_year(spot) season_shape = season_shape.resample("D").mean().dropna() cluster_df_daily = cluster_df[["year", "month", "day", "weekday", "day_indicator", "cluster"]].drop_duplicates().sort_index() calib_season_df = pd.merge(season_shape, cluster_df_daily, left_index=True, right_index=True) if self.remove_outlier_season: value_clmn = calib_season_df.columns[0] calib_season_df = self._remove_outliers( df=calib_season_df, value_clmn=value_clmn, grouping_clmn="cluster", lower_quantile=self.lower_quantile_season, upper_quantile=self.upper_quantile_season, ) self.__max_cluster = calib_season_df["cluster"].max() season_one_hot = self._create_one_hot_matrix( rows=len(calib_season_df), clusters=calib_season_df["cluster"], max_clusters=self.__max_cluster, adjust_clusters=True, # since clusters do not start at 0 offset_col=True, # since we would ignore the last column because it is obsolete due to our categorical variables, # we actually set it all to 1 to account for the offset in our regression model ) self._season_regression_params = ( np.linalg.inv(season_one_hot.T @ season_one_hot) @ season_one_hot.T @ calib_season_df.iloc[:, 0].to_numpy().reshape(-1, 1) ) id_shape = self._normalize_year(spot) calib_id_df = pd.merge(id_shape, cluster_df, left_index=True, right_index=True) calib_id_df["t"] = self.times_to_zero_one(calib_id_df.index) if self.remove_outlier_id: value_clmn = calib_id_df.columns[0] calib_id_df = self._remove_outliers( df=calib_id_df, grouping_clmn="hour", value_clmn=value_clmn, lower_quantile=self.lower_quantile_id, upper_quantile=self.upper_quantile_id, ) self._id_params = {} for h in calib_id_df["hour"].unique(): x = calib_id_df.loc[calib_id_df["hour"] == h, "t"].to_numpy().reshape(-1, 1) y = calib_id_df.loc[calib_id_df["hour"] == h, :].iloc[:, 0].to_numpy().reshape(-1, 1) m = self.k_fourier * 2 + 1 x_fit = np.zeros((x.shape[0], m)) fourier_parts = [] for k in np.arange(start=1, stop=self.k_fourier + 1): fourier_parts.append(np.sin(2 * k * np.pi * x)) fourier_parts.append(np.cos(2 * k * np.pi * x)) x_fit[:, :-1] = np.concatenate(fourier_parts, axis=1) x_fit[:, -1] = 1.0 self._id_params[h] = np.linalg.pinv(x_fit) @ y
[docs] def apply(self, apply_schedule: List[dt.datetime]) -> pd.DataFrame: cluster_df = self._create_cluster_df(apply_schedule, use_hours=True) cluster_df["t"] = self.times_to_zero_one(cluster_df.index) season_one_hot = self._create_one_hot_matrix( rows=len(cluster_df), clusters=cluster_df["cluster"], max_clusters=self.__max_cluster, adjust_clusters=True, offset_col=True, ) season_fit = season_one_hot @ self._season_regression_params self._season_fit = season_fit cluster_df["id_fit"] = 0.0 for h in cluster_df["hour"].unique(): x = cluster_df.loc[cluster_df["hour"] == h, "t"].to_numpy().reshape(-1, 1) x_fit = np.zeros((x.shape[0], 5)) m = self.k_fourier * 2 + 1 x_fit = np.zeros((x.shape[0], m)) fourier_parts = [] for k in np.arange(start=1, stop=self.k_fourier + 1): fourier_parts.append(np.sin(2 * k * np.pi * x)) fourier_parts.append(np.cos(2 * k * np.pi * x)) x_fit[:, :-1] = np.concatenate(fourier_parts, axis=1) x_fit[:, -1] = 1.0 cluster_df.loc[cluster_df["hour"] == h, "id_fit"] = (x_fit @ self._id_params[h]).squeeze() cluster_df["shape"] = season_fit.squeeze() * cluster_df["id_fit"].to_numpy() shape_df = pd.DataFrame(cluster_df.loc[:, "shape"]) shape_df = self.normalize_shape(shape=shape_df) return shape_df
def _to_dict(self): return super()._to_dict()