Thesis/canny_shock_finder.py

508 lines
30 KiB
Python
Raw Normal View History

2024-10-15 15:11:10 +10:00
# -*- coding: utf-8 -*-
2024-10-15 14:12:11 +10:00
"""
Canny_shock_finder.py
2024-10-15 15:11:10 +10:00
A little shock finding code which I wrote using a cut down one-dimensional Canny edge detector.
2024-10-15 14:12:11 +10:00
Seems to work quite well with the appropriate selection of input parameters.
The original paper is:
Canny, J. (1986). A computational approach to edge detection.
IEEE Transactions on pattern analysis and machine intelligence, (6), 679-698.
Chris James (c.james4@uq.edu.au) - 04/07/17
"""
2024-10-15 15:11:10 +10:00
VERSION_STRING = "15-Oct-2024"
2024-10-15 14:12:11 +10:00
def canny_shock_finder(time_list, pressure_list, sigma = 4, derivative_threshold = 0.001, auto_derivative = False, post_suppression_threshold = 0.05,
post_shock_pressure = None, start_time = None, find_second_peak = None, plot = True, plot_scale_factor = None,
amount_of_values_before_and_after = 100, time_unit = 's', y_label = 'Pressure (kPa)', plot_title = 'canny shock finding result',
return_processing_lists = False, calculate_automatic_derivative_threshold = False,
2024-10-15 15:11:10 +10:00
check_canny_runtime = False, plot_time_unit = 's', plot_time_scale = 1.0):
2024-10-15 14:12:11 +10:00
"""
This is a basic function with uses a cut down one-dimensional Canny edge detector to find shock arrival.
It seems to work quite well with the appropriate selection of input parameters
(sigma, derivative_threshold, post_suppression_threshold).
The original paper is:
Canny, J. (1986). A computational approach to edge detection.
IEEE Transactions on pattern analysis and machine intelligence, (6), 679-698.
Canny edge detection is a multi-stage algorithm generally used to find edges for image detection.
It is a widely used and discussed, and it should not be hard for a user to find adequate
information about it. (The original paper by Canny is also quite a well presented and easy to read
resource as well.)
In two dimensions (i.e. for an image) Canny edge detection is a bit more complicated, but
a simplified one dimensioanl version has been implemented here. The first step is to filter
the image by convolving it with the derivative of a Gaussian (this is done by using the
gaussian_filter1d functions from scipy.ndimage.filters with an order of 1), and then find
gradients in the image in the x and y directions with generally the Sobel method. As our
implementation here is one-dimensional, this second step has been performed by convolving the
original signal with the second derivative of a Gaussian to get the derivative of the original
derivative of a Gaussian convolution (this is done using the gaussian_filter1d functions from
scipy.ndimage.filters with an order of 2). The standard deviation (sigma) of these Gaussians is
defined by the user as an input.
At this point, there are two arrays, which are the original data convolved with both the
first and second derivatives of a Gaussian. The first array has maximums where step changes occur,
and the other crosses zero where step changes occur. Here the derivative_threshold input is used by
the code to inform it what to call zero in the second order Gaussian, as due to floating point uncertainty the
values don't come out as zero, but very close to it. This value is somewhat dependent on the magnitude of the
original signal, and it would be expected to scale with the magnitude of the original signal.
The next step in the Canny edge detection process is "Non-maximum suppression" where any value
which does not have a different gradient on either side of it is set to zero, "thinning" the edges
to generally a single point.
(If the code ends up with two edges next to each other due to issues with the signal, a midpoint
with a +- uncertainty is taken).
Finally, a low and high threshold is used to define less important edges and what is certaintly an
edge. Here, as we want one strong edge for the shock arrival, any edges below the lower threshold
(the post_suppression_threshold) are set to 0, and a high threshold is not used. Similar to the
derivative_threshold discussed above, the post_suppression_threshold would also be expected to scale
with the magnitude of the input signal.
As the selection of input parameters do scale with the magnitude of the signal, they must be modified
as required, but it is hoped that they can hopefully be setup for one experimental campaign,
and then used for all analysis after that for each signal.
The default setting is for the code to find only one peak, but it can also find a second peak
for reflected shock tunnel usage if required.
Returns the time and uncertainty for both the first and second peaks in the data,
which should be the first and second shock arrival times if the code is configured correctly
for the data. If the second peak is not asked for, its value are returned as None
:param time_list: List or array of time values. Is turned into an array by the code either way.
:param pressure_list: List or array of pressure (or other quantity) values. Is turned into an array by the code either way.
:param sigma: standard deviation to be used with the Gaussian filters. Defaults to 4 but should be changed
to suit the data.
:param derivative_threshold: threshold below which the non-maximum supression part of the calculation
will take values to be zero. Used to counter numerical errors causing the second order Gaussian values
where it crosses zero to not be exactly zero. Defaults to 0.001 but should be changed to suit the data.
:param post_suppression_threshold: threshold applied to the data after the non-maximum supression
has been performed. Is basically tuned to ensure that any values left in the noise before
shock arrival are removed in favour of the larger peak when the shock arrives. Defaults to
0.05 but should be modified as required until the first peak found by the code is the first shock arrival time.
:param post_shock_pressure: A post shock pressure estimate can be entered instead of the post_supression_threshold.
It will then be used to calculate a new rough post_suppression threshold based on the post-shock pressure value
and the sigma value. Input should be in the same units as the data
:param start_time: start_time can be used to make the code start at a certain time. Any value below that will be ignored.
Defaults to None (i.e. not used)
:param find_second_peak: Input flag for reflected shock tunnels where there is expected to be a
second shock arrival. Defaults to None and will then be set to the first value in the time_list.
:param plot: flag to plot the result. Defaults to True.
:param plot_scale_factor: used to scale the results so they have a similar magnitude to the original data.
Defaults to None, which means that no scaling will be used.
:param amount_of_values_before_and_after: amount of values before and after the found shock arrival time
to use for calculating the y limits for the plot. Defaults to 100.
:param time_unit: time unit for the plotting. Defaults to 's (seconds)'.
:param pressure_unit: time unit for the y axis Defaults to 'kPa' (pressure in kilopascals).
:param plot_title: string title for the plot. Defaults to 'canny shock finding result'
:param return_processing_lists: returns a dictionary with the tine and pressure lists, as well as the gaussian filtered results,
and the non-maximum suppression and final result lists. Useful for making plots of how the process has worked.
Defaults to False.
:param calculate_automatic_derivative_threshold: Makes the program calculate an automatic derivative threshold basically using
an emprirical correlation which I worked out based on how the required derivative threshold changes with the sigma value.
:param check_canny_runtime: flag to make the program time how long the Canny edge detection procedure
actually takes. Mainly done for testing purposes.
:param plot_time_unit: Allows a different time unit to be specified for the results plot, so that the
original calculation can be performed in the original units, but then plotted in another. Relies on
the user using the correct plot_time_scale factor to connect the two time units. Defaults to 's' (seconds).
:param plot_time_scale: See plot_time_unit above. These two inputs allow the results plot to be in a different
time unit to the original data. The user must specify this scale to connect the original and plotted
time units. Defaults to 1.0 (so no change if time units are seconds).
"""
2024-10-15 15:11:10 +10:00
if check_canny_runtime:
import time
run_start_time = time.time()
# do any required imports
import scipy.ndimage.filters
import numpy as np
import copy
2024-10-15 21:22:40 +10:00
print('-'*60)
print("Running Canny shock finder version {0}".format(VERSION_STRING))
2024-10-15 15:11:10 +10:00
# do some basic input checking
if not isinstance(sigma, (float, int)):
raise Exception("canny_shock_finder(): The user specified 'sigma' value ({0}) is not a float or an int.".format(sigma))
if post_shock_pressure and not isinstance(post_shock_pressure, (float, int)):
raise Exception("canny_shock_finder(): The user specified 'post_shock_pressure' value ({0}) is not a float or an int.".format(post_shock_pressure))
elif not post_shock_pressure:
if not isinstance(post_suppression_threshold, (float, int)):
raise Exception("canny_shock_finder(): The user specified 'post_suppression_threshold' value ({0}) is not a float or an int.".format(post_suppression_threshold))
if not auto_derivative:
if calculate_automatic_derivative_threshold and not post_shock_pressure:
raise Exception("canny_shock_finder(): 'calculate_automatic_derivative_threshold' cannot be used without a specified post-shock pressure value.")
elif not calculate_automatic_derivative_threshold:
if not isinstance(derivative_threshold, (float, int)):
raise Exception("canny_shock_finder(): The user specified 'derivative_threshold' value ({0}) is not a float or an int.".format(derivative_threshold))
if not start_time:
start_time = time_list[0]
if post_shock_pressure:
2024-10-15 21:22:40 +10:00
print("Using post-shock pressure scaling so the post_suppression_threshold will be calculated using a post-shock pressure estimate.")
2024-10-15 15:11:10 +10:00
# we need to calculate our post_suppression_threshold here based on the expected post-shock pressure and the
# scaling caused by the first order gaussian data based on the maximum of the Gaussian
# we'll take it to be half of the pressure rise, and will divide out Gaussian max by 2 as the first derivative
# of a gaussian has a maximum which is about half that of a normal Gaussian...
# make a Gaussian with a mean of zero and our sigma
from scipy.stats import norm
gaussian = norm(loc=0., scale=sigma)
# tehn get the value of the probability density function when it is 0 (which is the max)
gaussian_max = gaussian.pdf(0)
x = np.linspace(-5*sigma, 5*sigma, 1000)
y = scipy.stats.norm.pdf(x, 0, sigma)
dx = x[1] - x[0]
dydx = np.gradient(y, dx)
gaussian_first_derivative_max = max(dydx)
if sigma < 1.0:
post_suppression_threshold = 0.1*post_shock_pressure * gaussian_first_derivative_max
elif 1.0 <= sigma <= 2.0:
post_suppression_threshold = 0.5*post_shock_pressure * gaussian_first_derivative_max
else:
post_suppression_threshold = post_shock_pressure * gaussian_first_derivative_max
#post_suppression_threshold = 0.5 * post_shock_pressure * gaussian_max/2.0
2024-10-15 21:22:40 +10:00
print("Calculated post_suppression_threshold is {0}".format(post_suppression_threshold))
2024-10-15 15:11:10 +10:00
if calculate_automatic_derivative_threshold:
2024-10-15 21:22:40 +10:00
print("Calculating automatic derivative threshold as the user has asked for this.")
2024-10-15 15:11:10 +10:00
# this commented out code here was my original model, based on the actual second derivative of the Gaussian,
# but it didn't seem to work too well (it got too small at very high sigma values, i.e. above 6 or so)
#dy2d2x = np.gradient(dydx, dx)
#gaussian_second_derivative_max = max(dy2d2x)
#derivative_threshold = gaussian_second_derivative_max / 10.0
# so I basically took some data for a case with a good step change rise of 5 kPa and worked out
# that it was exponential decay and could be pretty well approximated with the function below
# testing shows that it works alright
# I made it stop at a sigma of 6 as, like above, the value was getting too big...
if sigma < 6:
derivative_threshold = post_shock_pressure / 2.5 * np.exp(-sigma) / 10.0
else:
derivative_threshold = post_shock_pressure / 2.5 * np.exp(-6) / 10.0
2024-10-15 21:22:40 +10:00
print("Calculated derivative_threshold is {0}.".format(derivative_threshold))
2024-10-15 15:11:10 +10:00
# make the input data arrays incase they didn't come in that way...
time_list = np.array(time_list)
pressure_list = np.array(pressure_list)
# convolve the input data with both the first and second derivatives of a Gaussian
# order = 1 gaussian filter (convolution with first derivative of a Gaussian)
first_order_data = scipy.ndimage.filters.gaussian_filter1d(pressure_list, sigma = sigma, order = 1)
#order = 2 gaussian fitler (convolution with second derivative of a Gaussian)
second_order_data = scipy.ndimage.filters.gaussian_filter1d(pressure_list, sigma = sigma, order = 2)
# now we start doing the non-maximum suppression
# copy the original first_order_data list and then change values to zero as we need to
suppressed_data = copy.copy(first_order_data)
have_found_first_value = False # flag to tell the code when we have passed the first value
# set all values to None, so None will be returned if nothing is found.
first_value = None
first_value_uncertainty = None
if auto_derivative:
2024-10-15 21:22:40 +10:00
print("Doing auto-derivative!")
2024-10-15 15:11:10 +10:00
# remove points which have the same gradient on either side
for i in range(0,len(first_order_data)-1):
if np.sign(second_order_data[i-1]) == np.sign(second_order_data[i+1]):
suppressed_data[i] = 0
else:
2024-10-15 21:22:40 +10:00
print("Not doing auto-derivative!")
2024-10-15 15:11:10 +10:00
for i in range(0,len(first_order_data)-1):
# check the gradients on either side using the second order data
# if they are the same, set the value to 0
# the derivative_threshold is also used here to set what is actually zero (that is the elif statement)
# if they are NOT the same, we keep the value, so don't change anything...
if np.sign(second_order_data[i-1]) == np.sign(second_order_data[i+1]):
suppressed_data[i] = 0
# other condition, which is to try to stop any numerical noise in the point where the second order gaussian crosses zero
elif np.sign(second_order_data[i-1]) != np.sign(second_order_data[i+1]) and abs(second_order_data[i-1]) < derivative_threshold or \
np.sign(second_order_data[i-1]) != np.sign(second_order_data[i+1]) and abs(second_order_data[i+1]) < derivative_threshold:
suppressed_data[i] = 0
# now we do the final threshold and go through the non-maximum suppressed data and
# remove any values below the user specified post_suppression_threshold
post_suppression_thresholded_data = copy.copy(suppressed_data)
for i in range(0, len(suppressed_data)-1):
if abs(post_suppression_thresholded_data[i]) < post_suppression_threshold:
post_suppression_thresholded_data[i] = 0
# now loop through again and find the first peak (and the second peak if the user has asked for it)
if find_second_peak:
second_value = None
second_value_uncertainty = None
for i in range(0,len(post_suppression_thresholded_data)-1):
# first check that we are past our start time, this will just be the first value is the user hasn't specified something else...
if time_list[i] >= start_time:
# we want the first peak!
if post_suppression_thresholded_data[i] != 0 and not have_found_first_value:
have_found_first_value = True
print('-'*60)
print("First value found at {0} seconds, {1}th value in the data.".format(time_list[i], i))
print("Magnitude of first value found in first order Gaussian is {0}.".format(post_suppression_thresholded_data[i]))
print("Magnitude of first value found in second order Gaussian is {0}.".format(second_order_data[i]))
# store the value too
# if the value after the value found isn't 0, the codes the next value as well
# and takes the trigger time to be a midpoint with a +- uncertainty
# if not, it just takes the single value
if auto_derivative:
# reducing derivative
if second_order_data[i] > second_order_data[i+1]:
print("Computing intersection for reducing derivative.")
delta_derivative = (second_order_data[i+1] - second_order_data[i])
delta_time = time_list[i+1] - time_list[i]
first_value = time_list[i] + (second_order_data[i]/-delta_derivative)*delta_time
first_value_uncertainty = (time_list[i+1]-time_list[i])/2.0
break
# increasing derivative
elif second_order_data[i] < second_order_data[i+1]:
print("Computing intersection for increasing derivative.")
delta_derivative = (second_order_data[i+1] - second_order_data[i])
delta_time = time_list[i+1] - time_list[i]
first_value = time_list[i] + (second_order_data[i]/-delta_derivative)*delta_time
first_value_uncertainty = (time_list[i]-time_list[i-1])/2.0
break
else:
# could not interpolate to find the second derivative zero crossing
print("Could not interpolate to find the second derivative zero crossing.")
print("Value of second derivative at 'i' is {0} and 'i+1' is {1}".format(second_order_data[i], second_order_data[i+1]))
break
elif post_suppression_thresholded_data[i+1] == 0:
first_value = time_list[i]
first_value_uncertainty = 0.0
if not find_second_peak:
break
else:
# flag that is used to stop the code picking up the value next to it if the peak found has two values
ignore_next_value = False
else:
first_value = (time_list[i] + (time_list[i+1] - time_list[i])/2.0)
first_value_uncertainty = ((time_list[i+1] - time_list[i])/2.0)
if not find_second_peak:
break
else:
# flag that is used to stop the code picking up the value next to it if the peak found has two values
ignore_next_value = True
elif post_suppression_thresholded_data[i] != 0 and have_found_first_value and ignore_next_value:
# change the flag back after the next value has been passed...
ignore_next_value = False
elif post_suppression_thresholded_data[i] != 0 and have_found_first_value and not ignore_next_value:
print("Second value found at {0} seconds, {1}th value in the data.".format(time_list[i], i))
print("Magnitude of second value found in first order Gaussian is {0}.".format(post_suppression_thresholded_data[i]))
print("Magnitude of first value found in second order Gaussian is {0}.".format(second_order_data[i]))
# store the value too
# if the value after the value found isn't 0, the codes the next value as well
# and takes the trigger time to be a midpoint with a +- uncertainty
# if not, it just takes the single value
if post_suppression_thresholded_data[i+1] == 0:
second_value = time_list[i]
second_value_uncertainty = 0.0
else:
second_value = (time_list[i] + (time_list[i+1] - time_list[i])/2.0)
second_value_uncertainty = ((time_list[i+1] - time_list[i])/2.0)
break
if check_canny_runtime:
run_finish_time = time.time()
total_run_time = run_finish_time - run_start_time
print('-'*60)
print("Total Canny run time was {0} milliseconds".format(total_run_time*1000.0))
print('-'*60)
if plot:
try: # this is mainly so the code doesn't bail out if one closes a window before it has loaded properly
import matplotlib.pyplot as mplt
2024-10-15 21:22:40 +10:00
2024-10-15 15:11:10 +10:00
figure, (data_ax, convolution_ax) = mplt.subplots(2,1, sharex=True, figsize = (14,8))
data_ax.plot(time_list*plot_time_scale, pressure_list, '-o', label = 'original data')
convolution_ax.plot(time_list*plot_time_scale, first_order_data, '-o', label='first order gaussian (sigma = {0})'.format(sigma))
convolution_ax.plot(time_list*plot_time_scale, second_order_data, '-o', label='second order gaussian (sigma = {0})'.format(sigma))
convolution_ax.plot(time_list*plot_time_scale, suppressed_data, '-o', label='first order with non-max suppression')
convolution_ax.plot(time_list*plot_time_scale, post_suppression_thresholded_data, '-o', label='final result')
if first_value:
if find_second_peak and second_value:
first_shock_arrival_label = 'first shock arrival time'
else:
first_shock_arrival_label = 'shock arrival time'
if first_value_uncertainty:
for ax in [data_ax, convolution_ax]:
ax.axvline((first_value - first_value_uncertainty)*plot_time_scale,
ymin=0.0, ymax = 1.0, linestyle = '--',
color = 'k', label = first_shock_arrival_label)
ax.axvline((first_value + first_value_uncertainty)*plot_time_scale,
ymin=0.0, ymax = 1.0, linestyle = '--', color = 'k')
else:
for ax in [data_ax, convolution_ax]:
ax.axvline(first_value*plot_time_scale, ymin=0.0, ymax = 1.0,
linestyle = '--', color = 'k', label = first_shock_arrival_label)
if find_second_peak and second_value:
if second_value_uncertainty:
for ax in [data_ax, convolution_ax]:
ax.axvline((second_value - second_value_uncertainty)*plot_time_scale,
ymin=0.0, ymax = 1.0, linestyle = '--', color = 'k',
label = "second shock arrival time")
ax.axvline((second_value + second_value_uncertainty)*plot_time_scale,
ymin=0.0, ymax = 1.0, linestyle = '--', color = 'k')
else:
for ax in [data_ax, convolution_ax]:
ax.axvline(second_value, ymin=0.0, ymax = 1.0,
linestyle = '--', color = 'k', label = "second shock arrival time")
font_sizes = {'title_size':16, 'label_size':18,'annotation_size':11,
'legend_text_size':11, 'tick_size':17, 'marker_size':12,
'main_title_size':20}
convolution_ax.set_xlabel(r'Time ({0})'.format(plot_time_unit), fontsize=font_sizes['label_size'])
data_ax.set_ylabel(y_label, fontsize=font_sizes['label_size'])
data_ax.set_title(plot_title, fontsize=font_sizes['title_size'])
# get the x and y limits by going through the data if we can...
if first_value and amount_of_values_before_and_after:
cut_down_lists = {}
cut_down_lists['pressure'] = []
cut_down_lists['first_order'] = []
cut_down_lists['second_order'] = []
cut_down_lists['suppressed'] = []
cut_down_lists['post_suppression'] = []
# get our sample size by taking two time values from each other...
sample_size = time_list[1] - time_list[0]
cut_down_time = sample_size*amount_of_values_before_and_after
if 'second_value' not in locals() or not second_value:
start_time = first_value - cut_down_time
end_time = first_value + cut_down_time
else:
# we need to get between two values if we have them...
start_time = first_value - cut_down_time
end_time = second_value + cut_down_time
for i, time in enumerate(time_list):
if time >= start_time and time <= end_time:
cut_down_lists['pressure'].append(pressure_list[i])
cut_down_lists['first_order'].append(first_order_data[i])
cut_down_lists['second_order'].append(second_order_data[i])
cut_down_lists['suppressed'].append(suppressed_data[i])
cut_down_lists['post_suppression'].append(post_suppression_thresholded_data[i])
elif time > end_time:
break
data_ax.set_xlim(start_time*plot_time_scale, end_time*plot_time_scale)
convolution_ax.set_xlim(start_time*plot_time_scale, end_time*plot_time_scale)
# y-axis for the pressure plot is fine, just take the min and max...
pressure_y_min = min(cut_down_lists['pressure'])
pressure_y_max = max(cut_down_lists['pressure'])
data_ax.set_ylim(pressure_y_min, pressure_y_max)
# y is a bit more complicated for the non-pressure values, as we have multiple things to plot on the y axis....
# we'll go through the dictionary keys...
y_min = None
y_max = None
for key in cut_down_lists.keys():
if key not in ['pressure', 'post_suppression']: # skip these two here...
for value in cut_down_lists[key]:
if not y_min:
y_min = value
else:
if value < y_min:
y_min = value
if not y_max:
y_max = value
else:
if value > y_max:
y_max = value
convolution_ax.set_ylim(y_min, y_max)
for ax in [data_ax, convolution_ax]:
# add grid and change ticks
ax.spines["right"].set_visible(False)
ax.spines["top"].set_visible(False)
#figure_dict[plot].grid(True)
ax.minorticks_on()
ax.tick_params(which='both', direction='out')
ax.yaxis.set_ticks_position('left')
ax.xaxis.set_ticks_position('bottom')
2024-10-16 20:57:25 +10:00
#for tick in ax.yaxis.get_major_ticks():
# tick.label.set_fontsize(font_sizes['tick_size'])
#for tick in ax.xaxis.get_major_ticks():
# tick.label.set_fontsize(font_sizes['tick_size'])
2024-10-15 15:11:10 +10:00
ax.legend(prop={'size':font_sizes['legend_text_size']}, loc = 'best')
ax.legend(loc = 'best')
mplt.show()
2024-10-15 21:34:29 +10:00
finally: # Needed so the Try doesn't complain
pass
# [TODO] Renable this
#except Exception as e:
# print (e)
# print ("There was an issue plotting the result.")
# #mplt.close('all')
2024-10-15 15:11:10 +10:00
if not return_processing_lists:
# just return the found arrival time/times
if find_second_peak:
return first_value, first_value_uncertainty, second_value, second_value_uncertainty
else:
return first_value, first_value_uncertainty, None, None
else:
# put all of teh results in a dictionary and return it too
results_dict = {}
results_dict['time_list'] = time_list
results_dict['pressure_list'] = pressure_list
results_dict['first_order_data'] = first_order_data
results_dict['second_order_data'] = second_order_data
results_dict['suppressed_data'] = suppressed_data
results_dict['post_suppression_thresholded_data'] = post_suppression_thresholded_data
if find_second_peak:
return first_value, first_value_uncertainty, second_value, second_value_uncertainty, results_dict
else:
return first_value, first_value_uncertainty, None, None, results_dict