Compare commits

..

20 Commits

Author SHA1 Message Date
aeef8f9253 Finalise README 2024-10-25 16:21:42 +10:00
247c503885 Correct 2024-10-25 16:05:59 +10:00
c4b0dd1fef quick refactor and addition of README 2024-10-25 16:05:22 +10:00
3398414b55 Finalise Thesis Codebase 2024-10-25 15:55:16 +10:00
1b66ac1f24 fix title 2024-10-23 14:24:02 +10:00
7d6b3403f5 Make nice canning graph 2024-10-23 14:20:57 +10:00
313ef7d1de Use mean uncert not sqrt(sum) 2024-10-23 14:07:12 +10:00
9570111f61 Correct the shot numbers 2024-10-23 14:03:01 +10:00
f3bb6f870f Fix uncert % 2024-10-23 10:40:14 +10:00
df1c20f0c3 Generate Total Average Speed 2024-10-23 10:38:37 +10:00
46ef08874a Roll back commits 2024-10-23 08:34:27 +10:00
e59aa2d034 Fix ordering 2024-10-23 08:32:53 +10:00
5ffbf17ac6 Acually finish code? 2024-10-23 08:31:51 +10:00
ab3547b26c Correct for scope triggering 2024-10-23 03:59:15 +10:00
30376abeb9 Finalise Graphing please? 2024-10-23 01:51:25 +10:00
511995830c Add x2 data loading 2024-10-23 01:22:07 +10:00
8125efa26a Graph everything 2024-10-23 00:39:11 +10:00
e653885de1 Generate Referance Shot info 2024-10-22 23:18:40 +10:00
5b1bf0eedb Order Graphs 2024-10-22 21:42:38 +10:00
2e5dc27055 Finalise Graphing Steps 2024-10-22 21:37:28 +10:00
10 changed files with 824 additions and 76 deletions

39
README.md Normal file
View File

@ -0,0 +1,39 @@
# Ionization Probe Analysis Code
> Written by Cal Wing (<c.wing@uq.net.au> - 45811953) in 2024 for his Thesis
## Installation
Run `pip install -r requirements.txt` or equivilient for your envoroment.
## Running
Run `main.py` it will then generate all the output graphs and put them in (a git untracked) folder called `./images`.
By default all data graphs will be generated - you need to change the functions called at the end in the `if '__name__ == '__main__':` section.
## `./data` Explanation
I like YAML files to store information. All the data shot folders have a file call `_info.yaml` this file contained all the info about the shot so that when it would load & be graphed it would be correct.
I hope the values are self explanatory but they may not - sorry
## Changes to [canny_shock_finder.py](./canny_shock_finder.py)
Basically I hacked in an extra argument `print_func`, that is used to override the `print` function in the file. It assumes its either a callable reference (like print, the default) or None (it then uses a no-operation function to silence all output)
I also removed the error catching around lines ~497 so the function blindly continues if it can't graph.
I also enabled grid lines on the graphs.
The UQ colours being used on the graphs is due to some funky ~~abuse~~ *utlitly* I built into my [`makeGraph`](#makegraph) library.
## MakeGraph
> This refers to the [makeGraph.py](./makeGraph.py) file/version *in this code base*, I would not trust the exact specifics for versions found elsewhere, but the general gist is the same.
A *long* time ago I wrote a wrapper for matplotlib that has expanded into a formatting tool. The crux of it is that I can throw a dictionary at the `makeGraph` function and it will do all the hard work for me.
There are a few fun things that loading the lib will do - like overriding the default colour cyclers to use only the UQ colours - so if your going to liberate / adapt it be wary.
All the graphing done in the [`main.py`](./main.py) uses makeGraph, its not the most scary thing in the world as its basically ~~AI~~ *a big if conditional*.

View File

@ -15,7 +15,7 @@ Chris James (c.james4@uq.edu.au) - 04/07/17
# Amened by Cal Wing to make the function not print # Amened by Cal Wing to make the function not print
VERSION_STRING = "17-Oct-2024" VERSION_STRING = "20-Oct-2024"
def canny_shock_finder(time_list, pressure_list, sigma = 4, derivative_threshold = 0.001, auto_derivative = False, post_suppression_threshold = 0.05, def canny_shock_finder(time_list, pressure_list, sigma = 4, derivative_threshold = 0.001, auto_derivative = False, post_suppression_threshold = 0.05,
post_shock_pressure = None, start_time = None, find_second_peak = None, plot = True, plot_scale_factor = None, post_shock_pressure = None, start_time = None, find_second_peak = None, plot = True, plot_scale_factor = None,
@ -116,6 +116,8 @@ def canny_shock_finder(time_list, pressure_list, sigma = 4, derivative_threshold
:param plot_time_scale: See plot_time_unit above. These two inputs allow the results plot to be in a different :param plot_time_scale: See plot_time_unit above. These two inputs allow the results plot to be in a different
time unit to the original data. The user must specify this scale to connect the original and plotted time unit to the original data. The user must specify this scale to connect the original and plotted
time units. Defaults to 1.0 (so no change if time units are seconds). time units. Defaults to 1.0 (so no change if time units are seconds).
:param print_func: Callable Reference to call when print is called in this function. Defaults to 'print' but if 'None'
is passed it uses a no-operation function to silence all output.
""" """
# Make the function silent / have print overridable # Make the function silent / have print overridable
@ -360,6 +362,9 @@ def canny_shock_finder(time_list, pressure_list, sigma = 4, derivative_threshold
data_ax.plot(time_list*plot_time_scale, pressure_list, '-o', label = 'original data') data_ax.plot(time_list*plot_time_scale, pressure_list, '-o', label = 'original data')
data_ax.grid(True)
convolution_ax.grid(True)
convolution_ax.plot(time_list*plot_time_scale, first_order_data, '-o', label='first order gaussian (sigma = {0})'.format(sigma)) convolution_ax.plot(time_list*plot_time_scale, first_order_data, '-o', label='first order gaussian (sigma = {0})'.format(sigma))
convolution_ax.plot(time_list*plot_time_scale, second_order_data, '-o', label='second order gaussian (sigma = {0})'.format(sigma)) convolution_ax.plot(time_list*plot_time_scale, second_order_data, '-o', label='second order gaussian (sigma = {0})'.format(sigma))
convolution_ax.plot(time_list*plot_time_scale, suppressed_data, '-o', label='first order with non-max suppression') convolution_ax.plot(time_list*plot_time_scale, suppressed_data, '-o', label='first order with non-max suppression')

View File

@ -1,7 +1,7 @@
# Data Info File # Data Info File
# Cal Wing - Oct 24 # Cal Wing - Oct 24
long_name: "Shot 1 (x2s5823) - Fat Probe - 2024-10-15" long_name: "Shot 1 (x2s5823) - Thick Probe (ST2) - 2024-10-15\nProtruding ST1 - Mars Entry Conditions"
name: "Shot 1" name: "Shot 1"
date: "2024-10-15" date: "2024-10-15"
time: "13:02" time: "13:02"

View File

@ -1,7 +1,7 @@
# Data Info File # Data Info File
# Cal Wing - Oct 24 # Cal Wing - Oct 24
long_name: "Shot 2 (x2s5824) - Thin Probe - 2024-10-15" long_name: "Shot 2 (x2s5824) - Thin Probe (ST2) - 2024-10-15\nProtruding ST2 - Mars Entry Conditions"
name: "Shot 2" name: "Shot 2"
date: "2024-10-15" date: "2024-10-15"
time: "17:18" time: "17:18"
@ -56,7 +56,7 @@ probe-info:
- 2 - 2
locations: # In order of pulse locations: # In order of pulse
- "st2" - "st2"
- "st3" # - "st3"
data-records: data-records:
- type: "scope" - type: "scope"

View File

@ -1,7 +1,7 @@
# Data Info File # Data Info File
# Cal Wing - Oct 24 # Cal Wing - Oct 24
long_name: "Shot 2 (x2s5827) - Thin Probe Pair (ST2 & ST3) - 2024-10-16" long_name: "Shot 2 (x2s5827) - Thin Probe Pair (ST2 & ST3) - 2024-10-16\nProtruding ST2 & ST3 - Mars Entry Conditions"
name: "Shot 3" name: "Shot 3"
date: "2024-10-16" date: "2024-10-16"
time: "18:40" time: "18:40"

View File

@ -1,7 +1,7 @@
# Data Info File # Data Info File
# Cal Wing - Oct 24 # Cal Wing - Oct 24
long_name: "Shot 4 (x2s5829) - Thin Probe Pair (ST2 & ST3) - 2024-10-17" long_name: "Shot 4 (x2s5829) - Thin Probe Pair (ST2 & ST3) - 2024-10-17\nProtruding ST2 & ST3 - Mars Entry Conditions"
name: "Shot 4" name: "Shot 4"
date: "2024-10-17" date: "2024-10-17"
time: "17:00" time: "17:00"

View File

@ -1,7 +1,7 @@
# Data Info File # Data Info File
# Cal Wing - Oct 24 # Cal Wing - Oct 24
long_name: "Shot 5 (x2s5829) - Thin Probe Pair (ST2 & ST3) - 2024-10-17" long_name: "Shot 5 (x2s5830) - Thin Probe Pair (ST2 & ST3) - 2024-10-17\nProtruding ST2 & ST3 - Mars Entry Conditions"
name: "Shot 5" name: "Shot 5"
date: "2024-10-18" date: "2024-10-18"
time: "08:51" time: "08:51"

View File

@ -1,7 +1,7 @@
# Data Info File # Data Info File
# Cal Wing - Oct 24 # Cal Wing - Oct 24
long_name: "Shot 6 (x2s5829) - Thin Probe (ST1, ST2 & ST3) - 2024-10-18 - Flush ST2" long_name: "Shot 6 (x2s5831) - Thin Probe Set (ST1, ST2 & ST3) - 2024-10-18\nProtruding ST1 & ST3, Flush ST2 (Only Gauge 1) - Mars Entry Conditions"
comments: "G2 Pointless, Wasn't attached" comments: "G2 Pointless, Wasn't attached"
name: "Shot 6" name: "Shot 6"
date: "2024-10-18" date: "2024-10-18"
@ -25,6 +25,14 @@ canny-args:
- sigma: 1 - sigma: 1
post_pres: 0.05 post_pres: 0.05
x2-canny:
- sigma: 1
post_pres: 0.02
- sigma: 0.5
post_pres: 0.01
no-graph: no-graph:
- "at1" - "at1"
- "at2" - "at2"
@ -71,9 +79,13 @@ probe-info:
- type: "x2" - type: "x2"
channels: # Gauge Channel Name channels: # Gauge Channel Name
- "st2" - "st1"
- "st3" - "st3"
scaler:
st1: 1
st3: 10
trigger: trigger:
type: "x2" type: "x2"
# Info isn't really needed # Info isn't really needed

View File

@ -1,8 +1,8 @@
# Data Info File # Data Info File
# Cal Wing - Oct 24 # Cal Wing - Oct 24
long_name: "Shot 7 (x2s5829) - Thin Probe Set (ST1, ST2 & ST3) - 2024-10-18\nFlush ST3 - Low Pressure, 'Pure' Air" long_name: "Shot 7 (x2s5832) - Thin Probe Set (ST1, ST2 & ST3) - 2024-10-18\nProtruding ST1 & ST1, Flush ST3 (Only Gauge 1) - Low Pressure, 'Pure' Air Conditions"
name: "Shot 6" name: "Shot 7"
date: "2024-10-18" date: "2024-10-18"
time: "15:58" time: "15:58"
@ -27,6 +27,16 @@ canny-args:
- sigma: 1 - sigma: 1
post_pres: 0.05 post_pres: 0.05
x2-canny:
- sigma: 2
post_pres: 0.03
- sigma: 1
post_pres: 0.2
- sigma: 1
post_pres: 0.05
no-graph: no-graph:
- "None" - "None"
- "at1" - "at1"
@ -74,9 +84,13 @@ probe-info:
- type: "x2" - type: "x2"
channels: # Gauge Channel Name channels: # Gauge Channel Name
- "st2" - "st1"
- "st3" - "st3"
scaler:
st1: 1
st3: 10
trigger: trigger:
type: "x2" type: "x2"
# Info isn't really needed # Info isn't really needed

798
main.py
View File

@ -29,16 +29,6 @@ CANNY_TIME_OFFSET = 50 #us
with open(TUNNEL_INFO_FILE, 'r') as file: with open(TUNNEL_INFO_FILE, 'r') as file:
TUNNEL_INFO = yaml.safe_load(file) TUNNEL_INFO = yaml.safe_load(file)
data_to_load = [
"x2s5823",
"x2s5824",
"x2s5827",
"x2s5829",
"x2s5830",
"x2s5831",
"x2s5832"
]
# ==== Uncerts ==== # ==== Uncerts ====
# Taken from DOI: 10.1007/s00193-017-0763-3 (Implementation of a state-to-state analytical framework for the calculation of expansion tube flow properties) # Taken from DOI: 10.1007/s00193-017-0763-3 (Implementation of a state-to-state analytical framework for the calculation of expansion tube flow properties)
@ -130,6 +120,7 @@ def load_data(data_path: str, data={}) -> dict:
"probes": None, # This may be x2 but may not - ie a scope was used "probes": None, # This may be x2 but may not - ie a scope was used
"trigger_index": None, "trigger_index": None,
"probe_uncert": None, #s "probe_uncert": None, #s
"x2-dt": None, #
}, },
"data": { "data": {
"x2": {}, # Only pop channels with a voltage scale in ./tunnel-info.yaml "x2": {}, # Only pop channels with a voltage scale in ./tunnel-info.yaml
@ -145,6 +136,7 @@ def load_data(data_path: str, data={}) -> dict:
ns_time = time_data[:].as_datetime64('ns') ns_time = time_data[:].as_datetime64('ns')
x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns] x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns]
x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us
x2_time_dt = np.diff(x2_time_us).mean() / 1000 # Scale to s
#second_fractions = np.array(time_data[:].second_fractions, dtype=int) # 2^-64 ths of a second #second_fractions = np.array(time_data[:].second_fractions, dtype=int) # 2^-64 ths of a second
#x2_time_seconds = (second_fractions - second_fractions[0]) / (2**(-64)) # 0 time data and convert to seconds #x2_time_seconds = (second_fractions - second_fractions[0]) / (2**(-64)) # 0 time data and convert to seconds
@ -171,8 +163,27 @@ def load_data(data_path: str, data={}) -> dict:
"x2": x2_time_us, "x2": x2_time_us,
"trigger_index": x2_trigger_index, "trigger_index": x2_trigger_index,
"probes": x2_time_us, # Until otherwise overridden - probe time is x2 time "probes": x2_time_us, # Until otherwise overridden - probe time is x2 time
"x2-dt": x2_time_dt
} }
data[x2_shot]["time"]["x2_uncert"] = UNCERTS["time"]["x2-daq"] #max(UNCERTS["time"]["x2-daq"], data[x2_shot]["time"]["x2-dt"])
# Setup custom scaling on the gauge values
if "x2" in data_locs:
for ch in dataInfo["probe-info"]["data-records"][data_locs.index("x2")]["channels"]:
if ch in dataInfo["probe-info"]["data-records"][data_locs.index("x2")]["scaler"]:
# Get the channel index from its name
chIndex = x2_channel_names.index(ch)
# Calculate the average noise offset
avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean()
# Save the channel data
data[x2_shot]["data"]["x2"][ch] = (x2_channels[chIndex][:] - avg_noise) * dataInfo["probe-info"]["data-records"][data_locs.index("x2")]["scaler"][ch]
#[TODO] This could be better
#if "x2" in data_locs:
# data[x2_shot]["data"]["probes"] = [data[x2_shot]["data"]["x2"]["st1"][:], data[x2_shot]["data"]["x2"]["st3"][:]]
# Scope timing _if it exists_ # Scope timing _if it exists_
if "scope" in data_locs: if "scope" in data_locs:
@ -198,6 +209,7 @@ def load_data(data_path: str, data={}) -> dict:
data[x2_shot]["time"]["scope"] = scope_time data[x2_shot]["time"]["scope"] = scope_time
data[x2_shot]["time"]["scope-offset"] = scope_alignment data[x2_shot]["time"]["scope-offset"] = scope_alignment
data[x2_shot]["time"]["scope-dt"] = np.diff(scope_time).mean() / 1e6
data[x2_shot]["data"]["scope"] = {} data[x2_shot]["data"]["scope"] = {}
for i, header in enumerate(scope_header): for i, header in enumerate(scope_header):
@ -212,15 +224,14 @@ def load_data(data_path: str, data={}) -> dict:
if "scope" in data_locs: if "scope" in data_locs:
data[x2_shot]["data"]["probes"] = [data[x2_shot]["data"]["scope"][1], data[x2_shot]["data"]["scope"][2]] data[x2_shot]["data"]["probes"] = [data[x2_shot]["data"]["scope"][1], data[x2_shot]["data"]["scope"][2]]
data[x2_shot]["time"]["probes"] = data[x2_shot]["time"]["scope"] data[x2_shot]["time"]["probes"] = data[x2_shot]["time"]["scope"]
data[x2_shot]["time"]["probe_uncert"] = scope_data_info["time-uncert"] data[x2_shot]["time"]["probe_uncert"] = max(scope_data_info["time-uncert"], data[x2_shot]["time"]["scope-dt"])
# Find Shock Times # Find Shock Times
# X2 - Canning Edge # X2 - Canning Edge
data[x2_shot]["shock-point"] = {} data[x2_shot]["shock-point"] = {}
cArgs = dataInfo["pcb-canny"] cArgs = dataInfo["pcb-canny"]
for i, ref in enumerate(dataInfo["pcb-refs"]): for i, ref in enumerate(dataInfo["pcb-refs"]):
refData = data[x2_shot]["data"]["x2"][ref] chData = data[x2_shot]["data"]["x2"][ref]
if i in range(len(cArgs)): if i in range(len(cArgs)):
sigma = cArgs[i]["sigma"] sigma = cArgs[i]["sigma"]
@ -229,20 +240,22 @@ def load_data(data_path: str, data={}) -> dict:
sigma = cArgs[-1]["sigma"] sigma = cArgs[-1]["sigma"]
post_sup_thresh = cArgs[-1]["post_pres"] post_sup_thresh = cArgs[-1]["post_pres"]
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=False, print_func=None) first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, chData, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=False, print_func=None)
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1 shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"][ref] = shock_point, first_value, first_value_uncertainty data[x2_shot]["shock-point"][ref] = shock_point, first_value, first_value_uncertainty
if "x2" in data_locs:
cArgs = dataInfo["x2-canny"]
data[x2_shot]["shock-point"]["x2"] = {}
# ---- Gauge Canning Edge ---- probeCh1 = data[x2_shot]["data"]["x2"]["st1"]
probeCh2 = data[x2_shot]["data"]["x2"]["st3"]
for i, probe in enumerate(dataInfo["probe-info"]["locations"]): for i, probe in enumerate(dataInfo["probe-info"]["locations"]):
probeCh1 = data[x2_shot]["data"]["probes"][0]
probeCh2 = data[x2_shot]["data"]["probes"][1]
# Get the canny-args # Get the canny-args
cArgs = dataInfo["canny-args"] cArgs = dataInfo["x2-canny"]
doCannyPlot = False doCannyPlot = False
if i in range(len(cArgs)): if i in range(len(cArgs)):
sigma = cArgs[i]["sigma"] sigma = cArgs[i]["sigma"]
@ -251,6 +264,58 @@ def load_data(data_path: str, data={}) -> dict:
sigma = cArgs[-1]["sigma"] sigma = cArgs[-1]["sigma"]
post_sup_thresh = cArgs[-1]["post_pres"] post_sup_thresh = cArgs[-1]["post_pres"]
# If this _isn't_ the first probe then apply a time offset
if i > 0:
privPoint = dataInfo["probe-info"]["locations"][i-1]
time_offset = data[x2_shot]["shock-point"]["x2"][f"{privPoint}-g1"][1] + CANNY_TIME_OFFSET
else:
time_offset = None
# Find G1 Shock Time
if 1 in dataInfo["probe-info"]["gauges"]:
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, probeCh1, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None)
if first_value is None:
print(f"[ERROR] {x2_shot} - {probe}-g1 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}")
#raise ValueError(f"{probe}-g1 not detected"
else:
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"]["x2"][f"{probe}-g1"] = shock_point, first_value, first_value_uncertainty
if 2 in dataInfo["probe-info"]["gauges"]:
# Do the same for G2
if i > 0:
time_offset = data[x2_shot]["shock-point"]["x2"][f"{privPoint}-g2"][1] + CANNY_TIME_OFFSET
# Find G2 Shock Time
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, probeCh2, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None)
if first_value is None:
print(f"[ERROR] {x2_shot} - {probe}-g2 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}")
#raise ValueError(f"{probe}-g2 not detected")
else:
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"]["x2"][f"{probe}-g2"] = shock_point, first_value, first_value_uncertainty
# ---- Gauge Canning Edge ----
probeCh1 = data[x2_shot]["data"]["probes"][0]
probeCh2 = data[x2_shot]["data"]["probes"][1]
for i, probe in enumerate(dataInfo["probe-info"]["locations"]):
# Get the canny-args
cArgs = dataInfo["canny-args"]
doCannyPlot = False # x2_shot == "x2s5829" and probe == "st2" # This condition was used to generate some graphs
plotValues = {
"plot_title": f"Canny Shock Finding Result for {x2_shot} - ST2 Gauge 1",
"plot_time_unit": "$\\mu$s",
"y_label": "Voltage Reading (V)",
}
if i in range(len(cArgs)):
sigma = cArgs[i]["sigma"]
post_sup_thresh = cArgs[i]["post_pres"]
else:
sigma = cArgs[-1]["sigma"]
post_sup_thresh = cArgs[-1]["post_pres"]
# If this _isn't_ the first probe then apply a time offset # If this _isn't_ the first probe then apply a time offset
if i > 0: if i > 0:
privPoint = dataInfo["probe-info"]["locations"][i-1] privPoint = dataInfo["probe-info"]["locations"][i-1]
@ -260,7 +325,7 @@ def load_data(data_path: str, data={}) -> dict:
# Find G1 Shock Time # Find G1 Shock Time
if 1 in dataInfo["probe-info"]["gauges"]: if 1 in dataInfo["probe-info"]["gauges"]:
first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh1, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None) first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh1, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None, **plotValues)
if first_value is None: if first_value is None:
print(f"[ERROR] {x2_shot} - {probe}-g1 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}") print(f"[ERROR] {x2_shot} - {probe}-g1 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}")
raise ValueError(f"{probe}-g1 not detected") raise ValueError(f"{probe}-g1 not detected")
@ -302,7 +367,7 @@ def load_data(data_path: str, data={}) -> dict:
p1_time_uncert = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][2] / 1e6 # Convert to seconds p1_time_uncert = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][2] / 1e6 # Convert to seconds
p2_time_uncert = data[x2_shot]["shock-point"][refProbe][2] / 1e6 # Convert to seconds p2_time_uncert = data[x2_shot]["shock-point"][refProbe][2] / 1e6 # Convert to seconds
uncert = deltaVs(probe_velocity, p2p_dist, p2p_time, (UNCERTS["probe-dist"][refProbe], UNCERTS["probe-dist"][dataInfo["pcb-refs"][i-1]]), (p1_time_uncert, p2_time_uncert, UNCERTS["time"]["x2-daq"])) uncert = deltaVs(probe_velocity, p2p_dist, p2p_time, (UNCERTS["probe-dist"][refProbe], UNCERTS["probe-dist"][dataInfo["pcb-refs"][i-1]]), (p1_time_uncert, p2_time_uncert, data[x2_shot]["time"]["x2_uncert"]))
print(f"{dataInfo['pcb-refs'][i-1]}-{refProbe} Measured a shock speed of {probe_velocity:.2f} +/- {uncert:.2f} m/s ({probe_velocity/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/probe_velocity * 100 :.2f}%])") print(f"{dataInfo['pcb-refs'][i-1]}-{refProbe} Measured a shock speed of {probe_velocity:.2f} +/- {uncert:.2f} m/s ({probe_velocity/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/probe_velocity * 100 :.2f}%])")
data[x2_shot]["shock-speed"][f"{dataInfo['pcb-refs'][i-1]}-{refProbe}"] = probe_velocity, uncert, True # Speed, Ref data[x2_shot]["shock-speed"][f"{dataInfo['pcb-refs'][i-1]}-{refProbe}"] = probe_velocity, uncert, True # Speed, Ref
@ -367,25 +432,159 @@ def load_data(data_path: str, data={}) -> dict:
# Return the data & the successfully loaded data keys # Return the data & the successfully loaded data keys
return data #, tuple(data.keys()) return data #, tuple(data.keys())
data = {} def load_ref_data(x2_shot: str, data_path: str, data={}) -> dict:
for dp in data_to_load: # Load Raw Data
pdp = f"{DATA_PATH}/{dp}/" # TDMS File (X2 DAQ Data)
load_data(pdp, data) x2_tdms_data = TdmsFile.read(data_path, raw_timestamps=True)
x2_channels = x2_tdms_data.groups()[0].channels()
x2_channel_names = tuple(c.name for c in x2_channels)
loaded_data = tuple(data.keys()) data[x2_shot] = {
"name": x2_shot,
"info": {
"name": x2_shot,
"pcb-canny": [
{
"sigma": 4,
"post_pres": 0.05
}
],
print("Loaded Data") "pcb-refs": [
"st1",
"st2",
"st3",
"at1",
"at2",
"at3",
"at4",
"at5",
"at6",
],
"no-graph": [
"at1",
"at2",
"at3",
"at4",
"at5",
"at6",
]
},
"x2": x2_channels,
"x2-channels": x2_channel_names,
"x2-tdms": x2_tdms_data,
"data": {
"x2": {}, # Only pop channels with a voltage scale in ./tunnel-info.yaml
},
"time": {
"x2": None,
"trigger_index": None,
},
"shock-speed": {} # Note all in us
}
# === Process the data ===
# Generate X2 time arrays
time_data = x2_channels[0]
ns_time = time_data[:].as_datetime64('ns')
x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns]
x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us
x2_time_dt = np.diff(x2_time_us).mean() / 1000 # Scale to s
# --- Un Scale Data ---
for channel, vScale in TUNNEL_INFO["volt-scale"].items():
# Get the channel index from its name
chIndex = x2_channel_names.index(channel)
# Calculate the average noise offset
avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean()
# Save the channel data
data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale
# Process Trigger Info
trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset
x2_trigger_index = np.where(trigger_volts > 1)[0][0]
x2_trigger_time = x2_time_us[x2_trigger_index]
# Add the time data
data[x2_shot]["time"] = {
"x2": x2_time_us,
"trigger_index": x2_trigger_index,
"x2-dt": x2_time_dt
}
data[x2_shot]["time"]["x2_uncert"] = UNCERTS["time"]["x2-daq"] #max(UNCERTS["time"]["x2-daq"], data[x2_shot]["time"]["x2-dt"])
#[TODO] Refactor # Find Shock Times
# X2 - Canning Edge
# Default Values
dataInfo = data[x2_shot]["info"]
data[x2_shot]["shock-point"] = {}
cArgs = dataInfo["pcb-canny"]
for i, ref in enumerate(dataInfo["pcb-refs"]):
refData = data[x2_shot]["data"]["x2"][ref]
if i in range(len(cArgs)):
sigma = cArgs[i]["sigma"]
post_sup_thresh = cArgs[i]["post_pres"]
else:
sigma = cArgs[-1]["sigma"]
post_sup_thresh = cArgs[-1]["post_pres"]
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=False, print_func=None)
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"][ref] = shock_point, first_value, first_value_uncertainty
# Calculate Shock Speeds
print("="*30, x2_shot, "="*30)
print(f"-- Reference Shot {int(x2_shot[-1]) + 1} --")
for i, refProbe in enumerate(dataInfo["pcb-refs"]):
if i == 0: continue
p1_time = data[x2_shot]["shock-point"][refProbe][1] / 1e6 # Convert to seconds
p2_time = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][1] / 1e6 # Convert to seconds
p2p_dist = abs(TUNNEL_INFO["distance"][refProbe] - TUNNEL_INFO["distance"][dataInfo["pcb-refs"][i-1]]) / 1000 # convert to m
p2p_time = abs(p2_time - p1_time)
probe_velocity = p2p_dist / p2p_time # m/s
p1_time_uncert = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][2] / 1e6 # Convert to seconds
p2_time_uncert = data[x2_shot]["shock-point"][refProbe][2] / 1e6 # Convert to seconds
uncert = deltaVs(probe_velocity, p2p_dist, p2p_time, (UNCERTS["probe-dist"][refProbe], UNCERTS["probe-dist"][dataInfo["pcb-refs"][i-1]]), (p1_time_uncert, p2_time_uncert, data[x2_shot]["time"]["x2_uncert"]))
print(f"{dataInfo['pcb-refs'][i-1]}-{refProbe} Measured a shock speed of {probe_velocity:.2f} +/- {uncert:.2f} m/s ({probe_velocity/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/probe_velocity * 100 :.2f}%])")
data[x2_shot]["shock-speed"][f"{dataInfo['pcb-refs'][i-1]}-{refProbe}"] = probe_velocity, uncert, True # Speed, Ref
print()
return data
# ======= Graphing ========
def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlots: bool = False, addShockInfo: bool = True): def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlots: bool = False, addShockInfo: bool = True):
graphData = { graphData = {
"title": f"Shock response Time\nFor {gData['info']['long_name']}", "title": f"Shock Response Time\nFor {gData['info']['long_name']}",
"xLabel": "Time ($\\mu$s)", "xLabel": "Time ($\\mu$s)",
"yLabel": "Voltage Reading (V)", "yLabel": "Voltage Reading (V)",
"grid": True, "grid": True,
"figSize": (8,6.5), "figSize": (9, 6.8), #(8,6.5),
"ledgLoc": 'upper left', "ledgLoc": 'upper left',
"yLim": (-1.5, 11 if addShockInfo else 4),
"plots": [] "plots": []
} }
@ -395,34 +594,15 @@ def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlo
lims = [] lims = []
for label in gData["info"]["pcb-refs"]: # + ["trigbox"]:
if not forcePlots and label in gData["info"]["no-graph"]: continue
graphData["plots"].append({
"x": gData["time"]["x2"],
"y": gData["data"]["x2"][label],
"label": label
})
if label in gData["info"]["pcb-refs"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][label][1],
"label": f"{label} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s",
"colour": "gray",
"args":{"zorder":2, "linestyle":"--", "alpha":0.5}
})
lims.append(gData["shock-point"][label][1]) # [TODO this but better]
for label, d in [("1 [V]", "Gauge 1"),("2 [V]", "Gauge 2")]: #, ("4 [V]", "Gauge Trigger")]: for label, d in [("1 [V]", "Gauge 1"),("2 [V]", "Gauge 2")]: #, ("4 [V]", "Gauge Trigger")]:
graphData["plots"].append({ graphData["plots"].append({
"x": gData["time"]["scope"], "x": gData["time"]["scope"],
"y": gData["data"]["scope"][label], "y": gData["data"]["scope"][label],
"label": d "label": d,
"args":{"zorder":1}
}) })
for i, probe in enumerate(gData["info"]["probe-info"]["locations"]): for _, probe in enumerate(gData["info"]["probe-info"]["locations"]):
if f"{probe}-g1" in gData["shock-point"]: if f"{probe}-g1" in gData["shock-point"]:
graphData["plots"].append({ graphData["plots"].append({
"type": "axvLine", "type": "axvLine",
@ -439,18 +619,42 @@ def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlo
"x": gData["shock-point"][f"{probe}-g2"][1],#[i], "x": gData["shock-point"][f"{probe}-g2"][1],#[i],
"label": f"{probe}-Gauge 2 - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s", "label": f"{probe}-Gauge 2 - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s",
"colour": UQC["purple"].lighten(0.5), "colour": UQC["purple"].lighten(0.5),
"args":{"zorder":2, "linestyle":"--", "alpha":0.5} "args":{"zorder":2, "linestyle":":", "alpha":0.5}
}) })
lims.append(gData["shock-point"][f"{probe}-g2"][1]) lims.append(gData["shock-point"][f"{probe}-g2"][1])
for label in gData["info"]["pcb-refs"]: # + ["trigbox"]:
if not forcePlots and label in gData["info"]["no-graph"]: continue
graphData["plots"].append({
"x": gData["time"]["x2"],
"y": gData["data"]["x2"][label],
"label": label
})
if label in gData["info"]["pcb-refs"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][label][1],
"label": f"{label} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s",
"colour": "gray",
"args":{"zorder":2, "linestyle":"-.", "alpha":0.5}
})
lims.append(gData["shock-point"][label][1]) # [TODO this but better]
if addShockInfo: if addShockInfo:
probeText = "" probeText = ""
flag = False flag = False
avg_speed_lst = []
avg_uncert_lst = []
for shock_speed_loc in gData["shock-speed"]: for shock_speed_loc in gData["shock-speed"]:
avg_speed_lst.append(gData['shock-speed'][shock_speed_loc][0])
avg_uncert_lst.append(gData['shock-speed'][shock_speed_loc][1])
if not flag and not gData["shock-speed"][shock_speed_loc][2]: if not flag and not gData["shock-speed"][shock_speed_loc][2]:
flag = True flag = True
avg_speed = np.array(avg_speed_lst).mean()
avg_uncert = np.array(avg_uncert_lst).mean() # np.sqrt(np.sum(np.pow(np.array(avg_uncert_lst), 2)))
probeText += f"\nAverage Speed - {avg_speed/1000:.2f} $\\pm${avg_uncert/1000:.2f} [{avg_uncert/avg_speed * 100:.2f}%] km/s"
probeText += "\n" + "-"*50 probeText += "\n" + "-"*50
probeText += "\n" probeText += "\n"
@ -462,8 +666,8 @@ def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlo
"type": "text", "type": "text",
"text": f"Measured Shock Speeds {probeText}", "text": f"Measured Shock Speeds {probeText}",
"align": ("top", "right"), "align": ("top", "right"),
"alpha": 0.75, "alpha": 0.8,
"x": 0.94 if len(gData["info"]["probe-info"]["locations"]) < 3 else 0.885, "x": 0.94, #if len(gData["info"]["probe-info"]["locations"]) < 3 else 0.885,
"y": 0.94 "y": 0.94
}) })
@ -475,14 +679,488 @@ def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlo
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/{gData['info']['shot-info']['name']}{'-all' if forcePlots else ''}{'-clipped' if doLimits else ''}.png") #figSavePath=f"./images/{{0}}{"-noLims" if not doLimits else ""}.png") makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/{gData['info']['shot-info']['name']}{'-all' if forcePlots else ''}{'-clipped' if doLimits else ''}.png") #figSavePath=f"./images/{{0}}{"-noLims" if not doLimits else ""}.png")
def genRefGraph(gData: dict, showPlot: bool = True, addShockInfo: bool = True, forcePlots: bool = False):
graphData = {
"title": f"Shock Response Time\nFor Reference Shot {int(gData['name'][-1]) + 1} ({gData['name']})",
"xLabel": "Time ($\\mu$s)",
"yLabel": "Voltage Reading (V)",
"grid": True,
"figSize": (9, 6.8), #(8,6.5),
"ledgLoc": 'upper left',
"yLim": (-1.5, 11),
"plots": []
}
print("Graphing Data") lims = []
for shot in loaded_data: for label in gData["info"]["pcb-refs"]:
#if shot != loaded_data[-2]: continue if not forcePlots and label in gData["info"]["no-graph"]: continue
graphData["plots"].append({
"x": gData["time"]["x2"],
"y": gData["data"]["x2"][label],
"label": label
})
if label in gData["info"]["pcb-refs"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][label][1],
"label": f"{label} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s",
"colour": "gray",
"args":{"zorder":2, "linestyle":"-.", "alpha":0.5}
})
lims.append(gData["shock-point"][label][1]) # [TODO this but better]
if addShockInfo:
probeText = ""
flag = False
avg_speed_lst = []
avg_uncert_lst = []
for shock_speed_loc in gData["shock-speed"]:
avg_speed_lst.append(gData['shock-speed'][shock_speed_loc][0])
avg_uncert_lst.append(gData['shock-speed'][shock_speed_loc][1])
if not flag and not gData["shock-speed"][shock_speed_loc][2]:
flag = True
avg_speed = np.array(avg_speed_lst).mean()
avg_uncert = np.array(avg_uncert_lst).mean() # np.sqrt(np.sum(np.pow(np.array(avg_uncert_lst), 2)))
probeText += f"\nAverage Speed - {avg_speed/1000:.2f} $\\pm${avg_uncert/1000:.2f} [{avg_uncert/avg_speed * 100:.2f}%] km/s"
probeText += "\n" + "-"*50
probeText += "\n"
probeText += f"{shock_speed_loc} - {gData['shock-speed'][shock_speed_loc][0]/1000:.2f} $\\pm${gData['shock-speed'][shock_speed_loc][1]/1000:.2f} [{gData['shock-speed'][shock_speed_loc][1]/gData['shock-speed'][shock_speed_loc][0]*100:.2f}%] km/s"
graphData["plots"].append({
"type": "text",
"text": f"Measured Shock Speeds {probeText}",
"align": ("top", "right"),
"alpha": 0.8,
"x": 0.94,
"y": 0.94
})
if len(lims) > 1:
OFFSET = 10 #if not forcePlots else 50
graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/ref-{gData['name']}{'-all' if forcePlots else ''}.png")
def genComboRefGraph(data: dict, plotCh: list[str] = ["st1", "st2", "st3"], showPlot: bool = False, doShockLabels:bool = False, addShockInfo:bool = False):
graphData = {
"title": f"Shock Response Time\nFor Reference Shots 1, 2, & 3 (x2s5820, x2s5821 & x2s5822) - Mars Entry Conditions",
"xLabel": "Time ($\\mu$s)",
"yLabel": "Voltage Reading (V)",
"grid": True,
"figSize": (16, 6.8), #(8,6.5),
"ledgLoc": 'upper left',
"yLim": (-1.5, 11),
"plots": []
}
LINESTYLES = (
'solid',
'dotted',
'dashed',
'dashdot'
)
COLOURS = (
UQC["purple"],
UQC["blue"],
UQC["green"],
# Don't need these
UQC["red"],
UQC["light_purple"],
UQC["dark_grey"],
UQC["orange"],
UQC["yellow"],
UQC["aqua"],
UQC["gold"],
UQC["neutral"]
)
lims = []
for line_sty, shot in enumerate(data):
gData = data[shot]
for col, label in enumerate(plotCh):
graphData["plots"].append({
"x": gData["time"]["x2"],
"y": gData["data"]["x2"][label],
"colour": COLOURS[col % len(COLOURS)],
"args":{"zorder":2, "linestyle":LINESTYLES[line_sty % len(LINESTYLES)], "alpha":0.5}
})
if line_sty == 0:
graphData["plots"][-1]["label"] = f"{label}",
for line_sty, shot in enumerate(data):
gData = data[shot]
for col, label in enumerate(plotCh):
if label in plotCh:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][label][1],
"colour": "gray",
"args":{"zorder":2, "linestyle":"-.", "alpha":0.5}
})
if doShockLabels:
graphData["plots"][-1]["label"] = f"{label} - Ref Shot {line_sty + 1} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s"
lims.append(gData["shock-point"][label][1]) # [TODO this but better]
if addShockInfo:
print("============================== Reference Shots ==============================")
shock_speeds = {}
for shot_id, shot in enumerate(data):
shot_id += 1
gData = data[shot]
for shock_speed_loc in gData['shock-speed']:
shk_sps = shock_speeds.get(shock_speed_loc, [])
shk_sps.append((gData['shock-speed'][shock_speed_loc][0], gData['shock-speed'][shock_speed_loc][1]))
shock_speeds[shock_speed_loc] = shk_sps
probeText = ""
avg_speeds = []
avg_uncerts = []
for shock_speed_loc in shock_speeds:
shock_info = np.array(shock_speeds[shock_speed_loc])
speeds = shock_info[:, 0]
uncerts = shock_info[:, 1]
speed = speeds.mean()
uncert = np.sqrt(np.pow(uncerts, 2).sum())
avg_speeds.append(speed)
avg_uncerts.append(uncert)
print(f"{shock_speed_loc} Measured a mean shock speed of {speed:.2f} +/- {uncert:.2f} m/s ({speed/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/speed * 100 :.2f}%])")
probeText += f"\n{shock_speed_loc} - {speed/1000:.2f} $\\pm${uncert/1000:.2f} [{uncert/speed*100:.2f}%] km/s"
avg_sp = np.array(avg_speeds).mean()
avg_unc = np.array(avg_uncerts).mean() #np.sqrt(np.pow(np.array(avg_uncerts), 2).sum())
probeText += f"\nAverage Speed - {avg_sp/1000:.2f} $\\pm${avg_unc/1000:.2f} [{avg_unc/avg_sp * 100:.2f}%] km/s"
graphData["plots"].append({
"type": "text",
"text": f"Average Measured Shock Speeds{probeText}",
"align": ("top", "right"),
"alpha": 0.8,
"x": 0.9,
"y": 0.9
})
if len(lims) > 1:
OFFSET = 10 #if not forcePlots else 50
graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/ref-combo-{'_'.join(plotCh)}.png")
def genComboDataGraph(data: dict, showPlot: bool = False, doShockLabels:bool = False, addShockInfo:bool = False):
shots = ""
names = ""
for i, shot in enumerate(data):
shots += ('' if i == 0 else ', ') + f"{data[shot]['info']['name'][-1]}"
names += ('' if i == 0 else ', ') + shot
graphData = {
"title": f"Shock Response Time\nFor Shots {shots} ({names})\nVarious Probe Locations - Mars Entry Conditions",
"xLabel": "Time ($\\mu$s)",
"yLabel": "Voltage Reading (V)",
"grid": True,
"figSize": (16, 6.8), #(8,6.5),
"ledgLoc": (0.6, 0.075) if doShockLabels else 'upper left',
"yLim": (-1.5, 11),
"plots": []
}
LINESTYLES = (
'solid',
'dotted',
'dashed',
'dashdot'
)
COLOURS = (
#UQC["purple"],
#UQC["blue"],
UQC["green"],
UQC["red"],
UQC["light_purple"],
UQC["dark_grey"],
UQC["orange"],
UQC["yellow"],
UQC["aqua"],
UQC["gold"],
UQC["neutral"]
)
lims = []
for line_sty, shot in enumerate(data):
gData = data[shot]
for label, d in [("1 [V]", "Gauge 1"),("2 [V]", "Gauge 2")]:
graphData["plots"].append({
"x": gData["time"]["scope"],
"y": gData["data"]["scope"][label],
#"label": d,
"colour": UQC["purple"] if label[0] == "1" else UQC["blue"],
"args":{"zorder":1, "linestyle": LINESTYLES[line_sty % len(LINESTYLES)]}
})
if line_sty == 0:
graphData["plots"][-2]["label"] = "Gauge 1"
graphData["plots"][-1]["label"] = "Gauge 2"
for _, probe in enumerate(gData["info"]["probe-info"]["locations"]):
if f"{probe}-g1" in gData["shock-point"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][f"{probe}-g1"][1],
"colour": UQC["purple"].lighten(0.5),
"args":{"zorder":2, "linestyle":"--", "alpha":0.5}
})
lims.append(gData["shock-point"][f"{probe}-g1"][1])
if doShockLabels:
graphData["plots"][-1]["label"] = f"{probe}-Gauge 1 - {shot} - Shock Point {gData['shock-point'][f'{probe}-g1'][1]:.2f}$\\mu$s"
if f"{probe}-g2" in gData["shock-point"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][f"{probe}-g2"][1],
"colour": UQC["purple"].lighten(0.5),
"args":{"zorder":2, "linestyle":":", "alpha":0.5}
})
lims.append(gData["shock-point"][f"{probe}-g2"][1])
if doShockLabels:
graphData["plots"][-1]["label"] = f"{probe}-Gauge 2 - {shot} - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s"
if False:
for line_sty, shot in enumerate(data):
gData = data[shot]
plotCh = gData["info"]["pcb-refs"]
for col, label in enumerate(plotCh):
if label in plotCh:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][label][1],
"colour": "gray",
"args":{"zorder":2, "linestyle":"--", "alpha":0.5}
})
if doShockLabels:
graphData["plots"][-1]["label"] = f"{label} - Ref Shot {line_sty + 1} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s"
lims.append(gData["shock-point"][label][1]) # [TODO this but better]
if addShockInfo:
print("============================== Reference Shots ==============================")
shock_speeds = {}
for shot_id, shot in enumerate(data):
shot_id += 1
gData = data[shot]
for shock_speed_loc in gData['shock-speed']:
shk_sps = shock_speeds.get(shock_speed_loc, [])
shk_sps.append((gData['shock-speed'][shock_speed_loc][0], gData['shock-speed'][shock_speed_loc][1]))
shock_speeds[shock_speed_loc] = shk_sps
probeText = ""
for shock_speed_loc in shock_speeds:
shock_info = np.array(shock_speeds[shock_speed_loc])
speeds = shock_info[:, 0]
uncerts = shock_info[:, 1]
speed = speeds.mean()
uncert = np.sqrt(np.pow(uncerts, 2).sum())
print(f"{shock_speed_loc} Measured a mean shock speed of {speed:.2f} +/- {uncert:.2f} m/s ({speed/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/speed * 100 :.2f}%])")
probeText += f"\n{shock_speed_loc} - {speed/1000:.2f} $\\pm${uncert/1000:.2f} [{uncert/speed*100:.2f}%] km/s"
graphData["plots"].append({
"type": "text",
"text": f"Average Measured Shock Speeds{probeText}",
"align": ("top", "right"),
"alpha": 0.8,
"x": 0.9,
"y": 0.9
})
if len(lims) > 1:
OFFSET = 10 #if not forcePlots else 50
graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/combo-my-data.png")
def genX2CompGraphs(gData: dict, showPlot: bool = True):
graphData = {
"title": f"Signal Comparison for {gData['info']['name']} ({gData['info']['shot-info']['name']})",
"xLabel": "Time ($\\mu$s)",
"yLabel": "Voltage Reading (V)",
"grid": True,
"figSize": (16, 6.8), #(9, 6.8), #(8,6.5),
"ledgLoc": 'upper left',
"yLim": (-1.5, 11),
"plots": []
}
lims = []
for label, d in [("1 [V]", "Gauge 1 - Scope"),("2 [V]", "Gauge 2 - Scope")]: #, ("4 [V]", "Gauge Trigger")]:
graphData["plots"].append({
"x": gData["time"]["scope"],
"y": gData["data"]["scope"][label],
"label": d,
"args":{"zorder":1}
})
for label, d in [("st1", "Gauge 1 - X2 DAQ"),("st3", "Gauge 2 - X2 DAQ")]: #, ("4 [V]", "Gauge Trigger")]:
graphData["plots"].append({
"x": gData["time"]["x2"],
"y": gData["data"]["x2"][label],
"label": d,
"args":{"zorder":1}
})
for _, probe in enumerate(gData["info"]["probe-info"]["locations"]):
if f"{probe}-g1" in gData["shock-point"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][f"{probe}-g1"][1],#[i],
"label": f"{probe}-Gauge 1 - Scope - Shock Point {gData['shock-point'][f'{probe}-g1'][1]:.2f}$\\mu$s",
"colour": UQC["purple"].lighten(0.5),
"args":{"zorder":2, "linestyle":"--", "alpha":0.5}
})
lims.append(gData["shock-point"][f"{probe}-g1"][1])
if f"{probe}-g2" in gData["shock-point"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][f"{probe}-g2"][1],#[i],
"label": f"{probe}-Gauge 2 - Scope - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s",
"colour": UQC["purple"].lighten(0.5),
"args":{"zorder":2, "linestyle":":", "alpha":0.5}
})
lims.append(gData["shock-point"][f"{probe}-g2"][1])
for _, probe in enumerate(gData["info"]["probe-info"]["locations"]):
if f"{probe}-g1" in gData["shock-point"]["x2"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"]["x2"][f"{probe}-g1"][1],#[i],
"label": f"{probe}-Gauge 1 - X2 DAQ - Shock Point {gData['shock-point']["x2"][f'{probe}-g1'][1]:.2f}$\\mu$s",
"colour": UQC["dark_grey"],
"args":{"zorder":2, "linestyle":"--", "alpha":0.5}
})
lims.append(gData["shock-point"]["x2"][f"{probe}-g1"][1])
if f"{probe}-g2" in gData["shock-point"]["x2"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"]["x2"][f"{probe}-g2"][1],#[i],
"label": f"{probe}-Gauge 2 - X2 DAQ - Shock Point {gData['shock-point']["x2"][f'{probe}-g2'][1]:.2f}$\\mu$s",
"colour": UQC["dark_grey"],
"args":{"zorder":2, "linestyle":":", "alpha":0.5}
})
lims.append(gData["shock-point"]["x2"][f"{probe}-g2"][1])
if True and len(lims) > 1:
OFFSET = 10 #if not forcePlots else 50
graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/signal_comp-{gData['info']['shot-info']['name']}.png")
print("-")
scopeTime = gData["time"]["scope"]
scopeData = gData["data"]["scope"]
x2Time = gData["time"]["x2"]
#x2Data = gData["data"]["x2"]
pass
if __name__ == "__main__":
data_to_load = [
"x2s5823",
"x2s5824",
"x2s5827",
"x2s5829",
"x2s5830",
"x2s5831",
"x2s5832"
]
ref_data_to_load = [
"x2s5820",
"x2s5821",
"x2s5822"
]
print("Loading Data")
# My Shot Data
data = {}
for dp in data_to_load:
pdp = f"{DATA_PATH}/{dp}/"
load_data(pdp, data)
loaded_data = tuple(data.keys())
# Reference Data from Mragank
ref_data = {}
for refShot in ref_data_to_load:
load_ref_data(refShot, f"./data/referance/{refShot}/{refShot}.tdms", ref_data)
print("Loaded Data")
print("Graphing Data")
# General Shot Graphing
for shot in loaded_data:
#print(data[shot]['info']['long_name'].rsplit("\n", 1)[-1])
genGraph(data[shot], showPlot=False, addShockInfo=False) genGraph(data[shot], showPlot=False, addShockInfo=False)
genGraph(data[shot], showPlot=False, forcePlots=True) genGraph(data[shot], showPlot=False, forcePlots=True)
# This forces matplotlib to hang until I tell it to close all windows combo_data = data.copy()
pltKeyClose() combo_data.pop(loaded_data[-2])
combo_data.pop(loaded_data[-1])
print("Done") genComboDataGraph(combo_data, doShockLabels=True)
genX2CompGraphs(data["x2s5831"], showPlot=False)
genX2CompGraphs(data["x2s5832"], showPlot=False)
# Reference Data
for shot in ref_data:
genRefGraph(ref_data[shot], showPlot=False, addShockInfo=False)
genRefGraph(ref_data[shot], showPlot=False, forcePlots=True)
genComboRefGraph(ref_data, doShockLabels=True)
genComboRefGraph(ref_data, ref_data[ref_data_to_load[0]]["info"]["pcb-refs"], addShockInfo=True)
# This forces matplotlib to hang until I tell it to close all windows
pltKeyClose()
print("Done")