import gc
import re
import h5py
import numpy as np
from audiomate.utils import stats
[docs]class FeatureContainer(object):
"""
A feature-container holds matrix-like data. The data is stored as HDF5 file.
The feature-container provides functionality to access this data. For each utterance a hdf5
data set is created within the file, if there is feature-data for a given utterance.
Args:
path (str): Path to where the HDF5 file is stored. If the file doesn't exist, one is
created.
Examples::
>>> fc = FeatureContainer('/path/to/hdf5file')
>>> with fc:
>>> fc.set('utt-1', np.array([1,2,3,4]))
>>> data = fc.get('utt-1')
array([1, 2, 3, 4])
"""
[docs] def open(self):
"""
Open the feature container file in order to read/write to it.
"""
if self._file is None:
self._file = h5py.File(self.path, 'a')
[docs] def close(self):
"""
Close the feature container file if its open.
"""
if self._file is not None:
self._file.close()
self._file = None
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __init__(self, path):
self.path = path
self._file = None
@property
def frame_size(self):
""" The number of samples used per frame. """
self._check_is_open()
return self._file.attrs['frame-size']
@frame_size.setter
def frame_size(self, frame_size):
self._check_is_open()
self._file.attrs['frame-size'] = frame_size
@property
def hop_size(self):
""" The number of samples between two frames. """
self._check_is_open()
return self._file.attrs['hop-size']
@hop_size.setter
def hop_size(self, hop_size):
self._check_is_open()
self._file.attrs['hop-size'] = hop_size
@property
def sampling_rate(self):
""" The sampling-rate of the signal these frames are based on. """
self._check_is_open()
return self._file.attrs['sampling-rate']
@sampling_rate.setter
def sampling_rate(self, sampling_rate):
self._check_is_open()
self._file.attrs['sampling-rate'] = sampling_rate
[docs] def keys(self):
"""
Return all keys available in the feature-container.
Returns:
keys (list): List of identifiers available in the feature-container.
Note:
The feature container has to be opened in advance.
"""
self._check_is_open()
return list(self._file.keys())
[docs] def set(self, utterance_idx, features):
"""
Add the given feature matrix to the feature container for the utterance with the given id.
Any existing features of the utterance in this container are discarded/overwritten.
Args:
utterance_idx (str): The ID of the utterance to store the features for.
features (numpy.ndarray): A np.ndarray with the features.
Note:
The feature container has to be opened in advance.
"""
self._check_is_open()
if utterance_idx in self._file:
del self._file[utterance_idx]
self._file.create_dataset(utterance_idx, data=features)
[docs] def append(self, utterance_idx, features):
"""
Append the given features to possible existing features of the given utterance.
Args:
utterance_idx (str): The id of the utterance.
features (numpy.ndarray): A np.ndarray with the features.
They have to be the same dimension as the existing ones.
Note:
The feature container has to be opened in advance.
For updating the h5py-Dataset has to be chunked, so it is not allowed to first add features via ``set``.
"""
existing = self.get(utterance_idx, mem_map=True)
if existing is not None:
num_existing = existing.shape[0]
if existing.shape[1:] != features.shape[1:]:
raise ValueError(
'The features to append need to have the same dimensions ({}).'.format(existing.shape[1:]))
existing.resize(num_existing + features.shape[0], 0)
existing[num_existing:] = features
else:
max_shape = list(features.shape)
max_shape[0] = None
self._file.create_dataset(utterance_idx, data=features, chunks=True, maxshape=max_shape)
[docs] def remove(self, utterance_idx):
"""
Remove the features stored for the given utterance-id.
Args:
utterance_idx (str): ID of the utterance.
Note:
The feature container has to be opened in advance.
"""
self._check_is_open()
if utterance_idx in self._file:
del self._file[utterance_idx]
[docs] def get(self, utterance_idx, mem_map=True):
"""
Read and return the features stored for the given utterance-id.
Args:
utterance_idx (str): The ID of the utterance to get the feature-matrix from.
mem_map (bool): If True returns the features as memory-mapped array, otherwise a copy is returned.
Note:
The feature container has to be opened in advance.
Returns:
numpy.ndarray: The stored data.
"""
self._check_is_open()
if utterance_idx in self._file:
data = self._file[utterance_idx]
if not mem_map:
data = data[()]
return data
else:
return None
[docs] def stats(self):
"""
Return statistics calculated overall features in the container.
Note:
The feature container has to be opened in advance.
Returns:
DataStats: Statistics overall data points of all features.
"""
self._check_is_open()
per_utt_stats = self.stats_per_utterance()
return stats.DataStats.concatenate(per_utt_stats.values())
[docs] def stats_per_utterance(self):
"""
Return statistics calculated for each utterance in the container.
Note:
The feature container has to be opened in advance.
Returns:
dict: A dictionary containing a DataStats object for each utterance.
"""
self._check_is_open()
all_stats = {}
for utt_id, data in self._file.items():
data = data[()]
all_stats[utt_id] = stats.DataStats(float(np.mean(data)),
float(np.var(data)),
np.min(data),
np.max(data),
data.size)
return all_stats
def _check_is_open(self):
if self._file is None:
raise ValueError('The feature container is not opened!')
[docs]class PartitioningFeatureIterator(object):
"""
Iterates over all features in the given HDF5 file.
Before iterating over the features, the iterator slices the file into one or more partitions and loads the data into
memory. This leads to significant speed-ups even with moderate partition sizes, regardless of the type of disk
(spinning or flash). Pseudo random access is supported with a negligible impact on performance and randomness: The
data is randomly sampled (without replacement) within each partition and the partitions are loaded in random order,
too.
The features are emitted as triplets in the form of
``(utterance name, index of the feature within the utterance, feature)``.
When calculating the partition sizes only the size of the features itself is factored in, overhead of data storage
is ignored. This overhead is usually negligible even with partition sizes of multiple gigabytes because the data is
stored as numpy ndarrays in memory (one per utterance). The overhead of a single ndarray is 96 bytes regardless of
its size. Nonetheless the partition size should be chosen to be lower than the total available memory.
Args:
hdf5file(h5py.File): HDF5 file containing the features
partition_size(str): Size of the partitions in bytes. The units ``k`` (kibibytes), ``m`` (mebibytes) and ``g``
(gibibytes) are supported, i.e. a ``partition_size`` of ``1g`` equates :math:`2^{30}`
bytes.
shuffle(bool): Indicates whether the features should be returned in random order (``True``) or not (``False``).
seed(int): Seed to be used for the random number generator.
includes(iterable): Iterable of names of data sets that should be included when iterating over the feature
container. Mutually exclusive with ``excludes``. If both are specified, only ``includes``
will be considered.
excludes(iterable): Iterable of names of data sets to skip when iterating over the feature container. Mutually
exclusive with ``includes``. If both are specified, only ``includes`` will be considered.
Example:
>>> import h5py
>>> from audiomate.corpus.assets import PartitioningFeatureIterator
>>> hdf5 = h5py.File('features.h5', 'r')
>>> iterator = PartitioningFeatureIterator(hdf5, '12g', shuffle=True)
>>> next(iterator)
('music-fma-0100', 227, array([-0.15004082, -0.30246958, -0.38708138, ..., -0.93471956,
-0.94194776, -0.90878332], dtype=float32))
>>> next(iterator)
('music-fma-0081', 2196, array([-0.00207647, -0.00101351, -0.00058832, ..., -0.00207647,
-0.00292684, -0.00292684], dtype=float32))
>>> next(iterator)
('music-hd-0050', 1026, array([-0.57352495, -0.63049972, -0.63049972, ..., 0.82490814,
0.84680521, 0.75517786], dtype=float32))
"""
PARTITION_SIZE_PATTERN = re.compile('^([0-9]+(\.[0-9]+)?)([gmk])?$', re.I)
def __init__(self, hdf5file, partition_size, shuffle=True, seed=None, includes=None, excludes=None):
self._file = hdf5file
self._partition_size = self._parse_partition_size(partition_size)
self._shuffle = shuffle
self._seed = seed
data_sets = self._filter_data_sets(hdf5file.keys(), includes=includes, excludes=excludes)
if shuffle:
_random_state(self._seed).shuffle(data_sets)
self._data_sets = tuple(data_sets)
self._partitions = []
self._partition_idx = 0
self._partition_data = None
self._partition()
def __iter__(self):
return self
def __next__(self):
if self._partition_data is None or not self._partition_data.has_next():
if self._partition_data is not None:
self._partition_data = None
gc.collect() # signal gc that it's time to get rid of the obsolete data
self._partition_data = self._load_next_partition()
if self._partition_data is None:
raise StopIteration
return next(self._partition_data)
def _load_next_partition(self):
if len(self._partitions) == self._partition_idx:
return None
start, end = self._partitions[self._partition_idx]
self._partition_idx += 1
start_dset_name, start_idx = start
end_dset_name, end_idx = end
start_dset_idx = self._data_sets.index(start_dset_name)
end_dset_idx = self._data_sets.index(end_dset_name)
if start_dset_name == end_dset_name:
slices = [DataSetSlice(start_dset_name, start_idx, self._file[start_dset_name][start_idx:end_idx])]
return Partition(slices, shuffle=self._shuffle, seed=self._seed)
slices = [DataSetSlice(start_dset_name, start_idx, self._file[start_dset_name][start_idx:])]
middle_dsets = self._data_sets[start_dset_idx + 1:end_dset_idx]
for dset in middle_dsets:
slices.append(DataSetSlice(dset, 0, self._file[dset][:]))
slices.append(DataSetSlice(end_dset_name, 0, self._file[end_dset_name][:end_idx]))
return Partition(slices, shuffle=self._shuffle, seed=self._seed)
def _partition(self):
dset_props = self._scan()
start = None
partition_free_space = self._partition_size
for idx, props in enumerate(dset_props):
dset_name = props.name
num_records = props.num_of_records
record_size = props.record_size
remaining_records = props.num_of_records
is_last = (idx == len(dset_props) - 1)
next_record_size = None if is_last else dset_props[idx + 1].record_size
if start is None:
start = (dset_name, 0)
while partition_free_space >= record_size and remaining_records >= 1:
num_fitting_records = int(partition_free_space / record_size)
num_records_taken = min(remaining_records, num_fitting_records)
end_index = num_records_taken if dset_name != start[0] else start[1] + num_records_taken
end = (dset_name, end_index)
if num_records_taken == num_fitting_records: # Partition is going to be full afterwards
self._partitions.append((start, end))
partition_free_space = self._partition_size
if end[1] == num_records: # Data set is exhausted
start = None
break
else: # Next partition starts within the same data set
start = end
elif num_records_taken == remaining_records and is_last: # All data sets are partitioned
self._partitions.append((start, end))
break
else:
partition_free_space -= record_size * num_records_taken
if partition_free_space < next_record_size:
self._partitions.append((start, end))
start = None
partition_free_space = self._partition_size
break
remaining_records -= num_records_taken
if self._shuffle:
_random_state(self._seed).shuffle(self._partitions)
def _scan(self):
dset_props = []
for dset_name in self._data_sets:
dtype_size = self._file[dset_name].dtype.itemsize
if len(self._file[dset_name]) == 0:
continue
num_records, items_per_record = self._file[dset_name].shape
record_size = dtype_size * items_per_record
if record_size > self._partition_size:
raise ValueError('Records in "{0}" are larger than the partition size'.format(dset_name))
dset_props.append(DataSetProperties(dset_name, num_records, record_size))
return dset_props
@staticmethod
def _parse_partition_size(partition_size):
units = {
'k': 1024,
'm': 1024 * 1024,
'g': 1024 * 1024 * 1024
}
match = PartitioningFeatureIterator.PARTITION_SIZE_PATTERN.fullmatch(str(partition_size))
if match is None:
raise ValueError('Invalid partition size: {0}'.format(partition_size))
groups = match.groups()
if groups[2] is None: # no units
return int(float(groups[0])) # silently dropping the float, because byte is the smallest unit)
return int(float(groups[0]) * units[groups[2].lower()])
@staticmethod
def _filter_data_sets(data_sets, includes=None, excludes=None):
if includes is None:
includes = frozenset()
else:
includes = frozenset(includes)
if excludes is None:
excludes = frozenset()
else:
excludes = frozenset(excludes)
if len(includes) > 0:
return [data_set for data_set in data_sets if data_set in includes]
return [data_set for data_set in data_sets if data_set not in excludes]
class DataSetProperties:
def __init__(self, name, num_of_records, record_size):
self.name = name
self.num_of_records = num_of_records
self.record_size = record_size
def __repr__(self):
return 'DataSetProperties({0}, {1}, {2})'.format(self.name, self.num_of_records, self.record_size)
class Partition:
def __init__(self, slices, shuffle=True, seed=None):
self._slices = slices
self._total_length = 0
for item in slices:
self._total_length += item.length
self._index = 0
if shuffle:
self._elements = _random_state(seed).permutation(self._total_length)
else:
self._elements = np.arange(0, self._total_length)
def __iter__(self):
return self
def __next__(self):
if self._index == self._total_length:
raise StopIteration()
index = self._elements[self._index]
for item in self._slices:
if index >= item.length:
index -= item.length
continue
self._index += 1
# emits triplet (data set's name, original index of feature within data set, feature)
return item.data_set_name, item.start_index + index, item.data[index]
def has_next(self):
return self._index < self._total_length
class DataSetSlice:
def __init__(self, data_set_name, start_index, data):
self.data_set_name = data_set_name
self.start_index = start_index
self.length = len(data)
self.data = data
def _random_state(seed=None):
random_state = np.random.RandomState()
if seed is not None:
random_state.seed(seed)
return random_state