Source code for mighty.utils.signal
"""
Signal processing and statistics
--------------------------------
.. autosummary::
:toctree: toctree/utils/
exponential_moving_average
peak_to_signal_noise_ratio
compute_sparsity
"""
import torch
import torch.nn.functional as F
__all__ = [
"compute_distance",
"exponential_moving_average",
"to_onehot",
"peak_to_signal_noise_ratio",
"compute_sparsity"
]
def compute_distance(input1, input2, metric, dim=1):
if metric == 'cosine':
dist = 1 - F.cosine_similarity(input1, input2, dim=dim)
elif metric == 'l1':
dist = F.l1_loss(input1, input2, reduction='none').sum(dim=dim)
elif metric == 'l2':
dist = F.mse_loss(input1, input2, reduction='none').sum(dim=dim)
else:
raise NotImplementedError
return dist
[docs]
def exponential_moving_average(tensor, window: int):
"""
Exponential moving average in a sliding window.
Parameters
----------
tensor : (N,) torch.Tensor
Input tensor.
window : int
Sliding window width.
Returns
-------
out : (N,) torch.Tensor
Filtered array of the same length.
"""
tensor = torch.as_tensor(tensor)
alpha = 2 / (window + 1.0)
alpha_rev = 1 - alpha
n = tensor.shape[0]
pows = torch.pow(alpha_rev, torch.arange(n + 1))
scale_arr = 1 / pows[:-1]
offset = tensor[0] * pows[1:]
pw0 = alpha * alpha_rev ** (n - 1)
mult = tensor * pw0 * scale_arr
cumsums = mult.cumsum(dim=0)
out = offset + cumsums * reversed(scale_arr)
return out
def to_onehot(y_labels, n_classes=None):
y_labels = torch.as_tensor(y_labels, dtype=torch.long)
if n_classes is None:
n_classes = len(y_labels.unique(sorted=False))
y_onehot = torch.zeros(y_labels.shape[0], n_classes, dtype=torch.int64)
y_onehot[torch.arange(y_onehot.shape[0]), y_labels] = 1
return y_onehot
[docs]
def peak_to_signal_noise_ratio(signal_orig, signal_estimated):
"""
Computes the Peak signal-to-noise ratio between two signals.
Parameters
----------
signal_orig, signal_estimated : torch.Tensor
A vector or a batch of vectors or images.
Returns
-------
psnr : torch.Tensor
A scalar tensor that holds (mean) PSNR value.
"""
signal_orig = signal_orig.detach()
signal_estimated = signal_estimated.detach()
signal_orig = torch.atleast_2d(signal_orig)
signal_estimated = torch.atleast_2d(signal_estimated)
signal_orig = signal_orig.flatten(start_dim=1)
signal_estimated = signal_estimated.flatten(start_dim=1)
if signal_orig.shape != signal_estimated.shape:
raise ValueError("Input signals must have the same shape.")
dynamic_range = signal_orig.max(dim=1).values - \
signal_orig.min(dim=1).values
# filter out pairs with zero dynamic range
mask_valid = (dynamic_range != 0.)
if not mask_valid.any():
return torch.tensor(float('NaN'), device=signal_orig.device)
signal_orig = signal_orig[mask_valid]
signal_estimated = signal_estimated[mask_valid]
dynamic_range = dynamic_range[mask_valid]
mse_val = F.mse_loss(signal_orig, signal_estimated,
reduction='none').mean(dim=1)
psnr = 10 * torch.log10(dynamic_range ** 2 / mse_val).mean()
return psnr.squeeze()
[docs]
def compute_sparsity(tensor):
"""
Compute L1 sparsity of the input tensor.
Parameters
----------
tensor : torch.Tensor
(N,) or (B, N) tensor array.
Returns
-------
sparsity : torch.Tensor
Mean L1 sparsity of `tensor`, scalar.
"""
if tensor.ndim == 1:
# make a batch of size 1
tensor = tensor.unsqueeze(dim=0)
sparsity = tensor.norm(p=1, dim=1).mean() / tensor.shape[1]
return sparsity.squeeze()