Source code for synctoolbox.dtw.core

import librosa
from numba import jit
import numpy as np


@jit(nopython=True, cache=True)
def __C_to_DE(C: np.ndarray = None,
              dn: np.ndarray = np.array([1, 1, 0], np.int64),
              dm: np.ndarray = np.array([1, 0, 1], np.int64),
              dw: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
              sub_sequence: bool = False) -> (np.ndarray, np.ndarray):
    """This function computes the accumulated cost matrix D and the step index
    matrix E.

    Parameters
    ----------
    C : np.ndarray (np.float32 / np.float64) [shape=(N, M)]
        Cost matrix

    dn : np.ndarray (np.int64) [shape=(1, S)]
        Integer array defining valid steps (N direction of C), default: [1, 1, 0]

    dm : np.ndarray (np.int64) [shape=(1, S)]
        Integer array defining valid steps (M direction of C), default: [1, 0, 1]

    dw : np.ndarray (np.float64) [shape=(1, S)]
        Double array defining the weight of the each step, default: [1.0, 1.0, 1.0]

    sub_sequence : bool
        Set `True` for SubSequence DTW, default: False

    Returns
    -------
    D : np.ndarray (np.float64) [shape=(N, M)]
        Accumulated cost matrix of type double

    E : np.ndarray (np.int64) [shape=(N, M)]
        Step index matrix.
        E[n, m] holds the index of the step take to determine the value of D[n, m].
        If E[n, m] is zero, no valid step was possible.
        NaNs in the cost matrix are preserved, invalid fields in the cost matrix are NaNs.
    """
    if C is None:
        raise ValueError('C must be a 2D numpy array.')

    N, M = C.shape
    S = dn.size

    if S != dm.size or S != dw.size:
        raise ValueError('The parameters dn,dm, and dw must be of equal length.')

    # calc bounding box size of steps
    sbbn = np.max(dn)
    sbbm = np.max(dm)

    # initialize E
    E = np.zeros((N, M), np.int64) - 1

    # initialize extended D matrix
    D = np.ones((sbbn + N, sbbm + M), np.float64) * np.inf

    if sub_sequence:
        for m in range(M):
            D[sbbn, sbbm + m] = C[0, m]
    else:
        D[sbbn, sbbm] = C[0, 0]

    # accumulate
    for m in range(sbbm, M + sbbm):
        for n in range(sbbn, N + sbbn):
            for s in range(S):
                cost = D[n - dn[s], m - dm[s]] + C[n - sbbn, m - sbbm] * dw[s]
                if cost < D[n, m]:
                    D[n, m] = cost
                    E[n - sbbn, m - sbbm] = s

    D = D[sbbn: N + sbbn, sbbm: M + sbbm]

    return D, E


@jit(nopython=True, cache=True)
def __E_to_warping_path(E: np.ndarray,
                        dn: np.ndarray = np.array([1, 1, 0], np.int64),
                        dm: np.ndarray = np.array([1, 0, 1], np.int64),
                        sub_sequence: bool = False,
                        end_index: int = -1) -> np.ndarray:
    """This function computes a warping path based on the provided matrix E
    and the allowed steps.

    Parameters
    ----------
    E : np.ndarray (np.int64) [shape=(N, M)]
        Step index matrix

    dn : np.ndarray (np.int64) [shape=(1, S)]
        Integer array defining valid steps (N direction of C), default: [1, 1, 0]

    dm : np.ndarray (np.int64) [shape=(1, S)]
         Integer array defining valid steps (M direction of C), default: [1, 0, 1]

    sub_sequence : bool
        Set `True` for SubSequence DTW, default: False

    end_index : int
        In case of SubSequence DTW

    Returns
    -------
    warping_path : np.ndarray (np.int64) [shape=(2, M)]
        Resulting optimal warping path
    """
    N, M = E.shape

    if not sub_sequence and end_index == -1:
        end_index = M - 1

    m = end_index
    n = N - 1

    warping_path = np.zeros((2, n + m + 1))

    index = 0

    def _loop(m, n, index):
        warping_path[:, index] = np.array([n, m])
        step_index = E[n, m]
        m -= dm[step_index]
        n -= dn[step_index]
        index += 1
        return m, n, index

    if sub_sequence:
        while n > 0:
            m, n, index = _loop(m, n, index)
    else:
        while m > 0 or n > 0:
            m, n, index = _loop(m, n, index)

    warping_path[:, index] = np.array([n, m])
    warping_path = warping_path[:, index::-1]

    return warping_path


[docs]def compute_warping_path(C: np.ndarray, step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int64), step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), implementation: str = 'synctoolbox'): """Applies DTW on cost matrix C. Parameters ---------- C : np.ndarray (np.float32 / np.float64) [shape=(N, M)] Cost matrix step_sizes : np.ndarray (np.int64) [shape=(2, S)] Array of step sizes step_weights : np.ndarray (np.float64) [shape=(2, S)] Array of step weights implementation: str Choose among ``synctoolbox`` and ``librosa``. (default: ``synctoolbox``) Returns ------- D : np.ndarray (np.float64) [shape=(N, M)] Accumulated cost matrix E : np.ndarray (np.int64) [shape=(N, M)] Step index matrix wp : np.ndarray (np.int64) [shape=(2, M)] Warping path """ if implementation == 'librosa': D, wp, E = librosa.sequence.dtw(C=C, step_sizes_sigma=step_sizes, weights_add=np.array([0, 0, 0]), weights_mul=step_weights, return_steps=True, subseq=False) wp = wp[::-1].T elif implementation == 'synctoolbox': dn = step_sizes[:, 0] dm = step_sizes[:, 1] D, E = __C_to_DE(C, dn=dn, dm=dm, dw=step_weights, sub_sequence=False) wp = __E_to_warping_path(E=E, dn=dn, dm=dm, sub_sequence=False) else: raise NotImplementedError(f'No implementation found called {implementation}') return D, E, wp