# Cal Wing (c.wing@uq.net.au) - Oct 2024 # Thesis Graphing import os import numpy as np import pandas as pd import yaml from nptdms import TdmsFile from makeGraph import makeGraph, pltKeyClose, UQ_COLOURS as UQC from canny_shock_finder import canny_shock_finder # Folder correction # Make sure the relevant folders folder exists folders = ["./images"] for folder in folders: if not os.path.isdir(folder): os.mkdir(folder) # Data Paths DATA_PATH = "./data" DATA_INFO = "_info.yaml" TUNNEL_INFO_FILE = "./tunnel-info.yaml" SAMPLES_TO_AVG = 500 CANNY_TIME_OFFSET = 50 #us with open(TUNNEL_INFO_FILE, 'r') as file: TUNNEL_INFO = yaml.safe_load(file) # ==== Uncerts ==== # Taken from DOI: 10.1007/s00193-017-0763-3 (Implementation of a state-to-state analytical framework for the calculation of expansion tube flow properties) UNCERTS = TUNNEL_INFO["uncertainties"] def deltaX(delta_x_1: float, delta_x_2: float): return np.sqrt(np.pow(delta_x_1, 2) + np.pow(delta_x_2, 2)) def deltaT(delta_t_1: float, delta_t_2: float, delta_t_sr: float): return np.sqrt(np.pow(delta_t_1, 2) + np.pow(delta_t_2, 2) + np.pow(delta_t_sr, 2)) def deltaVs(V: float, dx: float, dt: float, delta_x: tuple[float, float], delta_t: tuple[float, float, float]): return V * np.sqrt(np.pow(deltaX(*delta_x) / dx, 2) + np.pow(deltaT(*delta_t) / dt, 2)) # ==== Data Loading & Processing ==== def load_data(data_path: str, data={}) -> dict: data_info_path = data_path + DATA_INFO if not os.path.exists(data_info_path): print(f"[ERR] Could not find data info file: '{data_info_path}'") print(f"[WARN] Not Loading Data '{data_path}'") return None # Load Shot Data Info YAML File (Cal) with open(data_info_path, 'r') as file: dataInfo = yaml.safe_load(file) # Grab the shot name x2_shot = dataInfo["shot-info"]["name"] # Update shot-info values to use the name dataInfo["shot-info"]["tdms"] = dataInfo["shot-info"]["tdms"].format(x2_shot) dataInfo["shot-info"]["config"] = dataInfo["shot-info"]["config"].format(x2_shot) dataInfo["shot-info"]["info"] = dataInfo["shot-info"]["info"].format(x2_shot) # Load Raw Data # TDMS File (X2 DAQ Data) x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True) x2_channels = x2_tdms_data.groups()[0].channels() x2_channel_names = tuple(c.name for c in x2_channels) data_locs = [dr["type"] for dr in dataInfo["probe-info"]["data-records"]] # Scope info _if it exists_ if "scope" in data_locs: scope_data_info = dataInfo["probe-info"]["data-records"][data_locs.index("scope")] scope_data_path = data_path + scope_data_info["data"] scope_config_path = data_path + scope_data_info["config"] # [TODO] Read this file # Generate Data Headers - This could be better with open(scope_data_path, 'r') as dfile: scope_header = [] header_lines = [] for i, line in enumerate(dfile): if i > 1: break header_lines.append(line.strip().split(",")) for i, name in enumerate(header_lines[0]): if name == "x-axis": name = "Time" if header_lines[1][i] in ["second", "Volt"]: outStr = f"{name} [{header_lines[1][i][0]}]" else: outStr = f"{name} [{header_lines[1][i]}]" scope_header.append(outStr) # Load the Scope CSV Data scope_data = np.loadtxt(scope_data_path, delimiter=',', skiprows=2) # Build a data object (this could be cached - or partially cached if I was clever enough) # Raw Data is always added - processing comes after data[x2_shot] = { "info": dataInfo, "shot_time": np.datetime64(f"{dataInfo['date']}T{dataInfo['time']}"), "raw-data":{ "probe_headers": scope_header, "probes": scope_data, "x2": x2_channels, "x2-channels": x2_channel_names, "x2-tdms": x2_tdms_data }, "time": { "x2": None, "probes": None, # This may be x2 but may not - ie a scope was used "trigger_index": None, "probe_uncert": None, #s "x2-dt": None, # }, "data": { "x2": {}, # Only pop channels with a voltage scale in ./tunnel-info.yaml "probes": [[None], [None]] # Save probe data in volts - [G1, G2] }, "shock-speed": {} # Note all in us } # === Process the data === # Generate X2 time arrays time_data = x2_channels[0] ns_time = time_data[:].as_datetime64('ns') x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns] x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us x2_time_dt = np.diff(x2_time_us).mean() / 1000 # Scale to s #second_fractions = np.array(time_data[:].second_fractions, dtype=int) # 2^-64 ths of a second #x2_time_seconds = (second_fractions - second_fractions[0]) / (2**(-64)) # 0 time data and convert to seconds #x2_time_us = x2_time_seconds * 1000 # Scale to us # --- Un Scale Data --- for channel, vScale in TUNNEL_INFO["volt-scale"].items(): # Get the channel index from its name chIndex = x2_channel_names.index(channel) # Calculate the average noise offset avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean() # Save the channel data data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale # Process Trigger Info trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset x2_trigger_index = np.where(trigger_volts > 1)[0][0] x2_trigger_time = x2_time_us[x2_trigger_index] # Add the time data data[x2_shot]["time"] = { "x2": x2_time_us, "trigger_index": x2_trigger_index, "probes": x2_time_us, # Until otherwise overridden - probe time is x2 time "x2-dt": x2_time_dt } data[x2_shot]["time"]["x2_uncert"] = UNCERTS["time"]["x2-daq"] #max(UNCERTS["time"]["x2-daq"], data[x2_shot]["time"]["x2-dt"]) # Setup custom scaling on the gauge values if "x2" in data_locs: for ch in dataInfo["probe-info"]["data-records"][data_locs.index("x2")]["channels"]: if ch in dataInfo["probe-info"]["data-records"][data_locs.index("x2")]["scaler"]: # Get the channel index from its name chIndex = x2_channel_names.index(ch) # Calculate the average noise offset avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean() # Save the channel data data[x2_shot]["data"]["x2"][ch] = (x2_channels[chIndex][:] - avg_noise) * dataInfo["probe-info"]["data-records"][data_locs.index("x2")]["scaler"][ch] #[TODO] This could be better #if "x2" in data_locs: # data[x2_shot]["data"]["probes"] = [data[x2_shot]["data"]["x2"]["st1"][:], data[x2_shot]["data"]["x2"]["st3"][:]] # Scope timing _if it exists_ if "scope" in data_locs: scope_data_info = dataInfo["probe-info"]["data-records"][data_locs.index("scope")] trigger_info = scope_data_info["trigger"] # Get the scope trigger info # Calc the scope time & apply any manual offsets scope_time = (scope_data[:, 0] - scope_data[0, 0]) * 1e6 # to us scope_time -= trigger_info["alignment-offset"] # manual offset delay # Trigger Alignment scope_trigger_volts = (scope_data[:, 3] - scope_data[0:SAMPLES_TO_AVG, 3].mean()) # Use a mean here too scope_trigger_index = np.where(scope_trigger_volts > 1)[0][0] scope_trigger_time = scope_time[scope_trigger_index] scope_alignment = x2_trigger_time - scope_trigger_time scope_time += scope_alignment # Offset any trigger delays scope_time += trigger_info["delay"] # us delay from the actual trigger signal to the scope received trigger data[x2_shot]["time"]["scope"] = scope_time data[x2_shot]["time"]["scope-offset"] = scope_alignment data[x2_shot]["time"]["scope-dt"] = np.diff(scope_time).mean() / 1e6 data[x2_shot]["data"]["scope"] = {} for i, header in enumerate(scope_header): if i == 0: continue # Don't record time # Python reference so its the same object ref = scope_data[:, i] data[x2_shot]["data"]["scope"][i] = ref data[x2_shot]["data"]["scope"][header] = ref # Save Probe Data if "scope" in data_locs: data[x2_shot]["data"]["probes"] = [data[x2_shot]["data"]["scope"][1], data[x2_shot]["data"]["scope"][2]] data[x2_shot]["time"]["probes"] = data[x2_shot]["time"]["scope"] data[x2_shot]["time"]["probe_uncert"] = max(scope_data_info["time-uncert"], data[x2_shot]["time"]["scope-dt"]) # Find Shock Times # X2 - Canning Edge data[x2_shot]["shock-point"] = {} cArgs = dataInfo["pcb-canny"] for i, ref in enumerate(dataInfo["pcb-refs"]): chData = data[x2_shot]["data"]["x2"][ref] if i in range(len(cArgs)): sigma = cArgs[i]["sigma"] post_sup_thresh = cArgs[i]["post_pres"] else: sigma = cArgs[-1]["sigma"] post_sup_thresh = cArgs[-1]["post_pres"] first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, chData, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=False, print_func=None) shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1 data[x2_shot]["shock-point"][ref] = shock_point, first_value, first_value_uncertainty if "x2" in data_locs: cArgs = dataInfo["x2-canny"] data[x2_shot]["shock-point"]["x2"] = {} probeCh1 = data[x2_shot]["data"]["x2"]["st1"] probeCh2 = data[x2_shot]["data"]["x2"]["st3"] for i, probe in enumerate(dataInfo["probe-info"]["locations"]): # Get the canny-args cArgs = dataInfo["x2-canny"] doCannyPlot = False if i in range(len(cArgs)): sigma = cArgs[i]["sigma"] post_sup_thresh = cArgs[i]["post_pres"] else: sigma = cArgs[-1]["sigma"] post_sup_thresh = cArgs[-1]["post_pres"] # If this _isn't_ the first probe then apply a time offset if i > 0: privPoint = dataInfo["probe-info"]["locations"][i-1] time_offset = data[x2_shot]["shock-point"]["x2"][f"{privPoint}-g1"][1] + CANNY_TIME_OFFSET else: time_offset = None # Find G1 Shock Time if 1 in dataInfo["probe-info"]["gauges"]: first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, probeCh1, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None) if first_value is None: print(f"[ERROR] {x2_shot} - {probe}-g1 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}") #raise ValueError(f"{probe}-g1 not detected" else: shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1 data[x2_shot]["shock-point"]["x2"][f"{probe}-g1"] = shock_point, first_value, first_value_uncertainty if 2 in dataInfo["probe-info"]["gauges"]: # Do the same for G2 if i > 0: time_offset = data[x2_shot]["shock-point"]["x2"][f"{privPoint}-g2"][1] + CANNY_TIME_OFFSET # Find G2 Shock Time first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, probeCh2, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None) if first_value is None: print(f"[ERROR] {x2_shot} - {probe}-g2 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}") #raise ValueError(f"{probe}-g2 not detected") else: shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1 data[x2_shot]["shock-point"]["x2"][f"{probe}-g2"] = shock_point, first_value, first_value_uncertainty # ---- Gauge Canning Edge ---- probeCh1 = data[x2_shot]["data"]["probes"][0] probeCh2 = data[x2_shot]["data"]["probes"][1] for i, probe in enumerate(dataInfo["probe-info"]["locations"]): # Get the canny-args cArgs = dataInfo["canny-args"] doCannyPlot = False # x2_shot == "x2s5829" and probe == "st2" # This condition was used to generate some graphs plotValues = { "plot_title": f"Canny Shock Finding Result for {x2_shot} - ST2 Gauge 1", "plot_time_unit": "$\\mu$s", "y_label": "Voltage Reading (V)", } if i in range(len(cArgs)): sigma = cArgs[i]["sigma"] post_sup_thresh = cArgs[i]["post_pres"] else: sigma = cArgs[-1]["sigma"] post_sup_thresh = cArgs[-1]["post_pres"] # If this _isn't_ the first probe then apply a time offset if i > 0: privPoint = dataInfo["probe-info"]["locations"][i-1] time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g1"][1] + CANNY_TIME_OFFSET else: time_offset = None # Find G1 Shock Time if 1 in dataInfo["probe-info"]["gauges"]: first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh1, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None, **plotValues) if first_value is None: print(f"[ERROR] {x2_shot} - {probe}-g1 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}") raise ValueError(f"{probe}-g1 not detected") shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1 data[x2_shot]["shock-point"][f"{probe}-g1"] = shock_point, first_value, first_value_uncertainty if 2 in dataInfo["probe-info"]["gauges"]: # Do the same for G2 if i > 0: time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g2"][1] + CANNY_TIME_OFFSET # Find G2 Shock Time first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh2, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None) if first_value is None: print(f"[ERROR] {x2_shot} - {probe}-g2 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}") raise ValueError(f"{probe}-g2 not detected") shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1 data[x2_shot]["shock-point"][f"{probe}-g2"] = shock_point, first_value, first_value_uncertainty # Calculate Shock Speeds print("="*30, x2_shot, "="*30) print("--", dataInfo["long_name"], "--") for i, refProbe in enumerate(dataInfo["pcb-refs"]): if i == 0: continue p1_time = data[x2_shot]["shock-point"][refProbe][1] / 1e6 # Convert to seconds p2_time = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][1] / 1e6 # Convert to seconds p2p_dist = abs(TUNNEL_INFO["distance"][refProbe] - TUNNEL_INFO["distance"][dataInfo["pcb-refs"][i-1]]) / 1000 # convert to m p2p_time = abs(p2_time - p1_time) probe_velocity = p2p_dist / p2p_time # m/s p1_time_uncert = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][2] / 1e6 # Convert to seconds p2_time_uncert = data[x2_shot]["shock-point"][refProbe][2] / 1e6 # Convert to seconds uncert = deltaVs(probe_velocity, p2p_dist, p2p_time, (UNCERTS["probe-dist"][refProbe], UNCERTS["probe-dist"][dataInfo["pcb-refs"][i-1]]), (p1_time_uncert, p2_time_uncert, data[x2_shot]["time"]["x2_uncert"])) print(f"{dataInfo['pcb-refs'][i-1]}-{refProbe} Measured a shock speed of {probe_velocity:.2f} +/- {uncert:.2f} m/s ({probe_velocity/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/probe_velocity * 100 :.2f}%])") data[x2_shot]["shock-speed"][f"{dataInfo['pcb-refs'][i-1]}-{refProbe}"] = probe_velocity, uncert, True # Speed, Ref print() for probe in dataInfo["probe-info"]["locations"]: if f"{probe}-g1" in data[x2_shot]["shock-point"] and f"{probe}-g2" in data[x2_shot]["shock-point"]: g1_time = data[x2_shot]["shock-point"][f"{probe}-g1"][1] / 1e6 # Convert to seconds g2_time = data[x2_shot]["shock-point"][f"{probe}-g2"][1] / 1e6 # Convert to seconds c2c_dist = dataInfo["probe-info"]["c2c"] / 1000 # convert to m c2c_time = abs(g2_time - g1_time) probe_velocity = c2c_dist / c2c_time # m/s g1_time_uncert = data[x2_shot]["shock-point"][f"{probe}-g1"][2] / 1e6 # Convert to seconds g2_time_uncert = data[x2_shot]["shock-point"][f"{probe}-g2"][2] / 1e6 # Convert to seconds uncert = deltaVs(probe_velocity, p2p_dist, p2p_time, (0.05/1000, 0.05/1000), (g1_time_uncert, g2_time_uncert, data[x2_shot]["time"]["probe_uncert"])) print(f"{probe} Measured a shock speed of {probe_velocity:.2f} +/- {uncert:.2f} m/s ({probe_velocity/1000:.2f} +/- {uncert/1000:.2f} km/s)") data[x2_shot]["shock-speed"][probe] = probe_velocity, uncert, False # Speed, Ref # m/s else: print(f"Unable to calculate probe velocity, only have one gauge: {f'{probe}-g2' if f'{probe}-g2' in data[x2_shot]['shock-point'] else f'{probe}-g1'}") if len(dataInfo["probe-info"]["locations"]) > 1: for i in range(len(dataInfo["probe-info"]["locations"]) - 1): probe_locs = dataInfo["probe-info"]["locations"] p2p_dist = (TUNNEL_INFO["distance"][probe_locs[1]] - TUNNEL_INFO["distance"][probe_locs[0]]) / 1000 # convert to m if f"{probe_locs[i]}-g1" in data[x2_shot]["shock-point"] and f"{probe_locs[i+1]}-g1" in data[x2_shot]["shock-point"]: p1_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g1"][1] / 1e6 # Convert to seconds p2_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g1"][1] / 1e6 # Convert to seconds p2p_time = abs(p2_g1_time - p1_g1_time) p2p_1 = p2p_dist / p2p_time # m/s p1_time_uncert = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g1"][2] / 1e6 # Convert to seconds p2_time_uncert = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g1"][2] / 1e6 # Convert to seconds uncert = deltaVs(p2p_1, p2p_dist, p2p_time, (UNCERTS["probe-dist"][probe_locs[i]], UNCERTS["probe-dist"][probe_locs[i+1]]), (p1_time_uncert, p2_time_uncert, data[x2_shot]["time"]["probe_uncert"])) print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G1 - Measured a shock speed of {p2p_1:.2f} +/- {uncert:.2f} m/s ({p2p_1/1000:.2f} +/- {uncert/1000:.2f} [{uncert/p2p_1 * 100:.2f}%] km/s)") data[x2_shot]["shock-speed"][f"{probe_locs[i]}-{probe_locs[i + 1]}-g1"] = p2p_1, uncert, False # Speed, Ref if f"{probe_locs[i]}-g2" in data[x2_shot]["shock-point"] and f"{probe_locs[i+1]}-g2" in data[x2_shot]["shock-point"]: p1_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g2"][1] / 1e6 # Convert to seconds p2_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g2"][1] / 1e6 # Convert to seconds p2p_time = abs(p2_g2_time - p1_g2_time) p2p_2 = p2p_dist / p2p_time # m/s p1_time_uncert = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g2"][2] / 1e6 # Convert to seconds p2_time_uncert = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g2"][2] / 1e6 # Convert to seconds uncert = deltaVs(p2p_2, p2p_dist, p2p_time, (UNCERTS["probe-dist"][probe_locs[i]], UNCERTS["probe-dist"][probe_locs[i+1]]), (p1_time_uncert, p2_time_uncert, data[x2_shot]["time"]["probe_uncert"])) print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G2 - Measured a shock speed of {p2p_2:.2f} +/- {uncert:.2f} m/s ({p2p_2/1000:.2f} +/- {uncert/1000:.2f} [{uncert/p2p_2 * 100:.2f}%] km/s)") data[x2_shot]["shock-speed"][f"{probe_locs[i]}-{probe_locs[i + 1]}-g2"] = p2p_2, uncert, False # Speed, Ref print() # Return the data & the successfully loaded data keys return data #, tuple(data.keys()) def load_ref_data(x2_shot: str, data_path: str, data={}) -> dict: # Load Raw Data # TDMS File (X2 DAQ Data) x2_tdms_data = TdmsFile.read(data_path, raw_timestamps=True) x2_channels = x2_tdms_data.groups()[0].channels() x2_channel_names = tuple(c.name for c in x2_channels) data[x2_shot] = { "name": x2_shot, "info": { "name": x2_shot, "pcb-canny": [ { "sigma": 4, "post_pres": 0.05 } ], "pcb-refs": [ "st1", "st2", "st3", "at1", "at2", "at3", "at4", "at5", "at6", ], "no-graph": [ "at1", "at2", "at3", "at4", "at5", "at6", ] }, "x2": x2_channels, "x2-channels": x2_channel_names, "x2-tdms": x2_tdms_data, "data": { "x2": {}, # Only pop channels with a voltage scale in ./tunnel-info.yaml }, "time": { "x2": None, "trigger_index": None, }, "shock-speed": {} # Note all in us } # === Process the data === # Generate X2 time arrays time_data = x2_channels[0] ns_time = time_data[:].as_datetime64('ns') x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns] x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us x2_time_dt = np.diff(x2_time_us).mean() / 1000 # Scale to s # --- Un Scale Data --- for channel, vScale in TUNNEL_INFO["volt-scale"].items(): # Get the channel index from its name chIndex = x2_channel_names.index(channel) # Calculate the average noise offset avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean() # Save the channel data data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale # Process Trigger Info trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset x2_trigger_index = np.where(trigger_volts > 1)[0][0] x2_trigger_time = x2_time_us[x2_trigger_index] # Add the time data data[x2_shot]["time"] = { "x2": x2_time_us, "trigger_index": x2_trigger_index, "x2-dt": x2_time_dt } data[x2_shot]["time"]["x2_uncert"] = UNCERTS["time"]["x2-daq"] #max(UNCERTS["time"]["x2-daq"], data[x2_shot]["time"]["x2-dt"]) # Find Shock Times # X2 - Canning Edge # Default Values dataInfo = data[x2_shot]["info"] data[x2_shot]["shock-point"] = {} cArgs = dataInfo["pcb-canny"] for i, ref in enumerate(dataInfo["pcb-refs"]): refData = data[x2_shot]["data"]["x2"][ref] if i in range(len(cArgs)): sigma = cArgs[i]["sigma"] post_sup_thresh = cArgs[i]["post_pres"] else: sigma = cArgs[-1]["sigma"] post_sup_thresh = cArgs[-1]["post_pres"] first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=False, print_func=None) shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1 data[x2_shot]["shock-point"][ref] = shock_point, first_value, first_value_uncertainty # Calculate Shock Speeds print("="*30, x2_shot, "="*30) print(f"-- Reference Shot {int(x2_shot[-1]) + 1} --") for i, refProbe in enumerate(dataInfo["pcb-refs"]): if i == 0: continue p1_time = data[x2_shot]["shock-point"][refProbe][1] / 1e6 # Convert to seconds p2_time = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][1] / 1e6 # Convert to seconds p2p_dist = abs(TUNNEL_INFO["distance"][refProbe] - TUNNEL_INFO["distance"][dataInfo["pcb-refs"][i-1]]) / 1000 # convert to m p2p_time = abs(p2_time - p1_time) probe_velocity = p2p_dist / p2p_time # m/s p1_time_uncert = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][2] / 1e6 # Convert to seconds p2_time_uncert = data[x2_shot]["shock-point"][refProbe][2] / 1e6 # Convert to seconds uncert = deltaVs(probe_velocity, p2p_dist, p2p_time, (UNCERTS["probe-dist"][refProbe], UNCERTS["probe-dist"][dataInfo["pcb-refs"][i-1]]), (p1_time_uncert, p2_time_uncert, data[x2_shot]["time"]["x2_uncert"])) print(f"{dataInfo['pcb-refs'][i-1]}-{refProbe} Measured a shock speed of {probe_velocity:.2f} +/- {uncert:.2f} m/s ({probe_velocity/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/probe_velocity * 100 :.2f}%])") data[x2_shot]["shock-speed"][f"{dataInfo['pcb-refs'][i-1]}-{refProbe}"] = probe_velocity, uncert, True # Speed, Ref print() return data # ======= Graphing ======== def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlots: bool = False, addShockInfo: bool = True): graphData = { "title": f"Shock Response Time\nFor {gData['info']['long_name']}", "xLabel": "Time ($\\mu$s)", "yLabel": "Voltage Reading (V)", "grid": True, "figSize": (9, 6.8), #(8,6.5), "ledgLoc": 'upper left', "yLim": (-1.5, 11 if addShockInfo else 4), "plots": [] } #if forcePlots or not doLimits: graphData["title"] += "\n" #if forcePlots: graphData["title"] += "(All Data Shown)" #if not doLimits: graphData["title"] += () + "Full Re" lims = [] for label, d in [("1 [V]", "Gauge 1"),("2 [V]", "Gauge 2")]: #, ("4 [V]", "Gauge Trigger")]: graphData["plots"].append({ "x": gData["time"]["scope"], "y": gData["data"]["scope"][label], "label": d, "args":{"zorder":1} }) for _, probe in enumerate(gData["info"]["probe-info"]["locations"]): if f"{probe}-g1" in gData["shock-point"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][f"{probe}-g1"][1],#[i], "label": f"{probe}-Gauge 1 - Shock Point {gData['shock-point'][f'{probe}-g1'][1]:.2f}$\\mu$s", "colour": UQC["purple"].lighten(0.5), "args":{"zorder":2, "linestyle":"--", "alpha":0.5} }) lims.append(gData["shock-point"][f"{probe}-g1"][1]) if f"{probe}-g2" in gData["shock-point"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][f"{probe}-g2"][1],#[i], "label": f"{probe}-Gauge 2 - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s", "colour": UQC["purple"].lighten(0.5), "args":{"zorder":2, "linestyle":":", "alpha":0.5} }) lims.append(gData["shock-point"][f"{probe}-g2"][1]) for label in gData["info"]["pcb-refs"]: # + ["trigbox"]: if not forcePlots and label in gData["info"]["no-graph"]: continue graphData["plots"].append({ "x": gData["time"]["x2"], "y": gData["data"]["x2"][label], "label": label }) if label in gData["info"]["pcb-refs"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][label][1], "label": f"{label} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s", "colour": "gray", "args":{"zorder":2, "linestyle":"-.", "alpha":0.5} }) lims.append(gData["shock-point"][label][1]) # [TODO this but better] if addShockInfo: probeText = "" flag = False avg_speed_lst = [] avg_uncert_lst = [] for shock_speed_loc in gData["shock-speed"]: avg_speed_lst.append(gData['shock-speed'][shock_speed_loc][0]) avg_uncert_lst.append(gData['shock-speed'][shock_speed_loc][1]) if not flag and not gData["shock-speed"][shock_speed_loc][2]: flag = True avg_speed = np.array(avg_speed_lst).mean() avg_uncert = np.array(avg_uncert_lst).mean() # np.sqrt(np.sum(np.pow(np.array(avg_uncert_lst), 2))) probeText += f"\nAverage Speed - {avg_speed/1000:.2f} $\\pm${avg_uncert/1000:.2f} [{avg_uncert/avg_speed * 100:.2f}%] km/s" probeText += "\n" + "-"*50 probeText += "\n" #probeText += "\\definecolor{my_gray}{rbg}{0.6, 0.5803921568627451, 0.5647058823529412}\\textcolor{my_gray}{" if gData["shock-speed"][shock_speed_loc][1] else "" probeText += f"{shock_speed_loc} - {gData['shock-speed'][shock_speed_loc][0]/1000:.2f} $\\pm${gData['shock-speed'][shock_speed_loc][1]/1000:.2f} [{gData['shock-speed'][shock_speed_loc][1]/gData['shock-speed'][shock_speed_loc][0]*100:.2f}%] km/s" #probeText += "}" if gData["shock-speed"][shock_speed_loc][1] else "" graphData["plots"].append({ "type": "text", "text": f"Measured Shock Speeds {probeText}", "align": ("top", "right"), "alpha": 0.8, "x": 0.94, #if len(gData["info"]["probe-info"]["locations"]) < 3 else 0.885, "y": 0.94 }) if doLimits and len(lims) > 1: OFFSET = 10 #if not forcePlots else 50 graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET)) makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/{gData['info']['shot-info']['name']}{'-all' if forcePlots else ''}{'-clipped' if doLimits else ''}.png") #figSavePath=f"./images/{{0}}{"-noLims" if not doLimits else ""}.png") def genRefGraph(gData: dict, showPlot: bool = True, addShockInfo: bool = True, forcePlots: bool = False): graphData = { "title": f"Shock Response Time\nFor Reference Shot {int(gData['name'][-1]) + 1} ({gData['name']})", "xLabel": "Time ($\\mu$s)", "yLabel": "Voltage Reading (V)", "grid": True, "figSize": (9, 6.8), #(8,6.5), "ledgLoc": 'upper left', "yLim": (-1.5, 11), "plots": [] } lims = [] for label in gData["info"]["pcb-refs"]: if not forcePlots and label in gData["info"]["no-graph"]: continue graphData["plots"].append({ "x": gData["time"]["x2"], "y": gData["data"]["x2"][label], "label": label }) if label in gData["info"]["pcb-refs"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][label][1], "label": f"{label} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s", "colour": "gray", "args":{"zorder":2, "linestyle":"-.", "alpha":0.5} }) lims.append(gData["shock-point"][label][1]) # [TODO this but better] if addShockInfo: probeText = "" flag = False avg_speed_lst = [] avg_uncert_lst = [] for shock_speed_loc in gData["shock-speed"]: avg_speed_lst.append(gData['shock-speed'][shock_speed_loc][0]) avg_uncert_lst.append(gData['shock-speed'][shock_speed_loc][1]) if not flag and not gData["shock-speed"][shock_speed_loc][2]: flag = True avg_speed = np.array(avg_speed_lst).mean() avg_uncert = np.array(avg_uncert_lst).mean() # np.sqrt(np.sum(np.pow(np.array(avg_uncert_lst), 2))) probeText += f"\nAverage Speed - {avg_speed/1000:.2f} $\\pm${avg_uncert/1000:.2f} [{avg_uncert/avg_speed * 100:.2f}%] km/s" probeText += "\n" + "-"*50 probeText += "\n" probeText += f"{shock_speed_loc} - {gData['shock-speed'][shock_speed_loc][0]/1000:.2f} $\\pm${gData['shock-speed'][shock_speed_loc][1]/1000:.2f} [{gData['shock-speed'][shock_speed_loc][1]/gData['shock-speed'][shock_speed_loc][0]*100:.2f}%] km/s" graphData["plots"].append({ "type": "text", "text": f"Measured Shock Speeds {probeText}", "align": ("top", "right"), "alpha": 0.8, "x": 0.94, "y": 0.94 }) if len(lims) > 1: OFFSET = 10 #if not forcePlots else 50 graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET)) makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/ref-{gData['name']}{'-all' if forcePlots else ''}.png") def genComboRefGraph(data: dict, plotCh: list[str] = ["st1", "st2", "st3"], showPlot: bool = False, doShockLabels:bool = False, addShockInfo:bool = False): graphData = { "title": f"Shock Response Time\nFor Reference Shots 1, 2, & 3 (x2s5820, x2s5821 & x2s5822) - Mars Entry Conditions", "xLabel": "Time ($\\mu$s)", "yLabel": "Voltage Reading (V)", "grid": True, "figSize": (16, 6.8), #(8,6.5), "ledgLoc": 'upper left', "yLim": (-1.5, 11), "plots": [] } LINESTYLES = ( 'solid', 'dotted', 'dashed', 'dashdot' ) COLOURS = ( UQC["purple"], UQC["blue"], UQC["green"], # Don't need these UQC["red"], UQC["light_purple"], UQC["dark_grey"], UQC["orange"], UQC["yellow"], UQC["aqua"], UQC["gold"], UQC["neutral"] ) lims = [] for line_sty, shot in enumerate(data): gData = data[shot] for col, label in enumerate(plotCh): graphData["plots"].append({ "x": gData["time"]["x2"], "y": gData["data"]["x2"][label], "colour": COLOURS[col % len(COLOURS)], "args":{"zorder":2, "linestyle":LINESTYLES[line_sty % len(LINESTYLES)], "alpha":0.5} }) if line_sty == 0: graphData["plots"][-1]["label"] = f"{label}", for line_sty, shot in enumerate(data): gData = data[shot] for col, label in enumerate(plotCh): if label in plotCh: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][label][1], "colour": "gray", "args":{"zorder":2, "linestyle":"-.", "alpha":0.5} }) if doShockLabels: graphData["plots"][-1]["label"] = f"{label} - Ref Shot {line_sty + 1} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s" lims.append(gData["shock-point"][label][1]) # [TODO this but better] if addShockInfo: print("============================== Reference Shots ==============================") shock_speeds = {} for shot_id, shot in enumerate(data): shot_id += 1 gData = data[shot] for shock_speed_loc in gData['shock-speed']: shk_sps = shock_speeds.get(shock_speed_loc, []) shk_sps.append((gData['shock-speed'][shock_speed_loc][0], gData['shock-speed'][shock_speed_loc][1])) shock_speeds[shock_speed_loc] = shk_sps probeText = "" avg_speeds = [] avg_uncerts = [] for shock_speed_loc in shock_speeds: shock_info = np.array(shock_speeds[shock_speed_loc]) speeds = shock_info[:, 0] uncerts = shock_info[:, 1] speed = speeds.mean() uncert = np.sqrt(np.pow(uncerts, 2).sum()) avg_speeds.append(speed) avg_uncerts.append(uncert) print(f"{shock_speed_loc} Measured a mean shock speed of {speed:.2f} +/- {uncert:.2f} m/s ({speed/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/speed * 100 :.2f}%])") probeText += f"\n{shock_speed_loc} - {speed/1000:.2f} $\\pm${uncert/1000:.2f} [{uncert/speed*100:.2f}%] km/s" avg_sp = np.array(avg_speeds).mean() avg_unc = np.array(avg_uncerts).mean() #np.sqrt(np.pow(np.array(avg_uncerts), 2).sum()) probeText += f"\nAverage Speed - {avg_sp/1000:.2f} $\\pm${avg_unc/1000:.2f} [{avg_unc/avg_sp * 100:.2f}%] km/s" graphData["plots"].append({ "type": "text", "text": f"Average Measured Shock Speeds{probeText}", "align": ("top", "right"), "alpha": 0.8, "x": 0.9, "y": 0.9 }) if len(lims) > 1: OFFSET = 10 #if not forcePlots else 50 graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET)) makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/ref-combo-{'_'.join(plotCh)}.png") def genComboDataGraph(data: dict, showPlot: bool = False, doShockLabels:bool = False, addShockInfo:bool = False): shots = "" names = "" for i, shot in enumerate(data): shots += ('' if i == 0 else ', ') + f"{data[shot]['info']['name'][-1]}" names += ('' if i == 0 else ', ') + shot graphData = { "title": f"Shock Response Time\nFor Shots {shots} ({names})\nVarious Probe Locations - Mars Entry Conditions", "xLabel": "Time ($\\mu$s)", "yLabel": "Voltage Reading (V)", "grid": True, "figSize": (16, 6.8), #(8,6.5), "ledgLoc": (0.6, 0.075) if doShockLabels else 'upper left', "yLim": (-1.5, 11), "plots": [] } LINESTYLES = ( 'solid', 'dotted', 'dashed', 'dashdot' ) COLOURS = ( #UQC["purple"], #UQC["blue"], UQC["green"], UQC["red"], UQC["light_purple"], UQC["dark_grey"], UQC["orange"], UQC["yellow"], UQC["aqua"], UQC["gold"], UQC["neutral"] ) lims = [] for line_sty, shot in enumerate(data): gData = data[shot] for label, d in [("1 [V]", "Gauge 1"),("2 [V]", "Gauge 2")]: graphData["plots"].append({ "x": gData["time"]["scope"], "y": gData["data"]["scope"][label], #"label": d, "colour": UQC["purple"] if label[0] == "1" else UQC["blue"], "args":{"zorder":1, "linestyle": LINESTYLES[line_sty % len(LINESTYLES)]} }) if line_sty == 0: graphData["plots"][-2]["label"] = "Gauge 1" graphData["plots"][-1]["label"] = "Gauge 2" for _, probe in enumerate(gData["info"]["probe-info"]["locations"]): if f"{probe}-g1" in gData["shock-point"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][f"{probe}-g1"][1], "colour": UQC["purple"].lighten(0.5), "args":{"zorder":2, "linestyle":"--", "alpha":0.5} }) lims.append(gData["shock-point"][f"{probe}-g1"][1]) if doShockLabels: graphData["plots"][-1]["label"] = f"{probe}-Gauge 1 - {shot} - Shock Point {gData['shock-point'][f'{probe}-g1'][1]:.2f}$\\mu$s" if f"{probe}-g2" in gData["shock-point"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][f"{probe}-g2"][1], "colour": UQC["purple"].lighten(0.5), "args":{"zorder":2, "linestyle":":", "alpha":0.5} }) lims.append(gData["shock-point"][f"{probe}-g2"][1]) if doShockLabels: graphData["plots"][-1]["label"] = f"{probe}-Gauge 2 - {shot} - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s" if False: for line_sty, shot in enumerate(data): gData = data[shot] plotCh = gData["info"]["pcb-refs"] for col, label in enumerate(plotCh): if label in plotCh: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][label][1], "colour": "gray", "args":{"zorder":2, "linestyle":"--", "alpha":0.5} }) if doShockLabels: graphData["plots"][-1]["label"] = f"{label} - Ref Shot {line_sty + 1} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s" lims.append(gData["shock-point"][label][1]) # [TODO this but better] if addShockInfo: print("============================== Reference Shots ==============================") shock_speeds = {} for shot_id, shot in enumerate(data): shot_id += 1 gData = data[shot] for shock_speed_loc in gData['shock-speed']: shk_sps = shock_speeds.get(shock_speed_loc, []) shk_sps.append((gData['shock-speed'][shock_speed_loc][0], gData['shock-speed'][shock_speed_loc][1])) shock_speeds[shock_speed_loc] = shk_sps probeText = "" for shock_speed_loc in shock_speeds: shock_info = np.array(shock_speeds[shock_speed_loc]) speeds = shock_info[:, 0] uncerts = shock_info[:, 1] speed = speeds.mean() uncert = np.sqrt(np.pow(uncerts, 2).sum()) print(f"{shock_speed_loc} Measured a mean shock speed of {speed:.2f} +/- {uncert:.2f} m/s ({speed/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/speed * 100 :.2f}%])") probeText += f"\n{shock_speed_loc} - {speed/1000:.2f} $\\pm${uncert/1000:.2f} [{uncert/speed*100:.2f}%] km/s" graphData["plots"].append({ "type": "text", "text": f"Average Measured Shock Speeds{probeText}", "align": ("top", "right"), "alpha": 0.8, "x": 0.9, "y": 0.9 }) if len(lims) > 1: OFFSET = 10 #if not forcePlots else 50 graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET)) makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/combo-my-data.png") def genX2CompGraphs(gData: dict, showPlot: bool = True): graphData = { "title": f"Signal Comparison for {gData['info']['name']} ({gData['info']['shot-info']['name']})", "xLabel": "Time ($\\mu$s)", "yLabel": "Voltage Reading (V)", "grid": True, "figSize": (16, 6.8), #(9, 6.8), #(8,6.5), "ledgLoc": 'upper left', "yLim": (-1.5, 11), "plots": [] } lims = [] for label, d in [("1 [V]", "Gauge 1 - Scope"),("2 [V]", "Gauge 2 - Scope")]: #, ("4 [V]", "Gauge Trigger")]: graphData["plots"].append({ "x": gData["time"]["scope"], "y": gData["data"]["scope"][label], "label": d, "args":{"zorder":1} }) for label, d in [("st1", "Gauge 1 - X2 DAQ"),("st3", "Gauge 2 - X2 DAQ")]: #, ("4 [V]", "Gauge Trigger")]: graphData["plots"].append({ "x": gData["time"]["x2"], "y": gData["data"]["x2"][label], "label": d, "args":{"zorder":1} }) for _, probe in enumerate(gData["info"]["probe-info"]["locations"]): if f"{probe}-g1" in gData["shock-point"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][f"{probe}-g1"][1],#[i], "label": f"{probe}-Gauge 1 - Scope - Shock Point {gData['shock-point'][f'{probe}-g1'][1]:.2f}$\\mu$s", "colour": UQC["purple"].lighten(0.5), "args":{"zorder":2, "linestyle":"--", "alpha":0.5} }) lims.append(gData["shock-point"][f"{probe}-g1"][1]) if f"{probe}-g2" in gData["shock-point"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][f"{probe}-g2"][1],#[i], "label": f"{probe}-Gauge 2 - Scope - Shock Point {gData['shock-point'][f'{probe}-g2'][1]:.2f}$\\mu$s", "colour": UQC["purple"].lighten(0.5), "args":{"zorder":2, "linestyle":":", "alpha":0.5} }) lims.append(gData["shock-point"][f"{probe}-g2"][1]) for _, probe in enumerate(gData["info"]["probe-info"]["locations"]): if f"{probe}-g1" in gData["shock-point"]["x2"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"]["x2"][f"{probe}-g1"][1],#[i], "label": f"{probe}-Gauge 1 - X2 DAQ - Shock Point {gData['shock-point']["x2"][f'{probe}-g1'][1]:.2f}$\\mu$s", "colour": UQC["dark_grey"], "args":{"zorder":2, "linestyle":"--", "alpha":0.5} }) lims.append(gData["shock-point"]["x2"][f"{probe}-g1"][1]) if f"{probe}-g2" in gData["shock-point"]["x2"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"]["x2"][f"{probe}-g2"][1],#[i], "label": f"{probe}-Gauge 2 - X2 DAQ - Shock Point {gData['shock-point']["x2"][f'{probe}-g2'][1]:.2f}$\\mu$s", "colour": UQC["dark_grey"], "args":{"zorder":2, "linestyle":":", "alpha":0.5} }) lims.append(gData["shock-point"]["x2"][f"{probe}-g2"][1]) if True and len(lims) > 1: OFFSET = 10 #if not forcePlots else 50 graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET)) makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/signal_comp-{gData['info']['shot-info']['name']}.png") print("-") scopeTime = gData["time"]["scope"] scopeData = gData["data"]["scope"] x2Time = gData["time"]["x2"] #x2Data = gData["data"]["x2"] pass print("Loading Data") # My Shot Data data = {} for dp in data_to_load: pdp = f"{DATA_PATH}/{dp}/" load_data(pdp, data) loaded_data = tuple(data.keys()) # Reference Data from Mragank ref_data = {} for refShot in ref_data_to_load: load_ref_data(refShot, f"./data/referance/{refShot}/{refShot}.tdms", ref_data) print("Loaded Data") if __name__ == "__main__": data_to_load = [ "x2s5823", "x2s5824", "x2s5827", "x2s5829", "x2s5830", "x2s5831", "x2s5832" ] ref_data_to_load = [ "x2s5820", "x2s5821", "x2s5822" ] print("Graphing Data") # General Shot Graphing for shot in loaded_data: #print(data[shot]['info']['long_name'].rsplit("\n", 1)[-1]) genGraph(data[shot], showPlot=False, addShockInfo=False) genGraph(data[shot], showPlot=False, forcePlots=True) combo_data = data.copy() combo_data.pop(loaded_data[-2]) combo_data.pop(loaded_data[-1]) genComboDataGraph(combo_data, doShockLabels=True) genX2CompGraphs(data["x2s5831"], showPlot=False) genX2CompGraphs(data["x2s5832"], showPlot=False) # Reference Data for shot in ref_data: genRefGraph(ref_data[shot], showPlot=False, addShockInfo=False) genRefGraph(ref_data[shot], showPlot=False, forcePlots=True) genComboRefGraph(ref_data, doShockLabels=True) genComboRefGraph(ref_data, ref_data[ref_data_to_load[0]]["info"]["pcb-refs"], addShockInfo=True) # This forces matplotlib to hang until I tell it to close all windows pltKeyClose() print("Done")