# -*- coding: utf-8 -*-
# This source code is licensed under the BSD-style license found in the
# LICENSE.rst file in the root directory of this source tree.
# pyatsyn Copyright (c) <2023>, <Johnathan G Lyon>
# All rights reserved.
# Except where otherwise noted, ATSA and ATSH is Copyright (c) <2002-2004>
# <Oscar Pablo Di Liscia, Pete Moss, and Juan Pampin>
""" Main ATS Analysis Function
The analysis tracker is responsible for driving the analysis of an audio file
into the .ats format. The system uses a Short Time Fourier Transform (STFT) as
is core analysis tool. Sound is analyzed using overlapping time windows and by
taking the STFT on each window.
After converting to polar coordinates, a peak detection algorithm (:obj:`~pyatsyn.atsa.peak_detection`)
determines relevant spectral peaks in the data. At this point, pyschoacoustics
are considered in the form of masking curve evaluation and computation of the
Signal-to-Mask ratio (SMR) for each candidate peak. SMR data is store together with
a corrected frequency, magnitude and phase.
The next step involves frame-to-frame tracking of peaks (:obj:`~pyatsyn.atsa.peak_tracking`) to connect peaks that
follow a similar spectral trajectory using both frequency and SMR data. The system
uses a stable matching algorithm to pair candidate peaks, and is capable of interpolating
gaps in the tracks.
Once valid tracks are assembled, the results can be modeled with sinusoids (:obj:`~pyatsyn.atsa_synth`) and subtracted
from the origin source sound to compute a residual. NOTE: This part of the ATS system is currently under
active research. For now, the residual analysis (:obj:`~pyatsyn.atsa.residual`) is modeled using a
25 time-varying critical noise band energy model (consistent with the critical bands used during SMR evaluation).
These noise bands can then be resynthesized using 25 correspoding banks of time-enveloped, band-limited noise.
Analysis is finally stored and abstracted as an :obj:`~pyatsyn.ats_structure.AtsSound` object.
"""
from numpy import zeros, roll, absolute, angle
from numpy.fft import fft, fftfreq
import soundfile as sf
import argparse
from pyatsyn.ats_structure import AtsSound
from pyatsyn.atsa.utils import db_to_amp, next_power_of_2, compute_frames, optimize_tracks
from pyatsyn.atsa.windows import make_fft_window, normalize_window, window_norm, VALID_FFT_WINDOW_DEFINITIONS
from pyatsyn.atsa.peak_detect import peak_detection
from pyatsyn.atsa.critical_bands import evaluate_smr
from pyatsyn.atsa.peak_tracking import update_track_averages, peak_tracking
from pyatsyn.atsa.residual import compute_residual, residual_analysis
from pyatsyn.ats_io import ats_save
[docs]def tracker ( in_file,
start = 0.0, # analysis start point (in seconds)
duration = None, # max duration to analyze (in seconds) or 'None' if analyze to end
lowest_frequency = 20, # must be > 0
highest_frequency = 20000.0,
frequency_deviation = 0.1,
window_cycles = 4,
window_type = 'blackman-harris-4-1',
hop_size = 0.25, # in fraction of window size
fft_size = None, # None, or force an fft size
amp_threshold = db_to_amp(-60),
track_length = 3,
min_gap_length = 3,
min_segment_length = 3,
last_peak_contribution = 0.0,
SMR_continuity = 0.0,
residual_file = None,
optimize = True,
optimize_amp_threshold = None, # in amplitude
force_M = None, # None, or a forced window length in samples
force_window = None, # None, or a numpy.ndarray of floats
window_alpha = 0.5,
window_beta = 1.0,
verbose = False,
):
"""Function to generates an Analysis-Transformation-Synthesis :obj:`~pyatsyn.ats_structure.AtsSound` from an audio file
Parameters
----------
in_file : str
path to the audio file to analyze (must be single channel/mono)
start : float
timepoint (in s) in audiofile to begin analysis (default: 0.0)
duration : float
max duration (in s) in audiofile from start to end analysis or 'None' if analyze to end (default: None)
lowest_frequency
lowest frequency to analyze (must be > 0) (default: 20)
highest_frequency : float
highest frequency to analyze (capped to nyquist frequency and must be greater than `lowest_frequency`) (default: 20000.0)
frequency_deviation : float
maximum relative frequency deviation used to constrain peak tracking matches (default: 0.1)
window_cycles : int
lowest frequency to fit in analysis window; used to determine window size (default: 4)
window_type : str
type of window to use for FFT analysis (default: 'blackman-harris-4-1'). See :obj:`~pyatsyn.atsa.windows.VALID_FFT_WINDOW_DEFINITIONS`
hop_size : float
fraction of window size to shift from frame-to-frame (default: 0.25)
fft_size : int
None, or force an fft size (default: None)
amp_threshold : float
lowest amplitude used for peak detection (default: 0.001)
track_length : int
number of frames used to smooth frequency trajectories (default: 3)
min_gap_length : int
tracked partial gaps longer than this (in frames) will not be interpolated (default: 3)
min_segment_length : int
minimal size (in frames) of a track segment, otherwise it is pruned (default: 3)
last_peak_contribution : float
additional bias for the immediately prior frames values when calculating smoothing trajectories (default: 0.0)
SMR_continuity : float
percentage of SMR to use in cost calculations during peak tracking (default: 0.0)
residual_file : str
path to the audio file used to store residual analysis.
NOTE: noise calculation will not be performed in .ats file without this (default: None)
optimize : bool
whether to perform the post-peak tracking optimization on the :obj:`~pyatsyn.ats_structure.AtsSound` object (default: True)
optimize_amp_threshold : float
additional amplitude threshold used during optimization to prune tracks (default: None)
force_M : int
None, or a forced window length in samples (default: None)
force_window : ndarray[float]
None, or a 1D array describing a windowing curve (default: None)
window_alpha : float
parameter used for calculating tukey windows (default: 0.5)
window_beta : float
parameter used for calculating certain window types (default: 1.0)
verbose : bool
increase verbosity (default: False)
Returns
-------
:obj:`~pyatsyn.ats_structure.AtsSound`
the ats object that represents the analysis of the input audio file
Raises
------
ValueError
if input file is not single channel/mono
ValueError
if `lowest_frequency` is < 0.0
ValueError
if `highest_frequency` is < `lowest_frequency`
"""
# read input audio file
in_sound, sample_rate = sf.read(in_file)
if in_sound.ndim > 1:
raise ValueError("Input audio file must be mono")
# get first and last sample indices
st = int(start * sample_rate)
nd = in_sound.size
if duration is not None:
nd = st + int(duration * sample_rate)
# calculate windowing parameters
total_samps = nd - st
analysis_duration = total_samps / sample_rate
cycle_samps = int((1 / lowest_frequency) * window_cycles * sample_rate)
M = force_M
if M is None:
# by default we want an odd length window centered at time 0.0
if ( (cycle_samps % 2) == 0):
M = cycle_samps + 1
else:
M = cycle_samps
N = fft_size
if N is None:
# default fft size is next power of 2
N = next_power_of_2(M)
# instantiate window
window = force_window
if window is None:
if window_type is None:
window_type='blackman-harris-4-1'
window = make_fft_window(window_type, M, beta=window_beta, alpha=window_alpha)
window = normalize_window(window)
hop = int(M * hop_size)
# central point of the window
M_over_2 = (M - 1) // 2
frames = compute_frames(total_samps, hop)
# magic number for fft frequencies (frequency resolution)
fft_mag = sample_rate / N
l_frq = lowest_frequency
if l_frq <= 0.0:
raise ValueError('Lowest frequency must be greater than 0.0')
h_frq = highest_frequency
if h_frq > (sample_rate / 2.0):
if verbose: print('WARNING: Capping highest frequency to Nyquist Frequency')
h_frq = int(sample_rate / 2.0)
if h_frq < l_frq:
raise ValueError('Highest frequency must be greater than lowest frequency')
# lowest/highest bins to read
lowest_bin = int(l_frq / fft_mag)
highest_bin = int(h_frq / fft_mag)
# used to store central points of the windows
win_samps = zeros(frames, "int64")
# storage for lists of peaks
analysis_frames = [None for _ in range(frames)]
# set file pointer half a window from the first sample
fil_ptr = st - M_over_2
# guarantee that we have a valid minimum gap length
if min_gap_length < 1:
min_gap_length = 1
fft_mags = None
tracks = []
if verbose:
print(f"frames = {frames}")
print(f"M = {M}; N = {N}")
print(f"Beginning FFT -> peak tracking analysis...")
report_every_x_percent = 10
report_flag = 0
for frame_n in range(frames):
if verbose:
done = frame_n * 100.0 / frames
if done > report_flag:
print(f"\t{done:.2f}% complete (tracking)")
report_flag += report_every_x_percent
# store in_sound sample at the middle of the window
win_samps[frame_n] = fil_ptr + M_over_2
###################
# WINDOWING + FFT #
###################
# padding for window ranges that are out of the input file
front_pad = 0
back_pad = 0
if fil_ptr < 0:
front_pad = -fil_ptr
if fil_ptr + M >= in_sound.size:
back_pad = fil_ptr + M - in_sound.size
data = zeros(N, "float64")
if not front_pad and not back_pad:
data[:M] = window * in_sound[fil_ptr:fil_ptr+M]
else:
data[front_pad:M-back_pad] = window[front_pad:M-back_pad] * in_sound[fil_ptr+front_pad:fil_ptr+M-back_pad]
# shift window by half of M so that phases in `data` are relatively accurate to midpoints
data = roll(data, -M_over_2)
# update file pointer
fil_ptr += hop
# get the DFT
fd = fft(data)
##################
# PEAK DETECTION #
##################
fftfreqs = fftfreq(fd.size, 1 / sample_rate)
if front_pad or back_pad:
# apply correction for frames that hang off the edge of the input file, thus downscaling the actual amplitude
# multiply by additional 2.0 to account for symmetric negative frequencies
fft_mags = absolute(fd) * 2.0 * window_norm(window[front_pad:M-back_pad])
else:
# multiply by additional 2.0 to account for symmetric negative frequencies
fft_mags = absolute(fd) * 2.0
fftphases = angle(fd)
peaks = peak_detection(fftfreqs, fft_mags, fftphases, lowest_bin, highest_bin, amp_threshold)
if peaks:
# masking curve evaluation using a critical band based model
evaluate_smr(peaks)
#################
# PEAK TRACKING #
#################
if len(tracks) > 0:
update_track_averages(tracks, track_length, frame_n, analysis_frames, last_peak_contribution)
peak_tracking(tracks, peaks, frame_n, analysis_frames, sample_rate, hop, frequency_deviation, SMR_continuity, min_gap_length)
else:
# otherwise instantiate tracks with the current frame's peaks, if any
for pk_ind, pk in enumerate(peaks):
pk.track = pk_ind
tracks = [pk.clone() for pk in peaks]
# store peaks for this current frame
analysis_frames[frame_n] = peaks
########################
# INITIALIZE ATS SOUND #
########################
pre_opt_track_len = len(tracks)
if verbose:
print(f"Tracker found {pre_opt_track_len} partial(s)")
if optimize:
if verbose:
print("Optimizing...")
tracks = optimize_tracks(tracks, analysis_frames, min_segment_length, optimize_amp_threshold, highest_frequency, lowest_frequency)
if verbose:
print(f"Optimization removed {pre_opt_track_len - len(tracks)} partial(s)")
if verbose:
print("Initializing AtsSound object...")
ats_snd = AtsSound(sample_rate, hop, M, len(tracks), frames, analysis_duration, has_phase = True)
if optimize:
ats_snd.optimized = True
amp_max = 0.0
frq_max = 0.0
for tk in tracks:
ats_snd.frq_av[tk.track] = tk.frq
ats_snd.amp_av[tk.track] = tk.amp
amp_max = max(amp_max, tk.amp_max)
frq_max = max(frq_max, tk.frq_max)
ats_snd.amp_max = amp_max
ats_snd.frq_max = frq_max
# fill up with data
for frame_n in range(frames):
frame_time = (win_samps[frame_n] - st) / sample_rate
ats_snd.time[frame_n] = frame_time
for pk in analysis_frames[frame_n]:
ats_snd.frq[pk.track][frame_n] = pk.frq
ats_snd.amp[pk.track][frame_n] = pk.amp
ats_snd.pha[pk.track][frame_n] = pk.pha
#####################
# RESIDUAL ANALYSIS #
#####################
if residual_file:
if verbose:
print("Computing Residual...")
residual = compute_residual(ats_snd, in_sound, st, nd, residual_file)
if verbose:
print("Analyzing Residual...")
residual_analysis(residual, ats_snd, equalize=True, verbose=verbose)
if verbose:
print("Tracking analysis complete")
return ats_snd
[docs]def tracker_CLI():
"""Command line wrapper for :obj:`~pyatsyn.atsa.tracker.tracker`
Example
-------
Display usage details with help flag
::
$ pyatsyn-atsa -h
Analyze a wav file
::
$ pyatsyn-atsa example.wav example.ats
Analyze a wav file and compute the residual and increase verbosity
::
$ pyatsyn-atsa example.wav example.ats -v -r example-residual.wav
"""
parser = argparse.ArgumentParser(
description = "Generates an Analysis-Transformation-Synthesis (.ats) file from an audio file"
)
parser.add_argument("audio_file_in", help="path to the audio file to analyze")
parser.add_argument("ats_file_out", help=".ats file path to output to")
parser.add_argument("-v", "--verbose", help="increase verbosity", action="store_true")
parser.add_argument("-r","--residual_file", help="path to the audio file used to store residual analysis. \
NOTE: noise calculation will not be performed in .ats file without this", default=None)
parser.add_argument("-s", "--start", type=float, help="timepoint (in s) in audiofile to begin analysis (default 0.0)", default=0.0)
parser.add_argument("-d", "--duration", type=float, help="max duration (in s) in audiofile from start to end analysis (default duration of file)", default=None)
parser.add_argument("--low_freq", type=float, help="lowest frequency to analyze (must be > 0) (default 20)", default=20)
parser.add_argument("--hi_freq", type=float, help="highest frequency to analyze (default 20000)", default=20000)
parser.add_argument("--amp_threshold", type=float, help="lowest amplitude used for peak detection (default 0.001)", default=0.001)
parser.add_argument("--freq_dev", type=float, help="frequency deviation; used to constrain peak tracking matches (default 0.1)", default=0.1)
parser.add_argument("--SMR_continuity", type=float, help="percentage of SMR to use in cost calculations during peak tracking (default 0.0)", default=0.0)
valid_fft_win_str = "[ " + ' | '.join(VALID_FFT_WINDOW_DEFINITIONS) + " ]"
parser.add_argument("--win_type", type=ascii,
help=f"type of window to use for FFT analysis (default: blackman-harris-4-1); Supported types: {valid_fft_win_str}", default=None)
parser.add_argument("--win_alpha", type=float, help="parameter used for calculating tukey windows (default 0.5)", default=0.5)
parser.add_argument("--win_beta", type=float, help="parameter used for calculating certain window types (default 1.0)", default=1.0)
parser.add_argument("--hop_size", type=float, help="fraction of window size to shift from frame-to-frame (default 0.25)", default=0.25)
parser.add_argument("--fft_size", type=int, help="force an fft_size", default=None)
parser.add_argument("--win_cycles", type=float, help="lowest frequency to fit in analysis window; used to determine window size (default 4)", default=4)
parser.add_argument("--force_M", type=int, help="forced window length in samples", default=None)
parser.add_argument("--track_length", type=int, help="number of frames used to smooth frequency trajectories (default 3)", default=3)
parser.add_argument("--last_peak_contribution", type=float, help="aadditional bias for the immediately prior frames values when calculating smoothing trajectories (default 0.0)", default=0.0)
parser.add_argument("--min_gap_length", type=int, help="tracked partial gaps longer than this (in frames) will not be interpolated (default 3)", default=3)
parser.add_argument("--min_segment_length", type=int, help="minimize size (in frames) of a track segment, otherwise it is pruned (default 3)", default=3)
parser.add_argument("--opt_amp_threshold", type=float, help="additional amplitude threshold used during optimization to prune tracks (default 0.001)", default=None)
parser.add_argument("--no_optimize", help="skip pre-output optimization of .ats file", action="store_true")
args = parser.parse_args()
ats_save(tracker( args.audio_file_in,
start = args.start,
duration = args.duration,
lowest_frequency = args.low_freq,
highest_frequency = args.hi_freq,
frequency_deviation = args.freq_dev,
window_cycles = args.win_cycles,
window_type = args.win_type,
hop_size = args.hop_size,
fft_size = args.fft_size,
amp_threshold = args.amp_threshold,
track_length = args.track_length,
min_gap_length = args.min_gap_length,
min_segment_length = args.min_segment_length,
last_peak_contribution = args.last_peak_contribution,
SMR_continuity = args.SMR_continuity,
optimize_amp_threshold = args.opt_amp_threshold,
residual_file = args.residual_file,
optimize = not args.no_optimize,
force_M = args.force_M,
window_alpha = args.win_alpha,
window_beta = args.win_beta,
verbose = args.verbose,
),
args.ats_file_out)