# SPDX-FileCopyrightText: 2021 Daniel Laidig <laidig@control.tu-berlin.de>
#
# SPDX-License-Identifier: MIT
import math
import numpy as np
from qmt.utils.plot import AutoFigure
[docs]
def wrapToPi(angles, debug=False, plot=False):
"""
Wraps angles to interval -π... π by adding/subtracting multiples of 2π.
:param angles: input angles in rad, numpy array or scalar
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **output**: output angles with same shape as input angles
- **debugData**: debug: dict with debug values (only if debug==True)
"""
angles = np.asarray(angles, float)
with np.errstate(invalid='ignore'):
output = (angles + np.pi) % (2 * np.pi) - np.pi
if debug or plot:
debugData = dict(angles=angles, output=output)
if plot:
wrapToPi_debugPlot(debugData, plot)
if debug:
return output, debugData
return output
def wrapToPi_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('wrapToPi'))
ax1, ax2 = fig.subplots(2, 1, sharex=True)
angles = debug['angles']
output = debug['output']
shape = angles.shape if angles.ndim < 2 else (-1, angles.shape[-1])
style = '.-' if (angles.shape[0] if angles.ndim > 0 else angles.size) <= 100 else '-'
ax1.set_title(f'input angle in °, {angles.shape}')
ax1.plot(np.rad2deg(angles.reshape(shape)), style)
ax2.set_title(f'output angle in °, {output.shape}')
ax2.plot(np.rad2deg(output.reshape(shape)), style)
ax2.plot(np.rad2deg(angles.reshape(shape)), style, color='k', alpha=0.3, lw=1)
for ax in (ax1, ax2):
ax.grid()
fig.tight_layout()
[docs]
def wrapTo2Pi(angles, debug=False, plot=False):
"""
Wraps angles to interval 0...2π by adding/subtracting multiples of 2π.
:param angles: input angles in rad, numpy array or scalar
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **output**: output angles with same shape as input angles
- **debugData**: debug: dict with debug values (only if debug==True)
"""
angles = np.asarray(angles, float)
with np.errstate(invalid='ignore'):
positiveInput = angles > 0
output = np.mod(angles, 2 * np.pi)
if output.ndim == 0: # does not support item assignment
if output == 0 and positiveInput:
output = np.array(2 * np.pi, float)
else:
output[np.logical_and((output == 0), positiveInput)] = 2 * np.pi
if debug or plot:
debugData = dict(angles=angles, output=output)
if plot:
wrapTo2Pi_debugPlot(debugData, plot)
if debug:
return output, debugData
return output
def wrapTo2Pi_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('wrapTo2Pi'))
ax1, ax2 = fig.subplots(2, 1, sharex=True)
angles = debug['angles']
output = debug['output']
shape = angles.shape if angles.ndim < 2 else (-1, angles.shape[-1])
style = '.-' if (angles.shape[0] if angles.ndim > 0 else angles.size) <= 100 else '-'
ax1.set_title(f'input angle in °, {angles.shape}')
ax1.plot(np.rad2deg(angles.reshape(shape)), style)
ax2.set_title(f'output angle in °, {output.shape}')
ax2.plot(np.rad2deg(output.reshape(shape)), style)
ax2.plot(np.rad2deg(angles.reshape(shape)), style, color='k', alpha=0.3, lw=1)
for ax in (ax1, ax2):
ax.grid()
fig.tight_layout()
[docs]
def nanUnwrap(angle, debug=False, plot=False):
"""
Unwraps a signal that might contain NaNs. The angle is also shifted so that the mean is in the range -pi...pi.
The input can be a 1D or 2D array. Unwrapping is performed along the first axis. In the 2D case, it is expected
that all angles in the same row will be NaN at the same time.
:param angle: input angle signal in rad, shape (N, 3) or (N,)
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **output**: unwrapped output angles with same shape as input angles
- **debugData**: debug: dict with debug values (only if debug==True)
"""
angle = np.asarray(angle, float)
assert angle.ndim <= 2
is1D = angle.ndim == 1
if is1D:
angle = angle.reshape(-1, 1)
output = np.copy(angle)
ind = ~np.isnan(output[:, 0])
output[ind] = np.unwrap(output[ind], axis=0)
for i in range(output.shape[1]):
while np.nanmean(output[:, i]) > np.pi:
output[:, i] -= 2 * np.pi
while np.nanmean(output[:, i]) < -np.pi:
output[:, i] += 2 * np.pi
if is1D:
angle = angle.flatten()
output = output.flatten()
if debug or plot:
debugData = dict(angle=angle, output=output)
if plot:
nanUnwrap_debugPlot(debugData, plot)
if debug:
return output, debugData
return output
def nanUnwrap_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('nanUnwrap'))
ax1, ax2 = fig.subplots(2, 1, sharex=True)
angle = debug['angle']
output = debug['output']
shape = angle.shape if angle.ndim < 2 else (-1, angle.shape[-1])
style = '.-' if (angle.shape[0] if angle.ndim > 0 else angle.size) <= 100 else '-'
ax1.set_title(f'input angle in °, {angle.shape}')
ax1.plot(np.rad2deg(angle.reshape(shape)), style)
ax2.set_title(f'output angle in °, {output.shape}')
ax2.plot(np.rad2deg(output.reshape(shape)), style)
ax2.plot(np.rad2deg(angle.reshape(shape)), style, color='k', alpha=0.3, lw=1)
for ax in (ax1, ax2):
ax.grid()
fig.tight_layout()
[docs]
def angleBetween2Vecs(vec1, vec2, debug=False, plot=False):
"""
Calculates angle between two 3D vectors in rad.
Multiples vectors can be passed with an array with the last dimension having length 3, and numpy broadcasting is
supported.
>>> qmt.angleBetween2Vecs([1, 0, 0], [0, 1, 0])
angle = array([1.57079633])
:param vec1: first input vector, (..., 3)
:param vec2: second input vector, (..., 3)
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **angle**: angle output array, shape (...,)
- **debug**: dict with debug values (only if debug==True)
"""
vec1 = np.asarray(vec1, float)
vec2 = np.asarray(vec2, float)
angle = np.arccos(np.sum(normalized(vec1)*normalized(vec2), axis=-1))
if debug or plot:
debugData = dict(
vec1=vec1,
vec2=vec2,
angle=angle,
)
if plot:
angleBetween2Vecs_debugPlot(debugData, plot)
if debug:
return angle, debugData
return angle
def angleBetween2Vecs_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('angleBeween2Vecs'))
ax1, ax2, ax3 = fig.subplots(3, 1, sharex=True)
vec1 = debug['vec1']
vec2 = debug['vec2']
angle = debug['angle']
style = '.-' if angle.size <= 100 else '-'
ax1.plot(vec1.reshape(-1, 3), style)
ax1.set_title(f'first input vector, {debug["vec1"].shape}')
ax1.legend('xyz')
ax2.plot(vec2.reshape(-1, 3), style)
ax2.set_title(f'second input vector, {debug["vec2"].shape}')
ax2.legend('xyz')
ax3.plot(np.rad2deg(angle.flatten()), '.-')
ax3.set_title(f'angle in degree between the vectors in °, {debug["angle"].shape}')
for ax in (ax1, ax2, ax3):
ax.grid()
fig.tight_layout()
[docs]
def timeVec(N=None, T=None, rate=None, Ts=None, debug=False, plot=False):
"""
Creates a time vector with a fixed sampling rate based on two parameters.
Valid combinations of parameters:
- N and rate
- N and Ts
- T and rate
- T and Ts
- N and T
>>> qmt.timeVec(N=10, rate=100)
array([0. , 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09])
>>> qmt.timeVec(N=10, Ts=0.01)
array([0. , 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09])
>>> qmt.timeVec(T=0.05, Ts=0.01)
array([0. , 0.01, 0.02, 0.03, 0.04, 0.05])
:param N: number of samples
:param T: end time (the start time is always 0)
:param rate: sampling rate
:param Ts: sampling time, i.e. 1/rate
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **t**: time vector as (N,) array
- **debug**: dict with debug values (only if debug==True)
"""
if N is not None and rate is not None:
assert T is None and Ts is None
t = np.arange(0, N, 1) / rate
elif N is not None and Ts is not None:
assert T is None and rate is None
t = np.arange(0, N, 1) * Ts
elif T is not None and rate is not None:
assert N is None and Ts is None
N = math.floor(T * rate) + 1
t = np.arange(0, N, 1) / rate
elif T is not None and Ts is not None:
assert N is None and rate is None
N = math.floor(T / Ts) + 1
t = np.arange(0, N, 1) * Ts
elif N is not None and T is not None:
assert rate is None and Ts is None
t = np.linspace(0, T, N)
else:
raise RuntimeError('invalid combination of parameters')
if debug or plot:
debugData = dict(t=t, N=N, T=T, rate=rate, Ts=Ts)
if plot:
timeVec_debugPlot(debugData, plot)
if debug:
return t, debugData
return t
def timeVec_debugPlot(debug, fig=None):
t = debug['t']
N = debug['N']
T = debug['T']
rate = debug['rate']
Ts = debug['Ts']
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('timeVec'))
ax = fig.add_subplot(111)
ax.plot(t, '.-')
ax.set_title(f'output time vector, {t.shape}')
text = f'N={N}\nT={T}\nrate={rate}\nTs={Ts}\nt[0]={t[0]}\nt[1]={t[1]}\nt[{N-1}]={t[N-1]}'
ax.text(0.05, 0.95, text, va='top', transform=ax.transAxes)
ax.grid()
fig.tight_layout()
[docs]
def vecnorm(vec, debug=False, plot=False):
"""
Calculates the norm along the last axis.
:param vec: input array, shape (..., M)
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **norm**: norm output array, shape (...,)
- **debug**: dict with debug values (only if debug==True)
"""
vec = np.asarray(vec, float)
norm = np.linalg.norm(vec, axis=-1)
if debug or plot:
debugData = dict(
vec=vec,
norm=norm,
)
if plot:
vecnorm_debugPlot(debugData, plot)
if debug:
return norm, debugData
return norm
def vecnorm_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('vecnorm'))
ax1, ax2 = fig.subplots(2, 1, sharex=True)
vec = debug['vec']
norm = debug['norm']
style = '.-' if norm.size <= 100 else '-'
ax1.set_title(f'input vector, {vec.shape}')
ax1.plot(vec.reshape(-1, vec.shape[-1]), style)
ax2.set_title(f'output norm, {norm.shape}')
ax2.plot(norm.flatten(), style)
for ax in (ax1, ax2):
ax.grid()
fig.tight_layout()
[docs]
def normalized(vec, debug=False, plot=False):
"""
Divides each vector (along the last axis) by its norm.
>>> qmt.normalized([1, 1, 1])
v_norm = array([0.57735027, 0.57735027, 0.57735027])
:param vec: input array, shape (..., M)
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **out**: normalized output array, shape (..., M)
- **debug**: dict with debug values (only if debug==True)
"""
vec = np.asarray(vec, float)
out = vec / np.linalg.norm(vec, axis=-1)[..., None]
if debug or plot:
debugData = dict(
vec=vec,
vec_norm=vecnorm(vec),
out=out,
out_norm=vecnorm(out),
)
if plot:
normalized_debugPlot(debugData, plot)
if debug:
return out, debugData
return out
def normalized_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('normalized'))
ax1, ax2 = fig.subplots(2, 1, sharex=True)
vec = debug['vec']
out = debug['out']
style = '.-' if vec.size <= 3*100 else '-'
ax1.set_title(f'input vector (gray: norm), {vec.shape}')
ax1.plot(vec.reshape(-1, vec.shape[-1]), style)
ax1.plot(debug['vec_norm'].flatten(), style, color='k', alpha=0.5, lw=1)
ax1.grid()
ax2.set_title(f'normalized output vector (gray: norm), {out.shape}')
ax2.plot(out.reshape(-1, out.shape[-1]), style)
ax2.plot(debug['out_norm'].flatten(), style, color='k', alpha=0.5, lw=1)
ax2.grid()
fig.tight_layout()
[docs]
def allUnitNorm(vec, debug=False, plot=False):
"""
Checks if all elements have unit norm.
Calculation of the norm is performed along the last axis (see :func:`qmt.vecnorm`) and True is returned if all
entries have a norm that is close to one. Entries with NaNs are ignored.
:param vec: input vector or quaternion array
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **output**: True if all elements have unit norm, else False
- **debug**: dict with debug values (only if debug==True)
"""
norm = vecnorm(vec)
res = np.allclose(norm[~np.isnan(norm)], 1)
if debug or plot:
isnan = np.isnan(norm)
isclose = np.isclose(norm, 1)
debugData = dict(
vec=vec,
norm=norm,
isnan=isnan,
isclose=isclose,
notclose=~(isnan | isclose),
res=res,
)
if plot:
allUnitNorm_debugPlot(debugData, plot)
if debug:
return res, debugData
return res
def allUnitNorm_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('allUnitNorm'))
ax1, ax2, ax3 = fig.subplots(3, 1, sharex=True)
N = debug['norm'].size
style = '.-' if N <= 100 else '-'
vec = debug['vec'].reshape((N, -1))
norm = debug['norm'].flatten()
ax1.plot(vec, style)
ax1.plot(norm, style, color='k', lw=1, label='norm')
ax1.set_title(f'input vector, {debug["vec"].shape}, and norm')
ax1.legend()
ax2.axhline(0, color='r')
ax2.plot(norm - 1, style)
maxdiff = np.nanmax(np.abs(norm - 1))
ax2.set_title(f'deviation of norm from 1 (norm - 1), max diff: {maxdiff}')
x = np.arange(N)
y = np.ones_like(x)
ax3.plot(x[debug['notclose'].flatten()], 2*y[debug['notclose'].flatten()], '.C3', label='not close')
ax3.plot(x[debug['isnan'].flatten()], 1*y[debug['isnan'].flatten()], '.C1', label='is NaN')
ax3.plot(x[debug['isclose'].flatten()], 0*y[debug['isclose'].flatten()], '.C2', label='is close')
ax3.set_title(f'unit quaternion check result: {debug["res"]}')
ax3.set_yticks([0, 1, 2])
ax3.set_yticklabels(['close', 'NaN', 'NOT close'])
ax3.set_ylim(-0.2, 2.2)
ax3.legend()
for ax in (ax1, ax2, ax3):
ax.grid()
[docs]
def randomUnitVec(N=None, M=3, debug=False, plot=False):
"""
Generate random unit vectors.
M denotes the number of elements in the vector (default: 3).
If N is None (default), a single random quaternion is returned as an array with shape (M,).
If N is an integer, N random quaternions are returned as an (N, M) array.
If N is a tuple, it denotes the shape of the output vectors, e.g. N=(5, 20) returns 100 random vectors as a
(5, 20, 3) array.
See also :meth:`qmt.randomQuat`.
:param N: number of vectors to generate
:param M: number of elements of each unit vector (default: 3)
:return:
- **vec**: vector output array, (..., M)
- **debug**: dict with debug values (only if debug==True)
"""
assert isinstance(M, int)
if N is None:
shape = (M,)
elif isinstance(N, tuple):
shape = N + (M,)
else:
shape = (N, M)
# Use normal distribution (instead of uniform distribution) since normalizing the resulting vectors will
# distribute them equally on the sphere surface.
vec = normalized(np.random.standard_normal(shape))
if debug or plot:
debugData = dict(
vec=vec,
norm=vecnorm(vec),
N=N,
M=M,
)
if plot:
randomUnitVec_debugPlot(debugData, plot)
if debug:
return vec, debugData
return vec
def randomUnitVec_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('randomUnitVec'))
ax1, ax2 = fig.subplots(2, 1, sharex=True)
vec = debug['vec'].reshape(-1, debug['vec'].shape[-1])
norm = debug['norm'].flatten()
style = '.-' if vec.shape[0] <= 100 else '-'
ax1.plot(vec, style)
ax2.plot(norm, style, color='k', lw=1, label='norm')
ax1.set_title(f'output vector, {debug["vec"].shape}')
ax2.set_title('norm')
ax1.legend('xyz')
ax1.grid()
ax2.grid()
fig.tight_layout()
[docs]
def vecInterp(vec, ind, extend=True, debug=False, plot=False):
"""
Interpolates an array at (non-integer) indices using linear interpolation.
Sampling indices are in the range 0..N-1. For values outside of this range, depending on "extend", the first/last
element or NaN is returned.
See also :meth:`qmt.quatInterp`.
:param vec: array of input data, (N, P)
:param ind: vector containing the sampling indices, shape (M,)
:param extend: if true, the input data is virtually extended by the first/last value
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **out**: interpolated values, (M, P) or (P,) if ind is scalar
- **debug**: dict with debug values (only if debug==True)
"""
vec = np.asarray(vec, float)
ind = np.asarray(ind, float)
isScalar = ind.ndim == 0
ind = np.atleast_1d(ind)
N = vec.shape[0]
M = ind.shape[0]
P = vec.shape[1]
assert vec.shape == (N, P)
assert ind.shape == (M,)
ind0 = np.clip(np.floor(ind).astype(int), 0, N - 1)
ind1 = np.clip(np.ceil(ind).astype(int), 0, N - 1)
v0 = vec[ind0]
v1 = vec[ind1]
v_1_0 = v1 - v0
t01 = ind - ind0
out = v0 + t01[:, None] * v_1_0
if not extend:
out[ind < 0] = np.nan
out[ind > N - 1] = np.nan
if isScalar:
out = out.reshape((P,))
if debug or plot:
debugData = dict(
vec=vec,
N=N,
M=M,
P=P,
ind=ind,
out=out,
)
if plot:
vecInterp_debugPlot(debugData, plot)
if debug:
return out, debugData
return out
def vecInterp_debugPlot(debug, fig):
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('vecInterp'))
ax = fig.subplots(1, 1)
style = '.-' if debug['N'] <= 100 else '-'
ax.plot(debug['vec'], style)
ax.set_prop_cycle(None)
ax.plot(debug['ind'], debug['out'].reshape(-1, debug['P']), 'x-')
ax.set_title(f'input vec [{debug["vec"].shape}, circles]; interpolated vec: [{debug["out"].shape}, crosses]')
ax.grid()
fig.tight_layout()
[docs]
def nanInterp(signal, quatdetect=True, debug=False, plot=False):
"""
Fills NaN samples by linear interpolation or quaternion slerp based on the previous and next non-NaN sample.
NaN samples at the beginning and end are set to the first/last non-NaN sample. A sample is defined as a NaN sample
if at least one element is NaN.
:param signal: input signal, shape (N, M) or (N,)
:param quatdetect: if True and M==4, use slerp instead of linear interpolation
:param debug: enables returning debug data
:param plot: enables the debug plot
:return:
- **out**: interpolated signal with the same shape
- **debug**: dict with debug values (only if debug==True)
"""
signal = np.asarray(signal, float)
out = signal.copy()
is1D = out.ndim == 1
if is1D:
out = out[:, None]
nanind = np.any(np.isnan(out), axis=1)
validind = ~nanind
# cf https://stackoverflow.com/questions/6518811/interpolate-nan-values-in-a-numpy-array
xp = validind.nonzero()[0]
ind = np.interp(nanind.nonzero()[0], xp, np.arange(xp.shape[0]))
if quatdetect and out.shape[1] == 4:
from qmt.functions.quaternion import quatInterp
out[nanind] = quatInterp(out[validind], ind)
mode = 'quatInterp'
else:
out[nanind] = vecInterp(out[validind], ind)
mode = 'vecInterp'
if is1D:
out.shape = signal.shape
if debug or plot:
debugData = dict(signal=signal, out=out, nanind=nanind, mode=mode)
if plot:
nanInterp_debugPlot(debugData, plot)
if debug:
return out, debugData
return out
def nanInterp_debugPlot(debug, fig):
signal = debug['signal']
out = debug['out']
nanind = debug['nanind']
style = '.-' if debug['nanind'].size <= 100 else '-'
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('nanInterp'))
ax1, ax2 = fig.subplots(2, 1, sharex=True)
ax1.plot(signal, style, label='in')
ax1.legend(loc='upper left')
ax1b = ax1.twinx()
ax1b.plot(nanind, 'r', label='nan')
ax1b.legend(loc='upper right')
ax1.set_title(f'input signal, shape {signal.shape}, and nan indices (red)')
ax2.plot(out, style, label='out')
ax2.set_title(f'out (interpolation mode: {debug["mode"]}), shape {out.shape}')
for ax in ax1, ax2:
ax.grid()
fig.tight_layout()
[docs]
def rms(x, axis=0, debug=False, plot=False):
"""
Calculates root-mean-square (RMS) values.
For a 1D array, the RMS over all elements is calculated.
For arrays with more dimensions, the RMS is calculated along a specified axis (default: 0).
NaNs in input array will be ignored.
:param x: input array
:param axis: axis along which the means are computed
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **out**: root mean square
- **debug**: dict with debug values (only if debug==True)
"""
x = np.asarray(x, float)
out = np.sqrt(np.nanmean(x ** 2, axis=axis))
if debug or plot:
debugData = dict(x=x, out=out, axis=axis)
if plot:
rms_debugPlot(debugData, plot)
if debug:
return out, debugData
return out
def rms_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
fig.suptitle(AutoFigure.title('rms'))
ax1, ax2 = fig.subplots(2, 1, sharex=True)
style = '.-' if max(debug['x'].size, debug['out'].size) <= 400 else '-'
ax1.plot(debug['x'], style)
ax1.set_title(f'input x, {debug["x"].shape}')
ax1.grid()
ax2.plot(debug['out'], style)
ax2.set_title(f'out, {debug["out"].shape}')
ax2.grid()
fig.tight_layout()