Source code for tsaug._augmenter.pool

from typing import List, Optional, Tuple, Union

import numpy as np

from .base import _Augmenter, _default_seed


[docs]class Pool(_Augmenter): """ Reduce the temporal resolution without changing the length. Parameters ---------- kind : str, optional Pooling function, one of 'max', 'min', and 'ave'. Default: 'ave'. size : str, tuple, or list, optional Size of pooling window. - If int, all series (all channels if `per_channel` is True) are pooled with the same pooling size. - If list, a series (a channel if `per_channel` is True) is pooled with a pooling size sampled from this list randomly. - If 2-tuple, a series (a channel if `per_channel` is True) is pooled with a pooling size sampled from this interval randomly. Default: 2. per_channel : bool, optional Whether to sample a pooling window for each channel in a time series or to use the same window for all channels in a time series. Only used if the pooling window is not deterministic. Default: False. repeats : int, optional The number of times a series is augmented. If greater than one, a series will be augmented so many times independently. This parameter can also be set by operator `*`. Default: 1. prob : float, optional The probability of a series is augmented. It must be in (0.0, 1.0]. This parameter can also be set by operator `@`. Default: 1.0. seed : int, optional The random seed. Default: None. """ def __init__( self, kind: str = "ave", size: Union[int, Tuple[int, int], List[int]] = 2, per_channel: bool = False, repeats: int = 1, prob: float = 1.0, seed: Optional[int] = _default_seed, ): self.kind = kind self.size = size self.per_channel = per_channel super().__init__(repeats=repeats, prob=prob, seed=seed) @classmethod def _get_param_name(cls) -> Tuple[str, ...]: return ("kind", "size", "per_channel") @property def kind(self) -> str: return self._kind @kind.setter def kind(self, k: str) -> None: if not isinstance(k, str): raise TypeError( "Parameter `kind` must be one of 'max', 'min', and 'ave'." ) if k not in ("max", "min", "ave"): raise ValueError( "Parameter `kind` must be one of 'max', 'min', and 'ave'." ) self._kind = k @property def size(self) -> Union[int, Tuple[int, int], List[int]]: return self._size @size.setter def size(self, n: Union[int, Tuple[int, int], List[int]]) -> None: SIZE_ERROR_MSG = ( "Parameter `size` must be a positive integer, " "a 2-tuple of positive integers representing an interval, " "or a list of positive integers." ) if not isinstance(n, int): if isinstance(n, list): if len(n) == 0: raise ValueError(SIZE_ERROR_MSG) if not all([isinstance(nn, int) for nn in n]): raise TypeError(SIZE_ERROR_MSG) if not all([nn > 0 for nn in n]): raise ValueError(SIZE_ERROR_MSG) elif isinstance(n, tuple): if len(n) != 2: raise ValueError(SIZE_ERROR_MSG) if (not isinstance(n[0], int)) or (not isinstance(n[1], int)): raise TypeError(SIZE_ERROR_MSG) if n[0] >= n[1]: raise ValueError(SIZE_ERROR_MSG) if (n[0] <= 0) or (n[1] <= 0): raise ValueError(SIZE_ERROR_MSG) else: raise TypeError(SIZE_ERROR_MSG) elif n <= 0: raise ValueError(SIZE_ERROR_MSG) self._size = n @property def per_channel(self) -> bool: return self._per_channel @per_channel.setter def per_channel(self, p: bool) -> None: if not isinstance(p, bool): raise TypeError("Paremeter `per_channel` must be boolean.") self._per_channel = p def _augment_core( self, X: np.ndarray, Y: Optional[np.ndarray] ) -> Tuple[np.ndarray, Optional[np.ndarray]]: rand = np.random.RandomState(self.seed) N, T, C = X.shape if isinstance(self.size, int): size = [self.size] elif isinstance(self.size, tuple): size = list(range(self.size[0], self.size[1])) else: size = self.size if self.per_channel: kernel = rand.choice(size, size=N * C) else: kernel = rand.choice(size, size=N) kernel = np.repeat(kernel, C) X_aug = X.copy() X_aug = X_aug.swapaxes(1, 2).reshape(N * C, T) if self.kind == "max": pool_func = np.max elif self.kind == "min": pool_func = np.min else: pool_func = np.mean for s in np.unique(kernel): X_aug[kernel == s, : (s * int(T / s))] = np.repeat( pool_func( X_aug[kernel == s, : (s * int(T / s))].reshape( -1, int(T / s), s ), axis=2, keepdims=True, ), s, axis=2, ).reshape(-1, s * int(T / s)) if T % s: X_aug[kernel == s, (s * int(T / s)) :] = pool_func( X_aug[kernel == s, (s * int(T / s)) :], axis=1, keepdims=True, ) X_aug = X_aug.reshape(N, C, T).swapaxes(1, 2) if Y is not None: Y_aug = Y.copy() else: Y_aug = None return X_aug, Y_aug