# Cal Wing (c.wing@uq.net.au) - Oct 2024 # Thesis Graphing import os import numpy as np import pandas as pd import yaml from nptdms import TdmsFile from makeGraph import makeGraph, pltKeyClose, UQ_COLOURS as UQC from canny_shock_finder import canny_shock_finder # Folder correction # Make sure the relevant folders folder exists folders = ["./images"] for folder in folders: if not os.path.isdir(folder): os.mkdir(folder) # Data Paths DATA_PATH = "./data" DATA_INFO = "_info.yaml" TUNNEL_INFO_FILE = "./tunnel-info.yaml" SAMPLES_TO_AVG = 500 with open(TUNNEL_INFO_FILE, 'r') as file: TUNNEL_INFO = yaml.safe_load(file) data_to_load = [ "x2s5823", "x2s5824", "x2s5827" ] # ==== Data Loading & Processing ==== def load_data(data_to_load: list[str]) -> dict: data = {} for dp in data_to_load: data_path = f"{DATA_PATH}/{dp}/" data_info_path = data_path + DATA_INFO if not os.path.exists(data_info_path): print(f"[ERR] Could not find data info file: '{data_info_path}'") print(f"[WARN] Not Loading Data '{dp}'") continue # Load Shot Data Info YAML File (Cal) with open(data_info_path, 'r') as file: dataInfo = yaml.safe_load(file) # Grab the shot name x2_shot = dataInfo["shot-info"]["name"] # Load Raw Data # TDMS File (X2 DAQ Data) x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True) x2_channels = x2_tdms_data.groups()[0].channels() x2_channel_names = tuple(c.name for c in x2_channels) # Scope info _if it exists_ if dataInfo["probe-info"]["data-record"]["type"] == "scope": scope_data_path = data_path + dataInfo["probe-info"]["data-record"]["data"] scope_config_path = data_path + dataInfo["probe-info"]["data-record"]["config"] # [TODO] Read this file # Generate Data Headers - This could be better with open(scope_data_path, 'r') as dfile: scope_header = [] header_lines = [] for i, line in enumerate(dfile): if i > 1: break header_lines.append(line.strip().split(",")) for i, name in enumerate(header_lines[0]): if name == "x-axis": name = "Time" if header_lines[1][i] in ["second", "Volt"]: outStr = f"{name} [{header_lines[1][i][0]}]" else: outStr = f"{name} [{header_lines[1][i]}]" scope_header.append(outStr) # Load the Scope CSV Data scope_data = np.loadtxt(scope_data_path, delimiter=',', skiprows=2) # Build a data object (this could be cached - or partially cached if I was clever enough) # Raw Data is always added - processing comes after data[x2_shot] = { "info": dataInfo, "shot_time": np.datetime64(f"{dataInfo["date"]}T{dataInfo["time"]}"), "raw-data":{ "probe_headers": scope_header, "probes": scope_data, "x2": x2_channels, "x2-channels": x2_channel_names, "x2-tdms": x2_tdms_data }, "time": { "x2": None, "trigger_index": None }, "data": { "x2": {} # Only pop channels with a voltage scale in ./tunnel-info.yaml } } # === Process the data === # Generate X2 time arrays time_data = x2_channels[0] ns_time = time_data[:].as_datetime64('ns') x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns] x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us #second_fractions = np.array(time_data[:].second_fractions, dtype=int) # 2^-64 ths of a second #x2_time_seconds = (second_fractions - second_fractions[0]) / (2**(-64)) # 0 time data and convert to seconds #x2_time_us = x2_time_seconds * 1000 # Scale to us # --- Un Scale Data --- for channel, vScale in TUNNEL_INFO["volt-scale"].items(): # Get the channel index from its name chIndex = x2_channel_names.index(channel) # Calculate the average noise offset avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean() # Save the channel data data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale # Process Trigger Info trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset x2_trigger_index = np.where(trigger_volts > 1)[0][0] x2_trigger_time = x2_time_us[x2_trigger_index] # Add the time data data[x2_shot]["time"] = { "x2": x2_time_us, "trigger_index": x2_trigger_index } # Scope timing _if it exists_ if dataInfo["probe-info"]["data-record"]["type"] == "scope": trigger_info = dataInfo["probe-info"]["data-record"]["trigger"] # Get the scope trigger info # Calc the scope time & apply any manual offsets scope_time = (scope_data[:, 0] - scope_data[0, 0]) * 1e6 # to us scope_time -= trigger_info["alignment-offset"] # manual offset delay # Trigger Alignment scope_trigger_volts = (scope_data[:, 3] - scope_data[0:SAMPLES_TO_AVG, 3].mean()) # Use a mean here too scope_trigger_index = np.where(scope_trigger_volts > 1)[0][0] scope_trigger_time = scope_time[scope_trigger_index] scope_alignment = x2_trigger_time - scope_trigger_time scope_time += scope_alignment # Offset any trigger delays scope_time += trigger_info["delay"] # us delay from the actual trigger signal to the scope received trigger data[x2_shot]["time"]["scope"] = scope_time data[x2_shot]["time"]["scope-offset"] = scope_alignment data[x2_shot]["data"]["scope"] = {} for i, header in enumerate(scope_header): if i == 0: continue # Don't record time # Python reference so its the same object ref = scope_data[:, i] data[x2_shot]["data"]["scope"][i] = ref data[x2_shot]["data"]["scope"][header] = ref # Find Shock Times # X2 - Canning Edge data[x2_shot]["shock-point"] = {} for ref in dataInfo["pcb-refs"]: refData = data[x2_shot]["data"]["x2"][ref] first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, plot=False) shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1 data[x2_shot]["shock-point"][ref] = shock_point, first_value for i, probe in enumerate(dataInfo["probe-info"]["locations"]): probeCh1 = data[x2_shot]["data"]["scope"][1] probeCh2 = data[x2_shot]["data"]["scope"][2] #[HACK] For detection if i > 0: privPoint = dataInfo["probe-info"]["locations"][i-1] offset = data[x2_shot]["shock-point"][f"{privPoint}-g1"][0] + 150 #[i-1] + 25 else: offset = 0 shock_point = np.where(probeCh1[offset:] >= 0.3)[0] + offset first_value = scope_time[shock_point] first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time[offset:], probeCh1[offset:], plot=False, sigma=2, post_shock_pressure=0.02) shock_point = np.where(scope_time[offset:] >= first_value)[0][0] + offset # [BUG] Seems to give n+1 data[x2_shot]["shock-point"][f"{probe}-g1"] = shock_point, first_value #[HACK] For detection if i > 0: privPoint = dataInfo["probe-info"]["locations"][i-1] offset = data[x2_shot]["shock-point"][f"{privPoint}-g2"][0] + 150 #[i-1] + 25 else: offset = 0 shock_point = np.where(probeCh2[offset:] >= 0.3)[0] + offset first_value = scope_time[shock_point] first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time[offset:], probeCh2[offset:], plot=False, sigma=2, post_shock_pressure=0.02) shock_point = np.where(scope_time[offset:] >= first_value)[0][0] + offset # [BUG] Seems to give n+1 data[x2_shot]["shock-point"][f"{probe}-g2"] = shock_point, first_value # Calculate Shock Speeds for probe in dataInfo["probe-info"]["locations"]: g1_time = data[x2_shot]["shock-point"][f"{probe}-g1"][1] / 1e6 # Convert to seconds g2_time = data[x2_shot]["shock-point"][f"{probe}-g2"][1] / 1e6 # Convert to seconds c2c_dist = dataInfo["probe-info"]["c2c"] / 1000 # convert to m probe_velocity = c2c_dist / abs(g2_time - g1_time) # m/s print(f"{probe} Measured a shock speed of {probe_velocity:.2f} m/s") print(f" {probe_velocity/1000:.2f} km/s") # Return the data & the successfully loaded data keys return data, tuple(data.keys()) data, loaded_data = load_data(data_to_load) print("Loaded Data") #[TODO] Refactor def genGraph(gData: dict, showPlot: bool = True): graphData = { "title": f"Shock response Time\nFor {gData['info']['long_name']}", "xLabel": "Time ($\\mu$s)", "yLabel": "Voltage Reading (V)", "grid": True, "plots": [] } for label in gData["info"]["pcb-refs"] + ["trigbox"]: graphData["plots"].append({ "x": gData["time"]["x2"], "y": gData["data"]["x2"][label], "label": label }) if label in gData["info"]["pcb-refs"]: graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][label][1], "label": f"{label} - Shock Point {gData["shock-point"][label][1]:.2f}$\\mu$s", "colour": "gray", "args":{"zorder":2, "linestyle":"--"} }) for label, d in [("1 [V]", "G1"),("2 [V]", "G2"), ("4 [V]", "Gauge Trigger")]: graphData["plots"].append({ "x": gData["time"]["scope"], "y": gData["data"]["scope"][label], "label": d }) for i, probe in enumerate(gData["info"]["probe-info"]["locations"]): graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][f"{probe}-g1"][1],#[i], "label": f"{probe}-G1 - Shock Point {gData["shock-point"][f"{probe}-g1"][1]:.2f}$\\mu$s", #"colour": "gray", "args":{"zorder":2, "linestyle":"--"} }) graphData["plots"].append({ "type": "axvLine", "x": gData["shock-point"][f"{probe}-g2"][1],#[i], "label": f"{probe}-G2 - Shock Point {gData["shock-point"][f"{probe}-g2"][1]:.2f}$\\mu$s", #"colour": "gray", "args":{"zorder":2, "linestyle":"--"} }) makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath="./images/{0}.png") print("Graphing Data") #genGraph(data[loaded_data[0]], showPlot=False) #genGraph(data[loaded_data[1]], showPlot=False) genGraph(data[loaded_data[2]], showPlot=False) #x2_out = canny_shock_finder(x2_time, (gData["raw-data"]["x2"][16][:] - gData["raw-data"]["x2"][16][0])) #print(x2_out) # This forces matplotlib to hang until I tell it to close all windows pltKeyClose() print("Done")