Basic Data Loading

This commit is contained in:
Cal Wing 2024-10-15 15:11:10 +10:00
parent 0f089db409
commit 86ca28948e
5 changed files with 569 additions and 93 deletions

View File

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
""" """
Canny_shock_finder.py Canny_shock_finder.py
A little shock finding code which I [Chris] wrote using a cut down one-dimensional Canny edge detector. A little shock finding code which I wrote using a cut down one-dimensional Canny edge detector.
Seems to work quite well with the appropriate selection of input parameters. Seems to work quite well with the appropriate selection of input parameters.
The original paper is: The original paper is:
@ -10,19 +11,17 @@ Canny, J. (1986). A computational approach to edge detection.
IEEE Transactions on pattern analysis and machine intelligence, (6), 679-698. IEEE Transactions on pattern analysis and machine intelligence, (6), 679-698.
Chris James (c.james4@uq.edu.au) - 04/07/17 Chris James (c.james4@uq.edu.au) - 04/07/17
Updated in 2024 by Cal Wing to use Python 3
This refactor *REMOVED* some features, namely graphing
""" """
__version__ = "0.1.0" VERSION_STRING = "15-Oct-2024"
__date__ = "15/10/2024"
def canny_shock_finder(time_list, pressure_list, sigma = 4, derivative_threshold = 0.001, auto_derivative = False, post_suppression_threshold = 0.05, def canny_shock_finder(time_list, pressure_list, sigma = 4, derivative_threshold = 0.001, auto_derivative = False, post_suppression_threshold = 0.05,
post_shock_pressure = None, start_time = None, find_second_peak = None, plot = True, plot_scale_factor = None, post_shock_pressure = None, start_time = None, find_second_peak = None, plot = True, plot_scale_factor = None,
amount_of_values_before_and_after = 100, time_unit = 's', y_label = 'Pressure (kPa)', plot_title = 'canny shock finding result', amount_of_values_before_and_after = 100, time_unit = 's', y_label = 'Pressure (kPa)', plot_title = 'canny shock finding result',
return_processing_lists = False, calculate_automatic_derivative_threshold = False, return_processing_lists = False, calculate_automatic_derivative_threshold = False,
check_canny_runtime = False, plot_time_unit = 's', plot_time_scale = 1.0, print_diag_info: bool = False): check_canny_runtime = False, plot_time_unit = 's', plot_time_scale = 1.0):
""" """
This is a basic function with uses a cut down one-dimensional Canny edge detector to find shock arrival. This is a basic function with uses a cut down one-dimensional Canny edge detector to find shock arrival.
It seems to work quite well with the appropriate selection of input parameters It seems to work quite well with the appropriate selection of input parameters
@ -117,4 +116,388 @@ def canny_shock_finder(time_list, pressure_list, sigma = 4, derivative_threshold
time units. Defaults to 1.0 (so no change if time units are seconds). time units. Defaults to 1.0 (so no change if time units are seconds).
""" """
if check_canny_runtime:
import time
run_start_time = time.time()
# do any required imports
import scipy.ndimage.filters
import numpy as np
import copy
print ('-'*60)
print ("Running Canny shock finder version {0}".format(VERSION_STRING))
# do some basic input checking
if not isinstance(sigma, (float, int)):
raise Exception("canny_shock_finder(): The user specified 'sigma' value ({0}) is not a float or an int.".format(sigma))
if post_shock_pressure and not isinstance(post_shock_pressure, (float, int)):
raise Exception("canny_shock_finder(): The user specified 'post_shock_pressure' value ({0}) is not a float or an int.".format(post_shock_pressure))
elif not post_shock_pressure:
if not isinstance(post_suppression_threshold, (float, int)):
raise Exception("canny_shock_finder(): The user specified 'post_suppression_threshold' value ({0}) is not a float or an int.".format(post_suppression_threshold))
if not auto_derivative:
if calculate_automatic_derivative_threshold and not post_shock_pressure:
raise Exception("canny_shock_finder(): 'calculate_automatic_derivative_threshold' cannot be used without a specified post-shock pressure value.")
elif not calculate_automatic_derivative_threshold:
if not isinstance(derivative_threshold, (float, int)):
raise Exception("canny_shock_finder(): The user specified 'derivative_threshold' value ({0}) is not a float or an int.".format(derivative_threshold))
if not start_time:
start_time = time_list[0]
if post_shock_pressure:
print ("Using post-shock pressure scaling so the post_suppression_threshold will be calculated using a post-shock pressure estimate.")
# we need to calculate our post_suppression_threshold here based on the expected post-shock pressure and the
# scaling caused by the first order gaussian data based on the maximum of the Gaussian
# we'll take it to be half of the pressure rise, and will divide out Gaussian max by 2 as the first derivative
# of a gaussian has a maximum which is about half that of a normal Gaussian...
# make a Gaussian with a mean of zero and our sigma
from scipy.stats import norm
gaussian = norm(loc=0., scale=sigma)
# tehn get the value of the probability density function when it is 0 (which is the max)
gaussian_max = gaussian.pdf(0)
x = np.linspace(-5*sigma, 5*sigma, 1000)
y = scipy.stats.norm.pdf(x, 0, sigma)
dx = x[1] - x[0]
dydx = np.gradient(y, dx)
gaussian_first_derivative_max = max(dydx)
if sigma < 1.0:
post_suppression_threshold = 0.1*post_shock_pressure * gaussian_first_derivative_max
elif 1.0 <= sigma <= 2.0:
post_suppression_threshold = 0.5*post_shock_pressure * gaussian_first_derivative_max
else:
post_suppression_threshold = post_shock_pressure * gaussian_first_derivative_max
#post_suppression_threshold = 0.5 * post_shock_pressure * gaussian_max/2.0
print ("Calculated post_suppression_threshold is {0}".format(post_suppression_threshold))
if calculate_automatic_derivative_threshold:
print ("Calculating automatic derivative threshold as the user has asked for this.")
# this commented out code here was my original model, based on the actual second derivative of the Gaussian,
# but it didn't seem to work too well (it got too small at very high sigma values, i.e. above 6 or so)
#dy2d2x = np.gradient(dydx, dx)
#gaussian_second_derivative_max = max(dy2d2x)
#derivative_threshold = gaussian_second_derivative_max / 10.0
# so I basically took some data for a case with a good step change rise of 5 kPa and worked out
# that it was exponential decay and could be pretty well approximated with the function below
# testing shows that it works alright
# I made it stop at a sigma of 6 as, like above, the value was getting too big...
if sigma < 6:
derivative_threshold = post_shock_pressure / 2.5 * np.exp(-sigma) / 10.0
else:
derivative_threshold = post_shock_pressure / 2.5 * np.exp(-6) / 10.0
print ("Calculated derivative_threshold is {0}.".format(derivative_threshold))
# make the input data arrays incase they didn't come in that way...
time_list = np.array(time_list)
pressure_list = np.array(pressure_list)
# convolve the input data with both the first and second derivatives of a Gaussian
# order = 1 gaussian filter (convolution with first derivative of a Gaussian)
first_order_data = scipy.ndimage.filters.gaussian_filter1d(pressure_list, sigma = sigma, order = 1)
#order = 2 gaussian fitler (convolution with second derivative of a Gaussian)
second_order_data = scipy.ndimage.filters.gaussian_filter1d(pressure_list, sigma = sigma, order = 2)
# now we start doing the non-maximum suppression
# copy the original first_order_data list and then change values to zero as we need to
suppressed_data = copy.copy(first_order_data)
have_found_first_value = False # flag to tell the code when we have passed the first value
# set all values to None, so None will be returned if nothing is found.
first_value = None
first_value_uncertainty = None
if auto_derivative:
print ("Doing auto-derivative!")
# remove points which have the same gradient on either side
for i in range(0,len(first_order_data)-1):
if np.sign(second_order_data[i-1]) == np.sign(second_order_data[i+1]):
suppressed_data[i] = 0
else:
print ("Not doing auto-derivative!")
for i in range(0,len(first_order_data)-1):
# check the gradients on either side using the second order data
# if they are the same, set the value to 0
# the derivative_threshold is also used here to set what is actually zero (that is the elif statement)
# if they are NOT the same, we keep the value, so don't change anything...
if np.sign(second_order_data[i-1]) == np.sign(second_order_data[i+1]):
suppressed_data[i] = 0
# other condition, which is to try to stop any numerical noise in the point where the second order gaussian crosses zero
elif np.sign(second_order_data[i-1]) != np.sign(second_order_data[i+1]) and abs(second_order_data[i-1]) < derivative_threshold or \
np.sign(second_order_data[i-1]) != np.sign(second_order_data[i+1]) and abs(second_order_data[i+1]) < derivative_threshold:
suppressed_data[i] = 0
# now we do the final threshold and go through the non-maximum suppressed data and
# remove any values below the user specified post_suppression_threshold
post_suppression_thresholded_data = copy.copy(suppressed_data)
for i in range(0, len(suppressed_data)-1):
if abs(post_suppression_thresholded_data[i]) < post_suppression_threshold:
post_suppression_thresholded_data[i] = 0
# now loop through again and find the first peak (and the second peak if the user has asked for it)
if find_second_peak:
second_value = None
second_value_uncertainty = None
for i in range(0,len(post_suppression_thresholded_data)-1):
# first check that we are past our start time, this will just be the first value is the user hasn't specified something else...
if time_list[i] >= start_time:
# we want the first peak!
if post_suppression_thresholded_data[i] != 0 and not have_found_first_value:
have_found_first_value = True
print('-'*60)
print("First value found at {0} seconds, {1}th value in the data.".format(time_list[i], i))
print("Magnitude of first value found in first order Gaussian is {0}.".format(post_suppression_thresholded_data[i]))
print("Magnitude of first value found in second order Gaussian is {0}.".format(second_order_data[i]))
# store the value too
# if the value after the value found isn't 0, the codes the next value as well
# and takes the trigger time to be a midpoint with a +- uncertainty
# if not, it just takes the single value
if auto_derivative:
# reducing derivative
if second_order_data[i] > second_order_data[i+1]:
print("Computing intersection for reducing derivative.")
delta_derivative = (second_order_data[i+1] - second_order_data[i])
delta_time = time_list[i+1] - time_list[i]
first_value = time_list[i] + (second_order_data[i]/-delta_derivative)*delta_time
first_value_uncertainty = (time_list[i+1]-time_list[i])/2.0
break
# increasing derivative
elif second_order_data[i] < second_order_data[i+1]:
print("Computing intersection for increasing derivative.")
delta_derivative = (second_order_data[i+1] - second_order_data[i])
delta_time = time_list[i+1] - time_list[i]
first_value = time_list[i] + (second_order_data[i]/-delta_derivative)*delta_time
first_value_uncertainty = (time_list[i]-time_list[i-1])/2.0
break
else:
# could not interpolate to find the second derivative zero crossing
print("Could not interpolate to find the second derivative zero crossing.")
print("Value of second derivative at 'i' is {0} and 'i+1' is {1}".format(second_order_data[i], second_order_data[i+1]))
break
elif post_suppression_thresholded_data[i+1] == 0:
first_value = time_list[i]
first_value_uncertainty = 0.0
if not find_second_peak:
break
else:
# flag that is used to stop the code picking up the value next to it if the peak found has two values
ignore_next_value = False
else:
first_value = (time_list[i] + (time_list[i+1] - time_list[i])/2.0)
first_value_uncertainty = ((time_list[i+1] - time_list[i])/2.0)
if not find_second_peak:
break
else:
# flag that is used to stop the code picking up the value next to it if the peak found has two values
ignore_next_value = True
elif post_suppression_thresholded_data[i] != 0 and have_found_first_value and ignore_next_value:
# change the flag back after the next value has been passed...
ignore_next_value = False
elif post_suppression_thresholded_data[i] != 0 and have_found_first_value and not ignore_next_value:
print("Second value found at {0} seconds, {1}th value in the data.".format(time_list[i], i))
print("Magnitude of second value found in first order Gaussian is {0}.".format(post_suppression_thresholded_data[i]))
print("Magnitude of first value found in second order Gaussian is {0}.".format(second_order_data[i]))
# store the value too
# if the value after the value found isn't 0, the codes the next value as well
# and takes the trigger time to be a midpoint with a +- uncertainty
# if not, it just takes the single value
if post_suppression_thresholded_data[i+1] == 0:
second_value = time_list[i]
second_value_uncertainty = 0.0
else:
second_value = (time_list[i] + (time_list[i+1] - time_list[i])/2.0)
second_value_uncertainty = ((time_list[i+1] - time_list[i])/2.0)
break
if check_canny_runtime:
run_finish_time = time.time()
total_run_time = run_finish_time - run_start_time
print('-'*60)
print("Total Canny run time was {0} milliseconds".format(total_run_time*1000.0))
print('-'*60)
if plot:
try: # this is mainly so the code doesn't bail out if one closes a window before it has loaded properly
import matplotlib.pyplot as mplt
figure, (data_ax, convolution_ax) = mplt.subplots(2,1, sharex=True, figsize = (14,8))
data_ax.plot(time_list*plot_time_scale, pressure_list, '-o', label = 'original data')
convolution_ax.plot(time_list*plot_time_scale, first_order_data, '-o', label='first order gaussian (sigma = {0})'.format(sigma))
convolution_ax.plot(time_list*plot_time_scale, second_order_data, '-o', label='second order gaussian (sigma = {0})'.format(sigma))
convolution_ax.plot(time_list*plot_time_scale, suppressed_data, '-o', label='first order with non-max suppression')
convolution_ax.plot(time_list*plot_time_scale, post_suppression_thresholded_data, '-o', label='final result')
if first_value:
if find_second_peak and second_value:
first_shock_arrival_label = 'first shock arrival time'
else:
first_shock_arrival_label = 'shock arrival time'
if first_value_uncertainty:
for ax in [data_ax, convolution_ax]:
ax.axvline((first_value - first_value_uncertainty)*plot_time_scale,
ymin=0.0, ymax = 1.0, linestyle = '--',
color = 'k', label = first_shock_arrival_label)
ax.axvline((first_value + first_value_uncertainty)*plot_time_scale,
ymin=0.0, ymax = 1.0, linestyle = '--', color = 'k')
else:
for ax in [data_ax, convolution_ax]:
ax.axvline(first_value*plot_time_scale, ymin=0.0, ymax = 1.0,
linestyle = '--', color = 'k', label = first_shock_arrival_label)
if find_second_peak and second_value:
if second_value_uncertainty:
for ax in [data_ax, convolution_ax]:
ax.axvline((second_value - second_value_uncertainty)*plot_time_scale,
ymin=0.0, ymax = 1.0, linestyle = '--', color = 'k',
label = "second shock arrival time")
ax.axvline((second_value + second_value_uncertainty)*plot_time_scale,
ymin=0.0, ymax = 1.0, linestyle = '--', color = 'k')
else:
for ax in [data_ax, convolution_ax]:
ax.axvline(second_value, ymin=0.0, ymax = 1.0,
linestyle = '--', color = 'k', label = "second shock arrival time")
font_sizes = {'title_size':16, 'label_size':18,'annotation_size':11,
'legend_text_size':11, 'tick_size':17, 'marker_size':12,
'main_title_size':20}
convolution_ax.set_xlabel(r'Time ({0})'.format(plot_time_unit), fontsize=font_sizes['label_size'])
data_ax.set_ylabel(y_label, fontsize=font_sizes['label_size'])
data_ax.set_title(plot_title, fontsize=font_sizes['title_size'])
# get the x and y limits by going through the data if we can...
if first_value and amount_of_values_before_and_after:
cut_down_lists = {}
cut_down_lists['pressure'] = []
cut_down_lists['first_order'] = []
cut_down_lists['second_order'] = []
cut_down_lists['suppressed'] = []
cut_down_lists['post_suppression'] = []
# get our sample size by taking two time values from each other...
sample_size = time_list[1] - time_list[0]
cut_down_time = sample_size*amount_of_values_before_and_after
if 'second_value' not in locals() or not second_value:
start_time = first_value - cut_down_time
end_time = first_value + cut_down_time
else:
# we need to get between two values if we have them...
start_time = first_value - cut_down_time
end_time = second_value + cut_down_time
for i, time in enumerate(time_list):
if time >= start_time and time <= end_time:
cut_down_lists['pressure'].append(pressure_list[i])
cut_down_lists['first_order'].append(first_order_data[i])
cut_down_lists['second_order'].append(second_order_data[i])
cut_down_lists['suppressed'].append(suppressed_data[i])
cut_down_lists['post_suppression'].append(post_suppression_thresholded_data[i])
elif time > end_time:
break
data_ax.set_xlim(start_time*plot_time_scale, end_time*plot_time_scale)
convolution_ax.set_xlim(start_time*plot_time_scale, end_time*plot_time_scale)
# y-axis for the pressure plot is fine, just take the min and max...
pressure_y_min = min(cut_down_lists['pressure'])
pressure_y_max = max(cut_down_lists['pressure'])
data_ax.set_ylim(pressure_y_min, pressure_y_max)
# y is a bit more complicated for the non-pressure values, as we have multiple things to plot on the y axis....
# we'll go through the dictionary keys...
y_min = None
y_max = None
for key in cut_down_lists.keys():
if key not in ['pressure', 'post_suppression']: # skip these two here...
for value in cut_down_lists[key]:
if not y_min:
y_min = value
else:
if value < y_min:
y_min = value
if not y_max:
y_max = value
else:
if value > y_max:
y_max = value
convolution_ax.set_ylim(y_min, y_max)
for ax in [data_ax, convolution_ax]:
# add grid and change ticks
ax.spines["right"].set_visible(False)
ax.spines["top"].set_visible(False)
#figure_dict[plot].grid(True)
ax.minorticks_on()
ax.tick_params(which='both', direction='out')
ax.yaxis.set_ticks_position('left')
ax.xaxis.set_ticks_position('bottom')
for tick in ax.yaxis.get_major_ticks():
tick.label.set_fontsize(font_sizes['tick_size'])
for tick in ax.xaxis.get_major_ticks():
tick.label.set_fontsize(font_sizes['tick_size'])
ax.legend(prop={'size':font_sizes['legend_text_size']}, loc = 'best')
ax.legend(loc = 'best')
mplt.show()
except Exception as e:
print ("{0}: {1}".format(type(e).__name__, e.message))
print ("There was an issue plotting the result.")
mplt.close('all')
if not return_processing_lists:
# just return the found arrival time/times
if find_second_peak:
return first_value, first_value_uncertainty, second_value, second_value_uncertainty
else:
return first_value, first_value_uncertainty, None, None
else:
# put all of teh results in a dictionary and return it too
results_dict = {}
results_dict['time_list'] = time_list
results_dict['pressure_list'] = pressure_list
results_dict['first_order_data'] = first_order_data
results_dict['second_order_data'] = second_order_data
results_dict['suppressed_data'] = suppressed_data
results_dict['post_suppression_thresholded_data'] = post_suppression_thresholded_data
if find_second_peak:
return first_value, first_value_uncertainty, second_value, second_value_uncertainty, results_dict
else:
return first_value, first_value_uncertainty, None, None, results_dict

View File

@ -1,9 +1,17 @@
long_name: "Shot 1 - Fat Probe - 2024-10-15" # Data Info File
# Cal Wing - Oct 24
long_name: "Shot 1 (x2s5823) - Fat Probe - 2024-10-15"
name: "Shot 1" name: "Shot 1"
date: "2024-10-15" date: "2024-10-15"
time: "13:02"
shot-info: shot-info:
name: "x2s5823" name: "x2s5823"
tdms: "x2s5823.tdms"
config: "x2s5823.config"
info: "x2s5823.txt"
probe-info: probe-info:
type: "Fat" type: "Fat"
@ -13,8 +21,10 @@ probe-info:
data-record: data-record:
type: "scope" type: "scope"
config: "eProbe-Scope.txt" config: "eProbe-Scope.txt"
data-file: "eProbe-Scope.csv" data: "eProbe-Scope.csv"
trigger:
type: "scope" trigger: # Redundant?
type: "channel"
channel: 4
delay: 0 delay: 0

103
main-comp.py Normal file
View File

@ -0,0 +1,103 @@
# Cal Wing
# Sep 2024
import os
import numpy as np
from nptdms import TdmsFile
from makeGraph import makeGraph, pltKeyClose, UQ_COLOURS as UQC
# Folder correction
# Make sure the relevant folders folder exists
folders = ["./images"]
for folder in folders:
if not os.path.isdir(folder): os.mkdir(folder)
# Photo Diode Things
SHOT_PD = "x2s4111", 1, "PD", np.timedelta64(1146800, 'ns')
# PCB Things
SHOT_PCB_1 = "x2s3667", 0.0148 * 1000, "PCB", np.timedelta64(1643, 'us') # st1 - np.timedelta64(1608, 'us')
SHOT_PCB_2 = "x2s3668", 0.0148 * 1000, "PCB", np.timedelta64(1648600, 'ns') # st1 - np.timedelta64(1614, 'us')
# Shot DATA FILE
DATA_FILE_FMT = './data/{0}/databox/{0}.tdms'
TIME_OFFSET = np.timedelta64(-22, 'us'), np.timedelta64(33, 'us') # us
def main():
graphData = {
"title": "Shock response time of PCBs\nAt position st2",
"xLabel": "Time ($\\mu$s)",
"yLabel": "PCB Voltage Reading (mV)",
"grid": True,
"xLim": (-20, 20),
"plots": []
}
for df, scale, name, zero_p in [SHOT_PD, SHOT_PCB_1, SHOT_PCB_2]:
file_path = DATA_FILE_FMT.format(df)
data = TdmsFile.read(file_path)
channels = data.groups()[0].channels()
for channel in channels:
time = (channels[0][:] - channels[0][0]) # Convert to sec
time_range_i = [0, 0]
for i, value in enumerate(time):
if value >= zero_p + TIME_OFFSET[0]:
time_range_i[0] = i
break
for i, value in enumerate(time):
if value >= zero_p + TIME_OFFSET[1]:
time_range_i[1] = i
break
a = time > time[time_range_i[0]]
b = time < time[time_range_i[1]]
time_range = np.logical_and(a, b)
time_2 = time[time_range][:] - time[time_range][0] + TIME_OFFSET[0]
#print(time_range, a, b, time_range_i)
if channel.name == "st2" and df is not SHOT_PD[0]:
graphData["plots"].append({
"x": time_2,
"y": channel[time_range] * scale,
"label": f"{df} - {name}"
})
if channel.name == "st2" and df is SHOT_PD[0] and False:
graphData["plots"].append({
"x": time,
"y": channel[:] * scale,
"label": f"{df} - {name}"
})
graphData['plots'].append(
{
"x":0,
"type":"axvLine",
#"label":"Shock",
"color": UQC["dark_grey"],
"args": {
"linestyle": "--",
"zorder": -2
}
}
)
makeGraph(graphData, figSavePath="./images/{0}.png", showPlot=False, doProgramBlock=False)
if __name__ == '__main__':
main()
#pltKeyClose()

142
main.py
View File

@ -1,9 +1,12 @@
# Cal Wing # Cal Wing (c.wing@uq.net.au) - Oct 2024
# Sep 2024 # Thesis Graphing
import os import os
import numpy as np import numpy as np
import pandas as pd
import yaml
from nptdms import TdmsFile from nptdms import TdmsFile
from makeGraph import makeGraph, pltKeyClose, UQ_COLOURS as UQC from makeGraph import makeGraph, pltKeyClose, UQ_COLOURS as UQC
@ -14,90 +17,67 @@ folders = ["./images"]
for folder in folders: for folder in folders:
if not os.path.isdir(folder): os.mkdir(folder) if not os.path.isdir(folder): os.mkdir(folder)
# Photo Diode Things # Load Data
SHOT_PD = "x2s4111", 1, "PD", np.timedelta64(1146800, 'ns') DATA_PATH = "./data"
DATA_INFO = "_info.yaml"
# PCB Things data_to_load = [
SHOT_PCB_1 = "x2s3667", 0.0148 * 1000, "PCB", np.timedelta64(1643, 'us') # st1 - np.timedelta64(1608, 'us') "x2s5823"
SHOT_PCB_2 = "x2s3668", 0.0148 * 1000, "PCB", np.timedelta64(1648600, 'ns') # st1 - np.timedelta64(1614, 'us') ]
# Shot DATA FILE data = {}
DATA_FILE_FMT = './data/{0}/databox/{0}.tdms'
TIME_OFFSET = np.timedelta64(-22, 'us'), np.timedelta64(33, 'us') # us for dp in data_to_load:
data_path = f"{DATA_PATH}/{dp}/"
def main(): data_info_path = data_path + DATA_INFO
if not os.path.exists(data_info_path):
graphData = { print(f"[ERR] Could not find data info file: '{data_info_path}'")
"title": "Shock response time of PCBs\nAt position st2", print(f"[WARN] Not Loading Data '{dp}'")
"xLabel": "Time ($\\mu$s)", continue
"yLabel": "PCB Voltage Reading (mV)",
"grid": True,
"xLim": (-20, 20),
"plots": []
}
for df, scale, name, zero_p in [SHOT_PD, SHOT_PCB_1, SHOT_PCB_2]:
file_path = DATA_FILE_FMT.format(df)
data = TdmsFile.read(file_path)
channels = data.groups()[0].channels()
for channel in channels:
time = (channels[0][:] - channels[0][0]) # Convert to sec
time_range_i = [0, 0]
for i, value in enumerate(time):
if value >= zero_p + TIME_OFFSET[0]:
time_range_i[0] = i
break
for i, value in enumerate(time):
if value >= zero_p + TIME_OFFSET[1]:
time_range_i[1] = i
break
a = time > time[time_range_i[0]]
b = time < time[time_range_i[1]]
time_range = np.logical_and(a, b)
time_2 = time[time_range][:] - time[time_range][0] + TIME_OFFSET[0]
#print(time_range, a, b, time_range_i)
if channel.name == "st2" and df is not SHOT_PD[0]:
graphData["plots"].append({
"x": time_2,
"y": channel[time_range] * scale,
"label": f"{df} - {name}"
})
if channel.name == "st2" and df is SHOT_PD[0] and False:
graphData["plots"].append({
"x": time,
"y": channel[:] * scale,
"label": f"{df} - {name}"
})
graphData['plots'].append( with open(data_info_path, 'r') as file:
{ # Load data info (Cal)
"x":0, dataInfo = yaml.safe_load(file)
"type":"axvLine", x2_shot = dataInfo["shot-info"]["name"]
#"label":"Shock",
"color": UQC["dark_grey"], x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'])
"args": { x2_channels = x2_tdms_data.groups()[0].channels()
"linestyle": "--",
"zorder": -2 if dataInfo["probe-info"]["data-record"]["type"] == "scope":
} scope_data_path = data_path + dataInfo["probe-info"]["data-record"]["data"]
scope_config_path = data_path + dataInfo["probe-info"]["data-record"]["config"]
# Generate Headers
with open(scope_data_path, 'r') as dfile:
scope_header = []
header_lines = []
for i, line in enumerate(dfile):
if i > 1: break
header_lines.append(line.strip().split(","))
for i, name in enumerate(header_lines[0]):
if name == "x-axis":
name = "Time"
if header_lines[1][i] in ["second", "Volt"]:
outStr = f"{name} [{header_lines[1][i][0]}]"
else:
outStr = f"{name} [{header_lines[1][i]}]"
scope_header.append(outStr)
scope_data = pd.read_csv(scope_data_path, names=scope_header, skiprows=1)
data[x2_shot] = {
"info": dataInfo,
"probes": scope_data,
"x2": x2_channels,
"x2-tdms": x2_tdms_data
} }
)
loaded_data = data.keys()
makeGraph(graphData, figSavePath="./images/{0}.png", showPlot=False, doProgramBlock=False) print("Loaded Data")
if __name__ == '__main__':
main()
#pltKeyClose()

Binary file not shown.