import numpy as np
import time
from synctoolbox.dtw.anchor import derive_anchors_from_projected_alignment, derive_neighboring_anchors, \
project_alignment_on_a_new_feature_rate
from synctoolbox.dtw.utils import build_path_from_warping_paths, compute_cost_matrices_between_anchors, \
compute_warping_paths_from_cost_matrices, find_anchor_indices_in_warping_path
from synctoolbox.dtw.visualization import sync_visualize_step1, sync_visualize_step2
from synctoolbox.feature.utils import smooth_downsample_feature, normalize_feature
[docs]def sync_via_mrmsdtw(f_chroma1: np.ndarray,
f_chroma2: np.ndarray,
f_onset1: np.ndarray = None,
f_onset2: np.ndarray = None,
input_feature_rate: float = 50,
step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32),
step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
threshold_rec: int = 10000, win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]),
downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]),
verbose: bool = False,
dtw_implementation: str = 'synctoolbox',
normalize_chroma: bool = True,
chroma_norm_ord: int = 2,
chroma_norm_threshold: float = 0.001):
"""Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features.
MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint
regions defined by the alignment found on the previous, coarser level.
If onset features are provided, these are used on the finest level in addition to chroma
to provide higher synchronization accuracy.
Parameters
----------
f_chroma1 : np.ndarray [shape=(12, N)]
Chroma feature matrix of the first sequence
f_chroma2 : np.ndarray [shape=(12, M)]
Chroma feature matrix of the second sequence
f_onset1 : np.ndarray [shape=(L, N)]
Onset feature matrix of the first sequence (optional, default: None)
f_onset2 : np.ndarray [shape=(L, M)]
Onset feature matrix of the second sequence (optional, default: None)
input_feature_rate: float
Input feature rate of the chroma features (default: 50)
step_sizes: np.ndarray
DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]]))
step_weights: np.ndarray
DTW step weights (np.array([1.0, 1.0, 1.0]))
threshold_rec: int
Defines the maximum area that is spanned by the rectangle of two
consecutive elements in the alignment (default: 10000)
win_len_smooth : np.ndarray
Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1]))
downsamp_smooth : np.ndarray
Downsampling factors (default: np.array([50, 25, 5, 1]))
verbose : bool
Set `True` for visualization (default: False)
dtw_implementation : str
DTW implementation, librosa or synctoolbox (default: synctoolbox)
normalize_chroma : bool
Set `True` to normalize input chroma features after each downsampling
and smoothing operation.
chroma_norm_ord: int
Order of chroma normalization, relevant if ``normalize_chroma`` is True.
(default: 2)
chroma_norm_threshold: float
If the norm falls below threshold for a feature vector, then the
normalized feature vector is set to be the unit vector. Relevant, if
``normalize_chroma`` is True (default: 0.001)
Returns
-------
alignment : np.ndarray [shape=(2, T)]
Resulting warping path
"""
# If onset features are given as input, high resolution MrMsDTW is activated.
high_res = False
if f_onset1 is not None and f_onset2 is not None:
high_res = True
if high_res and (f_chroma1.shape[1] != f_onset1.shape[1] or f_chroma2.shape[1] != f_onset2.shape[1]):
raise ValueError('Chroma and onset features must be of the same length.')
if downsamp_smooth[-1] != 1 or win_len_smooth[-1] != 1:
raise ValueError('The downsampling factor of the last iteration must be equal to 1, i.e.'
'at the last iteration, it is computed at the input feature rate!')
num_iterations = win_len_smooth.shape[0]
cost_matrix_size_old = tuple()
feature_rate_old = input_feature_rate / downsamp_smooth[0]
alignment = None
total_computation_time = 0.0
for it in range(num_iterations):
tic1 = time.perf_counter()
# Smooth and downsample given raw features
f_chroma1_cur, _ = smooth_downsample_feature(f_chroma1,
input_feature_rate=input_feature_rate,
win_len_smooth=win_len_smooth[it],
downsamp_smooth=downsamp_smooth[it])
f_chroma2_cur, feature_rate_new = smooth_downsample_feature(f_chroma2,
input_feature_rate=input_feature_rate,
win_len_smooth=win_len_smooth[it],
downsamp_smooth=downsamp_smooth[it])
if normalize_chroma:
f_chroma1_cur = normalize_feature(f_chroma1_cur,
norm_ord=chroma_norm_ord,
threshold=chroma_norm_threshold)
f_chroma2_cur = normalize_feature(f_chroma2_cur,
norm_ord=chroma_norm_ord,
threshold=chroma_norm_threshold)
# Project path onto new resolution
cost_matrix_size_new = (f_chroma1_cur.shape[1], f_chroma2_cur.shape[1])
if alignment is None:
# Initialize the alignment with the start and end frames of the feature sequence
anchors = np.array([[0, f_chroma1_cur.shape[1] - 1], [0, f_chroma2_cur.shape[1] - 1]])
else:
projected_alignment = project_alignment_on_a_new_feature_rate(alignment=alignment,
feature_rate_old=feature_rate_old,
feature_rate_new=feature_rate_new,
cost_matrix_size_old=cost_matrix_size_old,
cost_matrix_size_new=cost_matrix_size_new)
anchors = derive_anchors_from_projected_alignment(projected_alignment=projected_alignment,
threshold=threshold_rec)
# Cost matrix and warping path computation
if high_res and it == num_iterations - 1:
# Compute cost considering chroma and pitch onset features and alignment only in the last iteration,
# where the features are at the finest level.
cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
f_chroma2=f_chroma2_cur,
f_onset1=f_onset1,
f_onset2=f_onset2,
anchors=anchors)
else:
cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
f_chroma2=f_chroma2_cur,
anchors=anchors)
wp_list = compute_warping_paths_from_cost_matrices(cost_matrices_step1,
step_sizes=step_sizes,
step_weights=step_weights,
implementation=dtw_implementation)
# Concatenate warping paths
wp = build_path_from_warping_paths(warping_paths=wp_list,
anchors=anchors)
anchors_step1 = None
wp_step1 = None
num_rows_step1 = 0
num_cols_step1 = 0
ax = None
toc1 = time.perf_counter()
if verbose and cost_matrices_step1 is not None:
anchors_step1 = np.array(anchors, copy=True)
wp_step1 = np.array(wp, copy=True)
num_rows_step1, num_cols_step1 = np.sum(np.array([dtw_mat.shape for dtw_mat in cost_matrices_step1], int),
axis=0)
fig, ax = sync_visualize_step1(cost_matrices_step1,
num_rows_step1,
num_cols_step1,
anchors,
wp)
tic2 = time.perf_counter()
# Compute neighboring anchors and refine alignment using local path between neighboring anchors
anchor_indices_in_warping_path = find_anchor_indices_in_warping_path(wp, anchors=anchors)
# Compute neighboring anchors for refinement
neighboring_anchors, neighboring_anchor_indices = \
derive_neighboring_anchors(wp, anchor_indices=anchor_indices_in_warping_path)
if neighboring_anchor_indices.shape[0] > 1 \
and it == num_iterations - 1 and high_res:
cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
f_chroma2=f_chroma2_cur,
f_onset1=f_onset1,
f_onset2=f_onset2,
anchors=neighboring_anchors)
else:
cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
f_chroma2=f_chroma2_cur,
anchors=neighboring_anchors)
wp_list_refine = compute_warping_paths_from_cost_matrices(cost_matrices=cost_matrices_step2,
step_sizes=step_sizes,
step_weights=step_weights,
implementation=dtw_implementation)
wp = __refine_wp(wp, anchors, wp_list_refine, neighboring_anchors, neighboring_anchor_indices)
toc2 = time.perf_counter()
computation_time_it = toc2 - tic2 + toc1 - tic1
total_computation_time += computation_time_it
alignment = wp
feature_rate_old = feature_rate_new
cost_matrix_size_old = cost_matrix_size_new
if verbose and cost_matrices_step2 is not None:
sync_visualize_step2(ax,
cost_matrices_step2,
wp,
wp_step1,
num_rows_step1,
num_cols_step1,
anchors_step1,
neighboring_anchors)
print('Level {} computation time: {:.2f} seconds'.format(it, computation_time_it))
if verbose:
print('Computation time of MrMsDTW: {:.2f} seconds'.format(total_computation_time))
return alignment
def __refine_wp(wp: np.ndarray,
anchors: np.ndarray,
wp_list_refine: list,
neighboring_anchors: np.ndarray,
neighboring_anchor_indices: np.ndarray):
wp_length = wp[:, neighboring_anchor_indices[-1]:].shape[1]
last_list = wp[:, neighboring_anchor_indices[-1]:] - np.tile(
wp[:, neighboring_anchor_indices[-1]].reshape(-1, 1), wp_length)
wp_list_tmp = [wp[:, :neighboring_anchor_indices[0] + 1]] + wp_list_refine + [last_list]
A_tmp = np.concatenate([anchors[:, 0].reshape(-1, 1), neighboring_anchors, anchors[:, -1].reshape(-1, 1)],
axis=1)
wp_res = build_path_from_warping_paths(warping_paths=wp_list_tmp,
anchors=A_tmp)
return wp_res