Source code for audiomate.feeding.partitioning

"""
This module module provides functionality to load data
from a container into memory in chunks.
"""

import gc
import random

import numpy as np

import audiomate
from audiomate import containers
from audiomate.utils import units


[docs]class PartitioningContainerLoader: """ Load data from one or more containers in partitions. It computes a scheme to load the data of as many utterances as possible in one partition. A scheme is initially computed on creation of the loader. To compute a new one the ``reload()`` method can be used. This only has an effect if ``shuffle == True``, otherwise the utterances are defined always loaded in the same order. With a given scheme, data of a partition can be retrieved via ``load_partition_data()``. It loads all data of the partition with the given index into memory. Args: corpus_or_utt_ids (Corpus, list): Either a corpus or a list of utterances. This defines which utterances are considered for loading. containers (container.Container, list): Either a single or a list of Container objects. From the given containers data is loaded. partition_size (str): Size of the partitions in bytes. The units ``k`` (kibibytes), ``m`` (mebibytes) and ``g`` (gibibytes) are supported, i.e. a ``partition_size`` of ``1g`` equates :math:`2^{30}` bytes. shuffle (bool): Indicates whether the utterances should be returned in random order (``True``) or not (``False``). seed (int): Seed to be used for the random number generator. Example: >>> corpus = audiomate.Corpus.load('/path/to/corpus') >>> container_inputs = containers.FeatureContainer('/path/to/feat.hdf5') >>> container_outputs = containers.Container('/path/to/targets.hdf5') >>> >>> lo = PartitioningContainerLoader( >>> corpus, >>> [container_inputs, container_outputs], >>> '1G', >>> shuffle=True, >>> seed=23 >>> ) >>> len(lo.partitions) # Number of parititions 5 >>> lo.partitions[0].utt_ids # Utterances in the partition with index 0 ['utt-1', 'utt-2', ...] >>> p0 = lo.load_partition_data(0) # Load partition 0 into memory >>> p0.info.utt_ids[0] # First utterance in the partition 'utt-1' >>> p0.utt_data[0] # Data of the first utterance ( array([[0.58843831, 0.18128443, 0.19718328, 0.25284105], ...]), array([[0.0, 1.0], ...]) ) """ def __init__(self, corpus_or_utt_ids, feature_containers, partition_size, shuffle=True, seed=None): if isinstance(corpus_or_utt_ids, audiomate.Corpus): self.utt_ids = list(corpus_or_utt_ids.utterances.keys()) else: self.utt_ids = corpus_or_utt_ids if isinstance(feature_containers, containers.Container): self.containers = [feature_containers] else: self.containers = feature_containers if len(self.containers) == 0: raise ValueError('At least one container has to be provided!') self.partitions = [] self.partition_size = units.parse_storage_size(partition_size) self.shuffle = shuffle # init random state self.rand = random.Random() self.rand.seed(a=seed) # check self._raise_error_if_container_is_missing_an_utterance() # Compute utterance size and length self.utt_sizes = self._scan() self.utt_lengths = self._get_all_lengths() self.reload()
[docs] def reload(self): """ Create a new partition scheme. A scheme defines which utterances are in which partition. The scheme only changes after every call if ``self.shuffle == True``. Returns: list: List of PartitionInfo objects, defining the new partitions (same as ``self.partitions``). """ # Create the order in which utterances will be loaded utt_ids = sorted(self.utt_ids) if self.shuffle: self.rand.shuffle(utt_ids) partitions = [] current_partition = PartitionInfo() for utt_id in utt_ids: utt_size = self.utt_sizes[utt_id] utt_lengths = self.utt_lengths[utt_id] # We add utterance to the partition as long the partition-size is not exceeded # Otherwise we start with new partition. if current_partition.size + utt_size > self.partition_size: partitions.append(current_partition) current_partition = PartitionInfo() current_partition.utt_ids.append(utt_id) current_partition.utt_lengths.append(utt_lengths) current_partition.size += utt_size if current_partition.size > 0: partitions.append(current_partition) self.partitions = partitions return self.partitions
[docs] def load_partition_data(self, index): """ Load and return the partition with the given index. Args: index (int): The index of partition, that refers to the index in ``self.partitions``. Returns: PartitionData: A PartitionData object containing the data for the partition with the given index. """ info = self.partitions[index] data = PartitionData(info) for utt_id in info.utt_ids: utt_data = [c._file[utt_id][:] for c in self.containers] # skipcq: PYL-W0212 data.utt_data.append(utt_data) return data
def _raise_error_if_container_is_missing_an_utterance(self): """ Check if there is a dataset for every utterance in every container, otherwise raise an error. """ expected_keys = frozenset(self.utt_ids) for cnt in self.containers: keys = set(cnt.keys()) if not keys.issuperset(expected_keys): raise ValueError('Container is missing data for some utterances!') def _scan(self): """ For every utterance, calculate the size it will need in memory. """ utt_sizes = {} for dset_name in self.utt_ids: per_container = [] for cnt in self.containers: dset = cnt._file[dset_name] # skipcq: PYL-W0212 dtype_size = dset.dtype.itemsize record_size = dtype_size * dset.size per_container.append(record_size) utt_size = sum(per_container) if utt_size > self.partition_size: raise ValueError('Records in "{0}" are larger than the partition size'.format(dset_name)) utt_sizes[dset_name] = utt_size return utt_sizes def _get_all_lengths(self): """ For every utterance, get the length of the data in every container. Return a list of tuples. """ utt_lengths = {} for utt_idx in self.utt_ids: per_container = [c._file[utt_idx].shape[0] for c in self.containers] # skipcq: PYL-W0212 utt_lengths[utt_idx] = tuple(per_container) return utt_lengths
[docs]class PartitionInfo: """ Class for holding the info of a partition. Attributes: utt_ids (list): A list of utterance-ids in the partition. utt_lengths (list): List with lengths of the utterances (Outermost dimension in the dataset of the container). Since there are maybe multiple containers, every item is a tuple of lengths. They correspond to the length of the utterance in every container, in the order of the containers passed to the ParitioningContainerLoader. size (int): The number of bytes the partition will allocate, when loaded. """ def __init__(self): self.utt_ids = [] self.utt_lengths = [] self.size = 0
[docs] def total_lengths(self): """ Return the total length of all utterances for every container. """ return tuple(sum(x) for x in zip(*self.utt_lengths))
[docs]class PartitionData: """ Class for holding the loaded data of a partition. Args: info (PartitionInfo): The info about the partition. Attributes: utt_data (list): A list holding the data-objects for every utterance in the order of ``info.utt_ids``. The entries are also lists or tuples containing the array for every container. """ def __init__(self, info): self.info = info self.utt_data = []
[docs]class PartitioningFeatureIterator: """ Iterates over all features in the given HDF5 file. Before iterating over the features, the iterator slices the file into one or more partitions and loads the data into memory. This leads to significant speed-ups even with moderate partition sizes, regardless of the type of disk (spinning or flash). Pseudo random access is supported with a negligible impact on performance and randomness: The data is randomly sampled (without replacement) within each partition and the partitions are loaded in random order, too. The features are emitted as triplets in the form of ``(utterance name, index of the feature within the utterance, feature)``. When calculating the partition sizes only the size of the features itself is factored in, overhead of data storage is ignored. This overhead is usually negligible even with partition sizes of multiple gigabytes because the data is stored as numpy ndarrays in memory (one per utterance). The overhead of a single ndarray is 96 bytes regardless of its size. Nonetheless the partition size should be chosen to be lower than the total available memory. Args: hdf5file(h5py.File): HDF5 file containing the features partition_size(str): Size of the partitions in bytes. The units ``k`` (kibibytes), ``m`` (mebibytes) and ``g`` (gibibytes) are supported, i.e. a ``partition_size`` of ``1g`` equates :math:`2^{30}` bytes. shuffle(bool): Indicates whether the features should be returned in random order (``True``) or not (``False``). seed(int): Seed to be used for the random number generator. includes(iterable): Iterable of names of data sets that should be included when iterating over the feature container. Mutually exclusive with ``excludes``. If both are specified, only ``includes`` will be considered. excludes(iterable): Iterable of names of data sets to skip when iterating over the feature container. Mutually exclusive with ``includes``. If both are specified, only ``includes`` will be considered. Example: >>> import h5py >>> from audiomate.feeding import PartitioningFeatureIterator >>> hdf5 = h5py.File('features.h5', 'r') >>> iterator = PartitioningFeatureIterator(hdf5, '12g', shuffle=True) >>> next(iterator) ('music-fma-0100', 227, array([ -0.15004082, -0.30246958, -0.38708138, ..., -0.93471956, -0.94194776, -0.90878332 ], dtype=float32)) >>> next(iterator) ('music-fma-0081', 2196, array([ -0.00207647, -0.00101351, -0.00058832, ..., -0.00207647, -0.00292684, -0.00292684], dtype=float32)) >>> next(iterator) ('music-hd-0050', 1026, array([ -0.57352495, -0.63049972, -0.63049972, ..., 0.82490814, 0.84680521, 0.75517786], dtype=float32)) """ def __init__(self, hdf5file, partition_size, shuffle=True, seed=None, includes=None, excludes=None): self._file = hdf5file self._partition_size = units.parse_storage_size(partition_size) self._shuffle = shuffle self._seed = seed data_sets = self._filter_data_sets(hdf5file.keys(), includes=includes, excludes=excludes) if shuffle: _random_state(self._seed).shuffle(data_sets) self._data_sets = tuple(data_sets) self._partitions = [] self._partition_idx = 0 self._partition_data = None self._partition() def __iter__(self): return self def __next__(self): if self._partition_data is None or not self._partition_data.has_next(): if self._partition_data is not None: self._partition_data = None gc.collect() # signal gc that it's time to get rid of the obsolete data self._partition_data = self._load_next_partition() if self._partition_data is None: raise StopIteration return next(self._partition_data) def _load_next_partition(self): if len(self._partitions) == self._partition_idx: return None start, end = self._partitions[self._partition_idx] self._partition_idx += 1 start_dset_name, start_idx = start end_dset_name, end_idx = end start_dset_idx = self._data_sets.index(start_dset_name) end_dset_idx = self._data_sets.index(end_dset_name) if start_dset_name == end_dset_name: slices = [DataSetSlice(start_dset_name, start_idx, self._file[start_dset_name][start_idx:end_idx])] return Partition(slices, shuffle=self._shuffle, seed=self._seed) slices = [DataSetSlice(start_dset_name, start_idx, self._file[start_dset_name][start_idx:])] middle_dsets = self._data_sets[start_dset_idx + 1:end_dset_idx] for dset in middle_dsets: slices.append(DataSetSlice(dset, 0, self._file[dset][:])) slices.append(DataSetSlice(end_dset_name, 0, self._file[end_dset_name][:end_idx])) return Partition(slices, shuffle=self._shuffle, seed=self._seed) def _partition(self): dset_props = self._scan() start = None partition_free_space = self._partition_size for idx, props in enumerate(dset_props): dset_name = props.name num_records = props.num_of_records record_size = props.record_size remaining_records = props.num_of_records is_last = (idx == len(dset_props) - 1) next_record_size = None if is_last else dset_props[idx + 1].record_size if start is None: start = (dset_name, 0) while partition_free_space >= record_size and remaining_records >= 1: num_fitting_records = int(partition_free_space / record_size) num_records_taken = min(remaining_records, num_fitting_records) end_index = num_records_taken if dset_name != start[0] else start[1] + num_records_taken end = (dset_name, end_index) if num_records_taken == num_fitting_records: # Partition is going to be full afterwards self._partitions.append((start, end)) partition_free_space = self._partition_size if end[1] == num_records: # Data set is exhausted start = None break else: # Next partition starts within the same data set start = end elif num_records_taken == remaining_records and is_last: # All data sets are partitioned self._partitions.append((start, end)) break else: partition_free_space -= record_size * num_records_taken if partition_free_space < next_record_size: self._partitions.append((start, end)) start = None partition_free_space = self._partition_size break remaining_records -= num_records_taken if self._shuffle: _random_state(self._seed).shuffle(self._partitions) def _scan(self): dset_props = [] for dset_name in self._data_sets: dtype_size = self._file[dset_name].dtype.itemsize if len(self._file[dset_name]) == 0: continue num_records, items_per_record = self._file[dset_name].shape record_size = dtype_size * items_per_record if record_size > self._partition_size: raise ValueError('Records in "{0}" are larger than the partition size'.format(dset_name)) dset_props.append(DataSetProperties(dset_name, num_records, record_size)) return dset_props @staticmethod def _filter_data_sets(data_sets, includes=None, excludes=None): if includes is None: includes = frozenset() else: includes = frozenset(includes) if excludes is None: excludes = frozenset() else: excludes = frozenset(excludes) if len(includes) > 0: return [data_set for data_set in data_sets if data_set in includes] return [data_set for data_set in data_sets if data_set not in excludes]
class DataSetProperties: def __init__(self, name, num_of_records, record_size): self.name = name self.num_of_records = num_of_records self.record_size = record_size def __repr__(self): return 'DataSetProperties({0}, {1}, {2})'.format(self.name, self.num_of_records, self.record_size) class Partition: def __init__(self, slices, shuffle=True, seed=None): self._slices = slices self._total_length = 0 for item in slices: self._total_length += item.length self._index = 0 if shuffle: self._elements = _random_state(seed).permutation(self._total_length) else: self._elements = np.arange(0, self._total_length) def __iter__(self): return self def __next__(self): if self._index == self._total_length: raise StopIteration() index = self._elements[self._index] for item in self._slices: if index >= item.length: index -= item.length continue self._index += 1 # emits triplet (data set's name, original index of feature within data set, feature) return item.data_set_name, item.start_index + index, item.data[index] def has_next(self): return self._index < self._total_length class DataSetSlice: def __init__(self, data_set_name, start_index, data): self.data_set_name = data_set_name self.start_index = start_index self.length = len(data) self.data = data def _random_state(seed=None): random_state = np.random.RandomState() if seed is not None: random_state.seed(seed) return random_state