Source code for pyatsyn.ats_structure

# -*- coding: utf-8 -*-

# This source code is licensed under the BSD-style license found in the
# LICENSE.rst file in the root directory of this source tree. 

# pyatsyn Copyright (c) <2023>, <Johnathan G Lyon>
# All rights reserved.

# Except where otherwise noted, ATSA and ATSH is Copyright (c) <2002-2004>
# <Oscar Pablo Di Liscia, Pete Moss, and Juan Pampin>


"""Data Abstraction for ATS

"""

from numpy import zeros, inf, copy, mean, any, arange

from pyatsyn.atsa.utils import ATS_MIN_SEGMENT_LENGTH
from pyatsyn.atsa.peak_tracking import phase_interp

[docs]class AtsPeak: """Data abstraction for storing single peak, single timepoint data for peak tracking Used primarily as a data-store during the peak tracking phase of analysis. Attributes ---------- amp : float the amplitude of the peak frq : float the frequency (in Hz) of the peak pha : float the phase (in radians) of the peak smr : float the signal-to-mask ratio (in dB SPL) of the peak track : int the corresponding tracked partial the peak is assigned to db_spl : float peak amplitude in dB SPL (used during SMR evaluation) bark_frq : float frequency in bark scale (used during SMR evaluation) slope_r : float right slope of masking curve (used during SMR evaluation) asleep_for : int sleep counter (in frames) (used during peak tracking) duration : float active counter (in frames) (used during peak tracking) frq_max : float maximum frequency (used in track data during optimization) amp_max : float maximum amplitude (used in track data during optimization) frq_min : float minimum frequency (used in track data during optimization) """ def __init__ (self, amp=0.0, frq=0.0, pha=0.0, smr=0.0, track=0, db_spl=0.0, barkfrq=0.0, slope_r=0.0, asleep_for=None, duration=1): self.amp = amp self.frq = frq self.pha = pha self.smr = smr self.track = track self.db_spl = db_spl self.barkfrq = barkfrq self.slope_r = slope_r self.asleep_for = asleep_for self.duration = duration self.frq_max = 0.0 self.amp_max = 0.0 self.frq_min = inf
[docs] def clone (self): """Function to return a copy of an :obj:`~pyatsyn.ats_structure.AtsSound` Returns ------- :obj:`~pyatsyn.ats_structure.AtsPeak` a copy of the calling :obj:`~pyatsyn.ats_structure.AtsPeak` object """ return AtsPeak(self.amp,self.frq,self.pha,self.smr,self.track,self.db_spl, self.barkfrq,self.slope_r,self.asleep_for, self.duration)
def __repr__(self): return f"PK: f_{self.frq} at mag_{self.amp} + {self.pha}"
[docs]class AtsSound: """Main data abstraction for ATS Parameters ---------- sampling_rate : int sampling rate (samples/sec) frame_size : int interframe distance (in samples) window_size : int size (in samples) of the FFT window used to analyze the sound partials : int number of partials/tracks stored frames : int number of frames of analysis dur : float duration (in s) of the sound has_phase : bool, optional whether to initial phase information data structure (default: True) Attributes ---------- sampling_rate : int sampling rate (samples/sec) frame_size : int interframe distance (in samples) window_size : int size (in samples) of the FFT window used to analyze the sound partials : int number of partials/tracks stored frames : int number of frames of analysis dur : float duration (in s) of the sound optimized : bool whether the object has been through optimization yet amp_max : float maximum amplitude of the sound frq_max : float maximum frequency (in Hz) of the sound frq_av : ndarray[float] 1D array of average frequency (in Hz) for each partial amp_av : ndarray[float] 1D array of average amplitude for each partial time : float 1D array of the time (in s) corresponding to each frame frq : ndarray[float] 2D array storing frequency (in Hz) for each partial at each frame amp : ndarray[float] 2D array storing amplitude for each partial at each frame pha : ndarray[float] 2D array storing phase (in radians) for each partial at each frame. None if no phase information is stored. energy : ndarray[float] 2D array for storing noise band energy into each partials at each frame. NOTE: Currently only implemented for legacy purposes. Empty list if no noise information is stored. band_energy : ndarray[float] 2D array of noise band energies for each band at each frame. Empty list if no noise information is stored. bands : ndarray[int] 1D array of unique indices to label each noise band. Empty list if no noise information is stored. """ def __init__ (self, sampling_rate, frame_size, window_size, partials, frames, dur, has_phase=True): self.sampling_rate = sampling_rate self.frame_size = frame_size self.window_size = window_size self.partials = partials self.frames = frames self.dur = dur self.optimized = False self.amp_max = 0.0 self.frq_max = 0.0 self.frq_av = zeros(partials, "float64") self.amp_av = zeros(partials, "float64") self.time = zeros(frames, "float64") self.frq = zeros([partials, frames], "float64") self.amp = zeros([partials, frames], "float64") self.pha = None if has_phase: self.pha = zeros([partials, frames], "float64") # Noise Data self.energy = [] self.band_energy = [] self.bands = []
[docs] def clone(self): """Function to return a deep copy of an :obj:`~pyatsyn.ats_structure.AtsSound` Returns ------- :obj:`~pyatsyn.ats_structure.AtsSound` a deep copy of the calling :obj:`~pyatsyn.ats_structure.AtsSound` object """ has_pha = True if self.pha is None: has_pha = False new_ats_snd = AtsSound(self.sampling_rate, self.frame_size, self.window_size, self.partials, self.frames, self.dur, has_phase=has_pha) new_ats_snd.optimized = self.optimized new_ats_snd.amp_max = self.amp_max new_ats_snd.frq_max = self.frq_max new_ats_snd.time = copy(self.time) new_ats_snd.frq = copy(self.frq) new_ats_snd.amp = copy(self.amp) new_ats_snd.frq_av = copy(self.frq_av) new_ats_snd.amp_av = copy(self.amp_av) if has_pha: new_ats_snd.pha = copy(self.pha) if self.bands: new_ats_snd.bands = copy(self.bands) new_ats_snd.band_energy = copy(self.band_energy) if self.energy: new_ats_snd.energy = copy(self.energy) return new_ats_snd
[docs] def optimize( self, min_gap_size = None, min_segment_length = None, amp_threshold = None, # in amplitude highest_frequency = None, lowest_frequency = None): """Function to run optimization routines on the frames of partial data stored in the object. The optimizations performed are: * fill gaps of min_gap_size or shorter * trim short partials * calculate and store maximum and average frq and amp * prune partials below amplitude threshold * prune partials outside frequency constraints * re-order partials according to average frq Parameters ---------- min_gap_size : int, optional partial gaps longer than this (in frames) will not be interpolated and filled in. If None, this sub-optimization will be skipped. (default: None) min_segment_length : int, optional minimal size (in frames) of a valid partial segment, otherwise it is pruned. If None, this sub-optimization will be skipped. (default: None) amp_threshold : float, optional amplitude threshold used to prune partials. If None, this sub-optimization will be skipped. (default: None) highest_frequency : float upper frequency threshold, tracks with maxima above this will be pruned. If None, this sub-optimization will be skipped. (default: None) lowest_frequency : float lower frequency threshold, tracks with minima below this will be pruned. If None, this sub-optimization will be skipped. (default: None) """ has_pha = True if self.pha is None: has_pha = False if min_gap_size is not None: # fill gaps of min_gap_size or shorter for partial in range(self.partials): asleep_for = 0 for frame_n in range(self.frames): if self.amp[partial][frame_n] == 0.0: asleep_for += 1 else: if asleep_for <= min_gap_size: fell_asleep_at = frame_n - asleep_for if fell_asleep_at != 0: # skip if waking up for the first time # interpolate the gap interp_range = asleep_for + 1 earlier_ind = fell_asleep_at - 1 frq_step = self.frq[partial][earlier_ind] - self.frq[partial][frame_n] amp_step = self.amp[partial][earlier_ind] - self.amp[partial][frame_n] for i in range(1, interp_range): # NOTE: we'll walk backward from frame_n mult = i / interp_range self.frq[partial][frame_n - i] = (frq_step * mult) + self.amp[partial][frame_n] self.amp[partial][frame_n - i] = (amp_step * mult) + self.amp[partial][frame_n] if has_pha: for i in range(1, interp_range): # NOTE: we'll walk backward from frame_n freq_now = self.frq[partial][frame_n - i] freq_then = self.frq[partial][earlier_ind] pha_then = self.pha[partial][earlier_ind] t = (interp_range - i) / self.sampling_rate self.pha[partial][frame_n - i] = phase_interp(freq_then, freq_now, pha_then, t) asleep_for = 0 keep_partials = set(arange(self.partials)) if min_segment_length is not None: # trim short partials if min_segment_length < 1: min_segment_length = ATS_MIN_SEGMENT_LENGTH for partial in range(self.partials): n_segments = 0 segment_dur = 0 for frame_n in range(self.frames): if self.amp[partial][frame_n] > 0.0: segment_dur += 1 elif segment_dur > 0: # we've reached the end of a segment if segment_dur >= min_segment_length: n_segments += 1 else: # remove the segment for ind in range(frame_n - segment_dur, frame_n): self.frq[partial][ind] = 0.0 self.amp[partial][ind] = 0.0 if has_pha: self.pha[partial][ind] = 0.0 # reset the segment counter segment_dur = 0 if n_segments == 0: keep_partials = keep_partials - {partial} # process amp and/or frequency thresholds if amp_threshold is not None: for partial in range(self.partials): if max(self.amp[partial,:]) < amp_threshold: keep_partials = keep_partials - {partial} if highest_frequency is not None: for partial in range(self.partials): if max(self.frq[partial,:]) > highest_frequency: keep_partials = keep_partials - {partial} if lowest_frequency is not None: for partial in range(self.partials): selection = self.frq[partial,:] > 0.0 if any(selection): if min(self.frq[partial][selection]) < lowest_frequency: keep_partials = keep_partials - {partial} else: keep_partials = keep_partials - {partial} # keep only unfiltered partials keep_partials = list(keep_partials) self.partials = len(keep_partials) if self.partials == 0: print("WARNING: optimization has removed all partials from AtsSound") self.frq_av = None self.amp_av = None self.frq = None self.amp = None self.pha = None self.amp_max = 0.0 self.frq_max = 0.0 return else: self.frq_av = self.frq_av[keep_partials] self.amp_av = self.amp_av[keep_partials] self.frq = self.frq[keep_partials,:] self.amp = self.amp[keep_partials,:] if has_pha: self.pha = self.pha[keep_partials,:] # reset amp_max & average amp_max = 0.0 frq_max = 0.0 for partial in range(self.partials): frq_selection = self.frq[partial,:] > 0.0 if any(frq_selection): self.frq_av[partial] = mean(self.frq[partial][self.frq[partial] > 0.0]) else: self.frq_av[partial] = 0.0 amp_selection = self.amp[partial,:] > 0.0 if any(amp_selection): self.amp_av[partial] = mean(self.amp[partial][self.amp[partial] > 0.0]) else: self.amp_av[partial] = 0.0 frq_max = max(frq_max, max(self.frq[partial,:])) amp_max = max(amp_max, max(self.amp[partial,:])) self.amp_max = amp_max self.frq_max = frq_max # re-sort tracks by frq_av partial_av_tuples = [(i, self.frq_av[i]) for i in range(self.partials)] sorted_tuples = sorted(partial_av_tuples, key=lambda tp: tp[0]) sorted_index = [tp[0] for tp in sorted_tuples] self.frq_av = self.frq_av[sorted_index] self.amp_av = self.amp_av[sorted_index] self.frq = self.frq[sorted_index,:] self.amp = self.amp[sorted_index,:] if has_pha: self.pha = self.pha[sorted_index,:]