From e653885de19353be6b19e1bde22f0a8dc0b5afea Mon Sep 17 00:00:00 2001 From: Cal Wing <20716204+calw20@users.noreply.github.com> Date: Tue, 22 Oct 2024 23:18:40 +1000 Subject: [PATCH] Generate Referance Shot info --- main.py | 352 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 341 insertions(+), 11 deletions(-) diff --git a/main.py b/main.py index 5c15d3c..60b641a 100644 --- a/main.py +++ b/main.py @@ -39,6 +39,12 @@ data_to_load = [ "x2s5832" ] +ref_data_to_load = [ + "x2s5820", + "x2s5821", + "x2s5822" +] + # ==== Uncerts ==== # Taken from DOI: 10.1007/s00193-017-0763-3 (Implementation of a state-to-state analytical framework for the calculation of expansion tube flow properties) @@ -367,17 +373,146 @@ def load_data(data_path: str, data={}) -> dict: # Return the data & the successfully loaded data keys return data #, tuple(data.keys()) -data = {} -for dp in data_to_load: - pdp = f"{DATA_PATH}/{dp}/" - load_data(pdp, data) +def load_ref_data(x2_shot: str, data_path: str, data={}) -> dict: + # Load Raw Data + # TDMS File (X2 DAQ Data) + x2_tdms_data = TdmsFile.read(data_path, raw_timestamps=True) + x2_channels = x2_tdms_data.groups()[0].channels() + x2_channel_names = tuple(c.name for c in x2_channels) -loaded_data = tuple(data.keys()) + data[x2_shot] = { + "name": x2_shot, + "info": { + "name": x2_shot, + "pcb-canny": [ + { + "sigma": 4, + "post_pres": 0.05 + } + ], -print("Loaded Data") + "pcb-refs": [ + "st1", + "st2", + "st3", + + "at1", + "at2", + "at3", + + "at4", + "at5", + "at6", + ], + + "no-graph": [ + "at1", + "at2", + "at3", + + "at4", + "at5", + "at6", + + ] + }, + "x2": x2_channels, + "x2-channels": x2_channel_names, + "x2-tdms": x2_tdms_data, + "data": { + "x2": {}, # Only pop channels with a voltage scale in ./tunnel-info.yaml + }, + "time": { + "x2": None, + "trigger_index": None, + }, + "shock-speed": {} # Note all in us + } + + # === Process the data === + # Generate X2 time arrays + time_data = x2_channels[0] + + ns_time = time_data[:].as_datetime64('ns') + x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns] + x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us + + # --- Un Scale Data --- + for channel, vScale in TUNNEL_INFO["volt-scale"].items(): + # Get the channel index from its name + chIndex = x2_channel_names.index(channel) + + # Calculate the average noise offset + avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean() + + # Save the channel data + data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale + + # Process Trigger Info + trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset + x2_trigger_index = np.where(trigger_volts > 1)[0][0] + x2_trigger_time = x2_time_us[x2_trigger_index] + + # Add the time data + data[x2_shot]["time"] = { + "x2": x2_time_us, + "trigger_index": x2_trigger_index, + } -#[TODO] Refactor + # Find Shock Times + # X2 - Canning Edge + + # Default Values + dataInfo = data[x2_shot]["info"] + data[x2_shot]["shock-point"] = {} + cArgs = dataInfo["pcb-canny"] + for i, ref in enumerate(dataInfo["pcb-refs"]): + refData = data[x2_shot]["data"]["x2"][ref] + + if i in range(len(cArgs)): + sigma = cArgs[i]["sigma"] + post_sup_thresh = cArgs[i]["post_pres"] + else: + sigma = cArgs[-1]["sigma"] + post_sup_thresh = cArgs[-1]["post_pres"] + + first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=False, print_func=None) + shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1 + + data[x2_shot]["shock-point"][ref] = shock_point, first_value, first_value_uncertainty + + + # Calculate Shock Speeds + print("="*30, x2_shot, "="*30) + print(f"-- Reference Shot {int(x2_shot[-1]) + 1} --") + + for i, refProbe in enumerate(dataInfo["pcb-refs"]): + if i == 0: continue + p1_time = data[x2_shot]["shock-point"][refProbe][1] / 1e6 # Convert to seconds + p2_time = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][1] / 1e6 # Convert to seconds + + p2p_dist = abs(TUNNEL_INFO["distance"][refProbe] - TUNNEL_INFO["distance"][dataInfo["pcb-refs"][i-1]]) / 1000 # convert to m + p2p_time = abs(p2_time - p1_time) + + probe_velocity = p2p_dist / p2p_time # m/s + + p1_time_uncert = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][2] / 1e6 # Convert to seconds + p2_time_uncert = data[x2_shot]["shock-point"][refProbe][2] / 1e6 # Convert to seconds + + uncert = deltaVs(probe_velocity, p2p_dist, p2p_time, (UNCERTS["probe-dist"][refProbe], UNCERTS["probe-dist"][dataInfo["pcb-refs"][i-1]]), (p1_time_uncert, p2_time_uncert, UNCERTS["time"]["x2-daq"])) + + print(f"{dataInfo['pcb-refs'][i-1]}-{refProbe} Measured a shock speed of {probe_velocity:.2f} +/- {uncert:.2f} m/s ({probe_velocity/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/probe_velocity * 100 :.2f}%])") + data[x2_shot]["shock-speed"][f"{dataInfo['pcb-refs'][i-1]}-{refProbe}"] = probe_velocity, uncert, True # Speed, Ref + + print() + + return data + + + +# ======= Graphing ======== + def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlots: bool = False, addShockInfo: bool = True): graphData = { "title": f"Shock Response Time\nFor {gData['info']['long_name']}", @@ -460,9 +595,9 @@ def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlo graphData["plots"].append({ "type": "text", - "text": f"Measured Shock Speeds {probeText}", + "text": f"Measured Shock Speeds {probeText}", "align": ("top", "right"), - "alpha": 0.75, + "alpha": 0.8, "x": 0.94, #if len(gData["info"]["probe-info"]["locations"]) < 3 else 0.885, "y": 0.94 }) @@ -475,16 +610,211 @@ def genGraph(gData: dict, showPlot: bool = True, doLimits: bool = True, forcePlo makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/{gData['info']['shot-info']['name']}{'-all' if forcePlots else ''}{'-clipped' if doLimits else ''}.png") #figSavePath=f"./images/{{0}}{"-noLims" if not doLimits else ""}.png") +def genRefGraph(gData: dict, showPlot: bool = True, addShockInfo: bool = True, forcePlots: bool = False): + graphData = { + "title": f"Shock Response Time\nFor Reference Shot {int(gData['name'][-1]) + 1} ({gData['name']})", + "xLabel": "Time ($\\mu$s)", + "yLabel": "Voltage Reading (V)", + "grid": True, + "figSize": (9, 6.8), #(8,6.5), + "ledgLoc": 'upper left', + "yLim": (-1.5, 11), + "plots": [] + } -#print("Graphing Data") + lims = [] + for label in gData["info"]["pcb-refs"]: + if not forcePlots and label in gData["info"]["no-graph"]: continue + graphData["plots"].append({ + "x": gData["time"]["x2"], + "y": gData["data"]["x2"][label], + "label": label + }) + + if label in gData["info"]["pcb-refs"]: + graphData["plots"].append({ + "type": "axvLine", + "x": gData["shock-point"][label][1], + "label": f"{label} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s", + "colour": "gray", + "args":{"zorder":2, "linestyle":"--", "alpha":0.5} + }) + lims.append(gData["shock-point"][label][1]) # [TODO this but better] + + if addShockInfo: + probeText = "" + flag = False + for shock_speed_loc in gData["shock-speed"]: + if not flag and not gData["shock-speed"][shock_speed_loc][2]: + flag = True + probeText += "\n" + "-"*50 + + probeText += "\n" + probeText += f"{shock_speed_loc} - {gData['shock-speed'][shock_speed_loc][0]/1000:.2f} $\\pm${gData['shock-speed'][shock_speed_loc][1]/1000:.2f} [{gData['shock-speed'][shock_speed_loc][1]/gData['shock-speed'][shock_speed_loc][0]*100:.2f}%] km/s" + + graphData["plots"].append({ + "type": "text", + "text": f"Measured Shock Speeds {probeText}", + "align": ("top", "right"), + "alpha": 0.8, + "x": 0.94, + "y": 0.94 + }) + + if len(lims) > 1: + OFFSET = 10 #if not forcePlots else 50 + graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET)) + + + makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/ref-{gData['name']}{'-all' if forcePlots else ''}.png") + + +def genComboRefGraph(data: dict, plotCh: list[str] = ["st1", "st2", "st3"], showPlot: bool = False, doShockLabels:bool = False, addShockInfo:bool = False): + graphData = { + "title": f"Shock Response Time\nFor Reference Shots 1, 2, & 3 (x2s5820, x2s5821 & x2s5822) - Mars Entry Conditions", + "xLabel": "Time ($\\mu$s)", + "yLabel": "Voltage Reading (V)", + "grid": True, + "figSize": (16, 6.8), #(8,6.5), + "ledgLoc": 'upper left', + "yLim": (-1.5, 11), + "plots": [] + } + + LINESTYLES = ( + 'solid', + 'dotted', + 'dashed', + 'dashdot' + ) + + COLOURS = ( + UQC["purple"], + UQC["blue"], + UQC["green"], + + # Don't need these + UQC["red"], + UQC["light_purple"], + UQC["dark_grey"], + UQC["orange"], + UQC["yellow"], + UQC["aqua"], + UQC["gold"], + UQC["neutral"] + ) + + + lims = [] + for line_sty, shot in enumerate(data): + gData = data[shot] + for col, label in enumerate(plotCh): + graphData["plots"].append({ + "x": gData["time"]["x2"], + "y": gData["data"]["x2"][label], + "colour": COLOURS[col % len(COLOURS)], + "args":{"zorder":2, "linestyle":LINESTYLES[line_sty % len(LINESTYLES)], "alpha":0.5} + }) + + if line_sty == 0: + graphData["plots"][-1]["label"] = f"{label}", + + for line_sty, shot in enumerate(data): + gData = data[shot] + for col, label in enumerate(plotCh): + if label in plotCh: + graphData["plots"].append({ + "type": "axvLine", + "x": gData["shock-point"][label][1], + "colour": "gray", + "args":{"zorder":2, "linestyle":"--", "alpha":0.5} + }) + + if doShockLabels: + graphData["plots"][-1]["label"] = f"{label} - Ref Shot {line_sty + 1} - Shock Point {gData['shock-point'][label][1]:.2f}$\\mu$s" + + lims.append(gData["shock-point"][label][1]) # [TODO this but better] + + if addShockInfo: + print("============================== Reference Shots ==============================") + shock_speeds = {} + + for shot_id, shot in enumerate(data): + shot_id += 1 + + gData = data[shot] + for shock_speed_loc in gData['shock-speed']: + shk_sps = shock_speeds.get(shock_speed_loc, []) + shk_sps.append((gData['shock-speed'][shock_speed_loc][0], gData['shock-speed'][shock_speed_loc][1])) + shock_speeds[shock_speed_loc] = shk_sps + + probeText = "" + for shock_speed_loc in shock_speeds: + shock_info = np.array(shock_speeds[shock_speed_loc]) + + speeds = shock_info[:, 0] + uncerts = shock_info[:, 1] + + speed = speeds.mean() + uncert = np.sqrt(np.pow(uncerts, 2).sum()) + + print(f"{shock_speed_loc} Measured a mean shock speed of {speed:.2f} +/- {uncert:.2f} m/s ({speed/1000:.2f} +/- {uncert/1000:.2f} km/s [{uncert/speed * 100 :.2f}%])") + + probeText += f"\n{shock_speed_loc} - {speed/1000:.2f} $\\pm${uncert/1000:.2f} [{uncert/speed*100:.2f}%] km/s" + + graphData["plots"].append({ + "type": "text", + "text": f"Average Measured Shock Speeds {probeText}", + "align": ("top", "right"), + "alpha": 0.8, + "x": 0.9, + "y": 0.9 + }) + + if len(lims) > 1: + OFFSET = 10 #if not forcePlots else 50 + graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET)) + + + makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath=f"./images/ref-combo-{'_'.join(plotCh)}.png") + + +print("Loading Data") + +# My Shot Data +data = {} +for dp in data_to_load: + pdp = f"{DATA_PATH}/{dp}/" + load_data(pdp, data) + +loaded_data = tuple(data.keys()) + +# Reference Data from Mragank +ref_data = {} +for refShot in ref_data_to_load: + load_ref_data(refShot, f"./data/referance/{refShot}/{refShot}.tdms", ref_data) + +print("Loaded Data") + + + +print("Graphing Data") + +# General Shot Graphing for shot in loaded_data: #print(data[shot]['info']['long_name'].rsplit("\n", 1)[-1]) genGraph(data[shot], showPlot=False, addShockInfo=False) genGraph(data[shot], showPlot=False, forcePlots=True) +# Reference Data +#for shot in ref_data: +# genRefGraph(ref_data[shot], showPlot=False, addShockInfo=False) +# genRefGraph(ref_data[shot], showPlot=False, forcePlots=True) + +genComboRefGraph(ref_data, doShockLabels=True) +genComboRefGraph(ref_data, ref_data[ref_data_to_load[0]]["info"]["pcb-refs"], addShockInfo=True) # This forces matplotlib to hang until I tell it to close all windows pltKeyClose() - print("Done") \ No newline at end of file