Source code for postprocessors.MBM

"""
    Member-By-Member postprocessor module
    =====================================

    This module contains :class:`.Data` postprocessors classes applying the postprocessing algorithm detailed in

    * Bert Van Schaeybroeck and Stéphane Vannitsem. Ensemble post-processing using member-by-member approaches: theoretical aspects. *Quarterly Journal
      of the Royal Meteorological Society*, **141** (688):807–818, 2015. URL: `<https://doi.org/10.1002/qj.2397>`_.

    Basically, it corrects a provided :class:`.Data` object :math:`\\mathcal{D}_{p,n,m,v} (t)` provided by applying the formula:

    .. math::

        \\mathcal{D}^C_{n,m,v} (t) = \\alpha_{v} (t) + \\sum_{p=1}^P \\beta_{p,v} (t) \\mu^{\\rm ens}_{p,n,v} (t) + \\tau_{v} (t) \\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t)

    where :math:`P` is the total number of predictors (given by the attribute :attr:`~.Data.number_of_predictors` of :math:`\\mathcal{D}`).
    In the formula above:

    * The Data object :math:`\\mathcal{D}_{p,n,m,v} (t)` is assumed to contain all the predictors used to correct the
      variable needed, the first predictor being by convention the variable itself: :math:`\\mathcal{D}_{0,n,m,v} (t)`.
    * :math:`\\mu^{\\rm ens}_{p,n,v} (t)` is the :attr:`.Data.ensemble_mean` of the Data object :math:`\\mathcal{D}_{p,n,m,v} (t)`.
    * :math:`\\tau_{v} (t)` is a multiplicative correction applied to each member of the first predictor of the :attr:`Data.centered_ensemble` :math:`\\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t)`.
    * :math:`\\mathcal{D}^C_{n,m,v} (t)` is the corrected Data object, with an ensemble member-by-member correction.

    Content
    -------

    Several different postprocessors are available, with different strategy to determine the coefficients :math:`\\alpha_{v} (t)`, :math:`\\beta_{p,v} (t)` and :math:`\\tau_{v}`:

    * :class:`BiasCorrection`: A simple correction of the bias.
    * :class:`EnsembleMeanCorrection`: A correction of the ensemble mean.
    * :class:`EnsembleSpreadScalingCorrection`: A correction of the ensemble mean and a correction of the ensemble spread through a multiplicative scaling.
    * :class:`EnsembleSpreadScalingAbsCRPSCorrection`: A correction of the ensemble mean and a correction of the ensemble spread, with a tuning of the parameters to
      minimize the Absolute norm CRPS score :meth:`.Data.Abs_CRPS`.
    * :class:`EnsembleSpreadScalingNgrCRPSCorrection`: A correction of the ensemble mean and a correction of the ensemble spread, with a tuning of the parameters to
      minimize the Non-homogeneous Gaussian Regression (NGR) CRPS score :meth:`.Data.Ngr_CRPS`.
    * :class:`EnsembleAbsCRPSCorrection`: A correction of the ensemble mean and a correction of the ensemble spread with spread scaling and nudging.
      In addition, the parameters are tuned to minimize the Absolute norm CRPS score :meth:`.Data.Abs_CRPS`.
    * :class:`EnsembleNgrCRPSCorrection`: A correction of the ensemble mean and a correction of the ensemble spread with spread scaling and nudging.
      In addition, the parameters are tuned to minimize the Non-homogeneous Gaussian Regression (NGR) CRPS score :meth:`.Data.Ngr_CRPS`.

    Postprocessors
    --------------

    We now detail these postprocessors:

"""
# TODO : - parameters as Data struct, ?
#        - diagnostic tools + plot (DONE)
#        - timestamps in plot (DONE)
#        - 3d diagnostic plot (2 predictors)
#        - add timestamps option in plot_parameters

import numpy as np
import warnings
import datetime
import multiprocessing
from core.utils import map_times_to_int_array
from core.data import Data
from core.postprocessor import PostProcessor
import scipy.optimize as optimize
from scipy.interpolate import interp1d
from itertools import product

try:
    import matplotlib.pyplot as plt
except ModuleNotFoundError:
    warnings.warn('Unable to import matplotlib, plotting method will not work.', ImportWarning)
    plt = None

try:
    import pandas as pd
except ModuleNotFoundError:
    warnings.warn('Unable to import pandas, loading pandas DataFrame will not work.', ImportWarning)
    pd = None


[docs]class EnsembleMeanCorrection(PostProcessor): """A postprocessor to correct the ensemble mean. Correct according to the formula: .. math:: \\mathcal{D}^C_{n,m,v} (t) = \\alpha_{v} (t) + \\sum_{p=1}^P \\beta_{p,v} (t) \\, \\mu^{\\rm ens}_{p,n,v} (t) + \\tau_{v} (t) \\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t) where :math:`\\alpha_{v} (t) = \\left\\langle \\mathcal{O}_{n,v} (t) - \\sum_{p=1}^P \\beta_{p,v} (t) \\, \\mu^{\\rm ens}_{p,n,v} (t) \\right\\rangle_n` with :math:`\\mathcal{O}_{n,v} (t)` the observation training set and :math:`\\mu^{\\rm ens}_{p,n,v} (t)` the ensemble mean of the predictors of the training set. :math:`\\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t)` is the :attr:`~.Data.centered_ensemble` of the first predictor. We have also .. math:: \\beta_{p,v} (t) = \\sum_{p_1=1}^P \\, {\\rm Cov}^{\\rm obs}_{p_1, v} [\\bar{\\mathcal{O}}^{\\rm obs}, \\mu^{\\rm ens}] (t) \\, {\\rm Cov}^{\\rm obs}_{p_1, p_2, v} [\\mu^{\\rm ens}, \\mu^{\\rm ens}] (t) where :math:`{\\rm Cov}^{\\rm obs}_{p_1, v} [\\bar{\\mathcal{O}}^{\\rm obs}, \\mu^{\\rm ens}]` is the :meth:`~.Data.ensemble_mean_observational_covariance` with the observation training set :math:`\\mathcal{O}_{n,v} (t)`, and :math:`{\\rm Cov}^{\\rm obs}_{p_1, p_2, v} [\\mu^{\\rm ens}, \\mu^{\\rm ens}]` is the :attr:`~.Data.ensemble_mean_observational_self_covariance`. Finally, we have :math:`\\tau_{v} (t) = 1` for all :math:`v` and :math:`t`. Attributes ---------- parameters_list: list(Data) The list of training parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{n,v} (t)`, :math:`\\tau_{v} (t)`, as :class:`.Data` objects. This list is empty until the first :meth:`~.BiasCorrection.train` method call. Examples -------- .. plot:: :format: doctest :include-source: 1 >>> from pp_test.test_data import test_sin >>> import postprocessors.MBM as MBM >>> import matplotlib.pyplot as plt >>> >>> # Loading random sinusoidal observations and reforecasts >>> past_observations, reforecasts = test_sin(60., 3., number_of_observation=200) >>> past_observations.index_shape (1, 200, 1, 1, 21) >>> reforecasts.index_shape (1, 200, 10, 1, 21) >>> >>> # Training the postprocessor >>> postprocessor = MBM.EnsembleMeanCorrection() >>> postprocessor.train(past_observations, reforecasts) >>> >>> # Loading new random sinusoidal observations and forecasts >>> observations, forecasts = test_sin(60., 3.) >>> corrected_forecasts = postprocessor(forecasts) # Correcting the new forecasts >>> >>> # Plotting the forecasts >>> ax = observations.plot(global_label='observations', color='tab:green') >>> ax = forecasts.plot(ax=ax, global_label='raw forecasts', mfc=None, mec='tab:blue', ... ls='', marker='o', ms=3.0) >>> ax = corrected_forecasts.plot(ax=ax, global_label='corrected forecasts', mfc=None, ... mec='tab:orange', ls='', marker='o', ms=3.0) >>> t = ax.set_ylabel('temperature [C°]') >>> t = ax.set_xlabel('date') >>> t = ax.legend() >>> t = ax.set_title('Example of corrected forecasts') >>> >>> # Plotting the postprocessing parameters >>> ax = postprocessor.plot_parameters() >>> t = ax.set_title('Postprocessing parameters \\n'+r'($\\gamma_{1,0}$ is $\\tau_{0}$)') >>> >>> # Computing the CRPS score >>> raw_crps = forecasts.CRPS(observations) >>> corrected_crps = corrected_forecasts.CRPS(observations) >>> >>> # Plotting the CRPS score >>> ax = raw_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ... global_label='Raw forecast CRPS', color='tab:blue') >>> ax = corrected_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ax=ax, ... global_label='Corrected forecast CRPS', color='tab:orange') >>> t = ax.set_ylabel('CRPS [C°]') >>> t = ax.set_xlabel('time [hour]') >>> t = ax.set_title('CRPS score') >>> t = ax.legend() >>> plt.show() """ def __init__(self): PostProcessor.__init__(self) @staticmethod def _compute_em_coefficients(observations, predictors, forecast_init_time=None): sigma_v = predictors.ensemble_mean_observational_self_covariance sigma_v = np.moveaxis(sigma_v, [0, 1], [-2, -1]) sigma_ov = predictors.ensemble_mean_observational_covariance(observations) sigma_ov = np.moveaxis(sigma_ov, [0, 1], [-2, -1]) beta = sigma_ov @ np.linalg.inv(sigma_v) beta = np.moveaxis(beta, -1, 0) index_shape = list(predictors.index_shape) index_shape[1] = index_shape[2] = 1 beta = np.squeeze(beta).reshape(tuple(index_shape) + predictors.shape) alpha = observations.observational_mean.data alpha -= np.nanmean(np.sum(beta * predictors.ensemble_mean.data, axis=0), axis=0)[np.newaxis, np.newaxis, ...] if predictors.timestamps is not None: timestamps = predictors.timestamps[0, 0] elif observations.timestamps is not None: timestamps = observations.timestamps[0, 0] else: parameters_time = None return Data(alpha, timestamps=parameters_time), Data(beta, timestamps=parameters_time) timedeltas = np.diff(timestamps) if isinstance(forecast_init_time, int): first_dt = timestamps[0].hour - forecast_init_time ptime = np.concatenate(([first_dt], first_dt + map_times_to_int_array(timedeltas.cumsum()))) elif isinstance(forecast_init_time, datetime.datetime): first_dt = timestamps[0] - forecast_init_time ptime = np.concatenate(([first_dt], map_times_to_int_array((first_dt + timedeltas).cumsum()))) else: first_dt = timestamps[0].hour ptime = np.concatenate(([first_dt], first_dt + map_times_to_int_array(timedeltas.cumsum()))) beta_parameters_time = np.empty((predictors.number_of_predictors, 1), dtype=object) alpha_parameters_time = np.empty((1, 1), dtype=object) alpha_parameters_time[0, 0] = ptime for p in range(predictors.number_of_predictors): beta_parameters_time[p, 0] = ptime return Data(alpha, timestamps=alpha_parameters_time), Data(beta, timestamps=beta_parameters_time) def _corrected_forecast(self, predictors): return self._corrected_forecast_mod(predictors, self.parameters_list) @staticmethod def _corrected_forecast_mod(predictors, parameters_list): cem = predictors.centered_ensemble.data[0] res = parameters_list[0].data res = res + np.sum(parameters_list[1].data * predictors.ensemble_mean.data, axis=0)[np.newaxis, ...] res = res + parameters_list[2].data * cem[np.newaxis, ...] if predictors.timestamps is not None: timestamps = predictors.timestamps[0][np.newaxis, ...] else: timestamps = None return Data(res, metadata=predictors.metadata, timestamps=timestamps, dtype=predictors.dtype)
[docs] def train(self, observations, predictors, forecast_init_time=None, **kwargs): """Method to train the postprocessor with an observation and predictors training set. Once trained, the postprocessor parameters are stored in the :attr:`~.EnsembleMeanCorrection.parameter_list`. Parameters ---------- observations: Data A `Data` object containing the observation training set. predictors: Data A `Data` object containing the predictors training set. Must be broadcastable with the `observations`. forecast_init_time: int or ~datetime.datetime, optional The time at which the predictors generation begin (the (re)forecasts initial time). If None, assume that this time is zero (00Z). """ obs, pred = self._sanitize_data(observations, predictors) alpha, beta = self._compute_em_coefficients(obs, pred, forecast_init_time) self.parameters_list.clear() self.parameters_list.append(alpha) self.parameters_list.append(beta) self.parameters_list.append(alpha.full_like(1.)) self.parameters_list.append(alpha.zeros_like())
[docs] def __call__(self, predictors, predictor_offset=0, proc_time_offset=0, shift_parameters=0, interpolate_offset=None, init_params=None): """Actual postprocessing of new predictors. Return the corrected first predictor. Parameters --------- predictors: Data Data object with the predictors to postprocess. The method will get their timestamps and interpolate the parameters for the timestamps that are missing. predictor_offset: int, optional Allow to skip the first `predictor_offset` values of the given predictors. Default to zero. proc_time_offset: int, optional Indicate at which time in hours the processor start with respect to the start of the forecast. Default to zero. interpolate_offset: None or int, optional Allow to start the interpolation before the first data point, and specify at which time (in hour after the start of the forecast). `None` to disable. Default to `None`. init_params: None or list(int) or list(Data), optional Specify the initial parameters of the interpolation if `interpolate_offset` is set. `None` to disable. Must be set if `interpolate_offset` is set. Default to `None`. """ if self.parameters_list: if predictors.timestamps is None: return self._corrected_forecast(predictors) else: parameters_list, ppt, t = self.get_interpolated_parameters(predictors, predictor_offset, proc_time_offset, interpolate_offset, init_params) if parameters_list is not None: data_time = predictors.timestamps.copy() for i in range(data_time.shape[0]): for j in range(data_time.shape[1]): data_time[i, j] = data_time[i, j][predictor_offset:] offset_predictors = predictors.copy() offset_predictors.data = offset_predictors.data[:, :, :, :, predictor_offset:, ...] offset_predictors.timestamps = data_time return self._corrected_forecast_mod(offset_predictors, parameters_list) else: return self._corrected_forecast(predictors) else: warnings.warn('Postprocessor is not trained. ' + 'Impossible to postprocess the predictors.', UserWarning) return None
def plot_parameters(self, timestamps=None, variable='all', ax=None, grid_point=None, **kwargs): if plt is None: warnings.warn('Matplotlib not loaded, cannot plot !', UserWarning) return None if ax is None: fig = plt.figure() ax = fig.gca() if variable == 'all': selected_var = range(self.parameters_list[0].number_of_variables) elif isinstance(variable, int): selected_var = [variable] elif isinstance(variable, slice): selected_var = range(variable.stop)[variable] elif isinstance(variable, (list, tuple)): selected_var = variable else: warnings.warn('Wrong variable argument, cannot plot !', UserWarning) return None leg = list() for var in selected_var: self.parameters_list[0].plot(variable=var, timestamps=timestamps, grid_point=grid_point, ax=ax, **kwargs) leg.append(r'$\alpha_{'+str(var)+r'}$') for var in selected_var: for p in range(self.parameters_list[1].index_shape[0]): self.parameters_list[1].plot(predictor=p, variable=var, timestamps=timestamps, grid_point=grid_point, ax=ax, **kwargs) leg.append(r'$\beta_{'+str(p)+r','+str(var)+'}$') for var in selected_var: self.parameters_list[2].plot(variable=var, timestamps=timestamps, grid_point=grid_point, ax=ax, **kwargs) leg.append(r'$\gamma_{1,' + str(var) + r'}$') for var in selected_var: self.parameters_list[3].plot(variable=var, timestamps=timestamps, grid_point=grid_point, ax=ax, **kwargs) leg.append(r'$\gamma_{2,' + str(var) + r'}$') ax.legend(leg) ax.set_xlabel('Timestep') return ax def _plot_diagnostics(self, observations, predictors, variable=0, predictor=0, timesteps=None, olabel=None, plabel=None, timestamps=None, scatter_kwargs=None, line_kwargs=None): if plt is None: warnings.warn('Matplotlib not loaded, cannot plot !', UserWarning) return None if scatter_kwargs is None: scatter_kwargs = dict() if line_kwargs is None: line_kwargs = dict() if olabel is None: olabel = '' else: olabel = '(' + olabel + ')' if plabel is None: plabel = '' else: plabel = '(' + plabel + ')' index = list(range(observations.number_of_time_steps)) if timesteps is None: timesteps = slice(observations.number_of_time_steps) index = index[timesteps] elif isinstance(timesteps, int): timesteps = slice(timesteps, timesteps+1) index = index[timesteps] elif isinstance(timesteps, (list, tuple)): index = timesteps elif isinstance(timesteps, slice): index = index[timesteps] else: warnings.warn('Wrong timesteps argument, cannot plot !', UserWarning) return None if len(index) > 1: n_panels = len(observations.data[predictor, 0, 0, variable, index]) nr = int(np.ceil(n_panels/2)) fig, axs = plt.subplots(ncols=2, nrows=nr, figsize=(15, 5*nr)) plt.subplots_adjust(hspace=0.4) axsl = axs.flatten() else: fig = plt.figure() axsl = [fig.gca()] n_panels = 1 for i in range(n_panels): obs1 = (observations[0, :, 0, variable, index[i]])[..., np.newaxis] obs = obs1.copy() for m in range(predictors.number_of_members-1): obs = np.concatenate((obs, obs1), axis=-1) axsl[i].scatter(obs.flatten(), predictors[predictor, :, :, variable, index[i]].flatten(), **scatter_kwargs) maxi = np.nanmax(predictors[predictor, :, :, variable, index[i]]) mini = np.nanmin(predictors[predictor, :, :, variable, index[i]]) y = np.linspace(mini, maxi) axsl[i].plot(self.parameters_list[0][0, 0, 0, variable, index[i]] + self.parameters_list[1][predictor, 0, 0, variable, index[i]] * y, y, **line_kwargs) if timestamps is not None: try: time = str(timestamps[index[i]]) except: time = str(index[i] + 1) else: time = str(index[i]+1) if i % 2 == 0: axsl[i].set_ylabel('Predictors '+str(predictor)+' '+plabel) axsl[i].set_title('time:'+time) axsl[i].legend() if len(axsl) > 1: axsl[-2].set_xlabel('Observations '+olabel) axsl[-1].set_xlabel('Observations '+olabel) return fig, axsl def _plot_interpolated_diagnostics(self, actual_predictors, past_observations, past_predictors, variable=0, predictor=0, timesteps=None, olabel=None, plabel=None, scatter_kwargs=None, line_kwargs=None, proc_kwargs=None): if plt is None: warnings.warn('Matplotlib not loaded, cannot plot !', UserWarning) return None if scatter_kwargs is None: scatter_kwargs = dict() if line_kwargs is None: line_kwargs = dict() if olabel is None: olabel = '' else: olabel = '(' + olabel + ')' if plabel is None: plabel = '' else: plabel = '(' + plabel + ')' parameters_list, pp_time, timestamps = self.get_interpolated_parameters(actual_predictors, **proc_kwargs) index = list(range(parameters_list[0].number_of_time_steps)) if timesteps is None: timesteps = slice(parameters_list[0].number_of_time_steps) index = index[timesteps] elif isinstance(timesteps, int): timesteps = slice(timesteps, timesteps+1) index = index[timesteps] elif isinstance(timesteps, (list, tuple)): index = timesteps elif isinstance(timesteps, slice): index = index[timesteps] else: warnings.warn('Wrong timesteps argument, cannot plot !', UserWarning) return None if len(index) > 1: n_panels = len(parameters_list[0].data[predictor, 0, 0, variable, index]) nr = int(np.ceil(n_panels/2)) fig, axs = plt.subplots(ncols=2, nrows=nr, figsize=(15, 5*nr)) plt.subplots_adjust(hspace=0.4) axsl = axs.flatten() else: fig = plt.figure() axsl = [fig.gca()] n_panels = 1 for i in range(n_panels): if timestamps[index[i]] in pp_time: pp_index = np.where(pp_time == timestamps[index[i]])[0][0] obs1 = (past_observations[0, :, 0, variable, pp_index])[..., np.newaxis] obs = obs1.copy() for m in range(past_predictors.number_of_members - 1): obs = np.concatenate((obs, obs1), axis=-1) axsl[i].scatter(obs.flatten(), past_predictors[predictor, :, :, variable, pp_index].flatten(), **scatter_kwargs) maxi = np.nanmax(past_predictors[predictor, :, :, variable, pp_index]) mini = np.nanmin(past_predictors[predictor, :, :, variable, pp_index]) else: maxi = np.nanmax(actual_predictors.data) mini = np.nanmin(actual_predictors.data) y = np.linspace(mini, maxi) axsl[i].plot(parameters_list[0][0, 0, 0, variable, index[i]] + parameters_list[1][predictor, 0, 0, variable, index[i]] * y, y, **line_kwargs) if timestamps is not None: try: time = str(timestamps[index[i]]) except: time = str(index[i] + 1) elif past_observations.timestamps is not None and not isinstance(past_observations.timestamps, list): try: time = str(past_observations.timestamps[index[i]]) except: time = str(index[i] + 1) elif past_predictors.timestamps is not None and not isinstance(past_observations.timestamps, list): try: time = str(past_predictors.timestamps[index[i]]) except: time = str(index[i] + 1) else: time = str(index[i]+1) if i % 2 == 0: axsl[i].set_ylabel('Predictors '+str(predictor)+' '+plabel) axsl[i].set_title('time:'+time) axsl[i].legend() axsl[-2].set_xlabel('Observations '+olabel) axsl[-1].set_xlabel('Observations '+olabel) return fig, axsl
[docs] def get_interpolated_parameters(self, predictors, predictor_offset=0, proc_time_offset=0, interpolate_offset=None, init_params=None): """Method to get the postprocessor parameters interpolated to fix missing timesteps. Assumes that the parameters evolve "smoothly" with the timesteps to fill the gaps. Parameters --------- predictors: Data Data object with the predictors to postprocess. The method will get their timestamps and interpolate the parameters for the timestamps that are missing. predictor_offset: int, optional Allow to skip the first `predictor_offset` values of the given predictors. Default to zero. proc_time_offset: int, optional Indicate at which time in hours the processor start with respect to the start of the forecast. Default to zero. interpolate_offset: None or int, optional Allow to start the interpolation before the first data point, and specify at which time (in hour after the start of the forecast). `None` to disable. Default to `None`. init_params: None or list(int) or list(Data), optional Specify the initial parameters of the interpolation if `interpolate_offset` is set. `None` to disable. Must be set if `interpolate_offset` is set. Default to `None`. """ if self.parameters_list: if predictors.timestamps is None: warnings.warn('Predictors data are not timestamped. Impossible to interpolate the parameters.', UserWarning) return None, None, None elif self.parameters_list[0].timestamps is None: warnings.warn('Postprocessor was not trained with timestamped data. ' + 'Impossible to interpolate the parameters.', UserWarning) return None, None, None else: proc_time_offset_delta = datetime.timedelta(hours=proc_time_offset) if interpolate_offset is not None: interpolate_offset_delta = datetime.timedelta(hours=interpolate_offset) td = np.diff(self.parameters_list[0].timestamps[0, 0]) timedelta = np.array([datetime.timedelta(seconds=3600 * t) for t in td], dtype=object) pp_time = np.concatenate((np.array([predictors.timestamps[0, 0][0] + proc_time_offset_delta]), predictors.timestamps[0, 0][0] + proc_time_offset_delta + timedelta.cumsum())) data_time = predictors.timestamps[0, 0][predictor_offset:] parameters_list_array = list() for params in self.parameters_list: shape = list(params.index_shape + params.shape) shape[4] = len(data_time) parameters_list_array.append(np.full(shape, np.nan)) if interpolate_offset is not None: interp_time = np.insert(pp_time, 0, predictors.timestamps[0, 0][0] + interpolate_offset_delta) else: interp_time = pp_time.copy() interp_time_timestamp = np.array(list(map(lambda x: x.timestamp(), interp_time))) interp_reference_time = interp_time_timestamp[0] interp_time_timestamp = interp_time_timestamp - interp_reference_time data_time_timestamp = np.array(list(map(lambda x: x.timestamp(), data_time))) data_time_timestamp = data_time_timestamp - interp_reference_time for p in range(len(self.parameters_list)): parameters = self.parameters_list[p] shape = parameters.index_shape for i, j, k, l in product(range(shape[0]), range(shape[1]), range(shape[2]), range(shape[3])): if interpolate_offset is not None: if hasattr(init_params[p], 'data'): params = np.insert(parameters[i, j, k, l], 0, init_params[p][i, j, k, l, 0], axis=0) else: params = np.insert(parameters[i, j, k, l], 0, init_params[p], axis=0) else: params = parameters[i, j, k, l] if len(params.shape) == 1: f = interp1d(interp_time_timestamp, params, kind='cubic') parameters_list_array[p][i, j, k, l] = f(data_time_timestamp) else: for ni in range(params.shape[1]): for nj in range(params.shape[2]): par1d = params[:, ni, nj] f = interp1d(interp_time_timestamp, par1d, kind='cubic') parameters_list_array[p][i, j, k, l, :, ni, nj] = f(data_time_timestamp) parameters_list = list() timedelta = np.diff(np.insert(data_time, 0, predictors.timestamps[0, 0][0])) ptime = map_times_to_int_array(timedelta.cumsum()) beta_parameters_time = np.empty((self.parameters_list[1].index_shape[0], 1), dtype=object) parameters_time = np.empty((1, 1), dtype=object) parameters_time[0, 0] = ptime for p in range(self.parameters_list[1].index_shape[0]): beta_parameters_time[p, 0] = ptime for i, parameters in enumerate(parameters_list_array): if i == 1: parameters_list.append(Data(parameters, timestamps=beta_parameters_time)) else: parameters_list.append(Data(parameters, timestamps=parameters_time)) return parameters_list, pp_time, data_time else: warnings.warn('Postprocessor is not trained. ' + 'Impossible to interpolate the parameters.', UserWarning) return None, None, None
[docs] def plot_interpolated_parameters(self, predictors, proc_kwargs, timestamps=False, variable='all', raw_param_kwargs=None, ax=None, grid_point=None, **kwargs): """Method to plot the postprocessor parameters interpolated to fix missing timesteps obtained by the method :meth:`get_interpolated_parameters`. Parameters --------- predictors: Data Data object with the predictors to postprocess. The method will get their timestamps and interpolate the parameters for the timestamps that are missing. proc_kwargs: dict Dictionary of arguments to pass to the :meth:`get_interpolated_parameters` method. timestamps: array_like, optional Timestamps to pass to the plotting routine. Supersedes the timestamps of the `predictors`. If `False`, use the timestamps of the `predictors`. Default to `False`. variable: str, slice, list or int, optional Allow to select for which variable to plot the parameters. Default to `'all'`. raw_param_kwargs: dict Dictionary of arguments to pass to the `~.Data.plot` method for the raw (not interpolated) parameters. ax: ~matplotlib.axes.Axes, optional An axes on which to plot. grid_point: tuple(int, int), optional If the data are fields, specifies the parameters of which grid point to plot. kwargs: dict Dictionary of arguments to pass to the `~.Data.plot` method for the interpolated parameters. Returns ------- ax: ~matplotlib.axes.Axes An axes where the parameters were plotted. """ if plt is None: warnings.warn('Matplotlib not loaded, cannot plot !', UserWarning) return None if self.parameters_list: if ax is None: fig = plt.figure() ax = fig.gca() leg = list() if raw_param_kwargs is None: raw_param_kwargs = dict() parameters_list, ppt, t = self.get_interpolated_parameters(predictors, **proc_kwargs) if timestamps is False: timestamps = t pp_timestamps = ppt else: timestamps = None pp_timestamps = None if variable == 'all': selected_var = range(self.parameters_list[0].number_of_variables) elif isinstance(variable, int): selected_var = [variable] elif isinstance(variable, slice): selected_var = range(variable.stop)[variable] elif isinstance(variable, (list, tuple)): selected_var = variable else: warnings.warn('Wrong variable argument, cannot plot !', UserWarning) return None for var in selected_var: self.parameters_list[0].plot(variable=var, timestamps=pp_timestamps, ax=ax, grid_point=grid_point, **raw_param_kwargs) leg.append(r'$\alpha_{'+str(var)+r'}$') for var in selected_var: for p in range(self.parameters_list[1].index_shape[0]): self.parameters_list[1].plot(predictor=p, variable=var, timestamps=pp_timestamps, ax=ax, grid_point=grid_point, **raw_param_kwargs) leg.append(r'$\beta_{'+str(p)+r','+str(var)+'}$') for var in selected_var: self.parameters_list[2].plot(variable=var, timestamps=pp_timestamps, ax=ax, grid_point=grid_point, **raw_param_kwargs) leg.append(r'$\gamma_{1,' + str(var) + r'}$') for var in selected_var: self.parameters_list[3].plot(variable=var, timestamps=pp_timestamps, ax=ax, grid_point=grid_point, **raw_param_kwargs) leg.append(r'$\gamma_{2,' + str(var) + r'}$') colors = list() for l in ax.lines: colors.append(l.get_color()) colors = iter(colors) for var in selected_var: parameters_list[0].plot(variable=var, timestamps=timestamps, ax=ax, grid_point=grid_point, color=colors.__next__(), **kwargs) for var in selected_var: for p in range(parameters_list[1].index_shape[0]): parameters_list[1].plot(predictor=p, variable=var, timestamps=timestamps, ax=ax, grid_point=grid_point, color=colors.__next__(), **kwargs) for var in selected_var: parameters_list[2].plot(variable=var, timestamps=timestamps, ax=ax, grid_point=grid_point, color=colors.__next__(), **kwargs) for var in selected_var: parameters_list[3].plot(variable=var, timestamps=timestamps, ax=ax, grid_point=grid_point, color=colors.__next__(), **kwargs) for i in range(len(leg)): leg.append(leg[i] + ' (interp1d)') ax.legend(leg) return ax else: return None
[docs]class BiasCorrection(EnsembleMeanCorrection): """A postprocessor to correct the bias. Correct according to the formula: .. math:: \\mathcal{D}^C_{n,m,v} (t) = \\alpha_{v} (t) + \\sum_{p=1}^P \\beta_{p,v} (t) \\mu^{\\rm ens}_{p,n,v} (t) + \\tau_{v} (t) \\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t) where :math:`\\alpha_{v} (t) = \\left\\langle \\mathcal{O}_{n,v} (t) - \\mu^{\\rm ens}_{0,n,v} (t) \\right\\rangle_n` with :math:`\\mathcal{O}_{n,v} (t)` is the observation training set and :math:`\\mu^{\\rm ens}_{0,n,v} (t)` is the ensemble mean of the first predictor of the training set, i.e. the :attr:`~.Data.ensemble_mean` of the predictor to correct. :math:`\\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t)` is the :attr:`~.Data.centered_ensemble` of the first predictor. We have also * :math:`\\beta_{0,v} (t) = 1` * :math:`\\beta_{p>0,v} (t) = 0` * :math:`\\tau_{v} (t) = 1` for all :math:`v` and :math:`t`. Attributes ---------- parameters_list: list(Data) The list of training parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{n,v} (t)`, :math:`\\tau_{v} (t)`, as :class:`.Data` objects. This list is empty until the first :meth:`~.BiasCorrection.train` method call. Examples -------- .. plot:: :format: doctest :include-source: 1 >>> from pp_test.test_data import test_sin >>> import postprocessors.MBM as MBM >>> import matplotlib.pyplot as plt >>> >>> # Loading random sinusoidal observations and reforecasts >>> past_observations, reforecasts = test_sin(60., 3., number_of_observation=200) >>> past_observations.index_shape (1, 200, 1, 1, 21) >>> reforecasts.index_shape (1, 200, 10, 1, 21) >>> >>> # Training the postprocessor >>> postprocessor = MBM.BiasCorrection() >>> postprocessor.train(past_observations, reforecasts) >>> >>> # Loading new random sinusoidal observations and forecasts >>> observations, forecasts = test_sin(60., 3.) >>> corrected_forecasts = postprocessor(forecasts) # Correcting the new forecasts >>> >>> # Plotting the forecasts >>> ax = observations.plot(global_label='observations', color='tab:green') >>> ax = forecasts.plot(ax=ax, global_label='raw forecasts', mfc=None, mec='tab:blue', ... ls='', marker='o', ms=3.0) >>> ax = corrected_forecasts.plot(ax=ax, global_label='corrected forecasts', mfc=None, ... mec='tab:orange', ls='', marker='o', ms=3.0) >>> t = ax.set_ylabel('temperature [C°]') >>> t = ax.set_xlabel('date') >>> t = ax.legend() >>> t = ax.set_title('Example of corrected forecasts') >>> >>> # Plotting the bias >>> ax = postprocessor.plot_parameters() >>> t = ax.set_title('Postprocessing parameters \\n'+r'($\\gamma_{1,0}$ is $\\tau_{0}$)') >>> >>> # Computing the CRPS score >>> raw_crps = forecasts.CRPS(observations) >>> corrected_crps = corrected_forecasts.CRPS(observations) >>> >>> # Plotting the CRPS score >>> ax = raw_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ... global_label='Raw forecast CRPS', color='tab:blue') >>> ax = corrected_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ax=ax, ... global_label='Corrected forecast CRPS', color='tab:orange') >>> t = ax.set_ylabel('CRPS [C°]') >>> t = ax.set_xlabel('time [hour]') >>> t = ax.set_title('CRPS score') >>> t = ax.legend() >>> plt.show() """ def __init__(self): EnsembleMeanCorrection.__init__(self) @staticmethod def _compute_bias(observations, predictors, forecast_init_time=None): alpha = observations.observational_mean.data alpha -= predictors.ensemble_mean.observational_mean.data[0][np.newaxis, ...] if predictors.timestamps is not None: timestamps = predictors.timestamps[0, 0] elif observations.timestamps is not None: timestamps = observations.timestamps[0, 0] else: parameters_time = None return Data(alpha, timestamps=parameters_time) timedeltas = np.diff(timestamps) if isinstance(forecast_init_time, int): first_dt = timestamps[0].hour - forecast_init_time ptime = np.concatenate(([first_dt], first_dt + map_times_to_int_array(timedeltas.cumsum()))) elif isinstance(forecast_init_time, datetime.datetime): first_dt = timestamps[0] - forecast_init_time ptime = np.concatenate(([first_dt], map_times_to_int_array((first_dt + timedeltas).cumsum()))) else: first_dt = timestamps[0].hour ptime = np.concatenate(([first_dt], first_dt + map_times_to_int_array(timedeltas.cumsum()))) parameters_time = np.empty((1, 1), dtype=object) parameters_time[0, 0] = ptime return Data(alpha, timestamps=parameters_time)
[docs] def train(self, observations, predictors, forecast_init_time=None, **kwargs): """Method to train the postprocessor with an observation and predictors training set. Once trained, the postprocessor parameters are stored in the :attr:`~.BiasCorrection.parameter_list`. Parameters ---------- observations: Data A `Data` object containing the observation training set. predictors: Data A `Data` object containing the predictors training set. Must be broadcastable with the `observations`. forecast_init_time: int or ~datetime.datetime, optional The time at which the predictors generation begin (the (re)forecasts initial time). If None, assume that this time is zero (00Z). """ obs, pred = self._sanitize_data(observations, predictors) alpha = self._compute_bias(obs, pred, forecast_init_time=None) beta = alpha.full_like(1.) for _ in range(pred.number_of_predictors - 1): beta.append_predictors(alpha.zeros_like()) self.parameters_list.clear() self.parameters_list.append(alpha) self.parameters_list.append(beta) self.parameters_list.append(alpha.full_like(1.)) self.parameters_list.append(alpha.zeros_like())
[docs]class EnsembleSpreadScalingCorrection(EnsembleMeanCorrection): """A postprocessor to correct the ensemble mean and scale the spread of the ensemble. Correct according to the formula: .. math:: \\mathcal{D}^C_{n,m,v} (t) = \\alpha_{v} (t) + \\sum_{p=1}^P \\beta_{p,v} (t) \\, \\mu^{\\rm ens}_{p,n,v} (t) + \\tau_{v} (t) \\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t) where :math:`\\alpha_{v} (t) = \\left\\langle \\mathcal{O}_{n,v} (t) - \\sum_{p=1}^P \\beta_{p,v} (t) \\, \\mu^{\\rm ens}_{p,n,v} (t) \\right\\rangle_n` with :math:`\\mathcal{O}_{n,v} (t)` the observation training set and :math:`\\mu^{\\rm ens}_{p,n,v} (t)` the ensemble mean of the predictors of the training set. :math:`\\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t)` is the :attr:`~.Data.centered_ensemble` of the first predictor. We have also .. math:: \\beta_{p,v} (t) = \\sum_{p_1=1}^P \\, {\\rm Cov}^{\\rm obs}_{p_1, v} [\\bar{\\mathcal{O}}^{\\rm obs}, \\mu^{\\rm ens}] (t) \\, {\\rm Cov}^{\\rm obs}_{p_1, p_2, v} [\\mu^{\\rm ens}, \\mu^{\\rm ens}] (t) where :math:`{\\rm Cov}^{\\rm obs}_{p_1, v} [\\bar{\\mathcal{O}}^{\\rm obs}, \\mu^{\\rm ens}]` is the :meth:`~.Data.ensemble_mean_observational_covariance` with the observation training set :math:`\\mathcal{O}_{n,v} (t)`, and :math:`{\\rm Cov}^{\\rm obs}_{p_1, p_2, v} [\\mu^{\\rm ens}, \\mu^{\\rm ens}]` is the :attr:`~.Data.ensemble_mean_observational_self_covariance`. Finally, we have .. math:: \\tau_{v} (t) = \\left\\langle \\sigma^{\\rm{ens}}_{n,v} (t)^2 \\right\\rangle_n^{-1} \\, \\left\\{ \\sigma^{\\rm obs}_{v} (t)^2 - \\sum_{p=1}^P \\, \\beta_{p,v} (t) \\, {\\rm Cov}^{\\rm obs}_{p, v} [\\bar{\\mathcal{O}}^{\\rm obs}, \\mu^{\\rm ens}] (t) \\right\} for all :math:`n`, which scales the spread of the ensemble over the lead time :math:`t`. In this formula, :math:`\\sigma^{\\rm{ens}}_{n,v} (t)^2` and :math:`\\sigma^{\\rm obs}_{v} (t)^2` stands respectively for the ensemble variance (:attr:`~.Data.ensemble_var`) and for the observational variance (:attr:`~.Data.observational_var`) of :math:`\\mathcal{D}`. Attributes ---------- parameters_list: list(Data) The list of training parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{n,v} (t)`, :math:`\\tau_{v} (t)`, as :class:`.Data` objects. This list is empty until the first :meth:`~.BiasCorrection.train` method call. Examples -------- .. plot:: :format: doctest :include-source: 1 >>> from pp_test.test_data import test_sin >>> import postprocessors.MBM as MBM >>> import matplotlib.pyplot as plt >>> >>> # Loading random sinusoidal observations and reforecasts >>> past_observations, reforecasts = test_sin(60., 3., number_of_observation=200) >>> past_observations.index_shape (1, 200, 1, 1, 21) >>> reforecasts.index_shape (1, 200, 10, 1, 21) >>> >>> # Training the postprocessor >>> postprocessor = MBM.EnsembleSpreadScalingCorrection() >>> postprocessor.train(past_observations, reforecasts) >>> >>> # Loading new random sinusoidal observations and forecasts >>> observations, forecasts = test_sin(60., 3.) >>> corrected_forecasts = postprocessor(forecasts) # Correcting the new forecasts >>> >>> # Plotting the forecasts >>> ax = observations.plot(global_label='observations', color='tab:green') >>> ax = forecasts.plot(ax=ax, global_label='raw forecasts', mfc=None, mec='tab:blue', ... ls='', marker='o', ms=3.0) >>> ax = corrected_forecasts.plot(ax=ax, global_label='corrected forecasts', mfc=None, ... mec='tab:orange', ls='', marker='o', ms=3.0) >>> t = ax.set_ylabel('temperature [C°]') >>> t = ax.set_xlabel('date') >>> t = ax.legend() >>> t = ax.set_title('Example of corrected forecasts') >>> >>> # Plotting the postprocessing parameters >>> ax = postprocessor.plot_parameters() >>> t = ax.set_title('Postprocessing parameters \\n'+r'($\\gamma_{1,0}$ is $\\tau_{0}$)') >>> >>> # Computing the CRPS score >>> raw_crps = forecasts.CRPS(observations) >>> corrected_crps = corrected_forecasts.CRPS(observations) >>> >>> # Plotting the CRPS score >>> ax = raw_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ... global_label='Raw forecast CRPS', color='tab:blue') >>> ax = corrected_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ax=ax, ... global_label='Corrected forecast CRPS', color='tab:orange') >>> t = ax.set_ylabel('CRPS [C°]') >>> t = ax.set_xlabel('time [hour]') >>> t = ax.set_title('CRPS score') >>> t = ax.legend() >>> plt.show() """ def __init__(self): EnsembleMeanCorrection.__init__(self) @staticmethod def _compute_ss_coefficients(observations, predictors, forecast_init_time=None): alpha, beta = EnsembleSpreadScalingCorrection._compute_em_coefficients(observations, predictors, forecast_init_time) sigma_epsnn = predictors.ensemble_var.observational_mean.data[0] sigma_o = observations.observational_var.data sigma_ov = predictors.ensemble_mean_observational_covariance(observations)[0] gamma_1_sq = (sigma_o - np.sum(beta.data * sigma_ov, axis=0)[np.newaxis, ...])/sigma_epsnn return alpha, beta, Data(np.sqrt(gamma_1_sq), metadata=alpha.metadata, timestamps=alpha.timestamps, dtype=alpha.dtype)
[docs] def train(self, observations, predictors, forecast_init_time=None, **kwargs): """Method to train the postprocessor with an observation and predictors training set. Once trained, the postprocessor parameters are stored in the :attr:`~.BiasCorrection.parameter_list`. Parameters ---------- observations: Data A `Data` object containing the observation training set. predictors: Data A `Data` object containing the predictors training set. Must be broadcastable with the `observations`. forecast_init_time: int or ~datetime.datetime, optional The time at which the predictors generation begin (the (re)forecasts initial time). If None, assume that this time is zero (00Z). """ obs, pred = self._sanitize_data(observations, predictors) alpha, beta, gamma_1 = self._compute_ss_coefficients(obs, pred, forecast_init_time) self.parameters_list.clear() self.parameters_list.append(alpha) self.parameters_list.append(beta) self.parameters_list.append(gamma_1) self.parameters_list.append(alpha.zeros_like())
[docs]class EnsembleSpreadScalingAbsCRPSCorrection(EnsembleSpreadScalingCorrection): """A postprocessor to correct the ensemble mean and scale the spread of the ensemble. Correct according to the formula: .. math:: \\mathcal{D}^C_{n,m,v} (t) = \\alpha_{v} (t) + \\sum_{p=1}^P \\beta_{p,v} (t) \\, \\mu^{\\rm ens}_{p,n,v} (t) + \\tau_{v} (t) \\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t) The parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{v} (t)` and :math:`\\tau_{v} (t)` are optimized at each lead time to minimize the absolute norm CRPS (:meth:`~.Data.Abs_CRPS`) of :math:`\\mathcal{D}^C_{n,m,v} (t)` with the observations :math:`\\mathcal{O}`. This postprocessor must thus be trained with a training set composed of past ensembles :math:`\\mathcal{D}_{p,n,m,v}` and observations :math:`\\mathcal{O}`. If needed, the parameters initial conditions for the minimization are constructed using the values given by the :class:`EnsembleSpreadScalingCorrection`, dependding on the minimizer and options being chosen. Attributes ---------- parameters_list: list(Data) The list of training parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{n,v} (t)`, :math:`\\tau_{v} (t)`, as :class:`.Data` objects. This list is empty until the first :meth:`~.BiasCorrection.train` method call. Examples -------- .. plot:: :format: doctest :include-source: 1 >>> from pp_test.test_data import test_sin >>> import postprocessors.MBM as MBM >>> import matplotlib.pyplot as plt >>> >>> # Loading random sinusoidal observations and reforecasts >>> past_observations, reforecasts = test_sin(60., 3., number_of_observation=200) >>> past_observations.index_shape (1, 200, 1, 1, 21) >>> reforecasts.index_shape (1, 200, 10, 1, 21) >>> >>> # Training the postprocessor >>> postprocessor = MBM.EnsembleSpreadScalingAbsCRPSCorrection() >>> postprocessor.train(past_observations, reforecasts, ntrial=10) >>> >>> # Loading new random sinusoidal observations and forecasts >>> observations, forecasts = test_sin(60., 3.) >>> corrected_forecasts = postprocessor(forecasts) # Correcting the new forecasts >>> >>> # Plotting the forecasts >>> ax = observations.plot(global_label='observations', color='tab:green') >>> ax = forecasts.plot(ax=ax, global_label='raw forecasts', mfc=None, mec='tab:blue', ... ls='', marker='o', ms=3.0) >>> ax = corrected_forecasts.plot(ax=ax, global_label='corrected forecasts', mfc=None, ... mec='tab:orange', ls='', marker='o', ms=3.0) >>> t = ax.set_ylabel('temperature [C°]') >>> t = ax.set_xlabel('date') >>> t = ax.legend() >>> t = ax.set_title('Example of corrected forecasts') >>> >>> # Plotting the postprocessing parameters >>> ax = postprocessor.plot_parameters() >>> t = ax.set_title('Postprocessing parameters \\n'+r'($\\gamma_{1,0}$ is $\\tau_{0}$)') >>> >>> # Computing the CRPS score >>> raw_crps = forecasts.CRPS(observations) >>> corrected_crps = corrected_forecasts.CRPS(observations) >>> >>> # Plotting the CRPS score >>> ax = raw_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ... global_label='Raw forecast CRPS', color='tab:blue') >>> ax = corrected_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ax=ax, ... global_label='Corrected forecast CRPS', color='tab:orange') >>> t = ax.set_ylabel('CRPS [C°]') >>> t = ax.set_xlabel('time [hour]') >>> t = ax.set_title('CRPS score') >>> t = ax.legend() >>> plt.show() """ def __init__(self): EnsembleSpreadScalingCorrection.__init__(self) self.min_result = None @staticmethod def _atomic_corrected_forecast(params, predictors): cem = predictors.centered_ensemble.data[0] res = params[0] res = res + np.sum((params[1:-1] * predictors.ensemble_mean.data.T).T, axis=0)[np.newaxis, ...] res = res + (params[-1]) * cem[np.newaxis, ...] return Data(res, metadata=False, timestamps=False, dtype=predictors.dtype) @staticmethod def _atomic_crps(params, observations, predictors): corrected = EnsembleSpreadScalingAbsCRPSCorrection._atomic_corrected_forecast(params, predictors) crps = corrected.Abs_CRPS(observations) return np.squeeze(crps.data) @staticmethod def _construct_params_ic(parameters_list): params = parameters_list[1] params = np.insert(params, 0, parameters_list[0]) params = np.append(params, parameters_list[2]) return params @staticmethod def _construct_params_bounds(parameters_list): params = list() params.append((-2 * np.abs(parameters_list[0]), 2 * np.abs(parameters_list[0]))) for beta in parameters_list[1]: params.append((-2 * np.abs(beta), 2 * np.abs(beta))) params.append((0, 2 * np.abs(parameters_list[2]))) return params def _minimize(self, observations, predictors, ntrial=1, num_threads=None, minimize_func=None, **kwargs): if 'method' not in kwargs.keys(): method = 'Nelder-Mead' kwargs['method'] = method else: method = kwargs['method'] if num_threads is None: pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) else: pool = multiprocessing.Pool(processes=num_threads) if minimize_func is None: minimize_func = self._atomic_crps if predictors.is_scalar() and observations.is_scalar(): nstep = predictors.number_of_time_steps nvar = predictors.number_of_variables alpha = self.parameters_list[0][0, 0, 0] beta = self.parameters_list[1][:, 0, 0, :, :] gamma_1 = self.parameters_list[2][0, 0, 0] min_result = np.empty((nvar, nstep, ntrial), dtype=object) crps_result = np.zeros((nvar, nstep, ntrial)) self.min_result = np.empty((nvar, nstep), dtype=object) self.crps_result = np.zeros((nvar, nstep)) jobs_list = list() for t in range(nstep): for n in range(nvar): observations_atom = Data(observations[:, :, :, n, t], metadata=False, timestamps=False, dtype=observations.dtype) predictors_atom = Data(predictors[:, :, :, n, t], metadata=False, timestamps=False, dtype=predictors.dtype) if method in ['differential_evolution', 'shgo', 'dual_annealing']: params = self._construct_params_bounds([alpha[n, t], beta[:, n, t], gamma_1[n, t]]) jobs_list.append([(n, t), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) elif method == 'basinhopping': params = self._construct_params_ic([alpha[n, t], beta[:, n, t], gamma_1[n, t]]) jobs_list.append([(n, t), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) else: for r in range(ntrial): rand = np.random.random(2) randa = rand[-1] - 0.5 rand_beta = np.random.random(len(beta[:, n, t])) - 0.5 params = self._construct_params_ic([4 * randa * alpha[n, t], 4 * rand_beta * beta[:, n, t], 2 * rand[0] * gamma_1[n, t]]) jobs_list.append([(n, t, r), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) result = pool.map(_compute, jobs_list) if method in ['differential_evolution', 'shgo', 'dual_annealing', 'basinhopping']: for res in result: n, t = res[0] self.parameters_list[0][0, 0, 0, n, t] = res[1].x[0] self.parameters_list[1][:, 0, 0, n, t] = res[1].x[1:-1] self.parameters_list[2][0, 0, 0, n, t] = np.abs(res[1].x[-1]) self.min_result[n, t] = res[1] self.crps_result[n, t] = res[1].fun else: for res in result: n, t, r = res[0] min_result[n, t, r] = res[1] crps_result[n, t, r] = res[1].fun min_index = np.argmin(crps_result, axis=-1) for n in range(min_result.shape[0]): for t in range(min_result.shape[1]): res = min_result[n, t, min_index[n, t]] self.parameters_list[0][0, 0, 0, n, t] = res.x[0] self.parameters_list[1][:, 0, 0, n, t] = res.x[1:-1] self.parameters_list[2][0, 0, 0, n, t] = np.abs(res.x[-1]) self.min_result[n, t] = res self.crps_result[n, t] = res.fun elif predictors.is_field() and observations.is_field(): nstep = predictors.number_of_time_steps nvar = predictors.number_of_variables ni, nj = predictors.shape alpha = self.parameters_list[0][0, 0, 0] beta = self.parameters_list[1][:, 0, 0, :, :] gamma_1 = self.parameters_list[2][0, 0, 0] min_result = np.empty((nvar, nstep, ni, nj, ntrial), dtype=object) crps_result = np.zeros((nvar, nstep, ni, nj, ntrial)) self.min_result = np.empty((nvar, nstep, ni, nj), dtype=object) self.crps_result = np.zeros((nvar, nstep, ni, nj)) jobs_list = list() for t, n, i, j in product(range(nstep), range(nvar), range(ni), range(nj)): observations_atom = Data(observations[:, :, :, n, t][..., i, j], metadata=False, timestamps=False, dtype=observations.dtype) predictors_atom = Data(predictors[:, :, :, n, t][..., i, j], metadata=False, timestamps=False, dtype=observations.dtype) if method in ['differential_evolution', 'shgo', 'dual_annealing']: params = self._construct_params_bounds([alpha[n, t, i, j], beta[:, n, t, i, j], gamma_1[n, t, i, j]]) jobs_list.append([(n, t, i, j), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) elif method == 'basinhopping': params = self._construct_params_ic([alpha[n, t, i, j], beta[:, n, t, i, j], gamma_1[n, t, i, j]]) jobs_list.append([(n, t, i, j), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) else: for r in range(ntrial): rand = np.random.random(2) randa = rand[-1] - 0.5 rand_beta = np.random.random(len(beta[:, n, t, i, j])) - 0.5 params = self._construct_params_ic([4 * randa * alpha[n, t, i, j], 4 * rand_beta * beta[:, n, t, i, j], 2 * rand[0] * gamma_1[n, t, i, j]]) jobs_list.append([(n, t, i, j, r), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) result = pool.map(_compute, jobs_list) if method in ['differential_evolution', 'shgo', 'dual_annealing', 'basinhopping']: for res in result: n, t, i, j = res[0] self.parameters_list[0].data[0, 0, 0, n, t, i, j] = res[1].x[0] self.parameters_list[1].data[:, 0, 0, n, t, i, j] = res[1].x[1:-1] self.parameters_list[2].data[0, 0, 0, n, t, i, j] = np.abs(res[1].x[-1]) self.min_result[n, t, i, j] = res[1] self.crps_result[n, t, i, j] = res[1].fun else: for res in result: n, t, i, j, r = res[0] min_result[n, t, i, j, r] = res[1] crps_result[n, t, i, j, r] = res[1].fun min_index = np.argmin(crps_result, axis=-1) for t, n, i, j in product(range(nstep), range(nvar), range(ni), range(nj)): res = min_result[n, t, i, j, min_index[n, t, i, j]] self.parameters_list[0].data[0, 0, 0, n, t, i, j] = res.x[0] self.parameters_list[1].data[:, 0, 0, n, t, i, j] = res.x[1:-1] self.parameters_list[2].data[0, 0, 0, n, t, i, j] = np.abs(res.x[-1]) self.min_result[n, t, i, j] = res self.crps_result[n, t, i, j] = res.fun pool.terminate() @staticmethod def _minimize_atom(minimize_func, observations_atom, predictors_atom, params, kwargs): if 'method' not in kwargs.keys(): method = 'Nelder-Mead' else: method = kwargs['method'] if method not in ['differential_evolution', 'shgo', 'dual_annealing', 'basinhopping']: optimizer = optimize.minimize opt_kwargs = kwargs else: optimizer = getattr(optimize, method) opt_kwargs = dict(kwargs) opt_kwargs.pop('method') res = optimizer(minimize_func, params, args=(observations_atom, predictors_atom), **opt_kwargs) return res
[docs] def train(self, observations, predictors, forecast_init_time=None, num_threads=None, ntrial=1, **kwargs): """Method to train the postprocessor with an observation and predictors training set. Once trained, the postprocessor parameters are stored in the :attr:`~.BiasCorrection.parameter_list`. Parameters ---------- observations: Data A `Data` object containing the observation training set. predictors: Data A `Data` object containing the predictors training set. Must be broadcastable with the `observations`. forecast_init_time: int or ~datetime.datetime, optional The time at which the predictors generation begin (the (re)forecasts initial time). If None, assume that this time is zero (00Z). num_threads: None or int, optional The number of threads used to compute the parameters. One thread computes a chunk of the given grid points and lead times. If `None`, uses the maximum number of cpu cores available. Default to `None`. ntrial: int Number of parameters initial conditions to try to find the best solution. Not used if the minimizer being chosen is a global one. Default to 1. kwargs: dict Dictionary of options to pass to the :func:`scipy.optimize.minimize` function. """ obs, pred = self._sanitize_data(observations, predictors) alpha, beta, gamma_1 = EnsembleSpreadScalingCorrection._compute_ss_coefficients(obs, pred, forecast_init_time) self.parameters_list = [alpha, beta, gamma_1, alpha.zeros_like()] self._minimize(obs, pred, num_threads=num_threads, ntrial=ntrial, **kwargs)
[docs]class EnsembleSpreadScalingNgrCRPSCorrection(EnsembleSpreadScalingAbsCRPSCorrection): """A postprocessor to correct the ensemble mean and scale the spread of the ensemble. Correct according to the formula: .. math:: \\mathcal{D}^C_{n,m,v} (t) = \\alpha_{v} (t) + \\sum_{p=1}^P \\beta_{p,v} (t) \\, \\mu^{\\rm ens}_{p,n,v} (t) + \\tau_{v} (t) \\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t) The parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{v} (t)` and :math:`\\tau_{v} (t)` are optimized at each lead time to minimize the Non-homogeneous Gaussian Regression (NGR) CRPS (:meth:`~.Data.Abs_Ngr`) of :math:`\\mathcal{D}^C_{n,m,v} (t)` with the observations :math:`\\mathcal{O}`. This postprocessor must thus be trained with a training set composed of past ensembles :math:`\\mathcal{D}_{p,n,m,v}` and observations :math:`\\mathcal{O}`. If needed, the parameters initial conditions for the minimization are constructed using the values given by the :class:`EnsembleSpreadScalingCorrection`, dependding on the minimizer and options being chosen. Attributes ---------- parameters_list: list(Data) The list of training parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{n,v} (t)`, :math:`\\tau_{v} (t)`, as :class:`.Data` objects. This list is empty until the first :meth:`~.BiasCorrection.train` method call. Examples -------- .. plot:: :format: doctest :include-source: 1 >>> from pp_test.test_data import test_sin >>> import postprocessors.MBM as MBM >>> import matplotlib.pyplot as plt >>> >>> # Loading random sinusoidal observations and reforecasts >>> past_observations, reforecasts = test_sin(60., 3., number_of_observation=200) >>> past_observations.index_shape (1, 200, 1, 1, 21) >>> reforecasts.index_shape (1, 200, 10, 1, 21) >>> >>> # Training the postprocessor >>> postprocessor = MBM.EnsembleSpreadScalingNgrCRPSCorrection() >>> postprocessor.train(past_observations, reforecasts, ntrial=10) >>> >>> # Loading new random sinusoidal observations and forecasts >>> observations, forecasts = test_sin(60., 3.) >>> corrected_forecasts = postprocessor(forecasts) # Correcting the new forecasts >>> >>> # Plotting the forecasts >>> ax = observations.plot(global_label='observations', color='tab:green') >>> ax = forecasts.plot(ax=ax, global_label='raw forecasts', mfc=None, mec='tab:blue', ... ls='', marker='o', ms=3.0) >>> ax = corrected_forecasts.plot(ax=ax, global_label='corrected forecasts', mfc=None, ... mec='tab:orange', ls='', marker='o', ms=3.0) >>> t = ax.set_ylabel('temperature [C°]') >>> t = ax.set_xlabel('date') >>> t = ax.legend() >>> t = ax.set_title('Example of corrected forecasts') >>> >>> # Plotting the postprocessing parameters >>> ax = postprocessor.plot_parameters() >>> t = ax.set_title('Postprocessing parameters \\n'+r'($\\gamma_{1,0}$ is $\\tau_{0}$)') >>> >>> # Computing the CRPS score >>> raw_crps = forecasts.CRPS(observations) >>> corrected_crps = corrected_forecasts.CRPS(observations) >>> >>> # Plotting the CRPS score >>> ax = raw_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ... global_label='Raw forecast CRPS', color='tab:blue') >>> ax = corrected_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ax=ax, ... global_label='Corrected forecast CRPS', color='tab:orange') >>> t = ax.set_ylabel('CRPS [C°]') >>> t = ax.set_xlabel('time [hour]') >>> t = ax.set_title('CRPS score') >>> t = ax.legend() >>> plt.show() """ def __init__(self): EnsembleSpreadScalingCorrection.__init__(self) self.min_result = None @staticmethod def _atomic_crps(params, observations, predictors): corrected = EnsembleSpreadScalingNgrCRPSCorrection._atomic_corrected_forecast(params, predictors) crps = corrected.Ngr_CRPS(observations) return np.squeeze(crps.data)
[docs]class EnsembleAbsCRPSCorrection(EnsembleSpreadScalingCorrection): """A postprocessor to correct the ensemble mean and scale the spread of the ensemble. Correct according to the formula: .. math:: \\mathcal{D}^C_{n,m,v} (t) = \\alpha_{v} (t) + \\sum_{p=1}^P \\beta_{p,v} (t) \\, \\mu^{\\rm ens}_{p,n,v} (t) + \\left(\\gamma_{1,v} (t) + \\gamma_{2,v} (t) / \\delta_{0, n, v} (t) \\right) \\, \\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t) where :math:`\\delta_{0, n, v} (t)` is the average over the ensemble members of the ensemble members distance (:attr:`~.Data.delta`). The parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{v} (t)`, :math:`\\gamma_{1,v} (t)` and :math:`\\gamma_{2, v} (t)` are optimized at each lead time to minimize the absolute norm CRPS (:meth:`~.Data.Abs_CRPS`) of :math:`\\mathcal{D}^C_{n,m,v} (t)` with the observations :math:`\\mathcal{O}`. This postprocessor must thus be trained with a training set composed of past ensembles :math:`\\mathcal{D}_{p,n,m,v}` and observations :math:`\\mathcal{O}`. If needed, the parameters initial conditions for the minimization are constructed using the values given by the :class:`EnsembleSpreadScalingCorrection`, dependding on the minimizer and options being chosen. The parameters :math:`\\gamma_{1,v} (t)` and :math:`\\gamma_{2,v} (t)` control respectively the scaling of the ensemble spread and its nudging toward the climatological variance for the long lead times. Attributes ---------- parameters_list: list(Data) The list of training parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{n,v} (t)`, :math:`\\tau_{v} (t)`, as :class:`.Data` objects. This list is empty until the first :meth:`~.BiasCorrection.train` method call. Examples -------- .. plot:: :format: doctest :include-source: 1 >>> from pp_test.test_data import test_sin >>> import postprocessors.MBM as MBM >>> import matplotlib.pyplot as plt >>> >>> # Loading random sinusoidal observations and reforecasts >>> past_observations, reforecasts = test_sin(60., 3., number_of_observation=200) >>> past_observations.index_shape (1, 200, 1, 1, 21) >>> reforecasts.index_shape (1, 200, 10, 1, 21) >>> >>> # Training the postprocessor >>> postprocessor = MBM.EnsembleAbsCRPSCorrection() >>> postprocessor.train(past_observations, reforecasts, ntrial=10) >>> >>> # Loading new random sinusoidal observations and forecasts >>> observations, forecasts = test_sin(60., 3.) >>> corrected_forecasts = postprocessor(forecasts) # Correcting the new forecasts >>> >>> # Plotting the forecasts >>> ax = observations.plot(global_label='observations', color='tab:green') >>> ax = forecasts.plot(ax=ax, global_label='raw forecasts', mfc=None, mec='tab:blue', ... ls='', marker='o', ms=3.0) >>> ax = corrected_forecasts.plot(ax=ax, global_label='corrected forecasts', mfc=None, ... mec='tab:orange', ls='', marker='o', ms=3.0) >>> t = ax.set_ylabel('temperature [C°]') >>> t = ax.set_xlabel('date') >>> t = ax.legend() >>> t = ax.set_title('Example of corrected forecasts') >>> >>> # Plotting the postprocessing parameters >>> ax = postprocessor.plot_parameters() >>> t = ax.set_title('Postprocessing parameters') >>> >>> # Computing the CRPS score >>> raw_crps = forecasts.CRPS(observations) >>> corrected_crps = corrected_forecasts.CRPS(observations) >>> >>> # Plotting the CRPS score >>> ax = raw_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ... global_label='Raw forecast CRPS', color='tab:blue') >>> ax = corrected_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ax=ax, ... global_label='Corrected forecast CRPS', color='tab:orange') >>> t = ax.set_ylabel('CRPS [C°]') >>> t = ax.set_xlabel('time [hour]') >>> t = ax.set_title('CRPS score') >>> t = ax.legend() >>> plt.show() """ def __init__(self): EnsembleSpreadScalingCorrection.__init__(self) self.min_result = None self.crps_result = None @staticmethod def _corrected_forecast_mod(predictors, parameters_list): cem = predictors.centered_ensemble.data[0] res = parameters_list[0].data res = res + np.sum(parameters_list[1].data * predictors.ensemble_mean.data, axis=0)[np.newaxis, ...] res = res + (parameters_list[2].data + parameters_list[3].data / predictors.delta.data[0]) * cem[np.newaxis, ...] if predictors.timestamps is not None: timestamps = predictors.timestamps[0][np.newaxis, ...] else: timestamps = None return Data(res, metadata=predictors.metadata, timestamps=timestamps, dtype=predictors.dtype) @staticmethod def _atomic_corrected_forecast(params, predictors): cem = predictors.centered_ensemble.data[0] res = params[0] res = res + np.sum((params[1:-2] * predictors.ensemble_mean.data.T).T, axis=0)[np.newaxis, ...] res = res + (np.abs(params[-2]) + np.abs(params[-1]) / predictors.delta.data[0]) * cem[np.newaxis, ...] return Data(res, metadata=False, timestamps=False, dtype=predictors.dtype) @staticmethod def _atomic_crps(params, observations, predictors): corrected = EnsembleAbsCRPSCorrection._atomic_corrected_forecast(params, predictors) crps = corrected.Abs_CRPS(observations) return np.squeeze(crps.data) @staticmethod def _construct_params_ic(parameters_list): params = parameters_list[1] params = np.insert(params, 0, parameters_list[0]) params = np.append(params, parameters_list[2:]) return params @staticmethod def _construct_params_bounds(parameters_list): params = list() params.append((-2 * np.abs(parameters_list[0]), 2 * np.abs(parameters_list[0]))) for beta in parameters_list[1]: params.append((-2 * np.abs(beta), 2 * np.abs(beta))) params.append((0, 2 * np.abs(parameters_list[2]))) params.append((0, 2 * np.abs(parameters_list[3]))) return params def _minimize(self, observations, predictors, ntrial=1, num_threads=None, minimize_func=None, **kwargs): if 'method' not in kwargs.keys(): method = 'Nelder-Mead' kwargs['method'] = method else: method = kwargs['method'] if num_threads is None: pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) else: pool = multiprocessing.Pool(processes=num_threads) if minimize_func is None: minimize_func = self._atomic_crps if predictors.is_scalar() and observations.is_scalar(): nstep = predictors.number_of_time_steps nvar = predictors.number_of_variables alpha = self.parameters_list[0][0, 0, 0] beta = self.parameters_list[1][:, 0, 0, :, :] gamma_1 = self.parameters_list[2][0, 0, 0] gamma_2 = self.parameters_list[3][0, 0, 0] min_result = np.empty((nvar, nstep, ntrial), dtype=object) crps_result = np.zeros((nvar, nstep, ntrial)) self.min_result = np.empty((nvar, nstep), dtype=object) self.crps_result = np.zeros((nvar, nstep)) jobs_list = list() for t in range(nstep): for n in range(nvar): observations_atom = Data(observations[:, :, :, n, t], metadata=False, timestamps=False, dtype=observations.dtype) predictors_atom = Data(predictors[:, :, :, n, t], metadata=False, timestamps=False, dtype=predictors.dtype) if method in ['differential_evolution', 'shgo', 'dual_annealing']: params = self._construct_params_bounds([alpha[n, t], beta[:, n, t], gamma_1[n, t], gamma_2[n, t]]) jobs_list.append([(n, t), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) elif method == 'basinhopping': params = self._construct_params_ic([alpha[n, t], beta[:, n, t], gamma_1[n, t], gamma_2[n, t]]) jobs_list.append([(n, t), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) else: for r in range(ntrial): rand = np.random.random(3) randa = rand[-1] - 0.5 rand_beta = np.random.random(len(beta[:, n, t])) - 0.5 params = self._construct_params_ic([4 * randa * alpha[n, t], 4 * rand_beta * beta[:, n, t], 2 * rand[0] * gamma_1[n, t], 2 * rand[1] * gamma_2[n, t]]) jobs_list.append([(n, t, r), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) result = pool.map(_compute, jobs_list) if method in ['differential_evolution', 'shgo', 'dual_annealing', 'basinhopping']: for res in result: n = res[0][0] t = res[0][1] self.parameters_list[0][0, 0, 0, n, t] = res[1].x[0] self.parameters_list[1][:, 0, 0, n, t] = res[1].x[1:-2] self.parameters_list[2][0, 0, 0, n, t] = np.abs(res[1].x[-2]) self.parameters_list[3][0, 0, 0, n, t] = np.abs(res[1].x[-1]) self.min_result[n, t] = res[1] self.crps_result[n, t] = res[1].fun else: for res in result: n = res[0][0] t = res[0][1] r = res[0][2] min_result[n, t, r] = res[1] crps_result[n, t, r] = res[1].fun min_index = np.argmin(crps_result, axis=-1) for n in range(min_result.shape[0]): for t in range(min_result.shape[1]): res = min_result[n, t, min_index[n, t]] self.parameters_list[0][0, 0, 0, n, t] = res.x[0] self.parameters_list[1][:, 0, 0, n, t] = res.x[1:-2] self.parameters_list[2][0, 0, 0, n, t] = np.abs(res.x[-2]) self.parameters_list[3][0, 0, 0, n, t] = np.abs(res.x[-1]) self.min_result[n, t] = res self.crps_result[n, t] = res.fun elif predictors.is_field() and observations.is_field(): nstep = predictors.number_of_time_steps nvar = predictors.number_of_variables ni, nj = predictors.shape alpha = self.parameters_list[0][0, 0, 0] beta = self.parameters_list[1][:, 0, 0, :, :] gamma_1 = self.parameters_list[2][0, 0, 0] gamma_2 = self.parameters_list[3][0, 0, 0] min_result = np.empty((nvar, nstep, ni, nj, ntrial), dtype=object) crps_result = np.zeros((nvar, nstep, ni, nj, ntrial)) self.min_result = np.empty((nvar, nstep, ni, nj), dtype=object) self.crps_result = np.zeros((nvar, nstep, ni, nj)) jobs_list = list() for t, n, i, j in product(range(nstep), range(nvar), range(ni), range(nj)): observations_atom = Data(observations[:, :, :, n, t][..., i, j], metadata=False, timestamps=False, dtype=observations.dtype) predictors_atom = Data(predictors[:, :, :, n, t][..., i, j], metadata=False, timestamps=False, dtype=observations.dtype) if method in ['differential_evolution', 'shgo', 'dual_annealing']: params = self._construct_params_bounds([alpha[n, t, i, j], beta[:, n, t, i, j], gamma_1[n, t, i, j], gamma_2[n, t, i, j]]) jobs_list.append([(n, t, i, j), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) elif method == 'basinhopping': params = self._construct_params_ic([alpha[n, t, i, j], beta[:, n, t, i, j], gamma_1[n, t, i, j], gamma_2[n, t, i, j]]) jobs_list.append([(n, t, i, j), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) else: for r in range(ntrial): rand = np.random.random(3) randa = rand[-1] - 0.5 rand_beta = np.random.random(len(beta[:, n, t, i, j])) - 0.5 params = self._construct_params_ic([4 * randa * alpha[n, t, i, j], 4 * rand_beta * beta[:, n, t, i, j], 2 * rand[0] * gamma_1[n, t, i, j], 2 * rand[1] * gamma_2[n, t, i, j]]) jobs_list.append([(n, t, i, j, r), self._minimize_atom, (minimize_func, observations_atom, predictors_atom, params, kwargs)]) result = pool.map(_compute, jobs_list) if method in ['differential_evolution', 'shgo', 'dual_annealing', 'basinhopping']: for res in result: n, t, i, j = res[0] self.parameters_list[0].data[0, 0, 0, n, t, i, j] = res[1].x[0] self.parameters_list[1].data[:, 0, 0, n, t, i, j] = res[1].x[1:-2] self.parameters_list[2].data[0, 0, 0, n, t, i, j] = np.abs(res[1].x[-2]) self.parameters_list[3].data[0, 0, 0, n, t, i, j] = np.abs(res[1].x[-1]) self.min_result[n, t, i, j] = res[1] self.crps_result[n, t, i, j] = res[1].fun else: for res in result: n, t, i, j, r = res[0] min_result[n, t, i, j, r] = res[1] crps_result[n, t, i, j, r] = res[1].fun min_index = np.argmin(crps_result, axis=-1) for t, n, i, j in product(range(nstep), range(nvar), range(ni), range(nj)): res = min_result[n, t, i, j, min_index[n, t, i, j]] self.parameters_list[0].data[0, 0, 0, n, t, i, j] = res.x[0] self.parameters_list[1].data[:, 0, 0, n, t, i, j] = res.x[1:-2] self.parameters_list[2].data[0, 0, 0, n, t, i, j] = np.abs(res.x[-2]) self.parameters_list[3].data[0, 0, 0, n, t, i, j] = np.abs(res.x[-1]) self.min_result[n, t, i, j] = res self.crps_result[n, t, i, j] = res.fun pool.terminate() @staticmethod def _minimize_atom(minimize_func, observations_atom, predictors_atom, params, kwargs): if 'method' not in kwargs.keys(): method = 'Nelder-Mead' else: method = kwargs['method'] if method not in ['differential_evolution', 'shgo', 'dual_annealing', 'basinhopping']: optimizer = optimize.minimize opt_kwargs = kwargs else: optimizer = getattr(optimize, method) opt_kwargs = dict(kwargs) opt_kwargs.pop('method') res = optimizer(minimize_func, params, args=(observations_atom, predictors_atom), **opt_kwargs) return res
[docs] def train(self, observations, predictors, forecast_init_time=None, num_threads=None, ntrial=1, **kwargs): """Method to train the postprocessor with an observation and predictors training set. Once trained, the postprocessor parameters are stored in the :attr:`~.BiasCorrection.parameter_list`. Parameters ---------- observations: Data A `Data` object containing the observation training set. predictors: Data A `Data` object containing the predictors training set. Must be broadcastable with the `observations`. forecast_init_time: int or ~datetime.datetime, optional The time at which the predictors generation begin (the (re)forecasts initial time). If None, assume that this time is zero (00Z). num_threads: None or int, optional The number of threads used to compute the parameters. One thread computes a chunk of the given grid points and lead times. If `None`, uses the maximum number of cpu cores available. Default to `None`. ntrial: int Number of parameters initial conditions to try to find the best solution. Not used if the minimizer being chosen is a global one. Default to 1. kwargs: dict Dictionary of options to pass to the :func:`scipy.optimize.minimize` function. """ obs, pred = self._sanitize_data(observations, predictors) alpha, beta, gamma_1 = EnsembleSpreadScalingCorrection._compute_ss_coefficients(obs, pred, forecast_init_time) gamma_2 = gamma_1.copy() self.parameters_list = [alpha, beta, gamma_1, gamma_2] self._minimize(obs, pred, num_threads=num_threads, ntrial=ntrial, **kwargs)
[docs]class EnsembleNgrCRPSCorrection(EnsembleAbsCRPSCorrection): """A postprocessor to correct the ensemble mean and scale the spread of the ensemble. Correct according to the formula: .. math:: \\mathcal{D}^C_{n,m,v} (t) = \\alpha_{v} (t) + \\sum_{p=1}^P \\beta_{p,v} (t) \\, \\mu^{\\rm ens}_{p,n,v} (t) + \\left(\\gamma_{1,v} (t) + \\gamma_{2,v} (t) / \\delta_{0, n, v} (t) \\right) \\, \\bar{\\mathcal{D}}^{\\rm ens}_{0,n,m,v} (t) where :math:`\\delta_{0, n, v} (t)` is the average over the ensemble members of the ensemble members distance (:attr:`~.Data.delta`). The parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{v} (t)`, :math:`\\gamma_{1,v} (t)` and :math:`\\gamma_{2,v} (t)` are optimized at each lead time to minimize Non-homogeneous Gaussian Regression (NGR) CRPS (:meth:`~.Data.Abs_Ngr`) of :math:`\\mathcal{D}^C_{n,m,v} (t)` with the observations :math:`\\mathcal{O}`. This postprocessor must thus be trained with a training set composed of past ensembles :math:`\\mathcal{D}_{p,n,m,v}` and observations :math:`\\mathcal{O}`. If needed, the parameters initial conditions for the minimization are constructed using the values given by the :class:`EnsembleSpreadScalingCorrection`, dependding on the minimizer and options being chosen. The parameters :math:`\\gamma_{1,v} (t)` and :math:`\\gamma_{2,v} (t)` control respectively the scaling of the ensemble spread and its nudging toward the climatological variance for the long lead times. Attributes ---------- parameters_list: list(Data) The list of training parameters :math:`\\alpha_{v} (t)`, :math:`\\beta_{n,v} (t)`, :math:`\\tau_{v} (t)`, as :class:`.Data` objects. This list is empty until the first :meth:`~.BiasCorrection.train` method call. Examples -------- .. plot:: :format: doctest :include-source: 1 >>> from pp_test.test_data import test_sin >>> import postprocessors.MBM as MBM >>> import matplotlib.pyplot as plt >>> >>> # Loading random sinusoidal observations and reforecasts >>> past_observations, reforecasts = test_sin(60., 3., number_of_observation=200) >>> past_observations.index_shape (1, 200, 1, 1, 21) >>> reforecasts.index_shape (1, 200, 10, 1, 21) >>> >>> # Training the postprocessor >>> postprocessor = MBM.EnsembleNgrCRPSCorrection() >>> postprocessor.train(past_observations, reforecasts, ntrial=10) >>> >>> # Loading new random sinusoidal observations and forecasts >>> observations, forecasts = test_sin(60., 3.) >>> corrected_forecasts = postprocessor(forecasts) # Correcting the new forecasts >>> >>> # Plotting the forecasts >>> ax = observations.plot(global_label='observations', color='tab:green') >>> ax = forecasts.plot(ax=ax, global_label='raw forecasts', mfc=None, mec='tab:blue', ... ls='', marker='o', ms=3.0) >>> ax = corrected_forecasts.plot(ax=ax, global_label='corrected forecasts', mfc=None, ... mec='tab:orange', ls='', marker='o', ms=3.0) >>> t = ax.set_ylabel('temperature [C°]') >>> t = ax.set_xlabel('date') >>> t = ax.legend() >>> t = ax.set_title('Example of corrected forecasts') >>> >>> # Plotting the postprocessing parameters >>> ax = postprocessor.plot_parameters() >>> t = ax.set_title('Postprocessing parameters') >>> >>> # Computing the CRPS score >>> raw_crps = forecasts.CRPS(observations) >>> corrected_crps = corrected_forecasts.CRPS(observations) >>> >>> # Plotting the CRPS score >>> ax = raw_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ... global_label='Raw forecast CRPS', color='tab:blue') >>> ax = corrected_crps.plot(timestamps=postprocessor.parameters_list[0].timestamps[0, 0], ax=ax, ... global_label='Corrected forecast CRPS', color='tab:orange') >>> t = ax.set_ylabel('CRPS [C°]') >>> t = ax.set_xlabel('time [hour]') >>> t = ax.set_title('CRPS score') >>> t = ax.legend() >>> plt.show() """ def __init__(self): EnsembleAbsCRPSCorrection.__init__(self) @staticmethod def _corrected_forecast_mod(predictors, parameters_list): cem = predictors.centered_ensemble.data[0] res = parameters_list[0].data res = res + np.sum(parameters_list[1].data * predictors.ensemble_mean.data, axis=0)[np.newaxis, ...] res = res + np.sqrt(parameters_list[2].data**2 + parameters_list[3].data**2 / predictors.ensemble_var.data[0]) * cem[np.newaxis, ...] if predictors.timestamps is not None: timestamps = predictors.timestamps[0][np.newaxis, ...] else: timestamps = None return Data(res, metadata=predictors.metadata, timestamps=timestamps, dtype=predictors.dtype) @staticmethod def _atomic_corrected_forecast(params, predictors): cem = predictors.centered_ensemble.data[0] res = params[0] res = res + np.sum((params[1:-2] * predictors.ensemble_mean.data.T).T, axis=0)[np.newaxis, ...] res = res + np.sqrt(params[-2]**2 + params[-1]**2 / predictors.ensemble_var.data[0]) * cem[np.newaxis, ...] return Data(res, metadata=False, timestamps=False, dtype=predictors.dtype) @staticmethod def _atomic_crps(params, observations, predictors): corrected = EnsembleNgrCRPSCorrection._atomic_corrected_forecast(params, predictors) crps = corrected.Ngr_CRPS(observations) return np.squeeze(crps.data)
def _compute(ls): return ls[0], ls[1](*ls[2])