# SPDX-FileCopyrightText: 2021 Bo Yang <b.yang@campus.tu-berlin.de>
# SPDX-FileCopyrightText: 2021 Dustin Lehmann <dustin.lehmann@tu-berlin.de>
#
# SPDX-License-Identifier: MIT
import numpy as np
import scipy.signal as signal
from scipy.linalg import lstsq
from scipy import interpolate
import qmt
from qmt.utils.misc import setDefaults
from qmt.utils.plot import AutoFigure
[docs]
def headingCorrection(gyr1, gyr2, quat1, quat2, t, joint, jointInfo, estSettings=None, verbose=False,
debug=False, plot=False):
"""
This function corrects the heading of a kinematic chain of two segments whose orientations are estimated without
the use of magnetometers. It corrects the heading of the second segment in a way that its orientation is estimated
in the reference of the first segment. It uses kinematic constraints for rotational joints to estimate the heading
offset of both segments based on the limited set of possible relative orientations induced by the rotational joint.
There are methods for 1D, 2D and 3D joints, based on different constraints.
Equivalent Matlab function: :mat:func:`+qmt.headingCorrection`.
:param gyr1: Nx3 array of angular velocities in rad/s
:param gyr2: Nx3 array of angular velocities in rad/s
:param quat1: Nx4 array of orientation quaternions
:param quat2: Nx4 array of orientation quaternions
:param t: Nx1 vector of the equidistant sampled time signal
:param joint: Dofx3 array containing the axes of the joint.
:param jointInfo: Dictionary containing additional joint information. Only needed for 3D-joints. Keys:
- **convention**: String of the chosen euler angles convention, e.g. 'xyz'
- **angle_ranges**: 3x2 array with the minimum and maximum value for each joint angle in radians
:param verbose: show all logs in function. Disabled by Default.
:param estSettings: structure containing the needed settings for the estimation:
- **window_time**: width of the window over which the estimation should be performed in seconds
- **estimation_rate**: rate in Hz at which the estimation should be performed. typically below 1Hz is
sufficient
- **data_rate**: rate at which new data is fed into the estimation. Is used to down sample the data for a
faster estimation. Typically values around 5Hz are sufficient
- **alignment**: String of the chosen type of alignment of the estimation window. It describes how the
samples around the current sample at which the estimation takes place are chosen. Possible values:
'backward', 'center', 'forward'
- **enable_stillness**: boolean value if the stillness detection should be enabled
- **stillness_time**: time in seconds which should pass for the stillness detection to be triggered
- **stillness_threshold**: threshold for the angular velocity under which the body is assumed to be at rest.
In rad/s
- **tauDelta**: time constant for the filter in seconds
- **tauBias**: time constant for the filter in seconds
- **delta_range**: array of values that are supposed to be tested for the method for 3D joints.
- **constraint**: type of contraint used for the estimation.
- Constraints for 1D: (proj, euler, euler_2d, 1d_corr).
- Constraints for 2D: (euler, gyro, combined).
- Constraints for 3D: (default)
- **optimizer_steps**: Number of Gauss-Newton steps during optimization.
:param debug: enables debug output
:param plot: enables debug plot
:return:
- **quat2Corr**: Corrected orientation of the second segment in the reference frame of the first segment
- **delta**: value of the estimated heading offset
- **deltaFilt**: value of the filtered heading offset
- **rating**: value of the certainty of the estimation
- **state**: state of the estimation. 1: regular, 2: startup, 3: stillness
- **debug**: dict with debug values (only if debug==True)
"""
# Input Checking
joint = np.array(joint).copy()
if len(joint.shape) < 2:
assert joint.shape == (3,), 'Wrong dimension of joint matrix'
dof = 1
else:
N = joint.shape[0]
assert N <= 3, 'Wrong dimension of joint matrix'
assert joint.shape == (N, 3), 'Wrong dimension of joint matrix'
dof = N
# Parameter loading
# if no settings are given load a set of standard settings
defaults = dict(
useRomConstraints=False,
windowTime=8.0,
estimationRate=1.0,
dataRate=5.0,
tauDelta=5.0,
tauBias=5.0,
ratingMin=0.4,
alignment='backward',
enableStillness=True,
optimizerSteps=5,
stillnessTime=3.0,
stillnessThreshold=np.deg2rad(4),
stillnessRating=1,
startRating=1,
deltaRange=np.deg2rad(np.linspace(0, 359, 360)),
angleRanges=np.ones((3, 1)) * np.array([[-np.pi, np.pi]]),
convention='xyz',
constraint=None,
)
estSettings = setDefaults(estSettings, defaults)
if estSettings['constraint'] is None:
del estSettings['constraint'] # default values are later handled depending on DoF
useRomConstraints = estSettings['useRomConstraints']
windowTime = estSettings['windowTime']
estimationRate = estSettings['estimationRate']
dataRate = estSettings['dataRate']
tauDelta = estSettings['tauDelta']
tauBias = estSettings['tauBias']
ratingMin = estSettings['ratingMin']
alignment = estSettings['alignment']
enableStillness = estSettings['enableStillness']
optimizerSteps = estSettings['optimizerSteps']
stillnessTime = estSettings['stillnessTime']
stillnessThreshold = estSettings['stillnessThreshold']
stillnessRating = estSettings['stillnessRating']
startRating = estSettings['startRating']
deltaRange = estSettings['deltaRange']
angleRanges = np.asarray(estSettings['angleRanges'], float)
convention = estSettings['convention']
if dof == 3:
angleRanges = np.asarray(jointInfo['angle_ranges'], float)
convention = jointInfo['convention']
constraint = estSettings.get('constraint', 'default')
elif dof == 2:
constraint = estSettings.get('constraint', 'euler')
else:
constraint = estSettings.get('constraint', 'euler_1d')
if verbose:
print('parameters: ')
for key, val in estSettings.items():
print(f'{key}: {val}')
# Determine data rate from one of the time signals.Assumption: constant
timeDiff = np.diff(t)
rate = 1 / timeDiff[1]
assert np.allclose(timeDiff, 1 / rate) # make sure the sampling time is constant to avoid surprises
assert np.isclose(rate % dataRate, 0) or np.isclose(rate % dataRate, dataRate), f'rate should be divisible by ' \
f'given dataRate: {dataRate}'
windowSteps = round(windowTime * rate)
estimationSteps = round(1 / estimationRate * rate)
dataSteps = round(1 / dataRate * rate)
stillnessSteps = round(stillnessTime * rate)
# number of total time steps in the time series
N = len(t)
if alignment == 'backward':
starts = np.arange(windowSteps, N, estimationSteps, dtype=int)
elif alignment == 'center':
starts = np.arange(windowSteps / 2, N - windowSteps / 2, estimationSteps, dtype=int)
elif alignment == 'forward':
starts = np.arange(1, N - windowSteps, estimationSteps, dtype=int)
else:
raise ValueError('Wrong alignment type')
# to have a smoother start up,
# add estimations at each data step in the beginning until a complete time window has passed
regularStart = starts[0] # start of the regular estimation without smooth startup
starts = np.concatenate((np.arange(dataSteps, starts[0], dataSteps, dtype=int), starts))
estimations = len(starts) # number of performed estimations
# Initialize the result vectors
delta = np.zeros((estimations, 1)) # delta is the heading offset between the first and second segment
rating = np.zeros((estimations, 1)) # the rating indicates the quality of the estimation
stateOut = np.zeros((estimations, 1))
cost = np.zeros((estimations, 1))
# Initialize the filtering time constants
tauDelta = np.ones((estimations, 1)) * tauDelta
tauBias = np.ones((estimations, 1)) * tauBias
stillnessCorrector = stillnessCorrection()
# Estimation
for k in range(1, estimations):
# set default values to variables
stillnessTrigger = False
state = 'none'
# get the current index in the data
index = starts[k]
# check whether the smooth startup is active
# If so, the used data, for the estimation is from index 1 to the current index
if index < regularStart:
state = 'startup'
indexStart = 0
indexEnd = index
else:
# during regular estimation, the start and end index are determined based on
# the current index and the chosen aligment type
if alignment == 'center':
indexStart = index - int(windowSteps / 2)
indexEnd = index + int(windowSteps / 2)
elif alignment == 'forward':
indexStart = index
indexEnd = index + windowSteps
else: # alignment == 'backward'
indexStart = index - windowSteps
indexEnd = index
# Stillness detection
# Check if the two segments are at rest. If they are at rest, the
# current value of delta can be calculated from the last estimated
# relative orientation. This ensures that both segments do not move in
# a resting phase. However, this does not ensure correct estimation
# since it is only based on the last estimated orientation
#
# only do the stillness detection of the setting is set to true and do
# not do stillness detection during startup
if enableStillness and state != 'startup':
# check whether the number of passed samples is larger than the detection peroiod for the stillness
# detection
if index > stillnessSteps:
stillness = checkStillness(gyr1[index - stillnessSteps:index + 1, :],
gyr2[index - stillnessSteps:index + 1, :], stillnessThreshold)
if stillness:
if stateOut[k-1] == 1: # regular
stillnessTrigger = True
state = 'stillness'
else:
state = 'regular'
else:
state = 'regular'
else:
if state != 'startup':
state = 'regular'
# Stillness correction
if state == 'stillness':
delta[k, :] = stillnessCorrector.stillnessCorrection(quat1[index, :], quat2[index, :], delta[k - 1, :],
stillnessTrigger)
# Estimation
# the estimation will only be performed in startup and regular state,
# not in stillness state
if state == 'startup' or state == 'regular':
# % for convenience extract the necessary data windows from both
# % segments
q1 = quat1[indexStart:indexEnd + 1:dataSteps]
q2 = quat2[indexStart:indexEnd + 1:dataSteps]
g1 = gyr1[indexStart:indexEnd + 1:dataSteps]
g2 = gyr2[indexStart:indexEnd + 1:dataSteps]
time = t[indexStart:indexEnd + 1:dataSteps]
# execute a dedicated estimation algorithm for each type of joint.
# for more detailed explanation look in the descriptions of the
# corresponding method
if dof == 1:
delta[k, :], rating[k, :], cost[k, :] = estimateDelta1d(q1, q2, joint, delta[k - 1, :], constraint,
optimizerSteps)
elif dof == 2:
delta[k, :], rating[k, :], cost[k, :] = estimateDelta2d(q1, q2, g1, g2, time, joint, delta[k - 1, :],
constraint, optimizerSteps)
else:
delta[k, :], rating[k, :], cost[k, :] = estimateDelta3d(q1, q2, angleRanges, convention, deltaRange,
delta[k - 1, :])
# during startup estimation a second estimation is performed from a
# different starting value to ensure that the global minimum is found
if state == 'startup':
if dof == 1:
delta2, _, cost2 = estimateDelta1d(q1, q2, joint, delta[k - 1, :] + np.pi, constraint,
optimizerSteps)
elif dof == 2:
delta2, _, cost2 = estimateDelta2d(q1, q2, g1, g2, time, joint, delta[k - 1, :] + np.pi,
constraint, optimizerSteps)
else:
delta2, _, cost2 = estimateDelta3d(q1, q2, angleRanges, convention, deltaRange,
delta[k - 1, :] + np.pi)
if useRomConstraints:
costRom = romCost(angleRanges, convention, q1, q2, delta[k, :])
costRom2 = romCost(angleRanges, convention, q1, q2, delta2)
if costRom2 < costRom:
delta[k] = delta2
elif cost2 < cost[k, :]:
delta[k] = delta2
# adapt the rating in the two special states startup and stillness and set it to 1 in order
# to enable fast convergence
if state == 'startup':
rating[k, :] = startRating
tauDelta[k, :] = 0.4
tauBias[k, :] = 1
if state == 'stillness':
rating[k, :] = stillnessRating
if state == 'regular':
stateOut[k, :] = 1
elif state == 'startup':
stateOut[k, :] = 2
elif state == 'stillness':
stateOut[k, :] = 3
else:
raise ValueError('Wrong state')
# Filtering
# use a filter to smooth the trajectory of the estimated delta.
# The filter tries to eliminate phase lag introduced by low pass filtering by estimating the slope of
# the current trajectory. Furthermore, if rating < rating_min,
# the filter only extrapolates the estimated slope and dismisses new estimates until rating >= rating_min
# shape(Nx1) for delta, rating tauDelta, tauBias inputs
deltaFilt, bias = headingFilter(delta, rating, stateOut, estimationRate, tauDelta, tauBias, ratingMin, windowTime,
alignment)
if debug or plot:
# uninterpolated debug data
uninterpolated = {'delta': delta, 'deltaFilt': deltaFilt, 'rating': rating, 'stateOut': stateOut}
# Interpolation
# Interpolate the data to the given time signal
fDelta = interpolate.interp1d(t[starts], np.unwrap(delta.ravel()), kind='linear', fill_value='extrapolate')
fDeltaFilt = interpolate.interp1d(t[starts], deltaFilt.ravel(), kind='linear', fill_value='extrapolate')
fRating = interpolate.interp1d(t[starts], rating.ravel(), kind='linear', fill_value='extrapolate')
fStateOut = interpolate.interp1d(t[starts], stateOut.ravel(), kind='linear', fill_value='extrapolate')
delta = fDelta(t)
deltaFilt = fDeltaFilt(t)
rating = fRating(t)
stateOut = fStateOut(t)
# Heading correction
quat2Corr = qmt.qmult(qmt.quatFromAngleAxis(deltaFilt, [0, 0, 1]), quat2)
if debug or plot:
debugData = qmt.Struct(
uninterpolated=uninterpolated,
delta=delta,
delta_filt=deltaFilt,
rating=rating,
state_out=stateOut,
fdelta=fDelta,
fdelta_filt=fDeltaFilt,
frating=fRating,
fstate_out=fStateOut,
starts=starts,
bias=bias,
params=dict(
windowTime=windowTime,
estimationRate=estimationRate,
dataRate=dataRate,
tauDelta=tauDelta,
tauBias=tauBias,
ratingMin=ratingMin,
alignment=alignment,
enableStillness=enableStillness,
optimizerSteps=optimizerSteps,
stillnessTime=stillnessTime,
stillnessThreshold=stillnessThreshold,
stillnessRating=stillnessRating,
startRating=startRating,
deltaRange=deltaRange,
constraint=constraint,
angleRanges=angleRanges,
convention=convention,
joint=joint,
)
)
if plot:
headingCorrection_debugPlot(debugData, plot)
if debug:
return quat2Corr, delta, deltaFilt, rating, stateOut, debugData
return quat2Corr, delta, deltaFilt, rating, stateOut
def headingCorrection_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
(ax1, ax2), (ax3, ax4) = fig.subplots(2, 2, sharex=True)
fig.suptitle(AutoFigure.title('headingCorrection'))
ax1.plot(np.rad2deg(debug['delta'])[:, np.newaxis])
ax1.set_ylabel('[°]')
ax1.set_xlabel('index')
ax1.set_title(f'delta, {debug["delta"].shape} {debug["delta"].dtype}')
ax2.plot(np.rad2deg(debug['delta_filt'])[:, np.newaxis])
ax2.set_ylabel('[°]')
ax2.set_xlabel('index')
ax2.set_title(f'delta_filtered, {debug["delta_filt"].shape} {debug["delta_filt"].dtype}')
ax3.plot(debug['rating'][:, np.newaxis])
ax3.set_title(f'rating, {debug["rating"].shape} {debug["rating"].dtype}')
ax4.plot(debug['state_out'][:, np.newaxis])
ax4.set_ylabel('[1: regular, 2: startup, 3: stillness]')
ax4.set_title(f'state_out, {debug["state_out"].shape} {debug["state_out"].dtype}')
for ax in (ax1, ax2, ax3, ax4):
ax.grid()
fig.tight_layout()
# % check whether the mean of both gyroscope signals is smaller than the
# % defined threshold over the complete period
def checkStillness(gyr1, gyr2, still_threshold):
# based on Matlab implementation in matlab/lib/HeadingCorrection/heading_corection.m
if np.mean(np.linalg.norm(gyr1, axis=1)) < still_threshold and \
np.mean(np.linalg.norm(gyr2, axis=1)) < still_threshold:
stillness = True
else:
stillness = False
return stillness
class stillnessCorrection:
# based on Matlab implementation in matlab/lib/HeadingCorrection/heading_corection.m
def __init__(self):
# % initialize memory variable
self.quatRelRef = None
self.deltaStill = None
def stillnessCorrection(self, quat1, quat2, lastDelta, stillnessTrigger):
# % set the reference relative orientation which should be held during
# % stillness phase. It is reset after each rising edge of the stillness
# % detection.
if self.quatRelRef is None:
self.quatRelRef = qmt.qmult(qmt.qmult(qmt.qinv(quat1), qmt.quatFromAngleAxis(
lastDelta, [0, 0, 1])), quat2)
self.deltaStill = lastDelta
else:
if stillnessTrigger:
self.quatRelRef = qmt.qmult(
qmt.qmult(qmt.qinv(quat1), qmt.quatFromAngleAxis(lastDelta, [0, 0, 1])),
quat2)
self.deltaStill = lastDelta
qE2E1 = qmt.qmult(qmt.qmult(quat1, self.quatRelRef), qmt.qinv(quat2))
qRel = qmt.qrel(qmt.quatFromAngleAxis(self.deltaStill, [0, 0, 1]), qE2E1)
# deltaInc = 2 * np.arctan2(np.dot(qRel[1:], np.array([0, 0, 1])), qRel[0])
deltaInc = 2 * np.arctan2(np.dot(qRel[0, 1:], np.array([0, 0, 1])), qRel[0, 0])
delta = self.deltaStill + deltaInc
return delta
def headingFilter(delta, rating, state, estimationRate, tauBias, tauDelta, minRating, windowTime, alignment):
# based on Matlab implementation in matlab/lib/HeadingCorrection/heading_corection.m
N = delta.shape[0]
assert delta.shape == (N, 1)
assert rating.shape == (N, 1)
Ts = 1 / estimationRate
out = np.zeros((N, 1))
bias = np.zeros((N, 1))
windowSize = windowTime * estimationRate
kBias = 1 - np.exp(-Ts * np.log(2) / tauBias)
kDelta = 1 - np.exp(-Ts * np.log(2) / tauDelta)
if isinstance(tauDelta, (int, float)):
kDelta = np.ones((N, 1)) * kDelta
if isinstance(tauBias, (int, float)):
kBias = np.ones((N, 1)) * kBias
rating = rating.copy()
rating[rating < minRating] = 0
out[0, :] = delta[0, :]
biasClip = np.deg2rad(2) * Ts # limit bias estimate to 2°/s
j = 0
for i in range(1, N):
if state[i] == 2: # startup
kBiasEff = 0.0
kDeltaEff = max(rating[i, :] * kDelta[i, :], 1 / (i + 1))
else:
kBiasEff = rating[i, :] * kBias[i, :]
kDeltaEff = max(rating[i, :] * kDelta[i, :], 1 / (j + 1))
j += 1
deltaDiff = np.clip(qmt.wrapToPi(delta[i, :] - delta[i - 1, :]), -biasClip, biasClip)
bias[i, :] = np.clip(bias[i - 1, :] + kBiasEff * (deltaDiff - bias[i - 1, :]), -biasClip, biasClip)
out[i, :] = out[i - 1, :] + bias[i, :] + kDeltaEff * (qmt.wrapToPi(delta[i, :] - out[i - 1, :]) - bias[i, :])
if alignment == 'backward':
delta_out = out + windowSize / 2 * bias
elif alignment == 'forward':
delta_out = out - windowSize / 2 * bias
else: # center
delta_out = out
return delta_out, bias
def estimateDelta1d(quat1, quat2, joint, deltaStart, constraint, optimizationSteps):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m
j = joint
delta = deltaStart
cost = 0.0
for _ in range(optimizationSteps):
deltaInc, cost = gaussNewtonStep(quat1, quat2, j, j, delta, constraint)
delta = delta + deltaInc
delta = qmt.wrapTo2Pi(delta)
# Rating calculation
v1 = qmt.rotate(quat1, j)
v2 = qmt.rotate(quat2, j)
weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt(v2[:, 0] ** 2 + v2[:, 1] ** 2)
rating = qmt.rms(weight)
return delta, rating, cost
def gaussNewtonStep(quat1, quat2, jB1, jB2, delta, constraint):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m
errors, jacobi = errorAndJac1D(quat1, quat2, jB1, jB2, delta, constraint)
deltaParams = -1 * lstsq(np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors))[0]
deltaParams = np.squeeze(deltaParams)
totalError = np.linalg.norm(errors)
return deltaParams, totalError
def errorAndJac1D(q1, q2, jB1, jB2, delta, constraint):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m
if constraint == 'proj':
errors, jacobi = getErrorAndJacProj(delta, q1, q2, jB1)
elif constraint == '1d_corr':
errors, jacobi = getErrorAndJac1dCorr(delta, q1, q2, jB1, jB2)
elif constraint == 'euler_1d':
errors, jacobi = getErrorAndJacEuler1D(delta, q1, q2, jB1, jB2)
elif constraint == 'euler_2d':
errors, jacobi = getErrorAndJacEuler2D(delta, q1, q2, jB1, jB2)
else:
raise ValueError('Wrong constrain')
return errors, jacobi
def getErrorAndJacEuler1D(delta, q1, q2, j, jAlt):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m
errors = CostEuler1D(delta, q1, q2, j, jAlt)
eps = 1e-8
errors_eps = CostEuler1D(delta + eps, q1, q2, j, jAlt)
jacobi = (errors_eps - errors) / eps
return errors, jacobi
def CostEuler1D(delta, q1, q2, jB1, jB2):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m
qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1])
qRot1 = qmt.quatFromAngleAxis(np.arccos(np.array([0, 0, 1]) @ jB1), np.cross(np.array([0, 0, 1]), jB1))
qRot2 = qmt.quatFromAngleAxis(np.arccos(np.array([0, 0, 1]) @ jB2), np.cross(np.array([0, 0, 1]), jB2))
qB1E1 = qmt.qmult(q1, qRot1)
qB2E2 = qmt.qmult(q2, qRot2)
qB2B1 = qmt.qmult(qmt.qmult(qmt.qinv(qB1E1), qE2E1), qB2E2)
angles = qmt.eulerAngles(qB2B1, 'zxy', True)
secondAngle = angles[:, 1]
thirdAngle = angles[:, 2]
error = getWeight(q1, q2, jB1, jB2) * np.sqrt(secondAngle ** 2 + thirdAngle ** 2)
return error
def getWeight(q1, q2, jB1, jB2):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m
v1 = qmt.rotate(q1, jB1)
v2 = qmt.rotate(q2, jB2)
weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt(v2[:, 0] ** 2 + v2[:, 1] ** 2)
return weight
def getErrorAndJacProj(delta, q1, q2, j):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m
errors = CostProj(delta, q1, q2, j)
eps = 1e-8
errorsEps = CostProj(delta + eps, q1, q2, j)
jacobi = (errorsEps - errors) / eps
return errors, jacobi
def CostProj(delta, q1, q2, j):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m
qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1])
qhat = qmt.qmult(qmt.qinv(q1), qmt.qmult(qE2E1, q2))
alpha = 2 * np.arctan2(qhat[:, 1:] @ j, qhat[:, 0])
qProj = qmt.quatFromAngleAxis(alpha, j)
qRes = qmt.qmult(qmt.qinv(qProj), qhat)
v1 = qmt.rotate(q1, j)
v2 = qmt.rotate(q2, j)
# weight = sqrt(v1(:, 1).^ 2 + v1(:, 2).^ 2).*sqrt(v2(:, 1).^ 2 + v2(:, 2).^ 2);
weight = np.sqrt(v1[:, 0] ** 2 + v1[:, 1] ** 2) * np.sqrt(v2[:, 0] ** 2 + v2[:, 1] ** 2)
error = weight * 2. * np.arccos(qRes[:, 0])
return error
def getErrorAndJac1dCorr(delta, q1, q2, jB1, jB2):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m
v1 = qmt.rotate(q1, jB1)
v2 = qmt.rotate(q2, jB2)
weight = getWeight(q1, q2, jB1, jB2)
error = qmt.wrapToPi(np.arctan2(v2[:, 1], v2[:, 0]) - np.arctan2(v1[:, 1], v1[:, 0]) + delta) * weight
jac = weight
return error, jac
def getErrorAndJacEuler2D(delta, q1, q2, j1, j2):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_1D.m
# % Calculate the error
qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1])
qB2S2 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2))
qB1S1 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1))
qE1B1 = qmt.qinv(qmt.qmult(q1, qB1S1))
qE2B1 = qmt.qmult(qE1B1, qE2E1)
qS2B1 = qmt.qmult(qE2B1, q2)
qB2B1 = qmt.qmult(qS2B1, qB2S2)
arcsinArg = 2 * (qB2B1[:, 1] * qB2B1[:, 0] + qB2B1[:, 2] * qB2B1[:, 3])
secondAngle = np.arcsin(np.clip(arcsinArg, -1, 1))
error = secondAngle
# Jacobian
qB2E2 = qmt.qmult(q2, qB2S2)
dQ3Ba = np.zeros((1, 4))
dQ3Ba[:, 0] = -0.5 * np.sin(delta / 2)
dQ3Ba[:, 3] = 0.5 * np.cos(delta / 2)
dQ3Ba = qmt.normalized(dQ3Ba)
dQB = qmt.qmult(qmt.qmult(qE1B1, dQ3Ba), qB2E2)
dQ = dQB
jac = 2 * (dQ[:, 1] * qB2B1[:, 0] + qB2B1[:, 1] * dQ[:, 0] +
dQ[:, 2] * qB2B1[:, 3] + qB2B1[:, 2] * dQ[:, 3])
jac = jac / np.sqrt(1 - arcsinArg ** 2)
return error, jac
# 2D functions
def errorAndJac2D(q1, q2, gyr1, gyr2, t, j1, j2, deltaStart, constraint):
# based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_2d.m
if constraint == 'euler':
errors, jacobi = getJacEulerCons(deltaStart[0], q1, q2, j1, j2)
elif constraint == 'gyro':
errors, jacobi = getJacGyroCons(j1, j2, q1, q2, gyr1, gyr2, deltaStart)
elif constraint == 'combined':
errors, jacobi = getJacCombCons(j1, j2, q1, q2, gyr1, gyr2, deltaStart)
else:
raise ValueError(f'Wrong constraint: {constraint}')
return errors, jacobi
def getJacEulerCons(delta, q1, q2, j1, j2):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m
# % Calculate the error
qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1])
qB2S2 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2))
qB1S1 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1))
qE1B1 = qmt.qinv(qmt.qmult(q1, qB1S1))
qE2B1 = qmt.qmult(qE1B1, qE2E1)
qS2B1 = qmt.qmult(qE2B1, q2)
qB2B1 = qmt.qmult(qS2B1, qB2S2)
arcsinArg = 2 * (qB2B1[:, 1] * qB2B1[:, 0] + qB2B1[:, 2] * qB2B1[:, 3])
secondAngle = np.arcsin(np.clip(arcsinArg, -1, 1))
error = get2DStaticWeight([j1, j2], q1, q2) * secondAngle
# Jacobian
qB2E2 = qmt.qmult(q2, qB2S2)
dQ3Ba = np.zeros((1, 4))
dQ3Ba[:, 0] = -0.5 * np.sin(delta / 2)
dQ3Ba[:, 3] = 0.5 * np.cos(delta / 2)
dQ3Ba = qmt.normalized(dQ3Ba)
dQB = qmt.qmult(qmt.qmult(qE1B1, dQ3Ba), qB2E2)
dQ = dQB
#
jac = 2 * (dQ[:, 1] * qB2B1[:, 0] + qB2B1[:, 1] * dQ[:, 0] +
dQ[:, 2] * qB2B1[:, 3] + qB2B1[:, 2] * dQ[:, 3])
jac = jac / np.sqrt(1 - arcsinArg ** 2)
return error, jac
def get2DStaticWeight(jointAxes, quat1, quat2):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m
axis1 = jointAxes[0]
axis2 = jointAxes[1]
j1 = qmt.rotate(quat1, axis1)
j2 = qmt.rotate(quat2, axis2)
weight = np.sqrt(j1[:, 0] ** 2 + j1[:, 1] ** 2) * np.sqrt(j2[:, 0] ** 2 + j2[:, 1] ** 2)
return weight
def estimateDelta2d(quat1, quat2, gyr1, gyr2, time, joint, deltaStart, constraint, optimizationSteps):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m
j1 = joint[0, :]
j2 = joint[1, :]
delta = deltaStart
cost = 0
for _ in range(optimizationSteps):
deltaInc, cost = gaussNewtonStep2d(quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint)
delta = delta + deltaInc
if constraint == 'euler_lin':
delta = delta[0] * time + delta[1]
delta = qmt.wrapTo2Pi(delta)
# %% Rating calculation
j1Global = qmt.rotate(quat1, j1)
j2Global = qmt.rotate(quat2, j2)
rating = qmt.rms(np.sqrt(j1Global[:, 0] ** 2 + j1Global[:, 1] ** 2)
* np.sqrt(j2Global[:, 0] ** 2 + j2Global[:, 1] ** 2))
return delta, rating, cost
def gaussNewtonStep2d(quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m
errors, jacobi = errorAndJac2D(quat1, quat2, gyr1, gyr2, time, j1, j2, delta, constraint)
deltaParams = -1 * np.linalg.lstsq(np.atleast_2d(jacobi @ jacobi), np.atleast_2d(jacobi @ errors), rcond=None)[0]
deltaParams = np.squeeze(deltaParams)
totalError = np.linalg.norm(errors)
return deltaParams, totalError
def getJacGyroCons(j1, j2, q1, q2, gyr1, gyr2, delta):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m
steps = q1.shape[0]
eZ = np.zeros((steps, 3)) + np.array([0, 0, 1])
quatE2E1 = np.array([np.cos(delta / 2), 0, 0, np.sin(delta / 2)])
j1E1 = qmt.rotate(q1, j1)
j2E2 = qmt.rotate(q2, j2)
j2E1 = qmt.rotate(quatE2E1, j2E2)
w1E1 = qmt.rotate(q1, gyr1)
w2E2 = qmt.rotate(q2, gyr2)
w2E1 = qmt.rotate(quatE2E1, w2E2)
jN = np.cross(j1E1, j2E1)
wD = w1E1 - w2E1
errors = np.sum(wD * jN, axis=1)
# % Calculate one Row of the Jacobian
dj2B = -j2E2 * np.sin(delta) + np.cross(eZ, j2E2, axis=1) * np.cos(delta) \
+ eZ * (np.sum(eZ * j2E2, axis=1) * np.sin(delta))[:, np.newaxis]
dwdB = -1 * (-w2E2 * np.sin(delta) + np.cross(eZ, w2E2, axis=1) * np.cos(delta)
+ eZ * (np.sum(eZ, w2E2, axis=1) * np.sin(delta))[:, np.newaxis])
dB = np.sum(dwdB * np.cross(j1E1, j2E1, axis=1), axis=1) + np.sum(wD * np.cross(j1E1, dj2B, axis=1), axis=1)
jac = dB
return errors, jac
def getJacCombCons(j1, j2, q1, q2, gyr1, gyr2, delta):
# based on Matlab implementation in matlab/lib/HeadingCorrection/errorAndJac_2D.m
steps = q1.shape[0]
eE = np.zeros((steps, 3)) + np.array([0, 0, 1])
quatE2E1 = np.array([np.cos(delta / 2), 0, 0, np.sin(delta / 2)], dtype=float)
j1E1 = qmt.rotate(q1, j1)
j2E2 = qmt.rotate(q2, j2)
j2E1 = qmt.rotate(quatE2E1, j2E2)
w1E1 = qmt.rotate(q1, gyr1)
w2E2 = qmt.rotate(q2, gyr2)
w2E1 = qmt.rotate(quatE2E1, w2E2)
jN = np.cross(j1E1, j2E1, axis=1)
wD = w1E1 - w2E1
errorsGyro = np.sum(wD * jN, axis=1)
# % Calculate one Row of the Jacobian
dj2B = -j2E2 * np.sin(delta) + np.cross(eE, j2E2, axis=1) * np.cos(delta) + \
eE * (np.sum(eE * j2E2, axis=1) * np.sin(delta))[:, np.newaxis]
dwdB = -(-w2E2 * np.sin(delta) + np.cross(eE, w2E2, axis=1) * np.cos(delta) +
eE * (np.sum(eE * w2E2, axis=1) * np.sin(delta))[:, np.newaxis])
dB = np.sum(dwdB * np.cross(j1E1, j2E1, axis=1), axis=1) + np.sum(wD * np.cross(j1E1, dj2B, axis=1), axis=1)
jacGyro = dB
# % Calculate the error
qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1])
qB2S2 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 1, 0], j2)), np.cross([0, 1, 0], j2))
qB1S1 = qmt.quatFromAngleAxis(np.arccos(np.dot([0, 0, 1], j1)), np.cross([0, 0, 1], j1))
qE1B1 = qmt.qinv(qmt.qmult(q1, qB1S1))
qE2B1 = qmt.qmult(qE1B1, qE2E1)
qS2B1 = qmt.qmult(qE2B1, q2)
qB2B1 = qmt.qmult(qS2B1, qB2S2)
arcsinArg = 2 * (qB2B1[:, 1] * qB2B1[:, 0] + qB2B1[:, 2] * qB2B1[:, 3])
secondAngle = np.arcsin(np.clip(arcsinArg, -1, 1))
errorsEuler = get2DStaticWeight([j1, j2], q1, q2) * secondAngle
# % Jacobian
qB2E2 = qmt.qmult(q2, qB2S2)
dQ3Ba = np.zeros((1, 4))
dQ3Ba[:, 0] = -0.5 * np.sin(delta / 2)
dQ3Ba[:, 3] = 0.5 * np.cos(delta / 2)
dQ3Ba = qmt.normalized(dQ3Ba)
dQB = qmt.qmult(qmt.qmult(qE1B1, dQ3Ba), qB2E2)
dQ = dQB
jac = 2 * (dQ[:, 1] * qB2B1[:, 0] + qB2B1[:, 1] * dQ[:, 0] +
dQ[:, 2] * qB2B1[:, 3] + qB2B1[:, 2] * dQ[:, 3])
jacEuler = jac / np.sqrt(1 - arcsinArg ** 2)
errors = errorsGyro + errorsEuler
jac = jacEuler + jacGyro
return errors, jac
# 3-D joint
def estimateDelta3d(quat1, quat2, angleRanges, convention, deltaRange, deltaStart):
# based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m
# generate the distribution of values for delta that produce a valid relative orientation
deltaProbability = getPossibleAngles(quat1, quat2, angleRanges[0, :], angleRanges[1, :], angleRanges[2, :],
convention, deltaRange, 'max')
# get the maximum
maxVal = max(deltaProbability)
distributionLastDelta = quat1.shape[0] / np.pi * abs(qmt.wrapToPi(deltaRange - deltaStart))
deltaProbabilityMin = getPossibleAngles(quat1, quat2, angleRanges[0, :], angleRanges[1, :], angleRanges[2, :],
convention, deltaRange, 'min')
distribNewMin = deltaProbabilityMin + distributionLastDelta
# % get the minimum
minVal = min(distribNewMin)
delta = np.mean(np.unwrap(deltaRange[distribNewMin == minVal]))
cost = minVal
delta = qmt.wrapTo2Pi(delta)
# %% Rating calculation
# % calculate the standard deviation of the distribution in order to quantify the quality of the estimation
_, stdConstraint = stdProbAngles(deltaRange[deltaProbability > maxVal / 2],
deltaProbability[deltaProbability > maxVal / 2])
# % scale the standard deviation to a range of [0.1]
rating = map_(stdConstraint, 0, np.deg2rad(20), 1, 0)
# % clip the window rating to values of [0,1]
rating = min(max(rating, 0), 1)
return delta, rating, cost
def getPossibleAngles(quat1, quat2, angle1Range, angle2Range, angle3Range, convention, deltaRange, mode):
# based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m
qHand = quat1[~np.isnan(quat1[:, 0]), :]
qMeta = quat2[~np.isnan(quat2[:, 0]), :]
assert qHand.shape == qMeta.shape
# qMeta: Mx4
# qHand: Mx4
qE1E2 = qmt.quatFromAngleAxis(deltaRange, [0, 0, 1])
qE1E2Broadcast, qMetaBroadcast, qHandBroadcast = np.broadcast_arrays(qE1E2[None], qMeta[:, None], qHand[:, None])
N = qE1E2.shape[0]
M = qMeta.shape[0]
qMetaCorr = qmt.qmult(qE1E2Broadcast.reshape(M * N, 4), qMetaBroadcast.reshape(M * N, 4)).reshape(M, N, 4)
# Mx360x4
qRel = qmt.qrel(qHandBroadcast.reshape(M * N, 4), qMetaCorr.reshape(M * N, 4)).reshape(M, N, 4)
angles = qmt.eulerAngles(qRel.reshape(M * N, 4), convention, True).reshape(M * N, 3)
angleDiff = getAngleDiff(angles.reshape(M * N, 3), angle1Range, angle2Range, angle3Range)
angleDiff[angleDiff > 0] = 1
angleDiff = angleDiff.astype(bool).reshape(M, N)
if mode == 'min':
possibleAngles = angleDiff
else:
possibleAngles = ~angleDiff
out = np.sum(possibleAngles, axis=0)
return out
def getAngleDiff(angles, angle1Range, angle2Range, angle3Range):
# based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m
d1 = angle1Range[np.newaxis, :] - angles[:, 0][:, np.newaxis]
out = abs(np.minimum(-np.sign(d1[:, 0] * d1[:, 1]) * np.min(np.abs(d1), axis=1), 0))
d2 = angle2Range[np.newaxis, :] - angles[:, 1][:, np.newaxis]
out = out + abs(np.minimum(-np.sign(d2[:, 0] * d2[:, 1]) * np.min(np.abs(d2), axis=1), 0))
d3 = angle3Range[np.newaxis, :] - angles[:, 2][:, np.newaxis]
out = out + abs(np.minimum(-np.sign(d3[:, 0] * d3[:, 1]) * np.min(np.abs(d3), axis=1), 0))
return out
def stdProbAngles(val=None, prob=None):
# based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m
val = np.array(val)
if val is None or len(val) < 2:
mean = 0
stdDev = 0
return mean, stdDev
area = sum(prob) * (val[1] - val[0])
mean = angularMean(val, prob)
temp = sum(qmt.wrapToPi(val - mean) ** 2 * prob / area)
temp = temp * (val[1] - val[0])
stdDev = np.sqrt(temp)
return mean, stdDev
def angularMean(angles, probability):
# based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m
N = len(angles)
mean = np.arctan2(1 / N * sum(probability * np.sin(angles)), 1 / N * sum(probability * np.cos(angles)))
mean = qmt.wrapTo2Pi(mean)
return mean
def map_(val, inMin, inMax, outMin, outMax):
# based on Matlab implementation in matlab/lib/HeadingCorrection/estimate_delta_3d.m
out = (val - inMin) * (outMax - outMin) / (inMax - inMin) + outMin
return out
def movMean1D(x, windowSize):
windowSize = np.ones(windowSize) / windowSize
x = np.array(x, dtype=float)
xFilt = signal.correlate(x, windowSize, mode='same')
return xFilt
def romCost(ROM, convention, q1, q2, delta):
qParent = q1[~np.isnan(q1[:, 0]), :]
qChild = q2[~np.isnan(q2[:, 0]), :]
qE2E1 = qmt.quatFromAngleAxis(delta, [0, 0, 1])
qChildCorr = qmt.qmult(qE2E1, qChild)
qRel = qmt.qrel(qParent, qChildCorr)
angles = qmt.eulerAngles(qRel, convention, True)
N = angles.shape[0]
angleRang1 = ROM[0, :]
angleRang2 = ROM[1, :]
angleRang3 = ROM[2, :]
rating = np.zeros((N, 3))
d1 = angleRang1[np.newaxis, :] - angles[:, [0]]
rating[:, 0] = np.sign(d1[:, 0] * d1[:, 1])
d2 = angleRang2[np.newaxis, :] - angles[:, [1]]
rating[:, 1] = np.sign(d2[:, 0] * d2[:, 1])
d3 = angleRang3[np.newaxis, :] - angles[:, [2]]
rating[:, 2] = np.sign(d3[:, 0] * d3[:, 1])
rating = np.sum(rating)
return rating
[docs]
def removeHeadingDriftForStraightWalk(t, quat, winlength, stepthreshold, movmeanwidth, plot=False, debug=False):
"""
A function that takes a quaternion with slow heading drift and removes that heading drift by assumimg that
the heading remains constant except for a few large steps (turns) by multiples of pi.
The function removes the heading steps, then low-pass filters heading and uses it to correct the given quaternion.
:param t: time vector with a fixed sampling rate
:param quat: Nx4 input quaternion array
:param winlength: winlength is the time in which the turns happen (choose e.g. 5 sec).
:param stepthreshold: stepthreshold defines which heading steps are ignored (not removed).
:param movmeanwidth: movmeanwidth is the width of the moving average low-pass filter.
:param plot: enables debug plot
:param debug: enables debug output
:return:
- output: Nx3 or 1x3 vector output array
- debug: dict with debug values (only if debug==True)
"""
timediff = np.diff(t)
rate = 1 / timediff[1]
assert np.allclose(timediff, 1 / rate)
qrel = qmt.qmult(quat, qmt.qinv(quat[199, :]))
h = qmt.eulerAngles(qrel, 'zyx', intrinsic=True)
h = np.unwrap(h, axis=0)
h = h[:, 0]
hsteps1 = np.zeros(h.shape)
hsteps2 = np.zeros(h.shape)
hsteps3 = np.zeros(h.shape)
for i in range(winlength, (h.shape[0] - winlength)):
hsteps1[i] = np.mean(h[i:i + winlength]) - np.mean(h[i - winlength:i])
hsteps1 = hsteps1 - np.mean(hsteps1[np.abs(hsteps1) < np.pi / 4])
for i in range(winlength, (h.shape[0] - winlength)):
if np.abs(hsteps1[i]) < stepthreshold:
hsteps2[i] = 0
else:
hsteps2[i] = np.round(hsteps1[i] / np.pi) * np.pi
if hsteps2[i] == 0 and hsteps2[i - 1] != 0:
# (nonzero rows, cols)
startind = np.nonzero(hsteps2[:i] == 0)[0][-1]
ind = np.argmax(np.abs(hsteps1[startind:i]))
hsteps3[startind + ind] = hsteps2[startind + ind]
hsteps4 = np.cumsum(hsteps3)
hstepsfree = h - hsteps4
drift = movMean1D(hstepsfree, 1000)
hnice = np.zeros(h.shape)
driInd = np.abs(hstepsfree - drift) > np.pi / 6
hnice[driInd] = drift[driInd]
hnice[~driInd] = hstepsfree[~driInd]
drift = movMean1D(hnice, movmeanwidth)
corrquat = np.concatenate((np.cos(-drift / 2)[:, None],
np.sin(-drift / 2)[:, None] * np.array([0, 0, 1])), axis=1)
quatCorrected = qmt.qmult(corrquat, quat)
qrelCorrected = qmt.qmult(quatCorrected, qmt.qinv(quatCorrected[200, :]))
hCorrected = np.unwrap(qmt.eulerAngles(qrelCorrected, 'zyx', 1), 30.0 * np.pi)
hOld = np.unwrap(qmt.eulerAngles(qrel, 'zyx', 1), 3.0 * np.pi)
if debug or plot:
debugData = dict(h=h,
hsteps1=hsteps1,
hsteps2=hsteps2,
hsteps3=hsteps3,
hsteps4=hsteps4,
hstepsfree=hstepsfree,
drift=drift,
hnice=hnice,
t=t,
h_corrected=hCorrected[:, 0],
h_old=hOld[:, 0])
if plot:
removeHeadingDriftForStraightWalk_debugPlot(debugData, plot)
if debug:
return quatCorrected, debugData
return quatCorrected
def removeHeadingDriftForStraightWalk_debugPlot(debug, fig=None):
with AutoFigure(fig) as fig:
(ax1, ax2), (ax3, ax4) = fig.subplots(2, 2)
fig.suptitle('removeHeadDrift debug plot')
ax1.plot(debug['t'], np.rad2deg(debug['h']), '.-')
ax1.plot(debug['t'], np.rad2deg(debug['hsteps1']), '.-')
ax1.plot(debug['t'], np.rad2deg(debug['hsteps2']), '.-')
ax1.plot(debug['t'], np.rad2deg(debug['hsteps3']), '.-')
ax1.legend(['h0', 'h1', 'h2', 'h3'])
ax1.set_ylabel('[°]')
ax1.set_title('h0~h3')
ax2.plot(debug['t'], np.rad2deg(debug['h']), '.-')
ax2.plot(debug['t'], np.rad2deg(debug['hstepsfree']), '.-')
ax2.plot(debug['t'], np.rad2deg(debug['drift']), '.-')
ax2.legend(['h', 'hstepsfree', 'drift'])
ax2.set_title('h , hstepsfree, drift')
ax2.set_ylabel('[°]')
ax3.plot(debug['t'], np.rad2deg(debug['hstepsfree']), '.-')
ax3.plot(debug['t'], np.rad2deg(debug['hnice']), '.-')
ax3.plot(debug['t'], np.rad2deg(debug['drift']), '.-')
ax3.legend(['hstepsfree', 'hnice', 'drift'])
ax3.set_title('hstepsfree, hnice, drift')
ax3.set_ylabel('[°]')
ax4.plot(debug['t'], np.rad2deg(debug['h_old']), '.-')
ax4.plot(debug['t'], np.rad2deg(debug['h_corrected']), '.-')
ax4.legend(['h_old', 'h_corrected'])
ax4.set_title('eulerAngle z-axis')
ax4.set_ylabel('[°]')
for ax in (ax1, ax2, ax3, ax4):
ax.grid()