Source code for tsaug._augmenter.crop

from typing import List, Optional, Tuple, Union

import numpy as np

from .base import _Augmenter, _default_seed
from .resize import Resize


[docs]class Crop(_Augmenter): """ Crop random sub-sequences from time series. To guarantee all output series have the same length, if the crop size is not deterministic, all crops must be resize to a fixed length. Parameters ---------- size : int, tuple, list The length of random crops. - If int, all crops have the same length. - If list, a crop from a series has a length sampled from this list randomly. - If 2-tuple, a crop from a series has a length sampled from this interval randomly. resize : int, optional The length that all crops are resized to. Only necessary if the crop size is not fixed. repeats : int, optional The number of times a series is augmented. If greater than one, a series will be augmented so many times independently. This parameter can also be set by operator `*`. Default: 1. prob : float, optional The probability of a series is augmented. It must be in (0.0, 1.0]. If multiple output is expected, this value must be 1.0, so that all output have the same length. This parameter can also be set by operator `@`. Default: 1.0. seed : int, optional The random seed. Default: None. """ def __init__( self, size: Union[int, Tuple[int, int], List[int]], resize: Optional[int] = None, repeats: int = 1, prob: float = 1.0, seed: Optional[int] = _default_seed, ): self.size = size self.resize = resize super().__init__(repeats=repeats, prob=prob, seed=seed) @classmethod def _get_param_name(cls) -> Tuple[str, ...]: return ("size", "resize") @property def size(self) -> Union[int, Tuple[int, int], List[int]]: return self._size @size.setter def size(self, n: Union[int, Tuple[int, int], List[int]]) -> None: SIZE_ERROR_MSG = ( "Parameter `size` must be a positive integer, " "a 2-tuple of positive integers representing an interval, " "or a list of positive integers." ) if not isinstance(n, int): if isinstance(n, list): if len(n) == 0: raise ValueError(SIZE_ERROR_MSG) if not all([isinstance(nn, int) for nn in n]): raise TypeError(SIZE_ERROR_MSG) if not all([nn > 0 for nn in n]): raise ValueError(SIZE_ERROR_MSG) elif isinstance(n, tuple): if len(n) != 2: raise ValueError(SIZE_ERROR_MSG) if (not isinstance(n[0], int)) or (not isinstance(n[1], int)): raise TypeError(SIZE_ERROR_MSG) if n[0] >= n[1]: raise ValueError(SIZE_ERROR_MSG) if (n[0] <= 0) or (n[1] <= 0): raise ValueError(SIZE_ERROR_MSG) else: raise TypeError(SIZE_ERROR_MSG) elif n <= 0: raise ValueError(SIZE_ERROR_MSG) self._size = n @property def resize(self) -> Optional[int]: return self._resize @resize.setter def resize(self, s: Optional[int]) -> None: if (s is not None) and (not isinstance(s, int)): raise TypeError("Parameter `resize` must be a positive integer.") if (s is not None) and (s <= 0): raise ValueError("Parameter `resize` must be a positive integer.") self._resize = s def _augmented_series_length(self, T: int) -> int: if isinstance(self.size, int): size = [self.size] elif isinstance(self.size, tuple): size = list(range(self.size[0], self.size[1])) else: size = self.size if self.resize is not None: resize = self.resize else: if len(size) > 1: raise ValueError( "Parameter `resize` must be specified if parameter `size` " "is not a single value." ) else: resize = size[0] return resize def _augment( self, X: np.ndarray, Y: Optional[np.ndarray] ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Overwrite the memory-expensive base method. """ N, T, C = X.shape rand = np.random.RandomState(self.seed) if self.prob != 1.0: # it implies N == 1 and self.repeats == 1 if rand.uniform() > self.prob: if Y is None: return X.copy(), None else: return X.copy(), Y.copy() if isinstance(self.size, int): size = [self.size] elif isinstance(self.size, tuple): size = list(range(self.size[0], self.size[1])) else: size = self.size if self.resize is not None: resize = self.resize else: if len(size) > 1: raise ValueError( "Parameter `resize` must be specified if parameter `size` " "is not a single value." ) else: resize = size[0] X_aug = np.zeros((N * self.repeats, resize, C)) if Y is None: Y_aug = None else: L = Y.shape[2] Y_aug = np.zeros((N * self.repeats, resize, L)) crop_size = rand.choice(size, size=N * self.repeats) resizer = Resize(resize) for s in np.unique(crop_size): n = (crop_size == s).sum() crop_start = rand.choice(T - s + 1, size=n) X_aug[crop_size == s, :, :] = resizer.augment( X[ np.repeat( np.repeat(np.arange(N), self.repeats)[crop_size == s], s, ) .reshape(n, s) .astype(int), ( crop_start.reshape(n, 1) + np.arange(s).reshape(1, s) ).astype(int), :, ].reshape((n, s, C)) ) if (Y is not None) and (Y_aug is not None): Y_aug[crop_size == s, :, :] = resizer.augment( Y[ np.repeat( np.repeat(np.arange(N), self.repeats)[ crop_size == s ], s, ) .reshape(n, s) .astype(int), ( crop_start.reshape(n, 1) + np.arange(s).reshape(1, s) ).astype(int), :, ].reshape((n, s, L)) ) return X_aug, Y_aug def _augment_core( self, X: np.ndarray, Y: Optional[np.ndarray] ) -> Tuple[np.ndarray, Optional[np.ndarray]]: "Method _augment is overwritten, therefore this method is not needed." pass