Thesis/main.py

1140 lines
46 KiB
Python
Raw Normal View History

2024-10-15 15:11:10 +10:00
# Cal Wing (c.wing@uq.net.au) - Oct 2024
# Thesis Graphing
2024-09-30 19:30:28 +10:00
import os
2024-09-30 19:13:09 +10:00
import numpy as np
2024-10-15 15:11:10 +10:00
import pandas as pd
import yaml
2024-09-30 19:13:09 +10:00
from nptdms import TdmsFile
from makeGraph import makeGraph, pltKeyClose, UQ_COLOURS as UQC
2024-10-15 20:33:26 +10:00
from canny_shock_finder import canny_shock_finder
2024-09-30 19:30:28 +10:00
# Folder correction
# Make sure the relevant folders folder exists
folders = ["./images"]
for folder in folders:
if not os.path.isdir(folder): os.mkdir(folder)
2024-10-16 22:09:24 +10:00
# Data Paths
2024-10-15 15:11:10 +10:00
DATA_PATH = "./data"
DATA_INFO = "_info.yaml"
2024-10-16 22:09:24 +10:00
TUNNEL_INFO_FILE = "./tunnel-info.yaml"
SAMPLES_TO_AVG = 500
2024-10-17 22:58:30 +10:00
CANNY_TIME_OFFSET = 50 #us
2024-10-16 20:57:25 +10:00
2024-10-16 22:09:24 +10:00
with open(TUNNEL_INFO_FILE, 'r') as file:
TUNNEL_INFO = yaml.safe_load(file)
2024-09-30 16:45:52 +10:00
2024-10-15 15:11:10 +10:00
data_to_load = [
2024-10-22 14:04:36 +10:00
"x2s5823",
"x2s5824",
"x2s5827",
2024-10-18 21:08:55 +10:00
"x2s5829",
2024-10-22 14:04:36 +10:00
"x2s5830",
2024-10-18 21:08:55 +10:00
"x2s5831",
"x2s5832"
2024-10-15 15:11:10 +10:00
]
2024-09-30 16:45:52 +10:00
2024-10-22 23:18:40 +10:00
ref_data_to_load = [
"x2s5820",
"x2s5821",
"x2s5822"
]
2024-10-22 15:46:08 +10:00
# ==== Uncerts ====
# Taken from DOI: 10.1007/s00193-017-0763-3 (Implementation of a state-to-state analytical framework for the calculation of expansion tube flow properties)
UNCERTS = TUNNEL_INFO["uncertainties"]
def deltaX(delta_x_1: float, delta_x_2: float):
return np.sqrt(np.pow(delta_x_1, 2) + np.pow(delta_x_2, 2))
def deltaT(delta_t_1: float, delta_t_2: float, delta_t_sr: float):
return np.sqrt(np.pow(delta_t_1, 2) + np.pow(delta_t_2, 2) + np.pow(delta_t_sr, 2))
def deltaVs(V: float, dx: float, dt: float, delta_x: tuple[float, float], delta_t: tuple[float, float, float]):
return V * np.sqrt(np.pow(deltaX(*delta_x) / dx, 2) + np.pow(deltaT(*delta_t) / dt, 2))
2024-10-16 22:09:24 +10:00
# ==== Data Loading & Processing ====
2024-10-17 22:58:30 +10:00
def load_data(data_path: str, data={}) -> dict:
data_info_path = data_path + DATA_INFO
if not os.path.exists(data_info_path):
print(f"[ERR] Could not find data info file: '{data_info_path}'")
print(f"[WARN] Not Loading Data '{data_path}'")
return None
2024-10-18 21:08:55 +10:00
2024-10-17 22:58:30 +10:00
# Load Shot Data Info YAML File (Cal)
with open(data_info_path, 'r') as file:
dataInfo = yaml.safe_load(file)
2024-10-18 21:08:55 +10:00
2024-10-17 22:58:30 +10:00
# Grab the shot name
x2_shot = dataInfo["shot-info"]["name"]
2024-10-18 09:41:31 +10:00
# Update shot-info values to use the name
dataInfo["shot-info"]["tdms"] = dataInfo["shot-info"]["tdms"].format(x2_shot)
dataInfo["shot-info"]["config"] = dataInfo["shot-info"]["config"].format(x2_shot)
dataInfo["shot-info"]["info"] = dataInfo["shot-info"]["info"].format(x2_shot)
2024-10-18 21:08:55 +10:00
2024-10-17 22:58:30 +10:00
# Load Raw Data
# TDMS File (X2 DAQ Data)
x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True)
x2_channels = x2_tdms_data.groups()[0].channels()
x2_channel_names = tuple(c.name for c in x2_channels)
2024-10-18 21:08:55 +10:00
2024-10-18 09:35:36 +10:00
data_locs = [dr["type"] for dr in dataInfo["probe-info"]["data-records"]]
2024-10-17 22:58:30 +10:00
# Scope info _if it exists_
2024-10-18 09:35:36 +10:00
if "scope" in data_locs:
2024-10-18 15:26:29 +10:00
scope_data_info = dataInfo["probe-info"]["data-records"][data_locs.index("scope")]
2024-10-18 09:35:36 +10:00
scope_data_path = data_path + scope_data_info["data"]
2024-10-18 15:26:29 +10:00
scope_config_path = data_path + scope_data_info["config"] # [TODO] Read this file
2024-10-17 22:58:30 +10:00
# Generate Data Headers - This could be better
with open(scope_data_path, 'r') as dfile:
scope_header = []
header_lines = []
for i, line in enumerate(dfile):
if i > 1: break
header_lines.append(line.strip().split(","))
2024-10-18 21:08:55 +10:00
2024-10-17 22:58:30 +10:00
for i, name in enumerate(header_lines[0]):
if name == "x-axis":
name = "Time"
2024-10-18 21:08:55 +10:00
2024-10-17 22:58:30 +10:00
if header_lines[1][i] in ["second", "Volt"]:
outStr = f"{name} [{header_lines[1][i][0]}]"
else:
outStr = f"{name} [{header_lines[1][i]}]"
2024-10-18 21:08:55 +10:00
2024-10-17 22:58:30 +10:00
scope_header.append(outStr)
# Load the Scope CSV Data
scope_data = np.loadtxt(scope_data_path, delimiter=',', skiprows=2)
2024-10-23 01:22:07 +10:00
2024-10-17 22:58:30 +10:00
# Build a data object (this could be cached - or partially cached if I was clever enough)
# Raw Data is always added - processing comes after
data[x2_shot] = {
"info": dataInfo,
2024-10-22 20:32:33 +10:00
"shot_time": np.datetime64(f"{dataInfo['date']}T{dataInfo['time']}"),
2024-10-17 22:58:30 +10:00
"raw-data":{
"probe_headers": scope_header,
"probes": scope_data,
"x2": x2_channels,
"x2-channels": x2_channel_names,
"x2-tdms": x2_tdms_data
},
"time": {
"x2": None,
2024-10-18 09:35:36 +10:00
"probes": None, # This may be x2 but may not - ie a scope was used
"trigger_index": None,
2024-10-22 15:46:08 +10:00
"probe_uncert": None, #s
2024-10-23 03:59:15 +10:00
"x2-dt": None, #
2024-10-17 22:58:30 +10:00
},
"data": {
2024-10-18 09:35:36 +10:00
"x2": {}, # Only pop channels with a voltage scale in ./tunnel-info.yaml
2024-10-18 15:26:29 +10:00
"probes": [[None], [None]] # Save probe data in volts - [G1, G2]
2024-10-18 00:03:22 +10:00
},
2024-10-18 21:08:55 +10:00
"shock-speed": {} # Note all in us
2024-10-17 22:58:30 +10:00
}
2024-10-17 22:58:30 +10:00
# === Process the data ===
# Generate X2 time arrays
time_data = x2_channels[0]
2024-10-18 21:08:55 +10:00
ns_time = time_data[:].as_datetime64('ns')
2024-10-17 22:58:30 +10:00
x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns]
x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us
2024-10-23 03:59:15 +10:00
x2_time_dt = np.diff(x2_time_us).mean() / 1000 # Scale to s
2024-10-17 22:58:30 +10:00
#second_fractions = np.array(time_data[:].second_fractions, dtype=int) # 2^-64 ths of a second
#x2_time_seconds = (second_fractions - second_fractions[0]) / (2**(-64)) # 0 time data and convert to seconds
#x2_time_us = x2_time_seconds * 1000 # Scale to us
# --- Un Scale Data ---
for channel, vScale in TUNNEL_INFO["volt-scale"].items():
# Get the channel index from its name
chIndex = x2_channel_names.index(channel)
# Calculate the average noise offset
avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean()
# Save the channel data
data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale
# Process Trigger Info
trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset
x2_trigger_index = np.where(trigger_volts > 1)[0][0]
x2_trigger_time = x2_time_us[x2_trigger_index]
# Add the time data
data[x2_shot]["time"] = {
"x2": x2_time_us,
2024-10-18 09:35:36 +10:00
"trigger_index": x2_trigger_index,
"probes": x2_time_us, # Until otherwise overridden - probe time is x2 time
2024-10-23 03:59:15 +10:00
"x2-dt": x2_time_dt
2024-10-17 22:58:30 +10:00
}
2024-10-23 08:34:27 +10:00
data[x2_shot]["time"]["x2_uncert"] = UNCERTS["time"]["x2-daq"] #max(UNCERTS["time"]["x2-daq"], data[x2_shot]["time"]["x2-dt"])
2024-10-23 08:31:51 +10:00
2024-10-23 01:22:07 +10:00
# Setup custom scaling on the gauge values
if "x2" in data_locs:
for ch in dataInfo["probe-info"]["data-records"][data_locs.index("x2")]["channels"]:
if ch in dataInfo["probe-info"]["data-records"][data_locs.index("x2")]["scaler"]:
# Get the channel index from its name
2024-10-23 01:51:25 +10:00
chIndex = x2_channel_names.index(ch)
2024-10-23 01:22:07 +10:00
# Calculate the average noise offset
avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean()
# Save the channel data
2024-10-23 01:51:25 +10:00
data[x2_shot]["data"]["x2"][ch] = (x2_channels[chIndex][:] - avg_noise) * dataInfo["probe-info"]["data-records"][data_locs.index("x2")]["scaler"][ch]
2024-10-23 01:22:07 +10:00
#[TODO] This could be better
#if "x2" in data_locs:
# data[x2_shot]["data"]["probes"] = [data[x2_shot]["data"]["x2"]["st1"][:], data[x2_shot]["data"]["x2"]["st3"][:]]
2024-10-16 19:31:51 +10:00
2024-10-17 22:58:30 +10:00
# Scope timing _if it exists_
2024-10-18 09:35:36 +10:00
if "scope" in data_locs:
2024-10-18 15:26:29 +10:00
scope_data_info = dataInfo["probe-info"]["data-records"][data_locs.index("scope")]
2024-10-18 09:35:36 +10:00
trigger_info = scope_data_info["trigger"] # Get the scope trigger info
2024-10-16 19:31:51 +10:00
2024-10-17 22:58:30 +10:00
# Calc the scope time & apply any manual offsets
scope_time = (scope_data[:, 0] - scope_data[0, 0]) * 1e6 # to us
scope_time -= trigger_info["alignment-offset"] # manual offset delay
2024-10-16 19:31:51 +10:00
2024-10-17 22:58:30 +10:00
# Trigger Alignment
scope_trigger_volts = (scope_data[:, 3] - scope_data[0:SAMPLES_TO_AVG, 3].mean()) # Use a mean here too
scope_trigger_index = np.where(scope_trigger_volts > 1)[0][0]
scope_trigger_time = scope_time[scope_trigger_index]
2024-10-15 20:33:26 +10:00
2024-10-17 22:58:30 +10:00
scope_alignment = x2_trigger_time - scope_trigger_time
2024-10-15 20:33:26 +10:00
2024-10-17 22:58:30 +10:00
scope_time += scope_alignment
2024-10-15 20:33:26 +10:00
2024-10-17 22:58:30 +10:00
# Offset any trigger delays
scope_time += trigger_info["delay"] # us delay from the actual trigger signal to the scope received trigger
2024-10-16 22:45:30 +10:00
2024-10-17 22:58:30 +10:00
data[x2_shot]["time"]["scope"] = scope_time
data[x2_shot]["time"]["scope-offset"] = scope_alignment
2024-10-23 03:59:15 +10:00
data[x2_shot]["time"]["scope-dt"] = np.diff(scope_time).mean() / 1e6
2024-10-15 20:33:26 +10:00
2024-10-17 22:58:30 +10:00
data[x2_shot]["data"]["scope"] = {}
for i, header in enumerate(scope_header):
if i == 0: continue # Don't record time
2024-10-16 19:31:51 +10:00
2024-10-17 22:58:30 +10:00
# Python reference so its the same object
ref = scope_data[:, i]
data[x2_shot]["data"]["scope"][i] = ref
data[x2_shot]["data"]["scope"][header] = ref
2024-10-15 20:33:26 +10:00
2024-10-18 09:35:36 +10:00
# Save Probe Data
if "scope" in data_locs:
2024-10-18 15:26:29 +10:00
data[x2_shot]["data"]["probes"] = [data[x2_shot]["data"]["scope"][1], data[x2_shot]["data"]["scope"][2]]
2024-10-18 09:35:36 +10:00
data[x2_shot]["time"]["probes"] = data[x2_shot]["time"]["scope"]
2024-10-23 03:59:15 +10:00
data[x2_shot]["time"]["probe_uncert"] = max(scope_data_info["time-uncert"], data[x2_shot]["time"]["scope-dt"])
2024-10-18 09:35:36 +10:00
2024-10-17 22:58:30 +10:00
# Find Shock Times
# X2 - Canning Edge
data[x2_shot]["shock-point"] = {}
2024-10-22 14:04:36 +10:00
cArgs = dataInfo["pcb-canny"]
for i, ref in enumerate(dataInfo["pcb-refs"]):
2024-10-23 01:22:07 +10:00
chData = data[x2_shot]["data"]["x2"][ref]
2024-10-22 14:04:36 +10:00
if i in range(len(cArgs)):
sigma = cArgs[i]["sigma"]
post_sup_thresh = cArgs[i]["post_pres"]
else:
sigma = cArgs[-1]["sigma"]
post_sup_thresh = cArgs[-1]["post_pres"]
2024-10-23 01:22:07 +10:00
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, chData, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=False, print_func=None)
2024-10-17 22:58:30 +10:00
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
2024-10-18 21:08:55 +10:00
2024-10-22 15:46:08 +10:00
data[x2_shot]["shock-point"][ref] = shock_point, first_value, first_value_uncertainty
2024-10-18 21:08:55 +10:00
2024-10-22 14:04:36 +10:00
2024-10-23 01:22:07 +10:00
if "x2" in data_locs:
cArgs = dataInfo["x2-canny"]
data[x2_shot]["shock-point"]["x2"] = {}
probeCh1 = data[x2_shot]["data"]["x2"]["st1"]
probeCh2 = data[x2_shot]["data"]["x2"]["st3"]
for i, probe in enumerate(dataInfo["probe-info"]["locations"]):
# Get the canny-args
cArgs = dataInfo["x2-canny"]
doCannyPlot = False
if i in range(len(cArgs)):
sigma = cArgs[i]["sigma"]
post_sup_thresh = cArgs[i]["post_pres"]
else:
sigma = cArgs[-1]["sigma"]
post_sup_thresh = cArgs[-1]["post_pres"]
# If this _isn't_ the first probe then apply a time offset
if i > 0:
privPoint = dataInfo["probe-info"]["locations"][i-1]
time_offset = data[x2_shot]["shock-point"]["x2"][f"{privPoint}-g1"][1] + CANNY_TIME_OFFSET
else:
time_offset = None
# Find G1 Shock Time
if 1 in dataInfo["probe-info"]["gauges"]:
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, probeCh1, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None)
if first_value is None:
print(f"[ERROR] {x2_shot} - {probe}-g1 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}")
#raise ValueError(f"{probe}-g1 not detected"
else:
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"]["x2"][f"{probe}-g1"] = shock_point, first_value, first_value_uncertainty
if 2 in dataInfo["probe-info"]["gauges"]:
# Do the same for G2
if i > 0:
time_offset = data[x2_shot]["shock-point"]["x2"][f"{privPoint}-g2"][1] + CANNY_TIME_OFFSET
# Find G2 Shock Time
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, probeCh2, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None)
if first_value is None:
print(f"[ERROR] {x2_shot} - {probe}-g2 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}")
#raise ValueError(f"{probe}-g2 not detected")
else:
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"]["x2"][f"{probe}-g2"] = shock_point, first_value, first_value_uncertainty
2024-10-22 14:04:36 +10:00
# ---- Gauge Canning Edge ----
2024-10-23 01:22:07 +10:00
probeCh1 = data[x2_shot]["data"]["probes"][0]
probeCh2 = data[x2_shot]["data"]["probes"][1]
2024-10-17 22:58:30 +10:00
for i, probe in enumerate(dataInfo["probe-info"]["locations"]):
2024-10-18 00:03:22 +10:00
# Get the canny-args
cArgs = dataInfo["canny-args"]
doCannyPlot = False
if i in range(len(cArgs)):
sigma = cArgs[i]["sigma"]
2024-10-18 09:35:36 +10:00
post_sup_thresh = cArgs[i]["post_pres"]
2024-10-18 21:08:55 +10:00
else:
2024-10-18 00:03:22 +10:00
sigma = cArgs[-1]["sigma"]
2024-10-18 09:35:36 +10:00
post_sup_thresh = cArgs[-1]["post_pres"]
2024-10-18 00:03:22 +10:00
# If this _isn't_ the first probe then apply a time offset
2024-10-17 22:58:30 +10:00
if i > 0:
privPoint = dataInfo["probe-info"]["locations"][i-1]
time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g1"][1] + CANNY_TIME_OFFSET
2024-10-18 00:03:22 +10:00
else:
2024-10-17 22:58:30 +10:00
time_offset = None
2024-10-16 23:17:26 +10:00
2024-10-18 00:03:22 +10:00
# Find G1 Shock Time
2024-10-22 14:04:36 +10:00
if 1 in dataInfo["probe-info"]["gauges"]:
first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh1, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None)
if first_value is None:
print(f"[ERROR] {x2_shot} - {probe}-g1 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}")
raise ValueError(f"{probe}-g1 not detected")
2024-10-18 09:35:36 +10:00
2024-10-22 14:04:36 +10:00
shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1
2024-10-18 21:08:55 +10:00
2024-10-22 15:46:08 +10:00
data[x2_shot]["shock-point"][f"{probe}-g1"] = shock_point, first_value, first_value_uncertainty
2024-10-18 00:03:22 +10:00
2024-10-22 14:04:36 +10:00
if 2 in dataInfo["probe-info"]["gauges"]:
# Do the same for G2
if i > 0:
time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g2"][1] + CANNY_TIME_OFFSET
2024-10-18 21:08:55 +10:00
2024-10-22 14:04:36 +10:00
# Find G2 Shock Time
first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh2, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None)
if first_value is None:
print(f"[ERROR] {x2_shot} - {probe}-g2 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}")
raise ValueError(f"{probe}-g2 not detected")
2024-10-18 21:08:55 +10:00
2024-10-22 14:04:36 +10:00
shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1
2024-10-22 15:46:08 +10:00
data[x2_shot]["shock-point"][f"{probe}-g2"] = shock_point, first_value, first_value_uncertainty
2024-10-18 21:08:55 +10:00
2024-10-22 14:42:39 +10:00
2024-10-17 22:58:30 +10:00
# Calculate Shock Speeds
2024-10-18 15:26:29 +10:00
print("="*30, x2_shot, "="*30)
print("--", dataInfo["long_name"], "--")
for i, refProbe in enumerate(dataInfo["pcb-refs"]):
if i == 0: continue
p1_time = data[x2_shot]["shock-point"][refProbe][1] / 1e6 # Convert to seconds
p2_time = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][1] / 1e6 # Convert to seconds
2024-10-22 15:46:08 +10:00
2024-10-22 20:23:02 +10:00
p2p_dist = abs(TUNNEL_INFO["distance"][refProbe] - TUNNEL_INFO["distance"][dataInfo["pcb-refs"][i-1]]) / 1000 # convert to m
2024-10-22 15:46:08 +10:00
p2p_time = abs(p2_time - p1_time)
probe_velocity = p2p_dist / p2p_time # m/s
2024-10-22 20:23:02 +10:00
p1_time_uncert = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][2] / 1e6 # Convert to seconds
p2_time_uncert = data[x2_shot]["shock-point"][refProbe][2] / 1e6 # Convert to seconds
2024-10-22 15:46:08 +10:00
2024-10-23 08:31:51 +10:00
uncert = deltaVs(probe_velocity, p2p_dist, p2p_time, (UNCERTS["probe-dist"][refProbe], UNCERTS["probe-dist"][dataInfo["pcb-refs"][i-1]]), (p1_time_uncert, p2_time_uncert, data[x2_shot]["time"]["x2_uncert"]))
2024-10-18 21:08:55 +10:00
2024-10-22 20:32:33 +10:00
print(f"{dataInfo['pcb-refs'][i-1]}-{refProbe} Measured a shock speed of {probe_velocity:.2f} +/- {uncert:.2f} m/s ({probe_velocity/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/probe_velocity * 100 :.2f}%])")
data[x2_shot]["shock-speed"][f"{dataInfo['pcb-refs'][i-1]}-{refProbe}"] = probe_velocity, uncert, True # Speed, Ref
2024-10-18 15:26:29 +10:00
print()
2024-10-17 22:58:30 +10:00
for probe in dataInfo["probe-info"]["locations"]:
2024-10-22 14:04:36 +10:00
if f"{probe}-g1" in data[x2_shot]["shock-point"] and f"{probe}-g2" in data[x2_shot]["shock-point"]:
g1_time = data[x2_shot]["shock-point"][f"{probe}-g1"][1] / 1e6 # Convert to seconds
g2_time = data[x2_shot]["shock-point"][f"{probe}-g2"][1] / 1e6 # Convert to seconds
c2c_dist = dataInfo["probe-info"]["c2c"] / 1000 # convert to m
2024-10-22 15:46:08 +10:00
c2c_time = abs(g2_time - g1_time)
2024-10-18 21:08:55 +10:00
2024-10-22 15:46:08 +10:00
probe_velocity = c2c_dist / c2c_time # m/s
2024-10-17 00:15:06 +10:00
2024-10-22 15:46:08 +10:00
g1_time_uncert = data[x2_shot]["shock-point"][f"{probe}-g1"][2] / 1e6 # Convert to seconds
g2_time_uncert = data[x2_shot]["shock-point"][f"{probe}-g2"][2] / 1e6 # Convert to seconds
uncert = deltaVs(probe_velocity, p2p_dist, p2p_time, (0.05/1000, 0.05/1000), (g1_time_uncert, g2_time_uncert, data[x2_shot]["time"]["probe_uncert"]))
print(f"{probe} Measured a shock speed of {probe_velocity:.2f} +/- {uncert:.2f} m/s ({probe_velocity/1000:.2f} +/- {uncert/1000:.2f} km/s)")
data[x2_shot]["shock-speed"][probe] = probe_velocity, uncert, False # Speed, Ref # m/s
2024-10-22 14:04:36 +10:00
else:
2024-10-22 20:32:33 +10:00
print(f"Unable to calculate probe velocity, only have one gauge: {f'{probe}-g2' if f'{probe}-g2' in data[x2_shot]['shock-point'] else f'{probe}-g1'}")
2024-10-17 00:02:40 +10:00
2024-10-17 22:58:30 +10:00
if len(dataInfo["probe-info"]["locations"]) > 1:
for i in range(len(dataInfo["probe-info"]["locations"]) - 1):
probe_locs = dataInfo["probe-info"]["locations"]
2024-10-22 14:04:36 +10:00
2024-10-22 15:46:08 +10:00
p2p_dist = (TUNNEL_INFO["distance"][probe_locs[1]] - TUNNEL_INFO["distance"][probe_locs[0]]) / 1000 # convert to m
2024-10-18 21:08:55 +10:00
2024-10-22 14:04:36 +10:00
if f"{probe_locs[i]}-g1" in data[x2_shot]["shock-point"] and f"{probe_locs[i+1]}-g1" in data[x2_shot]["shock-point"]:
p1_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g1"][1] / 1e6 # Convert to seconds
p2_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g1"][1] / 1e6 # Convert to seconds
2024-10-22 15:46:08 +10:00
p2p_time = abs(p2_g1_time - p1_g1_time)
p2p_1 = p2p_dist / p2p_time # m/s
p1_time_uncert = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g1"][2] / 1e6 # Convert to seconds
p2_time_uncert = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g1"][2] / 1e6 # Convert to seconds
uncert = deltaVs(p2p_1, p2p_dist, p2p_time, (UNCERTS["probe-dist"][probe_locs[i]], UNCERTS["probe-dist"][probe_locs[i+1]]), (p1_time_uncert, p2_time_uncert, data[x2_shot]["time"]["probe_uncert"]))
2024-10-22 14:42:39 +10:00
2024-10-22 15:46:08 +10:00
print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G1 - Measured a shock speed of {p2p_1:.2f} +/- {uncert:.2f} m/s ({p2p_1/1000:.2f} +/- {uncert/1000:.2f} [{uncert/p2p_1 * 100:.2f}%] km/s)")
data[x2_shot]["shock-speed"][f"{probe_locs[i]}-{probe_locs[i + 1]}-g1"] = p2p_1, uncert, False # Speed, Ref
2024-10-22 14:04:36 +10:00
if f"{probe_locs[i]}-g2" in data[x2_shot]["shock-point"] and f"{probe_locs[i+1]}-g2" in data[x2_shot]["shock-point"]:
p1_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g2"][1] / 1e6 # Convert to seconds
p2_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g2"][1] / 1e6 # Convert to seconds
2024-10-22 15:46:08 +10:00
p2p_time = abs(p2_g2_time - p1_g2_time)
p2p_2 = p2p_dist / p2p_time # m/s
2024-10-22 14:42:39 +10:00
2024-10-22 15:46:08 +10:00
p1_time_uncert = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g2"][2] / 1e6 # Convert to seconds
p2_time_uncert = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g2"][2] / 1e6 # Convert to seconds
uncert = deltaVs(p2p_2, p2p_dist, p2p_time, (UNCERTS["probe-dist"][probe_locs[i]], UNCERTS["probe-dist"][probe_locs[i+1]]), (p1_time_uncert, p2_time_uncert, data[x2_shot]["time"]["probe_uncert"]))
print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G2 - Measured a shock speed of {p2p_2:.2f} +/- {uncert:.2f} m/s ({p2p_2/1000:.2f} +/- {uncert/1000:.2f} [{uncert/p2p_2 * 100:.2f}%] km/s)")
data[x2_shot]["shock-speed"][f"{probe_locs[i]}-{probe_locs[i + 1]}-g2"] = p2p_2, uncert, False # Speed, Ref
2024-10-17 22:58:30 +10:00
print()
2024-10-18 21:08:55 +10:00
# Return the data & the successfully loaded data keys
2024-10-17 22:58:30 +10:00
return data #, tuple(data.keys())
2024-10-22 23:18:40 +10:00
def load_ref_data(x2_shot: str, data_path: str, data={}) -> dict:
# Load Raw Data
# TDMS File (X2 DAQ Data)
x2_tdms_data = TdmsFile.read(data_path, raw_timestamps=True)
x2_channels = x2_tdms_data.groups()[0].channels()
x2_channel_names = tuple(c.name for c in x2_channels)
2024-10-17 22:58:30 +10:00
2024-10-22 23:18:40 +10:00
data[x2_shot] = {
"name": x2_shot,
"info": {
"name": x2_shot,
"pcb-canny": [
{
"sigma": 4,
"post_pres": 0.05
}
],
"pcb-refs": [
"st1",
"st2",
"st3",
"at1",
"at2",
"at3",
"at4",
"at5",
"at6",
],
"no-graph": [
"at1",
"at2",
"at3",
"at4",
"at5",
"at6",
]
},
"x2": x2_channels,
"x2-channels": x2_channel_names,
"x2-tdms": x2_tdms_data,
"data": {
"x2": {}, # Only pop channels with a voltage scale in ./tunnel-info.yaml
},
"time": {
"x2": None,
"trigger_index": None,
},
"shock-speed": {} # Note all in us
}
2024-10-17 22:58:30 +10:00
2024-10-22 23:18:40 +10:00
# === Process the data ===
# Generate X2 time arrays
time_data = x2_channels[0]
ns_time = time_data[:].as_datetime64('ns')
x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns]
x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us
2024-10-23 03:59:15 +10:00
x2_time_dt = np.diff(x2_time_us).mean() / 1000 # Scale to s
2024-10-22 23:18:40 +10:00
# --- Un Scale Data ---
for channel, vScale in TUNNEL_INFO["volt-scale"].items():
# Get the channel index from its name
chIndex = x2_channel_names.index(channel)
# Calculate the average noise offset
avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean()
# Save the channel data
data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale
# Process Trigger Info
trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset
x2_trigger_index = np.where(trigger_volts > 1)[0][0]
x2_trigger_time = x2_time_us[x2_trigger_index]
# Add the time data
data[x2_shot]["time"] = {
"x2": x2_time_us,
"trigger_index": x2_trigger_index,
2024-10-23 03:59:15 +10:00
"x2-dt": x2_time_dt
2024-10-22 23:18:40 +10:00
}
2024-10-15 20:33:26 +10:00
2024-10-23 08:34:27 +10:00
data[x2_shot]["time"]["x2_uncert"] = UNCERTS["time"]["x2-daq"] #max(UNCERTS["time"]["x2-daq"], data[x2_shot]["time"]["x2-dt"])
2024-10-23 08:32:53 +10:00
2024-10-15 20:33:26 +10:00
2024-10-22 23:18:40 +10:00
# Find Shock Times
# X2 - Canning Edge
# Default Values
dataInfo = data[x2_shot]["info"]
data[x2_shot]["shock-point"] = {}
cArgs = dataInfo["pcb-canny"]
for i, ref in enumerate(dataInfo["pcb-refs"]):
refData = data[x2_shot]["data"]["x2"][ref]
if i in range(len(cArgs)):
sigma = cArgs[i]["sigma"]
post_sup_thresh = cArgs[i]["post_pres"]
else:
sigma = cArgs[-1]["sigma"]
post_sup_thresh = cArgs[-1]["post_pres"]
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=False, print_func=None)
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"][ref] = shock_point, first_value, first_value_uncertainty
# Calculate Shock Speeds
print("="*30, x2_shot, "="*30)
print(f"-- Reference Shot {int(x2_shot[-1]) + 1} --")
for i, refProbe in enumerate(dataInfo["pcb-refs"]):
if i == 0: continue
p1_time = data[x2_shot]["shock-point"][refProbe][1] / 1e6 # Convert to seconds
p2_time = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][1] / 1e6 # Convert to seconds
p2p_dist = abs(TUNNEL_INFO["distance"][refProbe] - TUNNEL_INFO["distance"][dataInfo["pcb-refs"][i-1]]) / 1000 # convert to m
p2p_time = abs(p2_time - p1_time)
probe_velocity = p2p_dist / p2p_time # m/s
p1_time_uncert = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][2] / 1e6 # Convert to seconds
p2_time_uncert = data[x2_shot]["shock-point"][refProbe][2] / 1e6 # Convert to seconds
2024-10-23 08:31:51 +10:00
uncert = deltaVs(probe_velocity, p2p_dist, p2p_time, (UNCERTS["probe-dist"][refProbe], UNCERTS["probe-dist"][dataInfo["pcb-refs"][i-1]]), (p1_time_uncert, p2_time_uncert, data[x2_shot]["time"]["x2_uncert"]))
2024-10-22 23:18:40 +10:00
print(f"{dataInfo['pcb-refs'][i-1]}-{refProbe} Measured a shock speed of {probe_velocity:.2f} +/- {uncert:.2f} m/s ({probe_velocity/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/probe_velocity * 100 :.2f}%])")
data[x2_shot]["shock-speed"][f"{dataInfo['pcb-refs'][i-1]}-{refProbe}"] = probe_velocity, uncert, True # Speed, Ref
print()
return data
# ======= Graphing ========
2024-10-22 20:23:02 +10:00
def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlots: bool = False, addShockInfo: bool = True):
2024-10-15 20:33:26 +10:00
graphData = {
2024-10-22 21:37:28 +10:00
"title": f"Shock Response Time\nFor {gData['info']['long_name']}",
2024-10-16 22:45:30 +10:00
"xLabel": "Time ($\\mu$s)",
2024-10-15 20:33:26 +10:00
"yLabel": "Voltage Reading (V)",
"grid": True,
2024-10-22 21:37:28 +10:00
"figSize": (9, 6.8), #(8,6.5),
2024-10-18 00:03:22 +10:00
"ledgLoc": 'upper left',
2024-10-22 21:37:28 +10:00
"yLim": (-1.5, 11 if addShockInfo else 4),
2024-10-16 22:45:30 +10:00
"plots": []
2024-10-15 20:33:26 +10:00
}
2024-10-18 21:08:55 +10:00
2024-10-22 15:46:08 +10:00
#if forcePlots or not doLimits: graphData["title"] += "\n"
#if forcePlots: graphData["title"] += "(All Data Shown)"
#if not doLimits: graphData["title"] += () + "Full Re"
2024-10-17 20:48:43 +10:00
lims = []
2024-10-15 20:33:26 +10:00
2024-10-22 20:23:02 +10:00
for label, d in [("1 [V]", "Gauge 1"),("2 [V]", "Gauge 2")]: #, ("4 [V]", "Gauge Trigger")]:
2024-10-16 22:45:30 +10:00
graphData["plots"].append({
"x": gData["time"]["scope"],
"y": gData["data"]["scope"][label],
2024-10-22 21:42:38 +10:00
"label": d,
"args":{"zorder":1}
2024-10-16 22:45:30 +10:00
})
2024-10-16 23:17:26 +10:00
2024-10-22 21:42:38 +10:00
for _, probe in enumerate(gData["info"]["probe-info"]["locations"]):
2024-10-22 14:42:39 +10:00
if f"{probe}-g1" in gData["shock-point"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][f"{probe}-g1"][1],#[i],
2024-10-22 20:32:33 +10:00
"label": f"{probe}-Gauge 1 - Shock Point {gData['shock-point'][f'{probe}-g1'][1]:.2f}$\\mu$s",
2024-10-22 20:23:02 +10:00
"colour": UQC["purple"].lighten(0.5),
"args":{"zorder":2, "linestyle":"--", "alpha":0.5}
2024-10-22 14:42:39 +10:00
})
lims.append(gData["shock-point"][f"{probe}-g1"][1])
if f"{probe}-g2" in gData["shock-point"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][f"{probe}-g2"][1],#[i],
2024-10-22 20:32:33 +10:00
"label": f"{probe}-Gauge 2 - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s",
2024-10-22 20:23:02 +10:00
"colour": UQC["purple"].lighten(0.5),
2024-10-23 00:39:11 +10:00
"args":{"zorder":2, "linestyle":":", "alpha":0.5}
2024-10-22 14:42:39 +10:00
})
2024-10-17 20:48:43 +10:00
2024-10-22 14:42:39 +10:00
lims.append(gData["shock-point"][f"{probe}-g2"][1])
2024-10-17 20:48:43 +10:00
2024-10-22 21:42:38 +10:00
for label in gData["info"]["pcb-refs"]: # + ["trigbox"]:
if not forcePlots and label in gData["info"]["no-graph"]: continue
graphData["plots"].append({
"x": gData["time"]["x2"],
"y": gData["data"]["x2"][label],
"label": label
})
if label in gData["info"]["pcb-refs"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][label][1],
"label": f"{label} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s",
"colour": "gray",
2024-10-23 00:39:11 +10:00
"args":{"zorder":2, "linestyle":"-.", "alpha":0.5}
2024-10-22 21:42:38 +10:00
})
lims.append(gData["shock-point"][label][1]) # [TODO this but better]
2024-10-18 00:03:22 +10:00
2024-10-22 20:23:02 +10:00
if addShockInfo:
probeText = ""
flag = False
for shock_speed_loc in gData["shock-speed"]:
if not flag and not gData["shock-speed"][shock_speed_loc][2]:
flag = True
probeText += "\n" + "-"*50
probeText += "\n"
#probeText += "\\definecolor{my_gray}{rbg}{0.6, 0.5803921568627451, 0.5647058823529412}\\textcolor{my_gray}{" if gData["shock-speed"][shock_speed_loc][1] else ""
probeText += f"{shock_speed_loc} - {gData['shock-speed'][shock_speed_loc][0]/1000:.2f} $\\pm${gData['shock-speed'][shock_speed_loc][1]/1000:.2f} [{gData['shock-speed'][shock_speed_loc][1]/gData['shock-speed'][shock_speed_loc][0]*100:.2f}%] km/s"
#probeText += "}" if gData["shock-speed"][shock_speed_loc][1] else ""
graphData["plots"].append({
"type": "text",
2024-10-22 23:18:40 +10:00
"text": f"Measured Shock Speeds {probeText}",
2024-10-22 20:23:02 +10:00
"align": ("top", "right"),
2024-10-22 23:18:40 +10:00
"alpha": 0.8,
2024-10-22 21:37:28 +10:00
"x": 0.94, #if len(gData["info"]["probe-info"]["locations"]) < 3 else 0.885,
2024-10-22 20:23:02 +10:00
"y": 0.94
})
2024-10-18 00:03:22 +10:00
2024-10-22 14:42:39 +10:00
if doLimits and len(lims) > 1:
2024-10-22 20:23:02 +10:00
OFFSET = 10 #if not forcePlots else 50
2024-10-22 14:42:39 +10:00
graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
2024-10-18 21:08:55 +10:00
2024-10-22 20:23:02 +10:00
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/{gData['info']['shot-info']['name']}{'-all' if forcePlots else ''}{'-clipped' if doLimits else ''}.png") #figSavePath=f"./images/{{0}}{"-noLims" if not doLimits else ""}.png")
2024-10-15 20:33:26 +10:00
2024-10-15 21:30:01 +10:00
2024-10-22 23:18:40 +10:00
def genRefGraph(gData: dict, showPlot: bool = True, addShockInfo: bool = True, forcePlots: bool = False):
graphData = {
"title": f"Shock Response Time\nFor Reference Shot {int(gData['name'][-1]) + 1} ({gData['name']})",
"xLabel": "Time ($\\mu$s)",
"yLabel": "Voltage Reading (V)",
"grid": True,
"figSize": (9, 6.8), #(8,6.5),
"ledgLoc": 'upper left',
"yLim": (-1.5, 11),
"plots": []
}
lims = []
for label in gData["info"]["pcb-refs"]:
if not forcePlots and label in gData["info"]["no-graph"]: continue
graphData["plots"].append({
"x": gData["time"]["x2"],
"y": gData["data"]["x2"][label],
"label": label
})
2024-10-15 18:28:10 +10:00
2024-10-22 23:18:40 +10:00
if label in gData["info"]["pcb-refs"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][label][1],
"label": f"{label} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s",
"colour": "gray",
2024-10-23 00:39:11 +10:00
"args":{"zorder":2, "linestyle":"-.", "alpha":0.5}
2024-10-22 23:18:40 +10:00
})
lims.append(gData["shock-point"][label][1]) # [TODO this but better]
if addShockInfo:
probeText = ""
flag = False
for shock_speed_loc in gData["shock-speed"]:
if not flag and not gData["shock-speed"][shock_speed_loc][2]:
flag = True
probeText += "\n" + "-"*50
probeText += "\n"
probeText += f"{shock_speed_loc} - {gData['shock-speed'][shock_speed_loc][0]/1000:.2f} $\\pm${gData['shock-speed'][shock_speed_loc][1]/1000:.2f} [{gData['shock-speed'][shock_speed_loc][1]/gData['shock-speed'][shock_speed_loc][0]*100:.2f}%] km/s"
graphData["plots"].append({
"type": "text",
"text": f"Measured Shock Speeds {probeText}",
"align": ("top", "right"),
"alpha": 0.8,
"x": 0.94,
"y": 0.94
})
if len(lims) > 1:
OFFSET = 10 #if not forcePlots else 50
graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/ref-{gData['name']}{'-all' if forcePlots else ''}.png")
def genComboRefGraph(data: dict, plotCh: list[str] = ["st1", "st2", "st3"], showPlot: bool = False, doShockLabels:bool = False, addShockInfo:bool = False):
graphData = {
"title": f"Shock Response Time\nFor Reference Shots 1, 2, & 3 (x2s5820, x2s5821 & x2s5822) - Mars Entry Conditions",
"xLabel": "Time ($\\mu$s)",
"yLabel": "Voltage Reading (V)",
"grid": True,
"figSize": (16, 6.8), #(8,6.5),
"ledgLoc": 'upper left',
"yLim": (-1.5, 11),
"plots": []
}
LINESTYLES = (
'solid',
'dotted',
'dashed',
'dashdot'
)
COLOURS = (
UQC["purple"],
UQC["blue"],
UQC["green"],
# Don't need these
UQC["red"],
UQC["light_purple"],
UQC["dark_grey"],
UQC["orange"],
UQC["yellow"],
UQC["aqua"],
UQC["gold"],
UQC["neutral"]
)
lims = []
for line_sty, shot in enumerate(data):
gData = data[shot]
for col, label in enumerate(plotCh):
graphData["plots"].append({
"x": gData["time"]["x2"],
"y": gData["data"]["x2"][label],
"colour": COLOURS[col % len(COLOURS)],
"args":{"zorder":2, "linestyle":LINESTYLES[line_sty % len(LINESTYLES)], "alpha":0.5}
})
if line_sty == 0:
graphData["plots"][-1]["label"] = f"{label}",
for line_sty, shot in enumerate(data):
gData = data[shot]
for col, label in enumerate(plotCh):
if label in plotCh:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][label][1],
"colour": "gray",
2024-10-23 00:39:11 +10:00
"args":{"zorder":2, "linestyle":"-.", "alpha":0.5}
2024-10-22 23:18:40 +10:00
})
if doShockLabels:
graphData["plots"][-1]["label"] = f"{label} - Ref Shot {line_sty + 1} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s"
lims.append(gData["shock-point"][label][1]) # [TODO this but better]
if addShockInfo:
print("============================== Reference Shots ==============================")
shock_speeds = {}
for shot_id, shot in enumerate(data):
shot_id += 1
gData = data[shot]
for shock_speed_loc in gData['shock-speed']:
shk_sps = shock_speeds.get(shock_speed_loc, [])
shk_sps.append((gData['shock-speed'][shock_speed_loc][0], gData['shock-speed'][shock_speed_loc][1]))
shock_speeds[shock_speed_loc] = shk_sps
probeText = ""
for shock_speed_loc in shock_speeds:
shock_info = np.array(shock_speeds[shock_speed_loc])
speeds = shock_info[:, 0]
uncerts = shock_info[:, 1]
speed = speeds.mean()
uncert = np.sqrt(np.pow(uncerts, 2).sum())
print(f"{shock_speed_loc} Measured a mean shock speed of {speed:.2f} +/- {uncert:.2f} m/s ({speed/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/speed * 100 :.2f}%])")
probeText += f"\n{shock_speed_loc} - {speed/1000:.2f} $\\pm${uncert/1000:.2f} [{uncert/speed*100:.2f}%] km/s"
graphData["plots"].append({
"type": "text",
2024-10-23 00:39:11 +10:00
"text": f"Average Measured Shock Speeds{probeText}",
2024-10-22 23:18:40 +10:00
"align": ("top", "right"),
"alpha": 0.8,
"x": 0.9,
"y": 0.9
})
if len(lims) > 1:
OFFSET = 10 #if not forcePlots else 50
graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/ref-combo-{'_'.join(plotCh)}.png")
2024-10-23 00:39:11 +10:00
def genComboDataGraph(data: dict, showPlot: bool = False, doShockLabels:bool = False, addShockInfo:bool = False):
shots = ""
names = ""
for i, shot in enumerate(data):
shots += ('' if i == 0 else ', ') + f"{data[shot]['info']['name'][-1]}"
names += ('' if i == 0 else ', ') + shot
graphData = {
"title": f"Shock Response Time\nFor Shots {shots} ({names})\nVarious Probe Locations - Mars Entry Conditions",
"xLabel": "Time ($\\mu$s)",
"yLabel": "Voltage Reading (V)",
"grid": True,
"figSize": (16, 6.8), #(8,6.5),
"ledgLoc": (0.6, 0.075) if doShockLabels else 'upper left',
"yLim": (-1.5, 11),
"plots": []
}
LINESTYLES = (
'solid',
'dotted',
'dashed',
'dashdot'
)
COLOURS = (
#UQC["purple"],
#UQC["blue"],
UQC["green"],
UQC["red"],
UQC["light_purple"],
UQC["dark_grey"],
UQC["orange"],
UQC["yellow"],
UQC["aqua"],
UQC["gold"],
UQC["neutral"]
)
lims = []
for line_sty, shot in enumerate(data):
gData = data[shot]
for label, d in [("1 [V]", "Gauge 1"),("2 [V]", "Gauge 2")]:
graphData["plots"].append({
"x": gData["time"]["scope"],
"y": gData["data"]["scope"][label],
#"label": d,
"colour": UQC["purple"] if label[0] == "1" else UQC["blue"],
"args":{"zorder":1, "linestyle": LINESTYLES[line_sty % len(LINESTYLES)]}
})
if line_sty == 0:
graphData["plots"][-2]["label"] = "Gauge 1"
graphData["plots"][-1]["label"] = "Gauge 2"
for _, probe in enumerate(gData["info"]["probe-info"]["locations"]):
if f"{probe}-g1" in gData["shock-point"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][f"{probe}-g1"][1],
"colour": UQC["purple"].lighten(0.5),
"args":{"zorder":2, "linestyle":"--", "alpha":0.5}
})
lims.append(gData["shock-point"][f"{probe}-g1"][1])
if doShockLabels:
graphData["plots"][-1]["label"] = f"{probe}-Gauge 1 - {shot} - Shock Point {gData['shock-point'][f'{probe}-g1'][1]:.2f}$\\mu$s"
if f"{probe}-g2" in gData["shock-point"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][f"{probe}-g2"][1],
"colour": UQC["purple"].lighten(0.5),
"args":{"zorder":2, "linestyle":":", "alpha":0.5}
})
lims.append(gData["shock-point"][f"{probe}-g2"][1])
if doShockLabels:
graphData["plots"][-1]["label"] = f"{probe}-Gauge 2 - {shot} - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s"
if False:
for line_sty, shot in enumerate(data):
gData = data[shot]
plotCh = gData["info"]["pcb-refs"]
for col, label in enumerate(plotCh):
if label in plotCh:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][label][1],
"colour": "gray",
"args":{"zorder":2, "linestyle":"--", "alpha":0.5}
})
if doShockLabels:
graphData["plots"][-1]["label"] = f"{label} - Ref Shot {line_sty + 1} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s"
lims.append(gData["shock-point"][label][1]) # [TODO this but better]
if addShockInfo:
print("============================== Reference Shots ==============================")
shock_speeds = {}
for shot_id, shot in enumerate(data):
shot_id += 1
gData = data[shot]
for shock_speed_loc in gData['shock-speed']:
shk_sps = shock_speeds.get(shock_speed_loc, [])
shk_sps.append((gData['shock-speed'][shock_speed_loc][0], gData['shock-speed'][shock_speed_loc][1]))
shock_speeds[shock_speed_loc] = shk_sps
probeText = ""
for shock_speed_loc in shock_speeds:
shock_info = np.array(shock_speeds[shock_speed_loc])
speeds = shock_info[:, 0]
uncerts = shock_info[:, 1]
speed = speeds.mean()
uncert = np.sqrt(np.pow(uncerts, 2).sum())
print(f"{shock_speed_loc} Measured a mean shock speed of {speed:.2f} +/- {uncert:.2f} m/s ({speed/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/speed * 100 :.2f}%])")
probeText += f"\n{shock_speed_loc} - {speed/1000:.2f} $\\pm${uncert/1000:.2f} [{uncert/speed*100:.2f}%] km/s"
graphData["plots"].append({
"type": "text",
"text": f"Average Measured Shock Speeds{probeText}",
"align": ("top", "right"),
"alpha": 0.8,
"x": 0.9,
"y": 0.9
})
if len(lims) > 1:
OFFSET = 10 #if not forcePlots else 50
graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/combo-my-data.png")
2024-10-22 23:18:40 +10:00
2024-10-23 01:51:25 +10:00
def genX2CompGraphs(gData: dict, showPlot: bool = True):
graphData = {
"title": f"Signal Comparison for {gData['info']['name']} ({gData['info']['shot-info']['name']})",
"xLabel": "Time ($\\mu$s)",
"yLabel": "Voltage Reading (V)",
"grid": True,
"figSize": (16, 6.8), #(9, 6.8), #(8,6.5),
"ledgLoc": 'upper left',
"yLim": (-1.5, 11),
"plots": []
}
lims = []
for label, d in [("1 [V]", "Gauge 1 - Scope"),("2 [V]", "Gauge 2 - Scope")]: #, ("4 [V]", "Gauge Trigger")]:
graphData["plots"].append({
"x": gData["time"]["scope"],
"y": gData["data"]["scope"][label],
"label": d,
"args":{"zorder":1}
})
for label, d in [("st1", "Gauge 1 - X2 DAQ"),("st3", "Gauge 2 - X2 DAQ")]: #, ("4 [V]", "Gauge Trigger")]:
graphData["plots"].append({
"x": gData["time"]["x2"],
"y": gData["data"]["x2"][label],
"label": d,
"args":{"zorder":1}
})
for _, probe in enumerate(gData["info"]["probe-info"]["locations"]):
if f"{probe}-g1" in gData["shock-point"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][f"{probe}-g1"][1],#[i],
"label": f"{probe}-Gauge 1 - Scope - Shock Point {gData['shock-point'][f'{probe}-g1'][1]:.2f}$\\mu$s",
"colour": UQC["purple"].lighten(0.5),
"args":{"zorder":2, "linestyle":"--", "alpha":0.5}
})
lims.append(gData["shock-point"][f"{probe}-g1"][1])
if f"{probe}-g2" in gData["shock-point"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"][f"{probe}-g2"][1],#[i],
"label": f"{probe}-Gauge 2 - Scope - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s",
"colour": UQC["purple"].lighten(0.5),
"args":{"zorder":2, "linestyle":":", "alpha":0.5}
})
lims.append(gData["shock-point"][f"{probe}-g2"][1])
for _, probe in enumerate(gData["info"]["probe-info"]["locations"]):
if f"{probe}-g1" in gData["shock-point"]["x2"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"]["x2"][f"{probe}-g1"][1],#[i],
"label": f"{probe}-Gauge 1 - X2 DAQ - Shock Point {gData['shock-point'][f'{probe}-g1'][1]:.2f}$\\mu$s",
"colour": UQC["dark_grey"],
"args":{"zorder":2, "linestyle":"--", "alpha":0.5}
})
lims.append(gData["shock-point"]["x2"][f"{probe}-g1"][1])
if f"{probe}-g2" in gData["shock-point"]["x2"]:
graphData["plots"].append({
"type": "axvLine",
"x": gData["shock-point"]["x2"][f"{probe}-g2"][1],#[i],
"label": f"{probe}-Gauge 2 - X2 DAQ - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s",
"colour": UQC["dark_grey"],
"args":{"zorder":2, "linestyle":":", "alpha":0.5}
})
lims.append(gData["shock-point"]["x2"][f"{probe}-g2"][1])
if True and len(lims) > 1:
OFFSET = 10 #if not forcePlots else 50
graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/signal_comp-{gData['info']['shot-info']['name']}.png")
2024-10-23 03:59:15 +10:00
print("-")
scopeTime = gData["time"]["scope"]
scopeData = gData["data"]["scope"]
x2Time = gData["time"]["x2"]
#x2Data = gData["data"]["x2"]
pass
2024-10-23 01:51:25 +10:00
2024-10-22 23:18:40 +10:00
print("Loading Data")
# My Shot Data
data = {}
for dp in data_to_load:
pdp = f"{DATA_PATH}/{dp}/"
load_data(pdp, data)
loaded_data = tuple(data.keys())
# Reference Data from Mragank
2024-10-23 08:31:51 +10:00
ref_data = {}
for refShot in ref_data_to_load:
load_ref_data(refShot, f"./data/referance/{refShot}/{refShot}.tdms", ref_data)
2024-10-22 23:18:40 +10:00
print("Loaded Data")
print("Graphing Data")
# General Shot Graphing
2024-10-23 08:31:51 +10:00
for shot in loaded_data:
#print(data[shot]['info']['long_name'].rsplit("\n", 1)[-1])
genGraph(data[shot], showPlot=False, addShockInfo=False)
genGraph(data[shot], showPlot=False, forcePlots=True)
combo_data = data.copy()
combo_data.pop(loaded_data[-2])
combo_data.pop(loaded_data[-1])
genComboDataGraph(combo_data, doShockLabels=True)
2024-10-23 00:39:11 +10:00
2024-10-23 01:51:25 +10:00
genX2CompGraphs(data["x2s5831"], showPlot=False)
genX2CompGraphs(data["x2s5832"], showPlot=False)
2024-10-22 23:18:40 +10:00
# Reference Data
2024-10-23 08:31:51 +10:00
for shot in ref_data:
genRefGraph(ref_data[shot], showPlot=False, addShockInfo=False)
genRefGraph(ref_data[shot], showPlot=False, forcePlots=True)
genComboRefGraph(ref_data, doShockLabels=True)
genComboRefGraph(ref_data, ref_data[ref_data_to_load[0]]["info"]["pcb-refs"], addShockInfo=True)
2024-10-22 21:37:28 +10:00
2024-10-16 22:09:24 +10:00
# This forces matplotlib to hang until I tell it to close all windows
2024-10-23 03:59:15 +10:00
#pltKeyClose()
2024-10-22 20:32:33 +10:00
print("Done")