Module stempeg.read

Writing module to load stems into numpy tensors.

Expand source code Browse git
# flake8: noqa
"""
Writing module to load stems into numpy tensors.


"""
from stempeg.write import FilesWriter
import numpy as np
import warnings
import ffmpeg
import pprint
from multiprocessing import Pool
import atexit
from functools import partial
import datetime as dt

class Reader(object):
    """Base class for reader

    Holds reader options
    """

    def __init__(self):
        pass


class StreamsReader(Reader):
    """Holding configuration for streams

    This is the default reader. Nothing to be hold
    """

    def __init__(self):
        pass


class ChannelsReader(Reader):
    """Using multichannels to multiplex to stems

    stems will be extracted from multichannel-pairs
    e.g. 8 channels will be converted to 4 stereo pairs


    Args:
        from_channels: int
            number of channels, defaults to `2`.
    """

    def __init__(self, nb_channels=2):
        self.nb_channels = nb_channels


def _read_ffmpeg(
    filename,
    sample_rate,
    metadata,
    start,
    duration,
    dtype,
    stem_idx
):
    """Loading data usingg ffmpeg and numpy

    Args:
        filename (str): filename path
        sample_rate (int): sample rate
        metadata (Info): metadata info object needed to
            know the channel configuration in advance
        start (float): start position in seconds
        duration (float): duration in seconds
        dtype (numpy.dtype): Type of audio array to be casted into
        stem_idx (int): stream id

    Returns:
        (array_like): numpy audio array
    """
    channels = metadata.channels(stem_idx)
    output_kwargs = {'format': 'f32le', 'ar': sample_rate}
    if duration is not None:
        output_kwargs['t'] = str(dt.timedelta(seconds=duration))
    if start is not None:
        output_kwargs['ss'] = str(dt.timedelta(seconds=start))

    output_kwargs['map'] = '0:' + str(stem_idx)
    process = (
        ffmpeg
        .input(filename)
        .output('pipe:', **output_kwargs)
        .run_async(pipe_stdout=True, pipe_stderr=True))
    buffer, _ = process.communicate()
    waveform = np.frombuffer(buffer, dtype='<f4').reshape(-1, channels)
    if not waveform.dtype == np.dtype(dtype):
        waveform = waveform.astype(dtype)
    return waveform


def read_stems(
    filename,
    start=None,
    duration=None,
    stem_id=None,
    always_3d=False,
    dtype=np.float32,
    info=None,
    sample_rate=None,
    reader=StreamsReader(),
    multiprocess=False
):
    """Read stems into numpy tensor

    This function can read both, multi-stream and single stream audio files.
    If used for reading normal audio, the output is a 1d or 2d (mono/stereo)
    array. When multiple streams are read, the output is a 3d array.

    An option stems_from_multichannel was added to load stems that are
    aggregated into multichannel audio (concatenation of pairs of
    stereo channels), see more info on audio `stempeg.write.write_stems`.

    By default `read_stems` assumes that multiple substreams were used to
    save the stem file (`reader=stempeg.StreamsReader()`). To support
    multistream files on audio formats that do not support multiple streams
    (e.g. WAV), streams can be mapped to multiple pairs of channels. In that
    case, `stempeg.ChannelsReader()`, can be passed. Also see:
    `stempeg.write.ChannelsWriter`.


    Args:
        filename (str): filename of the audio file to load data from.
        start (float): Start offset to load from in seconds.
        duration (float): Duration to load in seconds.
        stem_id: int, optional
            substream id, defauls to `None` (all substreams are loaded).
        always_3d: bool, optional
            By default, reading a single-stream audio file will return a
            two-dimensional array.  With ``always_3d=True``, audio data is
            always returned as a three-dimensional array, even if the audio
            file has only one stream.
        dtype: np.dtype, optional.
            Numpy data type to use, default to `np.float32`.
        info: Info, Optional
            Pass ffmpeg `Info` object to reduce number of os calls on file.
            This can be used e.g. the sample rate and length of a track is
            already known in advance. Useful for ML training where the
            info objects can be pre-processed, thus audio loading can
            be speed up.
        sample_rate: float, optional
            Sample rate of returned audio. Defaults to `None` which results in
            the sample rate returned from the mixture.
        reader: Reader
            Holds parameters for the reading method. One of the following:
                `StreamsReader(...)`
                    Read from a single multistream audio (default).
                `ChannelsReader(...)`
                    Read/demultiplexed from multiple channels.
        multiprocess: bool
            Applys multi-processing for reading substreams in parallel to
            speed up reading. Defaults to `True`

    Returns:
        stems: array_like
            stems tensor of `shape=(stem x samples x channels)`
        rate: float
            sample rate

    Shape:
        - Output: `[S, T, C']`, with
            `S`, if the file has multiple streams and,
            `C` is the audio has multiple channels.

    >>> audio, sample_rate = stempeg.read_stems("test.stem.mp4")
    >>> audio.shape
    [5, 220500, 2]
    >>> sample_rate
    44100
    """
    if multiprocess:
        _pool = Pool()
        atexit.register(_pool.close)
    else:
        _pool = None

    if not isinstance(filename, str):
        filename = filename.decode()

    # use ffprobe to get info object (samplerate, lengths)
    try:
        if info is None:
            metadata = Info(filename)
        else:
            metadata = info

        ffmpeg.probe(filename)
    except ffmpeg._run.Error as e:
        raise Warning(
            'An error occurs with ffprobe (see ffprobe output below)\n\n{}'
            .format(e.stderr.decode()))

    # check number of audio streams in file
    if 'streams' not in metadata.info or metadata.nb_audio_streams == 0:
        raise Warning('No audio stream found.')

    # using ChannelReader would ignore substreams
    if isinstance(reader, ChannelsReader):
        if metadata.nb_audio_streams != 1:
            raise Warning(
                'stempeg.ChannelsReader() only processes the first substream.'
            )
        else:
            if metadata.audio_streams[0][
                'channels'
            ] % reader.nb_channels != 0:
                raise Warning('Stems should be encoded as multi-channel.')
            else:
                substreams = 0
    else:
        if stem_id is not None:
            substreams = stem_id
        else:
            substreams = metadata.audio_stream_idx()

    if not isinstance(substreams, list):
        substreams = [substreams]

    # if not, get sample rate from mixture
    if sample_rate is None:
        sample_rate = metadata.sample_rate(0)

    stems = []

    if _pool:
        results = _pool.map_async(
            partial(
                _read_ffmpeg,
                filename,
                sample_rate,
                metadata,
                start,
                duration,
                dtype
            ),
            substreams,
            callback=stems.extend
        )
        results.wait()
        _pool.terminate()
    else:
        stems = [
            _read_ffmpeg(
                filename,
                sample_rate,
                metadata,
                start,
                duration,
                dtype,
                stem_idx
            )
            for stem_idx in substreams
        ]
    stem_durations = np.array([t.shape[0] for t in stems])
    if not (stem_durations == stem_durations[0]).all():
        warnings.warning("Stems differ in length and were shortend")
        min_length = np.min(stem_durations)
        stems = [t[:min_length, :] for t in stems]

    # aggregate list of stems to numpy tensor
    stems = np.array(stems)

    # If ChannelsReader is used, demultiplex from channels
    if isinstance(reader, (ChannelsReader)) and stems.shape[-1] > 1:
        stems = stems.transpose(1, 0, 2)
        stems = stems.reshape(
            stems.shape[0], stems.shape[1], -1, reader.nb_channels
        )
        stems = stems.transpose(2, 0, 3, 1)[..., 0]

    if not always_3d:
        stems = np.squeeze(stems)
    return stems, sample_rate


class Info(object):
    """Audio properties that hold a number of metadata.

    The object is created when can be used when `read_stems` is called.
    This is can be passed, to `read_stems` to reduce loading time.
    """

    def __init__(self, filename):
        super(Info, self).__init__()
        self.info = ffmpeg.probe(filename)
        self.audio_streams = [
            stream for stream in self.info['streams']
            if stream['codec_type'] == 'audio'
        ]

    @property
    def nb_audio_streams(self):
        """Returns the number of audio substreams"""
        return len(self.audio_streams)

    @property
    def nb_samples_streams(self):
        """Returns a list of number of samples for each substream"""
        return [self.samples(k) for k, stream in enumerate(self.audio_streams)]

    @property
    def duration_streams(self):
        """Returns a list of durations (in s) for all substreams"""
        return [
            self.duration(k) for k, stream in enumerate(self.audio_streams)
        ]

    @property
    def title_streams(self):
        """Returns stream titles for all substreams"""
        return [
            stream['tags'].get('handler_name')
            for stream in self.audio_streams
        ]

    def audio_stream_idx(self):
        """Returns audio substream indices"""
        return [s['index'] for s in self.audio_streams]

    def samples(self, idx):
        """Returns the number of samples for a stream index"""
        return int(self.audio_streams[idx]['duration_ts'])

    def duration(self, idx):
        """Returns the duration (in seconds) for a stream index"""
        return float(self.audio_streams[idx]['duration'])

    def title(self, idx):
        """Return the `handler_name` metadata for a given stream index"""
        return self.audio_streams[idx]['tags']['handler_name']

    def rate(self, idx):
        # deprecated from older stempeg version
        return self.sample_rate(idx)

    def sample_rate(self, idx):
        """Return sample rate for a given substream"""
        return int(self.audio_streams[idx]['sample_rate'])

    def channels(self, idx):
        """Returns the number of channels for a gvien substream"""
        return int(self.audio_streams[idx]['channels'])

    def __repr__(self):
        """Print stream information"""
        return pprint.pformat(self.audio_streams)

Functions

def read_stems(filename, start=None, duration=None, stem_id=None, always_3d=False, dtype=numpy.float32, info=None, sample_rate=None, reader=<stempeg.read.StreamsReader object>, multiprocess=False)

Read stems into numpy tensor

This function can read both, multi-stream and single stream audio files. If used for reading normal audio, the output is a 1d or 2d (mono/stereo) array. When multiple streams are read, the output is a 3d array.

An option stems_from_multichannel was added to load stems that are aggregated into multichannel audio (concatenation of pairs of stereo channels), see more info on audio write_stems().

By default read_stems() assumes that multiple substreams were used to save the stem file (reader=stempeg.StreamsReader()). To support multistream files on audio formats that do not support multiple streams (e.g. WAV), streams can be mapped to multiple pairs of channels. In that case, stempeg.ChannelsReader(), can be passed. Also see: ChannelsWriter.

Args

filename : str
filename of the audio file to load data from.
start : float
Start offset to load from in seconds.
duration : float
Duration to load in seconds.
stem_id
int, optional substream id, defauls to None (all substreams are loaded).
always_3d
bool, optional By default, reading a single-stream audio file will return a two-dimensional array. With always_3d=True, audio data is always returned as a three-dimensional array, even if the audio file has only one stream.
dtype
np.dtype, optional. Numpy data type to use, default to np.float32.
info
Info, Optional Pass ffmpeg Info object to reduce number of os calls on file. This can be used e.g. the sample rate and length of a track is already known in advance. Useful for ML training where the info objects can be pre-processed, thus audio loading can be speed up.
sample_rate
float, optional Sample rate of returned audio. Defaults to None which results in the sample rate returned from the mixture.
reader
Reader Holds parameters for the reading method. One of the following: StreamsReader(…) Read from a single multistream audio (default). ChannelsReader(…) Read/demultiplexed from multiple channels.
multiprocess
bool Applys multi-processing for reading substreams in parallel to speed up reading. Defaults to True

Returns

stems
array_like stems tensor of shape=(stem x samples x channels)
rate
float sample rate

Shape

  • Output: [S, T, C'], with S, if the file has multiple streams and, C is the audio has multiple channels.
>>> audio, sample_rate = stempeg.read_stems("test.stem.mp4")
>>> audio.shape
[5, 220500, 2]
>>> sample_rate
44100
Expand source code Browse git
def read_stems(
    filename,
    start=None,
    duration=None,
    stem_id=None,
    always_3d=False,
    dtype=np.float32,
    info=None,
    sample_rate=None,
    reader=StreamsReader(),
    multiprocess=False
):
    """Read stems into numpy tensor

    This function can read both, multi-stream and single stream audio files.
    If used for reading normal audio, the output is a 1d or 2d (mono/stereo)
    array. When multiple streams are read, the output is a 3d array.

    An option stems_from_multichannel was added to load stems that are
    aggregated into multichannel audio (concatenation of pairs of
    stereo channels), see more info on audio `stempeg.write.write_stems`.

    By default `read_stems` assumes that multiple substreams were used to
    save the stem file (`reader=stempeg.StreamsReader()`). To support
    multistream files on audio formats that do not support multiple streams
    (e.g. WAV), streams can be mapped to multiple pairs of channels. In that
    case, `stempeg.ChannelsReader()`, can be passed. Also see:
    `stempeg.write.ChannelsWriter`.


    Args:
        filename (str): filename of the audio file to load data from.
        start (float): Start offset to load from in seconds.
        duration (float): Duration to load in seconds.
        stem_id: int, optional
            substream id, defauls to `None` (all substreams are loaded).
        always_3d: bool, optional
            By default, reading a single-stream audio file will return a
            two-dimensional array.  With ``always_3d=True``, audio data is
            always returned as a three-dimensional array, even if the audio
            file has only one stream.
        dtype: np.dtype, optional.
            Numpy data type to use, default to `np.float32`.
        info: Info, Optional
            Pass ffmpeg `Info` object to reduce number of os calls on file.
            This can be used e.g. the sample rate and length of a track is
            already known in advance. Useful for ML training where the
            info objects can be pre-processed, thus audio loading can
            be speed up.
        sample_rate: float, optional
            Sample rate of returned audio. Defaults to `None` which results in
            the sample rate returned from the mixture.
        reader: Reader
            Holds parameters for the reading method. One of the following:
                `StreamsReader(...)`
                    Read from a single multistream audio (default).
                `ChannelsReader(...)`
                    Read/demultiplexed from multiple channels.
        multiprocess: bool
            Applys multi-processing for reading substreams in parallel to
            speed up reading. Defaults to `True`

    Returns:
        stems: array_like
            stems tensor of `shape=(stem x samples x channels)`
        rate: float
            sample rate

    Shape:
        - Output: `[S, T, C']`, with
            `S`, if the file has multiple streams and,
            `C` is the audio has multiple channels.

    >>> audio, sample_rate = stempeg.read_stems("test.stem.mp4")
    >>> audio.shape
    [5, 220500, 2]
    >>> sample_rate
    44100
    """
    if multiprocess:
        _pool = Pool()
        atexit.register(_pool.close)
    else:
        _pool = None

    if not isinstance(filename, str):
        filename = filename.decode()

    # use ffprobe to get info object (samplerate, lengths)
    try:
        if info is None:
            metadata = Info(filename)
        else:
            metadata = info

        ffmpeg.probe(filename)
    except ffmpeg._run.Error as e:
        raise Warning(
            'An error occurs with ffprobe (see ffprobe output below)\n\n{}'
            .format(e.stderr.decode()))

    # check number of audio streams in file
    if 'streams' not in metadata.info or metadata.nb_audio_streams == 0:
        raise Warning('No audio stream found.')

    # using ChannelReader would ignore substreams
    if isinstance(reader, ChannelsReader):
        if metadata.nb_audio_streams != 1:
            raise Warning(
                'stempeg.ChannelsReader() only processes the first substream.'
            )
        else:
            if metadata.audio_streams[0][
                'channels'
            ] % reader.nb_channels != 0:
                raise Warning('Stems should be encoded as multi-channel.')
            else:
                substreams = 0
    else:
        if stem_id is not None:
            substreams = stem_id
        else:
            substreams = metadata.audio_stream_idx()

    if not isinstance(substreams, list):
        substreams = [substreams]

    # if not, get sample rate from mixture
    if sample_rate is None:
        sample_rate = metadata.sample_rate(0)

    stems = []

    if _pool:
        results = _pool.map_async(
            partial(
                _read_ffmpeg,
                filename,
                sample_rate,
                metadata,
                start,
                duration,
                dtype
            ),
            substreams,
            callback=stems.extend
        )
        results.wait()
        _pool.terminate()
    else:
        stems = [
            _read_ffmpeg(
                filename,
                sample_rate,
                metadata,
                start,
                duration,
                dtype,
                stem_idx
            )
            for stem_idx in substreams
        ]
    stem_durations = np.array([t.shape[0] for t in stems])
    if not (stem_durations == stem_durations[0]).all():
        warnings.warning("Stems differ in length and were shortend")
        min_length = np.min(stem_durations)
        stems = [t[:min_length, :] for t in stems]

    # aggregate list of stems to numpy tensor
    stems = np.array(stems)

    # If ChannelsReader is used, demultiplex from channels
    if isinstance(reader, (ChannelsReader)) and stems.shape[-1] > 1:
        stems = stems.transpose(1, 0, 2)
        stems = stems.reshape(
            stems.shape[0], stems.shape[1], -1, reader.nb_channels
        )
        stems = stems.transpose(2, 0, 3, 1)[..., 0]

    if not always_3d:
        stems = np.squeeze(stems)
    return stems, sample_rate

Classes

class ChannelsReader (nb_channels=2)

Using multichannels to multiplex to stems

stems will be extracted from multichannel-pairs e.g. 8 channels will be converted to 4 stereo pairs

Args

from_channels
int number of channels, defaults to 2.
Expand source code Browse git
class ChannelsReader(Reader):
    """Using multichannels to multiplex to stems

    stems will be extracted from multichannel-pairs
    e.g. 8 channels will be converted to 4 stereo pairs


    Args:
        from_channels: int
            number of channels, defaults to `2`.
    """

    def __init__(self, nb_channels=2):
        self.nb_channels = nb_channels

Ancestors

class Info (filename)

Audio properties that hold a number of metadata.

The object is created when can be used when read_stems() is called. This is can be passed, to read_stems() to reduce loading time.

Expand source code Browse git
class Info(object):
    """Audio properties that hold a number of metadata.

    The object is created when can be used when `read_stems` is called.
    This is can be passed, to `read_stems` to reduce loading time.
    """

    def __init__(self, filename):
        super(Info, self).__init__()
        self.info = ffmpeg.probe(filename)
        self.audio_streams = [
            stream for stream in self.info['streams']
            if stream['codec_type'] == 'audio'
        ]

    @property
    def nb_audio_streams(self):
        """Returns the number of audio substreams"""
        return len(self.audio_streams)

    @property
    def nb_samples_streams(self):
        """Returns a list of number of samples for each substream"""
        return [self.samples(k) for k, stream in enumerate(self.audio_streams)]

    @property
    def duration_streams(self):
        """Returns a list of durations (in s) for all substreams"""
        return [
            self.duration(k) for k, stream in enumerate(self.audio_streams)
        ]

    @property
    def title_streams(self):
        """Returns stream titles for all substreams"""
        return [
            stream['tags'].get('handler_name')
            for stream in self.audio_streams
        ]

    def audio_stream_idx(self):
        """Returns audio substream indices"""
        return [s['index'] for s in self.audio_streams]

    def samples(self, idx):
        """Returns the number of samples for a stream index"""
        return int(self.audio_streams[idx]['duration_ts'])

    def duration(self, idx):
        """Returns the duration (in seconds) for a stream index"""
        return float(self.audio_streams[idx]['duration'])

    def title(self, idx):
        """Return the `handler_name` metadata for a given stream index"""
        return self.audio_streams[idx]['tags']['handler_name']

    def rate(self, idx):
        # deprecated from older stempeg version
        return self.sample_rate(idx)

    def sample_rate(self, idx):
        """Return sample rate for a given substream"""
        return int(self.audio_streams[idx]['sample_rate'])

    def channels(self, idx):
        """Returns the number of channels for a gvien substream"""
        return int(self.audio_streams[idx]['channels'])

    def __repr__(self):
        """Print stream information"""
        return pprint.pformat(self.audio_streams)

Instance variables

var duration_streams

Returns a list of durations (in s) for all substreams

Expand source code Browse git
@property
def duration_streams(self):
    """Returns a list of durations (in s) for all substreams"""
    return [
        self.duration(k) for k, stream in enumerate(self.audio_streams)
    ]
var nb_audio_streams

Returns the number of audio substreams

Expand source code Browse git
@property
def nb_audio_streams(self):
    """Returns the number of audio substreams"""
    return len(self.audio_streams)
var nb_samples_streams

Returns a list of number of samples for each substream

Expand source code Browse git
@property
def nb_samples_streams(self):
    """Returns a list of number of samples for each substream"""
    return [self.samples(k) for k, stream in enumerate(self.audio_streams)]
var title_streams

Returns stream titles for all substreams

Expand source code Browse git
@property
def title_streams(self):
    """Returns stream titles for all substreams"""
    return [
        stream['tags'].get('handler_name')
        for stream in self.audio_streams
    ]

Methods

def audio_stream_idx(self)

Returns audio substream indices

Expand source code Browse git
def audio_stream_idx(self):
    """Returns audio substream indices"""
    return [s['index'] for s in self.audio_streams]
def channels(self, idx)

Returns the number of channels for a gvien substream

Expand source code Browse git
def channels(self, idx):
    """Returns the number of channels for a gvien substream"""
    return int(self.audio_streams[idx]['channels'])
def duration(self, idx)

Returns the duration (in seconds) for a stream index

Expand source code Browse git
def duration(self, idx):
    """Returns the duration (in seconds) for a stream index"""
    return float(self.audio_streams[idx]['duration'])
def rate(self, idx)
Expand source code Browse git
def rate(self, idx):
    # deprecated from older stempeg version
    return self.sample_rate(idx)
def sample_rate(self, idx)

Return sample rate for a given substream

Expand source code Browse git
def sample_rate(self, idx):
    """Return sample rate for a given substream"""
    return int(self.audio_streams[idx]['sample_rate'])
def samples(self, idx)

Returns the number of samples for a stream index

Expand source code Browse git
def samples(self, idx):
    """Returns the number of samples for a stream index"""
    return int(self.audio_streams[idx]['duration_ts'])
def title(self, idx)

Return the handler_name metadata for a given stream index

Expand source code Browse git
def title(self, idx):
    """Return the `handler_name` metadata for a given stream index"""
    return self.audio_streams[idx]['tags']['handler_name']
class Reader

Base class for reader

Holds reader options

Expand source code Browse git
class Reader(object):
    """Base class for reader

    Holds reader options
    """

    def __init__(self):
        pass

Subclasses

class StreamsReader

Holding configuration for streams

This is the default reader. Nothing to be hold

Expand source code Browse git
class StreamsReader(Reader):
    """Holding configuration for streams

    This is the default reader. Nothing to be hold
    """

    def __init__(self):
        pass

Ancestors