# -*- coding: utf-8 -*-
# This source code is licensed under the BSD-style license found in the
# LICENSE.rst file in the root directory of this source tree.
# pyatsyn Copyright (c) <2023>, <Johnathan G Lyon>
# All rights reserved.
# Except where otherwise noted, ATSA and ATSH is Copyright (c) <2002-2004>
# <Oscar Pablo Di Liscia, Pete Moss, and Juan Pampin>
"""
Peak Tracking algorithms to assemble spectral trajectories
Peaks issued by the peak detection algorithm need to be connected and
translated into spectral trajectories. This process involves the evaluation
of the possible candidates to continue trajectories on a frame-by-frame basis.
This is done using tracks that keep information of recent average values for each
of the trajectory parameters. The length of the tracks is adjustable and has to
be tuned depending on the characteristics of the analyzed sound.
A Gale-Shapley stable matching algorithm is used to determine the best candidate pair using a the cost criteria:
:math:`cost = \\frac{|P_{freq} - T_{freq}| + \\alpha * |P_{smr} - T_{smr}|}{1 + \\alpha}`
where :math:`P_{freq}` is the candidate peak frequency, and :math:`P_{smr}` its SMR, :math:`T_{freq}`
is the track frequency, and :math:`T_{smr}` its SMR, both averaged over the track length (typically 3 frames).
:math:`\\alpha` is a coefficient controlling how much the SMR deviation affects the cost.
The use of the SMR continuation as a parameter for the peak tracking process is based upon
psychoacoustic temporal masking phenomena. Conceptually, we assume that masking profiles of
stable sinusoidal trajectories can only evolve at slow rate (no sudden changes). This is true for
analysis performed with hop sizes between 10 and 50 milliseconds, which is comparable to the
average duration of pre- and post-making effects.
New tracks get created from orphan peaks (the ones that were not incorporated to any existing tracks),
and tracks which couldn't find continuing peaks are set to sleep.
"""
from heapq import heappop, heappush
from queue import SimpleQueue
from math import tau, remainder
from numpy import zeros, matmul
[docs]def update_track_averages(tracks, track_length, frame_n, analysis_frames, beta = 0.0):
"""Function to update running averages of recent peaks
Using the list of current tracks, we use `track_length` frames to look back and update,
the average amp, frq, and smr values for the tracks.
NOTE: Tracks are updated directly without return value.
Parameters
----------
tracks : Iterable[:obj:`~pyatsyn.ats_structure.AtsPeak`]
iterable of tracks to update
track_length : int
how far back in time (in frames) to start average calculations
frame_n : int
the current frame
analysis_frames : Iterable[Iterable[:obj:`~pyatsyn.ats_structure.AtsPeak`]]
a running collection storing the :obj:`~pyatsyn.ats_structure.AtsPeak` objects at each frame in time
beta : float, optional
TOadditional bias for the immediately prior frames values when calculating smoothing trajectories (default: 0.0)
"""
frames = min(frame_n, track_length)
first_frame = frame_n - frames
for tk in tracks:
track = tk.track
frq_acc = 0
f = 0
amp_acc = 0
a = 0
smr_acc = 0
s = 0
last_frq = 0
last_amp = 0
last_smr = 0
for i in range(first_frame, frame_n):
l_peaks = analysis_frames[i]
peak = find_track_in_peaks(track, l_peaks)
if peak is not None:
if peak.frq > 0.0:
last_frq = peak.frq
frq_acc += peak.frq
f += 1
if peak.amp > 0.0:
last_amp = peak.amp
amp_acc += peak.amp
a += 1
if peak.smr > 0.0:
last_smr = peak.smr
smr_acc += peak.smr
s += 1
if f > 0:
tk.frq = ((1 - beta) * (frq_acc / f)) + (beta * last_frq)
if a > 0:
tk.amp = ((1 - beta) * (amp_acc / a)) + (beta * last_amp)
if s > 0:
tk.smr = ((1 - beta) * (smr_acc / s)) + (beta * last_smr)
[docs]def find_track_in_peaks(track, peaks):
"""Function to search a the first peak found tagged a given track ind
Parameters
----------
track : int
the track index to search for
peaks : Iterable[:obj:`~pyatsyn.ats_structure.AtsPeak`]
a collection of :obj:`~pyatsyn.ats_structure.AtsPeak` to search in
Returns
-------
:obj:`~pyatsyn.ats_structure.AtsPeak`
the first :obj:`~pyatsyn.ats_structure.AtsPeak` found in `peaks` that has a .track attribute matching `track`.
If no matches are found, None is returned.
"""
for pk in peaks:
if track == pk.track:
return pk
return None
[docs]def peak_tracking(tracks, peaks, frame_n, analysis_frames, sample_rate, hop_size, frequency_deviation = 0.45, SMR_continuity = 0.0, min_gap_length = 1):
"""Core function to coordinate peak tracking
This function coordinates the matching of new peaks with existing tracks using
an adaptation of the Gale-Shapley algorithm for stable matching. The algorithm is
gap-size aware and will monitor 'slept' tracks within the minimum gap distance as
candidates. Linear interpolation is used to fill the gaps for frequency and amplitude,
and a cubic polynomial interpolation for phase.
NOTE: Tracks, peaks, and analysis_frames are updated directly.
Parameters
----------
tracks : Iterable[:obj:`~pyatsyn.ats_structure.AtsPeak`]
collection of established tracks
peaks : Iterable[:obj:`~pyatsyn.ats_structure.AtsPeak`]
collection of candidate peaks to match
frame_n : int
the current frame
analysis_frames : Iterable[Iterable[:obj:`~pyatsyn.ats_structure.AtsPeak`]]
a running collection storing the :obj:`~pyatsyn.ats_structure.AtsPeak` objects at each frame in time
sample_rate : int
the sampling rate (in samples / s)
hop_size : int
the inter-frame distance (in samples)
frequency_deviation : float, optional
maximum relative frequency deviation used to constrain peak tracking matches (default: 0.45)
SMR_continuity : float, optional
percentage of SMR to use in cost calculations during peak tracking (default: 0.0)
min_gap_length : int
tracked partial gaps longer than this (in frames) will not be interpolated (default: 1)
"""
# state for costs
peak_costs = [[] for _ in peaks]
# calculate costs for valid peak/track pairs
for tk_ind, tk in enumerate(tracks):
if tk.asleep_for is None or tk.asleep_for < min_gap_length:
for pk_ind, pk in enumerate(peaks):
if are_valid_candidates(tk, pk, frequency_deviation):
cost = peak_dist(tk, pk, SMR_continuity)
heappush(peak_costs[pk_ind], MatchCost(cost, tk_ind))
# perform Gale-Shapley stable matching
peak_queue = SimpleQueue()
for ind in range(len(peaks)):
peak_queue.put(ind)
unmatched_peaks = []
track_matches = [None for _ in tracks]
while(not peak_queue.empty()):
pk_ind = peak_queue.get()
made_match = False
while(len(peak_costs[pk_ind]) > 0):
tk = heappop(peak_costs[pk_ind])
# if the track is unmatched
if track_matches[tk.index] is None:
# update match
track_matches[tk.index] = MatchCost(tk.cost, pk_ind)
made_match = True
break
# prior match is more costly, re-match
elif track_matches[tk.index].cost > tk.cost:
# kick previous match back onto queue
peak_queue.put(track_matches[tk.index].index)
# update match
track_matches[tk.index] = MatchCost(tk.cost, pk_ind)
made_match = True
break
# if we ran out of candidates or no match was made, the peak is unmatched
if not made_match:
unmatched_peaks.append(pk_ind)
unmatched_tracks = []
# process matches, fixing gaps
for tk_ind, tk_match in enumerate(track_matches):
if tk_match is None:
unmatched_tracks.append(tk_ind)
else:
# update matched peak with track number
pk = peaks[tk_match.index]
pk.track = tk_ind
tk = tracks[tk_ind]
# handle gap if any, with linear interpolation
if tk.asleep_for is not None:
interp_range = tk.asleep_for + 1
tk.duration += interp_range
pk.duration = tk.duration
frq_step = tk.frq - pk.frq
amp_step = tk.amp - pk.amp
smr_step = tk.smr - pk.smr
for i in range(1, interp_range): # NOTE: we'll walk backward from frame_n
new_pk = pk.clone()
mult = i / interp_range
new_pk.frq = (frq_step * mult) + pk.frq
new_pk.amp = (amp_step * mult) + pk.amp
new_pk.smr = (smr_step * mult) + pk.smr
samps_from_0_to_t = (interp_range + 1) * hop_size
i_samps_from_0 = hop_size * (interp_range - i)
new_pk.pha = phase_interp_cubic(tk.frq, pk.frq, tk.pha, pk.pha, i_samps_from_0, samps_from_0_to_t, sample_rate)
new_pk.duration -= i
analysis_frames[frame_n - i].append(new_pk)
tk.asleep_for = None
else:
tk.duration += 1
pk.duration = tk.duration
# instantiate new tracks for unmatched peaks
for pk_ind in unmatched_peaks:
peaks[pk_ind].track = len(tracks)
tracks.append(peaks[pk_ind].clone())
# sleep or update sleep for unmatched tracks
for tk_ind in unmatched_tracks:
if tracks[tk_ind].asleep_for is None:
tracks[tk_ind].asleep_for = 1
else:
tracks[tk_ind].asleep_for += 1
[docs]def are_valid_candidates(candidate1, candidate2, deviation):
"""Function to determine if the distance between two peaks are within the relative deviation constraint
Peaks are valid candidates for pair if their absolute distance is smaller than the frequency deviation
multiplied by the lower of the candidate's frequencies.
Parameters
----------
candidate1 : :obj:`~pyatsyn.ats_structure.AtsPeak`
a candidate peak
candidate2 : :obj:`~pyatsyn.ats_structure.AtsPeak`
a candidate peak
deviation : float
relative frequency deviation
Returns
-------
bool
True if the candidates are within constrained range, False otherwise.
"""
min_frq = min(candidate1.frq, candidate2.frq)
return abs(candidate1.frq - candidate2.frq) <= 0.5 * min_frq * deviation
[docs]def peak_dist(pk1, pk2, alpha):
"""Function to calculate peak frequency distance
This function is used to calculate the cost for the peak matching algorithm
and allows for psychoacoustic biasing of the calculation:
:math:`dist = \\frac{|P1_{freq} - P2_{freq}| + \\alpha * |P1_{smr} - P2_{smr}|}{1 + \\alpha}`
where :math:`P\#_{freq}` is the peak's frequency, and :math:`P\#_{smr}` its SMR.
:math:`\\alpha` is a coefficient controlling how much the SMR deviation affects the distance.
Parameters
----------
pk1 : :obj:`~pyatsyn.ats_structure.AtsPeak`
a candidate peak
pk1 : :obj:`~pyatsyn.ats_structure.AtsPeak`
a candidate peak
alpha : float
percent of SMR to use to bias the result
Returns
-------
float
the frequency distance (in Hz) between the peaks
"""
return (abs(pk1.frq - pk2.frq) + (alpha * abs(pk1.smr - pk2.smr))) / (alpha + 1.0)
[docs]def phase_interp(freq_0, freq_t, pha_0, t):
"""Function to compute linear phase interpolation
NOTE: currently not used in peak tracking, but supplied for legacy purposes
Assumes smooth linear interpolation, where the average frequency dictates phase rate estimate
from the relative time 0 to time t.
Parameters
----------
freq_0 : float
initial frequency (in Hz)
freq_t : float
frequency at time t (in Hz)
pha_0 : float
initial phase (in radians)
t : float
time (in s) from freq_0
Returns
-------
float
the phase (in radians) at relative time t
"""
# assuming smooth linear interpolation the average frequency dictates phase rate estimate
freq_est = (freq_t + freq_0) / 2
new_phase = pha_0 + (tau * freq_est * t)
return remainder(new_phase, tau) # NOTE: IEEE remainder
[docs]def phase_interp_cubic(freq_0, freq_t, pha_0, pha_t, i_samps_from_0, samps_from_0_to_t, sample_rate):
"""Function to interpolate phase using cubic polynomial interpolation
Uses cubic interpolation to determine and intermediate phase within the curve linking
a particular frequency and phase at relative time 0, to a frequency and phase at time t.
The basis for this method is credited to:
MR. McAulay and T. Quatieri, "Speech analysis/Synthesis based on a
sinusoidal representation," in IEEE Transactions on Acoustics,
Speech, and Signal Processing, vol. 34, no. 4, pp. 744-754,
August 1986
`doi: 10.1109/TASSP.1986.1164910 <https://doi.org/10.1109/TASSP.1986.1164910>`_.
Parameters
----------
freq_0 : float
initial frequency (in Hz)
freq_t : float
frequency at time t (in Hz)
pha_0 : float
initial phase (in radians)
pha_t : float
phase at time t (in radians)
i_samps_from_0 : int
relative sample index `i` to interpolate at
samps_from_0_to_t : int
distance (in samples) from 0 to t
sample_rate : int
sampling rate (in samps/s)
Returns
-------
float
the modeled phase (in radians) at sample `i`
"""
freq_to_radians_per_sample = tau / sample_rate
alpha_beta_coeffs = zeros([2,2], "float64")
alpha_beta_coeffs[0][0] = 3 / (samps_from_0_to_t**2)
alpha_beta_coeffs[0][1] = -1 / samps_from_0_to_t
alpha_beta_coeffs[1][0] = -2 / (samps_from_0_to_t**3)
alpha_beta_coeffs[1][1] = 1 / (samps_from_0_to_t**2)
alpha_beta_terms = zeros([2,1],"float64")
half_T = samps_from_0_to_t / 2
w_0 = freq_0 * freq_to_radians_per_sample
w_t = freq_t * freq_to_radians_per_sample
M = round((((pha_0 + (w_0 * samps_from_0_to_t) - pha_t) + (half_T * (w_t - w_0))) / tau))
alpha_beta_terms[0] = pha_t - pha_0 - (w_0 * samps_from_0_to_t) + (tau * M)
alpha_beta_terms[1] = w_t - w_0
alpha, beta = matmul(alpha_beta_coeffs, alpha_beta_terms)
return pha_0 + (w_0 * i_samps_from_0) + (alpha * (i_samps_from_0**2)) + (beta * i_samps_from_0**3)
[docs]class MatchCost():
"""Object to abstract cost for comparisons
Attributes
----------
cost : float
the calculated cost to `index`
index : int
the index that indicates the track the cost was calculated against
"""
def __init__(self, cost, index):
self.cost = cost
self.index = index
def __repr__(self):
return f"to index: {self.index} at cost: {self.cost}"
def __lt__(self, other):
return self.cost < other.cost