\Logs
and were recorded using AccelRecorder in Processingarduino_accel_lefthoodiepocket_3sets_15steps_nodelay_115200baud.csv
was recorded with the ESP32 and LIS3DH sensor in my left hoodie pocket. I took three sets of 15 steps. The Arduino program had no delays (sampling sensor as fast as possible) and communicated over serial at a baud rate of 115200.arduino_accel_leftwrist_15steps_3hops_3halfspin_3touchceiling_thenshake_nodelay_115200.csv
, which has 15 steps followed by 3 jumps (hops), 3 half spins, 3 ceiling touches, and then some massive shakes. Can you detect the steps here and ignore the other activities?You can record, visualize, and analyze your own step tracker accelerometer data using AccelRecorder. You must also be running LIS3DHSerialWriter.ino on your Arduino. See video below. Note that my screen recording software was impacting my real-time visualization frame rates (but the underlying recorded data is not impacted).
from IPython.lib.display import YouTubeVideo
# https://www.youtube.com/watch?v=k1HvoYhnwXE
YouTubeVideo('k1HvoYhnwXE', width=800, height=450)
# Import the primary data analysis libraries
import matplotlib.pyplot as plt
import numpy as np
import scipy as sp
from scipy import signal
import os
log_path = 'Logs'
files_in_log_path = [f for f in os.listdir(log_path) if os.path.isfile(os.path.join(log_path, f))]
print("The following files are available to load in the '{}' dir: ".format(log_path))
for filename in files_in_log_path:
print(filename)
# Load in the data from one of the files
#log_filename = 'arduino_accel_leftwrist_15steps_3hops_3halfspin_3touchceiling_thenshake_nodelay_115200.csv'
#log_filename = 'arduino_accel_lefthoodiepocket_3sets_15steps_nodelay_115200baud_subset.csv'
#log_filename = 'arduino_accel_leftwrist_3sets_15steps_leftwrist_nodelay_115200.csv'
log_filename = 'arduino_accel_righthoodiepocket_3sets_15steps_delay10_9600baud_subset.csv'
#log_filename = 'arduino_accel_leftwrist_15steps_3hops_3halfspin_3touchceiling_thenshake_nodelay_115200.csv'
log_filename_with_path = os.path.join(log_path, log_filename)
# Use loadtxt to load and parse csv data.
# https://numpy.org/doc/stable/reference/generated/numpy.loadtxt.html?highlight=loadtxt#numpy.loadtxt
np.loadtxt(log_filename_with_path, delimiter=',', unpack=True, skiprows=1)
raw_data = np.loadtxt(log_filename_with_path, delimiter=',', unpack=True, skiprows=0)
(processing_time, arduino_time, accel_x, accel_y, accel_z) = raw_data
# Calculate the magnitude of the x, y, z acceleration values
# to get the overall "strength" of acceleration at each time step
# Magnitude is a particularly helpful value when working with accelerometers
# because it is orientation agnostic
accel_mag = np.sqrt(accel_x**2 + accel_y**2 + accel_z**2)
accel_time = arduino_time
print("Read in {} samples from {}".format(len(accel_x), log_filename))
# Print some stats about the loaded data
import math
sample_time_in_ms = accel_time[-1] - accel_time[0]
sample_time_in_s = sample_time_in_ms / 1000.0
print("Num samples: ", accel_time.size)
print("First timestamp: {} ms".format(int(accel_time[0])))
print("Last timestamp: {} ms".format(int(accel_time[-1])))
# Dive into timestamps a bit to analyze sampling rate over time
map_sec_to_sample_count = dict()
for timestamp_ms in accel_time:
sec_floor = math.floor(timestamp_ms / 1000.0)
if sec_floor not in map_sec_to_sample_count:
map_sec_to_sample_count[sec_floor] = 0
map_sec_to_sample_count[sec_floor] = map_sec_to_sample_count[sec_floor] + 1
print("Length (raw diff): {} secs".format(sample_time_in_s))
print("Length (rounded ceiling): {} secs".format(len(map_sec_to_sample_count)))
counts = list(map_sec_to_sample_count.values())
np_counts = np.array(counts)
sampling_rate_imprecise = accel_time.size / sample_time_in_s
print("Coarse sampling rate: {:.2f} samples/sec".format(sampling_rate_imprecise))
sampling_rate = np.average(np_counts)
print("More precise sampling rate calc: {:.2f} samples/sec (SD={:.2f} samples/sec)".
format(sampling_rate, np.std(np_counts)))
print()
print("**Accel stats**")
print("Avg x: {:.2f} (SD={:.2f})".format(np.mean(accel_x), np.std(accel_x)))
print("Avg y: {:.2f} (SD={:.2f})".format(np.mean(accel_y), np.std(accel_y)))
print("Avg z: {:.2f} (SD={:.2f})".format(np.mean(accel_z), np.std(accel_z)))
print("Avg mag: {:.2f} (SD={:.2f})".format(np.mean(accel_mag), np.std(accel_mag)))
# What other descriptive stats may be useful to better understand and assess your signal?
# We can also use pandas to analyze our data. I don't use Pandas that much (I should use it more!)
# but it's incredibly popular in data science. It definitely simplifies things like computing
# descriptive stats.
import pandas as pd
# Make a dataframe (the key data object in pandas)
# We could also use Pandas to load and parse our CSV directly
# https://pandas.pydata.org/pandas-docs/stable/getting_started/10min.html#getting-data-in-out
df = pd.DataFrame(list(zip(processing_time, arduino_time, accel_x, accel_y, accel_z, accel_mag)),
columns =['Processing Time', "Time", "X", "Y", "Z", "Mag"])
df.describe()
# We won't be using Pandas anymore below but feel free to explore it more on your own!
# Now, let's plot the raw x, y, z, and magnitude values
fig, axes = plt.subplots(1, figsize=(15, 5)) #figsize is width, height
axes.plot(accel_x, alpha=0.7, label="X")
axes.plot(accel_y, alpha=0.7, label="Y")
axes.plot(accel_z, alpha=0.7, label="Z")
axes.plot(accel_mag, alpha=0.7, label="Mag")
# set the title and show the legend
axes.set_title("Accelerometer Signal")
axes.legend()
# What do you observe?
# Let's make another version but this time emphasize the magnitude stream
# Try collecting your own data and switching the orientation of the accelerometer
# between data collection sets. What happens to x,y,z? What happens to the magnitude?
fig, axes = plt.subplots(1, figsize=(15, 5)) #figsize is width, height
axes.plot(accel_x, alpha=0.3, label="X")
axes.plot(accel_y, alpha=0.3, label="Y")
axes.plot(accel_z, alpha=0.3, label="Z")
axes.plot(accel_mag, alpha=1, label="Mag")
# set the title and show the legend
axes.set_title("Accelerometer Signal")
axes.legend()
# Often it's useful to show a zoomed in view. We'll select a random 2-sec segment to highlight
import random
n_rows = 1
n_cols = 2
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15,4), gridspec_kw={'width_ratios': [2, 1]}, sharey='all')
axes[0].plot(accel_x, alpha=0.7, label="X")
axes[0].plot(accel_y, alpha=0.7, label="Y")
axes[0].plot(accel_z, alpha=0.7, label="Z")
axes[0].plot(accel_mag, alpha=0.7, label="Mag")
# show a zoomed in view of ~10 periods
zoom_window_secs = 2
zoom_samples = math.floor(sampling_rate * zoom_window_secs)
zoom_start_index = random.randint(0, len(accel_x) - zoom_samples)
zoom_end_index = zoom_start_index + zoom_samples
zoom_x = list(range(zoom_start_index, zoom_end_index))
axes[1].plot(zoom_x, accel_x[zoom_start_index : zoom_end_index], alpha=0.7, label="X")
axes[1].plot(zoom_x, accel_y[zoom_start_index : zoom_end_index], alpha=0.7, label="Y")
axes[1].plot(zoom_x, accel_z[zoom_start_index : zoom_end_index], alpha=0.7, label="Z")
axes[1].plot(zoom_x, accel_mag[zoom_start_index : zoom_end_index], alpha=0.7, label="Mag")
# highlight where this zoomed view is occurring on main plot
axes[0].axvline(zoom_start_index, linestyle="dashed", linewidth=1, color="black")
axes[0].axvline(zoom_end_index, linestyle="dashed", linewidth=1, color="black")
# Perform some simple frequency analysis. What is the "frequency" of a walk?
# This isn't necessary for the assignment but is interesting to investigate
# And we'll talk more about time vs. frequency domain analysis in the future
# fourier transformations and frequency-based features
num_samples = len(accel_mag)
# the mean substraction removes the y offset before performing FFT
fft = np.abs(np.fft.fft(accel_mag - np.mean(accel_mag)))
freqs = np.fft.fftfreq(len(fft))
freqs = freqs * sampling_rate # convert normalized freq bins to our freq bins
print("We have {} frequency bins of size {}Hz from {} to {}Hz".\
format(len(freqs), (freqs[1]-freqs[0]), freqs[0], np.abs(freqs[num_samples//2])))
freqs = freqs[range(num_samples//2)] # one side freq range, integer division
abs_fft = np.abs(fft)[range(num_samples//2)] # one side freq range, integer division
abs_fft_normalized = abs_fft / num_samples # normalize to values 0 - 1
# find the max frequency
max_freq_idx = np.argmax(abs_fft_normalized)
max_freq = freqs[max_freq_idx]
print("The max frequency is: {} Hz".format(max_freq))
# find the top N frequencies
# see: https://stackoverflow.com/questions/10337533/a-fast-way-to-find-the-largest-n-elements-in-an-numpy-array?lq=1
top_n_freq = 3
ind = np.argpartition(abs_fft_normalized, -top_n_freq)[-top_n_freq:] # from https://stackoverflow.com/a/23734295
ind_sorted_by_coef = ind[np.argsort(-abs_fft_normalized[ind])] # reverse sort indices
# I appear to walk at ~1.6 Hz :)
print("Top 3 frequencies (freq, coef):", list(zip(freqs[ind_sorted_by_coef], abs_fft_normalized[ind_sorted_by_coef]))) # print top freq
top1_freq = freqs[ind_sorted_by_coef][0]
top2_freq = freqs[ind_sorted_by_coef][0]
top3_freq = freqs[ind_sorted_by_coef][0]
# Visualize the frequency data. In this spectrogram plot, the y-axis is frequency
# and the x-axis is time. We'll talk more about this in the future
# You'll note that most of the frequency is located around 2Hz or less
# See:
# - https://matplotlib.org/api/_as_gen/matplotlib.pyplot.specgram.html
# - https://matplotlib.org/gallery/images_contours_and_fields/specgram_demo.html
print("Sampling rate: {:0.2f} Hz".format(sampling_rate))
print("Maximum detectable frequency in our signal is: {:.2f} Hz (due to Nyquist theorem)".format(sampling_rate/2.0))
NFFT = 64 # the length of the windowing segments
fig, (axes1, axes2) = plt.subplots(2, 1, figsize=(15, 5)) # create two rows, one col of plots
axes1.plot(accel_mag, 'b-', label="netMag Accel")
Pxx, freqs, bins, im = axes2.specgram(accel_mag, NFFT=NFFT, Fs=sampling_rate, noverlap=math.ceil(NFFT/2.0))
Here, we process the data (e.g., detrend it, smooth it, etc.)
# Apply some basic processing. Here, we'll subtract out the mean so that the
# average of the signal is zero (this is helpful to, for example, count zero crossings)
mag_after_mean_subtraction = accel_mag - np.average(accel_mag)
fig, axes = plt.subplots(1, figsize=(15, 5)) #figsize is width, height
axes.plot(accel_mag, alpha=0.7, label="Original Mag")
axes.plot(mag_after_mean_subtraction, alpha=0.7, label="Mag After Mean Subtraction")
# set the title and show the legend
axes.set_title("Accelerometer Magnitude Signal")
axes.legend()
# We can also use scipy's built in detrend algorithm, which is the result
# of a linear least-squares fit to the data
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.detrend.html
mag_detrended = sp.signal.detrend(accel_mag)
fig, axes = plt.subplots(1, figsize=(15, 5)) #figsize is width, height
# axes.plot(accel_mag, alpha=0.7, label="Original Mag")
axes.plot(mag_after_mean_subtraction, alpha=0.9, label="Mag After Mean Subtraction")
axes.plot(mag_detrended, alpha=1, linestyle="-.", label="Mag detrended")
# set the title and show the legend
axes.set_title("Accelerometer Signal")
axes.legend()
# Now, let's play around with various smoothing methods
# Here are different mean smoothing filters
# Note that the filter window size should really be a function of your sampling rate
def mean_filter(signal, N):
filter_signal = np.zeros((len(signal),))
for ctr in range(len(signal)):
filter_signal[ctr] = np.sum(signal[ctr:(ctr+N)])
return filter_signal/N
filter_window_size = 5
# Convolve using a mean filter
# https://numpy.org/doc/stable/reference/generated/numpy.convolve.html?highlight=convolve#numpy.convolve
# or just use our own mean_filter code
mag_mean_convolve5 = np.convolve(mag_detrended,
np.ones((filter_window_size,))/filter_window_size,
mode='valid')
mag_mean_filter5 = mean_filter(mag_detrended, filter_window_size)
fig, axes = plt.subplots(1, figsize=(15, 5)) #figsize is width, height
# axes.plot(accel_mag, alpha=0.7, label="Original Mag")
axes.plot(mag_detrended, alpha=0.3, label="Original")
axes.plot(mag_mean_convolve5, alpha=0.6, label="mag_mean_convolve5")
axes.plot(mag_mean_filter5, alpha=0.6, label="mag_mean_filter5")
# set the title and show the legend
axes.set_title("Accelerometer Signal (Processed)")
axes.legend()
# Effect of various window sizes on mean filter
filter_window_size = 5
# Convolve using a mean filter
# https://numpy.org/doc/stable/reference/generated/numpy.convolve.html?highlight=convolve#numpy.convolve
# or just use our own mean_filter code
mag_mean_filter_kernel5 = np.convolve(mag_detrended,
np.ones((filter_window_size,))/filter_window_size,
mode='valid')
filter_window_size = 10
mag_mean_filter_kernel10 = np.convolve(mag_detrended,
np.ones((filter_window_size,))/filter_window_size,
mode='valid')
filter_window_size = 100
mag_mean_filter_kernel100 = np.convolve(mag_detrended,
np.ones((filter_window_size,))/filter_window_size,
mode='valid')
fig, axes = plt.subplots(1, figsize=(15, 5)) #figsize is width, height
# axes.plot(accel_mag, alpha=0.7, label="Original Mag")
axes.plot(mag_detrended, alpha=0.3, label="Original")
axes.plot(mag_mean_filter_kernel5, alpha=0.8, label="mag_mean_filter_kernel5")
axes.plot(mag_mean_filter_kernel10, alpha=0.8, label="mag_mean_filter_kernel10")
axes.plot(mag_mean_filter_kernel100, alpha=0.8, label="mag_mean_filter_kernel100")
# set the title and show the legend
axes.set_title("Accelerometer Signal (Processed)")
axes.legend()
# median filters are easy to implement but fairly destructive
filter_window_size = 5 # should be odd
mag_med_filter_kernel5 = sp.signal.medfilt(mag_detrended, filter_window_size)
filter_window_size = 11
mag_med_filter_kernel11 = sp.signal.medfilt(mag_detrended, filter_window_size)
filter_window_size = 99
mag_med_filter_kernel99 = sp.signal.medfilt(mag_detrended, filter_window_size)
fig, axes = plt.subplots(1, figsize=(15, 5)) #figsize is width, height
# axes.plot(accel_mag, alpha=0.7, label="Original Mag")
axes.plot(mag_detrended, alpha=0.3, label="Original")
axes.plot(mag_med_filter_kernel5, alpha=0.8, label="mag_med_filter_kernel5")
axes.plot(mag_med_filter_kernel11, alpha=0.8, label="mag_med_filter_kernel11")
axes.plot(mag_med_filter_kernel99, alpha=0.8, label="mag_med_filter_kernel99")
# set the title and show the legend
axes.set_title("Accelerometer Signal (Filtered)")
axes.legend()
# The Mladenov paper uses a Butterworth filter but a mean filter works well for this problem as well
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.butter.html
# There are many other filters available in scipy
print("Sampling rate: {:0.2f} Hz".format(sampling_rate))
print("Maximum detectable frequency in our signal is: {:.2f} Hz (due to Nyquist theorem)".format(sampling_rate/2.0))
filter_order = 1
low_pass_cutoff_freq = 5;
nyquist_freq = 0.5 * sampling_rate
sos = signal.butter(filter_order, low_pass_cutoff_freq, 'lowpass', fs=sampling_rate, output='sos')
mag_butter_filter5 = signal.sosfilt(sos, mag_detrended)
low_pass_cutoff_freq = 3;
sos = signal.butter(filter_order, low_pass_cutoff_freq, 'lowpass', fs=sampling_rate, output='sos')
mag_butter_filter3 = signal.sosfilt(sos, mag_detrended)
low_pass_cutoff_freq = 2;
sos = signal.butter(filter_order, low_pass_cutoff_freq, 'lowpass', fs=sampling_rate, output='sos')
mag_butter_filter2 = signal.sosfilt(sos, mag_detrended)
fig, axes = plt.subplots(1, figsize=(15, 5)) #figsize is width, height
# axes.plot(accel_mag, alpha=0.7, label="Original Mag")
axes.plot(mag_detrended, alpha=0.3, label="Original")
axes.plot(mag_butter_filter5, alpha=0.8, label="mag_butter_filter5")
axes.plot(mag_butter_filter3, alpha=0.8, label="mag_butter_filter3")
axes.plot(mag_butter_filter2, alpha=0.8, label="mag_butter_filter2")
# set the title and show the legend
axes.set_title("Accelerometer Signal (Filtered)")
axes.legend()
Now, brainstorm and discuss step tracker approaches with your small group. Things to think about (and possibly try):
The screenshot below shows a heuristic-based step tracking algorithm correctly identifying all steps in arduino_accel_righthoodiepocket_3sets_15steps_delay10_9600baud_subset.csv
. Start here and then move to the more complicated cases such as arduino_accel_leftwrist_15steps_3hops_3halfspin_3touchceiling_thenshake_nodelay_115200.csv
# Write your step tracking code here!
Once you've finished with the exercise above, focus on adapting your algorithm to a "real-time" version. That is, to make this more realistic, we should really read in and buffer samples (perhaps with windows of ~1 sec long) and analyze the signal that way.
print("Sampling rate: {:0.2f} Hz".format(sampling_rate))
buffer_window_size = math.floor(sampling_rate)
peak_locations = []
min_distance_between_peaks_ms = 300
min_peak_height = 10000
last_peak_timestamp = -1
for i in range(0, len(accel_mag), buffer_window_size - 1):
# overlap our buffer by one sample between windows
buffer = accel_mag[i:i+buffer_window_size]
# TODO add in simple signal processing here
# How would you detrend the signal? Perhaps an ongoing average?
# How would you smooth the signal in an ongoing fashion?
# Here's a simple peak detection algorithm
for j in range(0, len(buffer) - 1):
forward_slope = buffer[j + 1] - buffer[j]
backward_slope = buffer[j] - buffer[j - 1]
if forward_slope < 0 and backward_slope > 0:
peak_index = i + j
peak_val = buffer[j]
#print("Found peak at index={} with val={}".format(peak_index, peak_val))
if(peak_val >= min_peak_height):
# print("Peak at index={} passes minimum height value ({})".format(peak_index, min_peak_height))
time_since_last_detection_ms = accel_time[peak_index] - last_peak_timestamp
if(last_peak_timestamp == -1 or time_since_last_detection_ms >= min_distance_between_peaks_ms):
last_peak_timestamp = accel_time[peak_index]
peak_locations.append(peak_index)
print("Found {} peaks".format(len(peak_locations)))
# Plots the peaks
fig, axes = plt.subplots(1, figsize=(15, 5)) #figsize is width, height
axes.plot(accel_mag, alpha=1, label="Raw mag")
axes.axhline(y=min_peak_height, linewidth=1, linestyle=":", alpha=0.6, color='gray')
axes.plot(peak_locations, accel_mag[peak_locations], 'y+', color="red", label="Peak Locations")
This Notebook was designed and written by Professor Jon E. Froehlich at the University of Washington along with feedback from students. It is made available freely online as an open educational resource at the teaching website: https://makeabilitylab.github.io/physcomp/.
The website, Notebook code, and Arduino code are all open source using the MIT license.
Please file a GitHub Issue or Pull Request for changes/comments or email me directly.