# SPDX-FileCopyrightText: 2021 Daniel Laidig <laidig@control.tu-berlin.de>
#
# SPDX-License-Identifier: MIT
import numpy as np
import scipy.signal
import scipy.optimize
from qmt.utils.struct import Struct
from qmt.functions.quaternion import quatInterp, quatToGyrStrapdown
from qmt.functions.utils import timeVec, vecInterp, rms
from qmt.utils.misc import setDefaults
from qmt.utils.plot import AutoFigure, extendYlim
[docs]
class SyncMapper:
"""
Converts timestamps and indices for different data streams based on specified time/index mappings.
"""
def __init__(self, syncInfo):
"""
syncInfo is a dict data structure specifying the available recordings and the sync mappings.
example using YAML syntax and only using the fields used by this class
more fields may be added for other applications):
.. code-block:: yaml
data:
imu_data:
rate: 200
video1_part1:
rate: 25
video1_part2:
rate: 25
video2:
rate: 50
sync:
- [video1_part1, t, 0, video2, t, 1.85]
- [video1_part2, t, 0, video2, t, 1145.2]
- [video2, t, 0, imu_data, t, 0]
The mapping should be an undirected connected graph without any cycles. An exception is thrown if the graph is
not connected, i.e. if one or more mappings are missing. Cycles (e.g. adding
[video1_part1, t, 0, imu_data, t, 0] to the example) are not detected and may lead to inconsistent data.
Currently, only one sync mapping can be provided for each data set and the rate has to be known. In the future,
support for estimating the rate based on multiple measurements may be added.
"""
if isinstance(syncInfo, Struct):
syncInfo = syncInfo.data
self.syncInfo = syncInfo
self._nameToIdx = {n: i for i, n in enumerate(syncInfo['data'].keys())}
self._mapA = None # matrices with index mappings; ind2 = a12*ind1 + b12
self._mapB = None
def _process(self):
N = len(self._nameToIdx)
self._mapA = np.full((N, N), np.nan)
self._mapB = np.full((N, N), np.nan)
for sync in self.syncInfo['sync']:
# fill A and B matrix with the given sync pairs
name1, mode1, val1, name2, mode2, val2 = sync
rate1 = self.syncInfo['data'][name1]['rate']
rate2 = self.syncInfo['data'][name2]['rate']
i = self._nameToIdx[name1]
j = self._nameToIdx[name2]
if mode1 == 'ind':
ind1 = val1
else:
assert mode1 == 't'
ind1 = val1*rate1
if mode2 == 'ind':
ind2 = val2
else:
assert mode2 == 't'
ind2 = val2*rate2
a_ij = float(rate2)/float(rate1)
b_ij = float(ind2) - a_ij*float(ind1)
self._mapA[i, j] = a_ij
self._mapB[i, j] = b_ij
self._mapA[j, i] = 1/a_ij # inverse mapping
self._mapB[j, i] = -b_ij/a_ij
# try to fill remaining mappings
while True:
foundNew = False
for i, k in np.ndindex(self._mapA.shape):
if i == k or not np.isnan(self._mapA[i, k]):
continue
for j in range(N):
if j == i or j == k:
continue
if np.isnan(self._mapA[i, j]) or np.isnan(self._mapA[j, k]):
continue
# there is a way from i to j and a way from j to k, add the concatenation to get from i to k
self._mapA[i, k] = self._mapA[i, j] * self._mapA[j, k]
self._mapB[i, k] = self._mapA[j, k] * self._mapB[i, j] + self._mapB[j, k]
foundNew = True
break
if not foundNew:
break
np.fill_diagonal(self._mapA, 1)
np.fill_diagonal(self._mapB, 0)
if np.isnan(self._mapA).any():
print(self._mapA)
print(self._mapB)
print(self._nameToIdx)
raise RuntimeError('sync mapping is incomplete')
return True
[docs]
def map(self, nameFrom, modeFrom, valFrom, nameTo, modeTo):
"""
Maps a timestamp/index from one data stream to a timestamp/index of another data stream.
Example call to get the index in imu_data corresponding to 25 s of video1_part2:
sync.map('video1_part2', 't', 25.0, 'imu_data', 'ind')
"""
assert modeFrom in ('t', 'ind')
assert modeTo in ('t', 'ind')
assert nameFrom in self._nameToIdx, f'{nameFrom!r} not in {list(self._nameToIdx.keys())}'
assert nameTo in self._nameToIdx, f'{nameTo!r} not in {list(self._nameToIdx.keys())}'
if self._mapA is None:
self._process()
if nameFrom == nameTo:
rate = self.syncInfo['data'][nameFrom]['rate']
if modeFrom == 'ind' and modeTo == 't':
return valFrom/rate
elif modeFrom == 't' and modeTo == 'ind':
return valFrom*rate
else:
assert modeFrom == modeTo
return valFrom
if modeFrom == 'ind':
indFrom = valFrom
else:
assert modeFrom == 't'
indFrom = valFrom * self.syncInfo['data'][nameFrom]['rate']
i = self._nameToIdx[nameFrom]
j = self._nameToIdx[nameTo]
indTo = self._mapA[i, j] * indFrom + self._mapB[i, j]
if modeTo == 'ind':
return indTo
else:
assert modeTo == 't'
return indTo / self.syncInfo['data'][nameTo]['rate']
def timeToInd(self, name, t):
return self.map(name, 't', t, name, 'ind')
def indToTime(self, name, ind):
return self.map(name, 'ind', ind, name, 't')
[docs]
def resample(self, nameFrom, signal, nameTo, N, method='auto'):
"""
Resamples data (``signal`` sampled in ``nameFrom``) to match data sampled in a different data stream
(``nameTo``, consisting of ``N`` samples). By default, slerp is used when the input is in shape (M, 4)
and linear interpolation is used otherwise.
:param nameFrom: name of the original data stream
:param signal: data, shape (M, P)
:param nameTo: name of the target data stream
:param N: number of samples of the target data
:param method: 'vecInterp'/'quatInterp'/'auto'
:return: (N, P) resampled signal
"""
assert signal.ndim == 2
if method == 'auto':
if signal.shape[1] == 4:
method = 'quatInterp'
else:
method = 'vecInterp'
assert method in ('vecInterp', 'quatInterp')
ind = self.map(nameTo, 'ind', np.arange(N), nameFrom, 'ind')
if method == 'vecInterp':
return vecInterp(signal, ind)
elif method == 'quatInterp':
return quatInterp(signal, ind)
else:
raise RuntimeError(f'invalid method "{method}"')
def _rmsecorrelate(longer, shorter):
outN = longer.shape[0] - shorter.shape[0] + 1
overlapN = shorter.shape[0]
out = np.empty(outN)
for shift in range(outN):
out[shift] = -rms(shorter - longer[shift:shift+overlapN])
return out
def _syncOptImuOffsetViaGyr(imuGyr, imuRate, optQuat, optRate, syncRate=1000.0, cut=0.15, fc=10, correlate='rmse',
debug=False):
"""
Syncs IMU and optical data by cross correlation of rotation rates.
The shorter signal is cut even more (by default to 70%) to increase the search range if the IMU recording does not
strictly lie within the optical recording or vice versa.
:param imuGyr: IMU gyroscope data in rad/s
:param imuRate: IMU sampling rate
:param optQuat: quaternion derived from optical data
:param optRate: sampling rate of optical data
:param syncRate: sampling rate used for syncing
:param cut: amount of data to cut away at start/end of the shorter signal (default: 15 %, i.e. 70 % of the shorter
signal are used)
:param fc: cutoff frequency for gyr low pass filters (<= 0 to disable)
:param correlate: correlation method ('rmse' or 'linear')
:param debug: returns additional debugging information
:return: time offset (i.e. when the optical recording started in IMU time)
"""
assert imuGyr.ndim == 2 and imuGyr.shape[1] == 3
assert optQuat.ndim == 2 and optQuat.shape[1] == 4
imuGyrInterp = vecInterp(imuGyr, np.arange(0, imuGyr.shape[0], imuRate / syncRate))
optQuatInterp = quatInterp(optQuat, np.arange(0, optQuat.shape[0], optRate / syncRate))
optGyrInterp = quatToGyrStrapdown(optQuatInterp, syncRate)
# low pass filter the rotation rates with a cutoff frequency of 10 Hz to increase robustness
if fc > 0:
[b, a] = scipy.signal.butter(2, fc / (syncRate / 2))
validInd = ~np.any(np.isnan(optGyrInterp), axis=1)
optGyrInterp[validInd] = scipy.signal.filtfilt(b, a, optGyrInterp[validInd], axis=0)
validInd = ~np.any(np.isnan(imuGyrInterp), axis=1)
imuGyrInterp[validInd] = scipy.signal.filtfilt(b, a, imuGyrInterp[validInd], axis=0)
optGyrNorm = np.linalg.norm(optGyrInterp, axis=1)
imuGyrNorm = np.linalg.norm(imuGyrInterp, axis=1)
if correlate == 'linear':
corrfn = np.correlate
optGyrNorm[np.isnan(optGyrNorm)] = 0
imuGyrNorm[np.isnan(imuGyrNorm)] = 0
elif correlate == 'rmse':
corrfn = _rmsecorrelate
else:
raise ValueError('invalid correlation method: '+correlate)
if imuGyrInterp.shape[0] < optGyrInterp.shape[0]:
cutN = int(cut*imuGyrInterp.shape[0])
xcorr = corrfn(optGyrNorm, imuGyrNorm[cutN:-cutN])
maxind = np.argmax(xcorr)
offset = -maxind + cutN
else:
cutN = int(cut*optGyrInterp.shape[0])
xcorr = corrfn(imuGyrNorm, optGyrNorm[cutN:-cutN])
maxind = np.argmax(xcorr)
offset = maxind - cutN
debugData = None
if debug:
debugData = dict(
offset=offset,
syncRate=syncRate,
xcorr=xcorr,
maxind=maxind,
optGyrNorm=optGyrNorm,
imuGyrNorm=imuGyrNorm,
)
return offset / syncRate, debugData
def _optimizeClockDriftUsingGyr(optGyr, imuGyr, imuInd, fast=False, x=None):
optGyrNorm = np.linalg.norm(optGyr, axis=1)
def costFn(x):
imuGyrInterp = vecInterp(imuGyr, imuInd + np.linspace(x[0], x[1], imuInd.shape[0]))
imuGyrInterp = imuGyrInterp - x[2:]
imuGyrNorm = np.linalg.norm(imuGyrInterp, axis=1)
return rms(imuGyrNorm - optGyrNorm)
if x is not None:
return costFn(x), x
optResult = None
log = []
startVals = [0] if fast else [-100, -10, -1, 0, 1, 10, 100]
for start1 in startVals:
for start2 in startVals:
init = np.array([start1, start2, 0, 0, 0], float)
res = scipy.optimize.minimize(costFn, init, method='BFGS')
log.append([start1, start2, res.x[0], res.x[1], res.x[2:], res.fun])
# print('start: [{start1}, {start2}], x: { x}, cost: {fun}'.format(start1=start1, start2=start2, x=res.x,
# fun=res.fun))
if optResult is None or optResult.fun > res.fun:
optResult = res
shiftStart, shiftEnd, *gyrBias = optResult.x
return shiftStart, shiftEnd, gyrBias, log
[docs]
def syncOptImu(opt_quat, opt_rate, imu_gyr, imu_rate, params, debug=False, plot=False):
"""
Determines time offsets between data recorded using opticial motion capture and IMU data.
The offsets are determined based on angular rates. For the optical data, the angular rates are derived from a
quaternion. Two synchronization steps are performed:
- First, a fixed offset between the data is determined via cross correlation (imu_offsetonly).
- Based on the result of this, both a time offset and an adjusted IMU sampling rate is adjusted to account for
clock drift. This is done by minimizing the RMSE of the gyr norms.
See Appendix B of https://doi.org/10.3390/data6070072 for more information about this algorithm.
:param opt_quat: orientation quaternion from optical mocap system, Nx4
:param opt_rate: sampling rate of opt_quat in Hz
:param imu_gyr: gyroscope measurements from IMU in rad/s, Nx3
:param imu_rate: sampling rate of imu_gyr in Hz
:param params: optional parameters, defaults: syncRate=1000.0, cut=0.15, fc=10.0, correlate='rmse', fast=False
:param debug: enables debug return value
:param plot: enables debug plots
:return: syncInfo dictionary that can be used by :class:`qmt.SyncMapper`
"""
defaults = dict(syncRate=1000.0, cut=0.15, fc=10.0, correlate='rmse', fast=False)
params = setDefaults(params, defaults)
fc = params['fc']
correlate = params['correlate']
offset, shiftDebug = _syncOptImuOffsetViaGyr(imu_gyr, imu_rate, opt_quat, opt_rate, syncRate=params['syncRate'],
cut=params['cut'], fc=fc, correlate=correlate, debug=debug or plot)
syncInfo = {
'data': {
'opt': {
'rate': opt_rate,
},
'imu_offsetonly': {
'rate': imu_rate
},
},
'sync': [
['opt', 't', 0, 'imu_offsetonly', 't', offset]
],
}
sync = SyncMapper(syncInfo)
opt_gyr = quatToGyrStrapdown(opt_quat, opt_rate)
imuInd = sync.map('opt', 'ind', np.arange(opt_quat.shape[0]), 'imu_offsetonly', 'ind')
# low pass filter the rotation rates with a cutoff frequency of 10 Hz to increase robustness
if fc > 0:
[b, a] = scipy.signal.butter(2, fc / (opt_rate / 2))
validInd = ~np.any(np.isnan(opt_gyr), axis=1)
opt_gyr[validInd] = scipy.signal.filtfilt(b, a, opt_gyr[validInd], axis=0)
imu_gyr = imu_gyr.copy()
[b, a] = scipy.signal.butter(2, fc / (imu_rate / 2))
validInd = ~np.any(np.isnan(imu_gyr), axis=1)
imu_gyr[validInd] = scipy.signal.filtfilt(b, a, imu_gyr[validInd], axis=0)
shiftStart, shiftEnd, _, log = _optimizeClockDriftUsingGyr(opt_gyr, imu_gyr, imuInd, fast=params['fast'])
newRate = (imuInd[-1] + shiftEnd - (imuInd[0] + shiftStart)) / (opt_quat.shape[0] - 1) * opt_rate
syncInfo['data']['imu'] = {'rate': newRate.item()}
syncInfo['sync'].append(['opt', 'ind', 0, 'imu', 'ind', imuInd[0] + shiftStart])
# sync = SyncMapper(syncInfo)
# imuInd2 = sync.map('opt', 'ind', np.arange(opt_quat.shape[0]), 'imu', 'ind')
# print(shiftStart, imuInd2[0] - imuInd[0], shiftStart - (imuInd2[0] - imuInd[0]))
# print(shiftEnd, imuInd2[-1] - imuInd[-1], shiftEnd - (imuInd2[-1] - imuInd[-1]))
if debug or plot:
debugData = dict(
syncInfo=syncInfo,
shift=shiftDebug,
opt_gyr=opt_gyr,
imu_gyr=imu_gyr,
opt_rate=opt_rate,
imu_rate=imu_rate,
log=log,
)
if plot:
syncOptImu_debugPlot(debugData, plot)
if debug:
return syncInfo, debugData
return syncInfo
def syncOptImu_debugPlot(debug, figs=None):
with AutoFigure(figs[0] if isinstance(figs, (list, tuple)) else None) as fig:
fig.suptitle(AutoFigure.title('syncOptImu (1)'))
from matplotlib import pyplot as plt
offset = debug['shift']['offset']
xcorr = debug['shift']['xcorr']
maxind = debug['shift']['maxind']
syncRate = debug['shift']['syncRate']
optGyrNorm = debug['shift']['optGyrNorm']
imuGyrNorm = debug['shift']['imuGyrNorm']
plt.subplot(221)
plt.plot(xcorr, '-')
plt.stem([maxind], [xcorr[maxind]], 'r', bottom=np.min(xcorr), use_line_collection=True)
plt.grid()
plt.title('cross correlation')
plt.subplot(222)
N = int(syncRate/10)
plt.plot(xcorr[maxind-N:maxind+N], '*-')
plt.stem([100], [xcorr[maxind]], 'r', bottom=np.min(xcorr[maxind-N:maxind+N]), use_line_collection=True)
plt.grid()
plt.title('cross correlation peak, 0.2 seconds zoom')
plt.subplot(223)
plt.plot(np.arange(optGyrNorm.shape[0]), optGyrNorm, label='opt')
plt.plot(np.arange(imuGyrNorm.shape[0]) - offset, imuGyrNorm, label='imu')
plt.legend()
plt.grid()
plt.title('filtered gyr norm')
plt.subplot(224)
plt.plot(np.arange(optGyrNorm.shape[0]), optGyrNorm, label='opt')
plt.plot(np.arange(imuGyrNorm.shape[0]) - offset, imuGyrNorm, label='imu')
N = int(2*syncRate)
plt.xlim(optGyrNorm.shape[0]/2-N, optGyrNorm.shape[0]/2+N)
plt.legend()
plt.grid()
plt.title('filtered gyr norm, 4 second zoom')
fig.tight_layout(rect=(0, 0, 1, 0.95))
with AutoFigure(figs[1] if isinstance(figs, (list, tuple)) else None) as fig:
plt.suptitle(AutoFigure.title('syncOptImu (2)'))
opt_gyr = debug['opt_gyr']
imu_gyr = debug['imu_gyr']
opt_rate = debug['opt_rate']
log = debug['log']
sync = SyncMapper(debug['syncInfo'])
optGyrNorm = np.linalg.norm(opt_gyr, axis=1)
imuIndNooffset = sync.map('opt', 'ind', np.arange(opt_gyr.shape[0]), 'imu_offsetonly', 'ind')
gyr_nooffset = vecInterp(imu_gyr, imuIndNooffset)
gyrNorm_nooffset = np.linalg.norm(gyr_nooffset, axis=1)
imuIndNodrift = sync.map('opt', 'ind', np.arange(opt_gyr.shape[0]), 'imu', 'ind')
gyr_nodrift = vecInterp(imu_gyr, imuIndNodrift)
gyrNorm_nodrift = np.linalg.norm(gyr_nodrift, axis=1)
t = timeVec(N=optGyrNorm.shape[0], rate=opt_rate)
plt.subplot(221)
plt.plot([entry[2] for entry in log], '.', label='shiftStart')
plt.plot([entry[3] for entry in log], '.', label='shiftEnd')
plt.legend()
plt.grid()
plt.title('shifts (in samples)')
plt.subplot(222)
cost = [entry[-1] for entry in log]
plt.plot(cost, 'C3x', label='cost')
extendYlim(plt.gca(), 0, 2 * np.min(cost))
plt.legend(loc='upper left')
plt.grid()
plt.gca().twinx()
plt.plot([entry[4] for entry in log], '.', label='bias')
plt.legend(loc='upper right')
plt.title('cost function value and gyr bias estimates')
plt.subplot(223)
plt.plot(t, optGyrNorm, label='opt')
plt.plot(t, gyrNorm_nooffset, label='no offset')
plt.plot(t, gyrNorm_nodrift, label='no drift')
plt.legend()
plt.grid()
plt.title('filtered gyr norm')
plt.subplot(224)
plt.plot(t, optGyrNorm, label='opt')
plt.plot(t, gyrNorm_nooffset, label='no offset')
plt.plot(t, gyrNorm_nodrift, label='no drift')
plt.xlim(t[-1] / 2 - 2, t[-1] / 2 + 2)
plt.legend()
plt.grid()
plt.title('filtered gyr norm, 4 second zoom')
fig.tight_layout(rect=(0, 0, 1, 0.95))