Source code for qmt.functions.opt_imu_alignment

# SPDX-FileCopyrightText: 2021 Daniel Laidig <laidig@control.tu-berlin.de>
#
# SPDX-License-Identifier: MIT

from itertools import product

from qmt.functions.quaternion import eulerAngles, quatFromAngleAxis, qmult, rotate, qinv, quatToGyrStrapdown, \
    quatFrom2Axes, headingInclinationAngle, quatProject
from qmt.functions.utils import timeVec, nanInterp, rms, nanUnwrap
from qmt.utils.misc import setDefaults

import numpy as np
import scipy.signal
import scipy.optimize

from qmt.utils.plot import AutoFigure


def _nanButterFiltfilt(signal, rate, fc, N=2):
    is1D = signal.ndim == 1
    if is1D:
        signal = signal[:, None]
    [b, a] = scipy.signal.butter(N, fc / (rate / 2))
    signal_nonan = nanInterp(signal)
    filtered = scipy.signal.filtfilt(b, a, signal_nonan, axis=0)
    nanind = np.any(np.isnan(signal), axis=1)
    filtered[nanind] = np.nan
    if is1D:
        filtered.shape = (filtered.shape[0], )
    return filtered


def _central2ndOrderFiniteDifference(data, rate=1):
    """
    Calculates central 2nd order finite difference.

    f''(x) = (f(x-h) - 2f(x) + f(x+h)) / h^2

    The first and last value are always 0. If the input is a matrix, the difference is calculated for each column
    independently.

    :param data: array containing the input data (NxM)
    :param rate: sampling frequency of the signal
    :return: 2nd order difference (NxM)
    """
    diff = np.zeros(data.shape)
    diff[1:-1] += data[:-2]  # f(x-h)
    diff[1:-1] += -2 * data[1:-1]  # -2f(x)
    diff[1:-1] += data[2:]  # f(x+h)

    return diff / ((1.0 / rate) ** 2)  # / h^2


def _detectMovement(opt_quat, rate, th=0.3, restLength=5.0, shift=0.5, fc1=10.0, fc2=0.2, plot=False):
    opt_gyr = quatToGyrStrapdown(opt_quat, rate)
    opt_gyr = _nanButterFiltfilt(opt_gyr, rate, fc1)
    opt_gyr_norm = np.linalg.norm(opt_gyr, axis=1)
    opt_gyr_norm_lp = _nanButterFiltfilt(opt_gyr_norm, rate, fc2)

    with np.errstate(invalid='ignore'):
        rest = opt_gyr_norm_lp < th
    rest_diff = np.diff(np.concatenate(([0.], rest, [0.])))
    starts = np.argwhere(rest_diff == 1)
    stops = np.argwhere(rest_diff == -1)
    assert len(starts) == len(stops)

    rest_sections = [(start.item() if start.item() == 0 else start.item() + int(shift * rate),
                      stop.item() if stop.item() == opt_gyr_norm_lp.shape[0] else stop.item() - int(shift * rate))
                     for start, stop in zip(starts, stops) if stop - start > restLength * rate]

    movement_ind = np.ones((opt_gyr_norm_lp.shape[0]), dtype=bool)
    for start, stop in rest_sections:
        movement_ind[start:stop] = 0

    if plot:
        import matplotlib.pyplot as plt
        t = timeVec(N=opt_gyr.shape[0], rate=rate)
        axes = plt.gcf().subplots(3, 1)
        axes[0].plot(t, np.rad2deg(opt_gyr), '-', '', lw=1.0)
        axes[0].set_title(f'opt_gyr [°/s, {opt_gyr.shape} @ {rate:.1f} Hz]')
        axes[0].grid()

        axes[1].plot(t, opt_gyr_norm, label='orig')
        axes[1].plot(t, opt_gyr_norm_lp, label='lp')
        axes[1].axhline(th, color='r')
        axes[1].set_ylim(0, 3 * th)
        axes[1].set_title('opt_gyr_norm before and after filtering, with threshold')
        axes[1].grid()
        axes[1].legend()

        axes[2].plot(t, movement_ind, 'r', lw=4)
        axes[2].set_title('movement_ind')
        axes[2].grid()

    return movement_ind, rest_sections


def _inclQuat(x0, x1):
    norm = np.sqrt(x0**2 + x1**2)
    return quatFromAngleAxis(norm, [x0, x1, 0])


def _quatFromX(q_init, x):
    assert x.shape == (3,), x.shape
    n = np.linalg.norm(x)
    return qmult(q_init, quatFromAngleAxis(n, x))


def _addEntry(alignment, name, quat, delta=None, verbose=False):
    assert quat.shape == (4,)
    euler_deg = np.round(np.rad2deg(eulerAngles(quat)), 3)
    total_deg = np.round(np.rad2deg(2 * np.arccos(np.abs(quat[0]))), 3)
    alignment[f'{name}'] = quat
    alignment[f'{name}_euler_deg'] = euler_deg
    alignment[f'{name}_total_deg'] = total_deg
    if delta is not None:
        alignment[f'{name}_delta'] = delta
        alignment[f'{name}_delta_deg'] = np.rad2deg(delta)
    if verbose:
        with np.printoptions(precision=3, suppress=True):
            deltatext = f', delta: {np.rad2deg(delta):.2f}°' if delta is not None else ''
            print(f'alignment: {name} zxy: {euler_deg}°, total: {total_deg:.3f}°{deltatext}')


[docs] def alignOptImu(imu_gyr, imu_acc, imu_mag, opt_quat, opt_pos, rate, params=None, names=None, debug=False, plot=False): """ Determines sensor-to-segment and imu-to-opt reference frame alignment quaternions. See Appendix C of https://doi.org/10.3390/data6070072 for more information about (a subset of) this algorithm. :param imu_gyr: gyroscope measurements in rad/s :param imu_acc: accelerometer measurements in m/s² :param imu_mag: magnetometer measurements :param opt_quat: orientation from optical motion capture :param opt_pos: position from optical motion capture :param rate: sampling rate in seconds :param params: additional parameters, defaults: fullEOpt2EImu=True, accbias=True, r=0.5, fc_gyr=10.0, fc_grav=0.1, fc_acc=10.0, fc_pos=10.0, fc_mag=1.0, fast=False, init_quats=None, verbose=False :param names: names for different segments :param debug: enables debug output :param plot: enables debug plot :return: - **alignment**: dict with alignment information - **debug**: dict with debug values (only if debug==True) """ defaults = dict(fullEOpt2EImu=True, accbias=True, r=0.5, fc_gyr=10.0, fc_grav=0.1, fc_acc=10.0, fc_pos=10.0, fc_mag=1.0, fast=False, init_quats=None, verbose=False) params = setDefaults(params, defaults) if isinstance(imu_gyr, np.ndarray): imu_gyr = [imu_gyr] if isinstance(imu_acc, np.ndarray): imu_acc = [imu_acc] if isinstance(imu_mag, np.ndarray): imu_mag = [imu_mag] if isinstance(opt_quat, np.ndarray): opt_quat = [opt_quat] if isinstance(opt_pos, np.ndarray): opt_pos = [opt_pos] N = len(imu_gyr) unique_names = list(set(names)) if names is not None else [None] M = len(unique_names) assert len(imu_acc) == N assert imu_mag is None or len(imu_mag) == N assert len(opt_quat) == N assert opt_pos is None or len(opt_pos) == N assert len(imu_gyr) == 1 or M <= N segInd = [unique_names.index(n) for n in names] if names is not None else list(range(N)) # print('segInd', segInd) alignment = dict() if len(imu_gyr) > 1: alignment['names'] = names def imu2segName(i): return f'qImu2Seg_{unique_names[i]}' if M > 1 else 'qImu2Seg' def alignHeading(qEOpt2EImu): fc = params['fc_mag'] imu_mag_EImu = [] for i in range(N): quat = qmult(opt_quat[i], alignment[imu2segName(i)]) quat = qmult(qEOpt2EImu, quat) imu_mag_EImu.append(_nanButterFiltfilt(rotate(quat, imu_mag[i]), rate, fc)) imu_mag_EImu = np.vstack(imu_mag_EImu) norm = np.linalg.norm(imu_mag_EImu, axis=1) dip = np.arccos(imu_mag_EImu[:, 2]/norm) median_norm = np.nanmedian(norm) median_dip = np.nanmedian(dip) delta = nanUnwrap(np.arctan2(-imu_mag_EImu[:, 0], imu_mag_EImu[:, 1])[:, None])[:, 0] norm_th = 0.05 dip_th = np.deg2rad(5) with np.errstate(invalid='ignore'): while True: valid = np.logical_and( np.logical_and(norm/median_norm > (1 - norm_th), norm/median_norm < (1 + norm_th)), np.logical_and(dip > median_dip - dip_th, dip < median_dip + dip_th)) if np.mean(valid) >= 0.5: break norm_th += 0.01 dip_th += np.deg2rad(1) if norm_th != 0.05: print(f'warning: mag disturbance thresholds increased to norm_th={norm_th:.2f} and ' f'dip_th={np.rad2deg(dip_th):.1f}°, valid: {100 * np.mean(valid):.1f}%') # assert np.mean(valid) > 0.4, f'magnetic field seems to be too disturbed to estimate heading ' \ # f'({100*np.mean(valid):.1f}% valid)' delta = np.nanmean(delta[valid]) qEOpt2EImu = qmult(quatFromAngleAxis(-delta, [0, 0, 1]), qEOpt2EImu) return qEOpt2EImu, delta, norm_th, dip_th initQuats = params['init_quats'] if params['fast']: assert initQuats is None initQuats = np.array([[1, 0, 0, 0]], float) if initQuats is None: initQuats = [] for ax1, sign1, ax2, sign2 in product(range(3), (1, -1), range(3), (1, -1)): if ax1 == ax2: continue x = np.zeros(3) x[ax1] = sign1 y = np.zeros(3) y[ax2] = sign2 initQuats.append(quatFrom2Axes(x=x, y=y, z=[0, 0, 0])) initQuats = np.array(initQuats) if opt_pos is not None: fc_pos = params['fc_pos'] fc_acc = params['fc_acc'] opt_acc_nograv_EOpt = [] for i in range(N): acc = _central2ndOrderFiniteDifference(_nanButterFiltfilt(opt_pos[i], rate, fc_pos), rate) acc = _nanButterFiltfilt(acc, rate, fc_acc) opt_acc_nograv_EOpt.append(acc) opt_acc_nograv_EOpt = np.vstack(opt_acc_nograv_EOpt) else: opt_acc_nograv_EOpt = None fc_acc = params['fc_grav'] opt_gyr_lp = [quatToGyrStrapdown(opt_quat[i], rate) for i in range(N)] # low pass filter the rotation rates with a cutoff frequency of 10 Hz to increase robustness if params['fc_gyr'] > 0: opt_gyr_lp = [_nanButterFiltfilt(g, rate, params['fc_gyr']) for g in opt_gyr_lp] imu_gyr_lp = [_nanButterFiltfilt(imu_gyr[i], rate, params['fc_gyr']) for i in range(N)] else: imu_gyr_lp = imu_gyr def costFn(x, r, qImu2SegInit, getInfo=False): qImu2Seg = [_quatFromX(qImu2SegInit[i], x[3*i:3*i+3]) for i in range(M)] bias = [x[3*M+3*i:3*M+3*i+3] for i in range(N)] # print(x, qImu2Seg, bias) opt_gyr_rot = [rotate(qinv(qImu2Seg[segInd[i]]), opt_gyr_lp[i]) for i in range(N)] imu_acc_EOpt = [] accbias = x[-4:-1] if params['accbias'] else np.zeros(3) for i in range(N): qImu2EOpt = qmult(opt_quat[i], qImu2Seg[segInd[i]]) acc = rotate(qImu2EOpt, imu_acc[i]-accbias) acc = _nanButterFiltfilt(acc, rate, fc_acc) imu_acc_EOpt.append(acc) imu_acc_EOpt = np.vstack(imu_acc_EOpt) qEOpt2EImu = (_inclQuat(x[-3-3], x[-2-3]) if params['accbias'] else _inclQuat(x[-3], x[-2])) \ if params['fullEOpt2EImu'] else _inclQuat(0, 0) g = x[-1] grav_EOpt = rotate(qinv(qEOpt2EImu), np.array([0, 0, g], float)) gyr_diff = [imu_gyr_lp[i] - bias[i][None] - opt_gyr_rot[i] for i in range(N)] gyr_cost = rms(np.linalg.norm(np.vstack(gyr_diff), axis=1)) if opt_acc_nograv_EOpt is None: acc_diff = imu_acc_EOpt - grav_EOpt else: acc_diff = imu_acc_EOpt - (opt_acc_nograv_EOpt + grav_EOpt) acc_cost = rms(np.linalg.norm(acc_diff, axis=1)) cost = r*gyr_cost + (1-r)*acc_cost if getInfo: return dict( cost=cost, gyr_cost=gyr_cost, acc_cost=acc_cost, qImu2Seg=qImu2Seg, qEOpt2EImu=qEOpt2EImu, bias=bias, g=g, accbias=accbias, ) return cost best = None log = [] for qImu2SegInit in initQuats: r = params['r'] assert 0.0 <= r <= 1.0 args = (r, M*[qImu2SegInit]) init = np.zeros((3*N+3*M+3+3 if params['accbias'] else 3*N+3*M+3) if params['fullEOpt2EImu'] else 3*N+3*M+1+3) init[-1] = 9.8 res = scipy.optimize.minimize(costFn, init, args=args, method='BFGS') info = costFn(res.x, *args, getInfo=True) entry = [res.x, info, res.fun] log.append(entry) if best is None or best[-1] > res.fun: best = entry info = best[1] success_th = 1.01 * best[-1] success_rate = len([e for e in log if e[-1] <= success_th]) / len(log) if success_rate < 0.5: print(f'warning: low success_rate: {success_rate*100:.1f} %') info['sucess_rate'] = success_rate info['init_vals'] = len(log) if params['verbose']: print(f'success rate: {success_rate*100:.1f} %') for i in range(M): _addEntry(alignment, imu2segName(i), info['qImu2Seg'][i], verbose=params['verbose']) qEOpt2EImu = best[1]['qEOpt2EImu'] delta = None if imu_mag is not None: _addEntry(alignment, 'qEOpt2EImu_6D', qEOpt2EImu, verbose=params['verbose']) qEOpt2EImu, delta, info['norm_th'], info['dip_th'] = alignHeading(qEOpt2EImu) _addEntry(alignment, 'qEOpt2EImu', qEOpt2EImu, delta, verbose=params['verbose']) info['opt_pos_available'] = opt_pos is not None del info['qImu2Seg'] del info['qEOpt2EImu'] alignment['info'] = info del params['verbose'] alignment['params'] = params if debug or plot: from qmt import oriEstIMU imu_quat = [ # calculate some IMU orientation estimate to be able to calculate errors oriEstIMU(imu_gyr[i], imu_acc[i], None if imu_mag is None else imu_mag[i], dict(Ts=1/rate)) for i in range(N) ] qRelEarth = qmult(np.vstack(imu_quat), qinv(np.vstack(opt_quat))) heading_error_before, inclination_error_before = headingInclinationAngle(qRelEarth) opt_quat_aligned = [qmult(qmult(qEOpt2EImu, opt_quat[i]), alignment[imu2segName(i)]) for i in range(N)] qRelEarth = qmult(np.vstack(imu_quat), qinv(np.vstack(opt_quat_aligned))) heading_error, inclination_error = headingInclinationAngle(qRelEarth) debugData = dict( alignment=alignment, heading_error_before=heading_error_before, inclination_error_before=inclination_error_before, heading_error=heading_error, inclination_error=inclination_error, ) if plot: alignOptImu_debugPlot(debugData, plot) if debug: return alignment, debugData return alignment
def alignOptImu_debugPlot(debug, fig=None): with AutoFigure(fig) as fig: fig.suptitle(AutoFigure.title('alignOptImu')) ax1 = fig.add_subplot(211) ax1.plot(np.rad2deg(np.abs(debug['heading_error_before'])), label='heading') ax1.plot(np.rad2deg(debug['inclination_error_before']), label='inclination') ax1.set_title('heading and inclination error before alignment [°]') ax1.legend() ax1.grid() ax2 = fig.add_subplot(212, sharex=ax1, sharey=ax1) ax2.plot(np.rad2deg(np.abs(debug['heading_error'])), label='heading') ax2.plot(np.rad2deg(debug['inclination_error']), label='inclination') ax2.set_title('heading and inclination error after alignment [°]') ax2.legend() ax2.grid() fig.tight_layout()
[docs] def alignOptImuByMinimizingRmse(imu_quat, opt_quat, params=None, debug=False, plot=False): """ Determines sensor-to-segment and imu-to-opt reference frame alignment quaternions that minimize the orientation difference. Warning: Use this function with care because systematic orientation estimation errors can easily be optimized away. The results are probably only trustworthy if the input orientations are quite accurate (slow movements, calibrated IMU, no magnetic disturbances, good time synchronization, ...) and if the performed movement contains a large variety of different orientations. :param imu_quat: IMU quaternion :param opt_quat: quaternion from optical motion capture :param params: optional parameters to control algorithm behavior, defaults: qImu2Seg=True, qEOpt2EImu=True, delta=True, initQuats=None, verbose=False :param debug: enables debug output :param plot: enables debug plot :return: - **alignent**: dict with alignment quaternions - **debug**: dict with debug values (only if debug==True) """ params = setDefaults(params, dict(qImu2Seg=True, qEOpt2EImu=True, delta=True, initQuats=None, verbose=False)) if params['initQuats'] is not None: assert len(params['initQuats']) == 2 assert params['initQuats'][0].shape == (4,) assert params['initQuats'][1].shape == (4,) qImu2Seg_init = params['initQuats'][0] proj = quatProject(params['initQuats'][1], [0, 0, 1]) delta_init = proj['projAngle'] qEOpt2EImu_6D_init = proj['resQuat'] assert np.isclose(qEOpt2EImu_6D_init[-1], 0) else: qImu2Seg_init = np.array([1, 0, 0, 0], float) qEOpt2EImu_6D_init = np.array([1, 0, 0, 0], float) delta_init = 0.0 def unpackX(x): if len(x) in (3, 5): qImu2Seg = _quatFromX(qImu2Seg_init, x[:3]) else: qImu2Seg = np.array([1, 0, 0, 0], float) if len(x) in (2, 5): qEOpt2EImu_6D = qmult(qEOpt2EImu_6D_init, _inclQuat(x[-2], x[-1])) else: qEOpt2EImu_6D = np.array([1, 0, 0, 0], float) return qImu2Seg, qEOpt2EImu_6D def costFn(x): qImu2Seg, qEOpt2EImu_6D = unpackX(x) opt_quat_aligned = qmult(qmult(qEOpt2EImu_6D, opt_quat), qImu2Seg) qRelEarth = qmult(imu_quat, qinv(opt_quat_aligned)) heading, inclination = headingInclinationAngle(qRelEarth) return rms(inclination) init = np.zeros((3 if params['qImu2Seg'] else 0) + (2 if params['qEOpt2EImu'] else 0), float) res = scipy.optimize.minimize(costFn, init, method='BFGS') inclRmse = res.fun qImu2Seg, qEOpt2EImu_6D = unpackX(res.x) def costFn(x): qEOpt2EImu = qmult(quatFromAngleAxis(x.item(), [0, 0, 1]), qEOpt2EImu_6D) opt_quat_aligned = qmult(qmult(qEOpt2EImu, opt_quat), qImu2Seg) qRelEarth = qmult(imu_quat, qinv(opt_quat_aligned)) heading, inclination = headingInclinationAngle(qRelEarth) return rms(heading) res = scipy.optimize.minimize(costFn, np.array([delta_init], float), method='BFGS') headingRmse = res.fun delta = res.x.item() qEOpt2EImu = qmult(quatFromAngleAxis(delta, [0, 0, 1]), qEOpt2EImu_6D) alignment = {} _addEntry(alignment, 'qImu2Seg', qImu2Seg, verbose=params['verbose']) _addEntry(alignment, 'qEOpt2EImu_6D', qEOpt2EImu_6D, verbose=params['verbose']) _addEntry(alignment, 'qEOpt2EImu', qEOpt2EImu, delta, verbose=params['verbose']) alignment.update(dict(delta=delta, inclRmse=inclRmse, headingRmse=headingRmse)) if debug or plot: qRelEarth = qmult(imu_quat, qinv(opt_quat)) heading_error_before, inclination_error_before = headingInclinationAngle(qRelEarth) opt_quat_aligned = qmult(qmult(qEOpt2EImu, opt_quat), qImu2Seg) qRelEarth = qmult(imu_quat, qinv(opt_quat_aligned)) heading_error, inclination_error = headingInclinationAngle(qRelEarth) debugData = dict( alignment=alignment, heading_error_before=heading_error_before, inclination_error_before=inclination_error_before, heading_error=heading_error, inclination_error=inclination_error, ) if plot: alignOptImuByMinimizingRmse_debugPlot(debugData, plot) if debug: return alignment, debugData return alignment
def alignOptImuByMinimizingRmse_debugPlot(debug, fig=None): with AutoFigure(fig) as fig: fig.suptitle(AutoFigure.title('alignOptImuByMinimizingRmse')) ax1 = fig.add_subplot(211) ax1.plot(np.rad2deg(np.abs(debug['heading_error_before'])), label='heading') ax1.plot(np.rad2deg(debug['inclination_error_before']), label='inclination') ax1.set_title('heading and inclination error before alignment [°]') ax1.legend() ax1.grid() ax2 = fig.add_subplot(212, sharex=ax1, sharey=ax1) ax2.plot(np.rad2deg(np.abs(debug['heading_error'])), label='heading') ax2.plot(np.rad2deg(debug['inclination_error']), label='inclination') ax2.set_title('heading and inclination error after alignment [°]') ax2.legend() ax2.grid() fig.tight_layout()