# Cal Wing (c.wing@uq.net.au) - Oct 2024 # Thesis Graphing import os import numpy as np import pandas as pd import yaml from nptdms import TdmsFile from makeGraph import makeGraph, pltKeyClose, UQ_COLOURS as UQC from canny_shock_finder import canny_shock_finder # Folder correction # Make sure the relevant folders folder exists folders = ["./images"] for folder in folders: if not os.path.isdir(folder): os.mkdir(folder) # Load Data DATA_PATH = "./data" DATA_INFO = "_info.yaml" data_to_load = [ "x2s5823", "x2s5824", "x2s5827" ] data = {} for dp in data_to_load: data_path = f"{DATA_PATH}/{dp}/" data_info_path = data_path + DATA_INFO if not os.path.exists(data_info_path): print(f"[ERR] Could not find data info file: '{data_info_path}'") print(f"[WARN] Not Loading Data '{dp}'") continue with open(data_info_path, 'r') as file: # Load data info (Cal) dataInfo = yaml.safe_load(file) x2_shot = dataInfo["shot-info"]["name"] x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True) x2_channels = x2_tdms_data.groups()[0].channels() if dataInfo["probe-info"]["data-record"]["type"] == "scope": scope_data_path = data_path + dataInfo["probe-info"]["data-record"]["data"] scope_config_path = data_path + dataInfo["probe-info"]["data-record"]["config"] # Generate Headers with open(scope_data_path, 'r') as dfile: scope_header = [] header_lines = [] for i, line in enumerate(dfile): if i > 1: break header_lines.append(line.strip().split(",")) for i, name in enumerate(header_lines[0]): if name == "x-axis": name = "Time" if header_lines[1][i] in ["second", "Volt"]: outStr = f"{name} [{header_lines[1][i][0]}]" else: outStr = f"{name} [{header_lines[1][i]}]" scope_header.append(outStr) #scope_data = pd.read_csv(scope_data_path, names=scope_header, skiprows=2) scope_data = np.loadtxt(scope_data_path, delimiter=',', skiprows=2) data[x2_shot] = { "info": dataInfo, "probe_headers": scope_header, "probes": scope_data, "x2": x2_channels, "x2-tdms": x2_tdms_data } loaded_data = list(data.keys()) print("Loaded Data") foo = data[loaded_data[0]]["x2"][0] bar = foo[:].seconds baz = bar[0] print(foo) print(bar) print(baz) pass def process_data(gData: dict): x2_time = (gData["x2"][0][:] - gData["x2"][0][0]).astype('timedelta64[ns]') # Convert x2 to timedelta64[ns] trigger_info = gData["info"]["probe-info"]["data-record"]["trigger"] # Get the scope trigger info # Convert the scope times into timedelta64 & apply config offsets & delays scope_time = np.array([ pd.Timedelta(t, 's').to_numpy() for t in (gData["probes"][:, 0] - gData["probes"][0, 0])]) scope_time -= np.timedelta64(trigger_info["alignment-offset"], 'ns') scope_time += np.timedelta64(trigger_info["delay"], 'us') start_timestamp = np.datetime64(f"{gData["info"]["date"]}T{gData["info"]["time"]}") start_time = 0 x2_timesteps = np.array([0 for _ in x2_time]) for i, dt in enumerate(x2_time): dt = dt.astype("int") if i == 0: x2_timesteps[i] = start_time + dt # should be 0 else: x2_timesteps[i] = x2_timesteps[i-1] + dt test = x2_time.cumsum() return x2_time, scope_time, x2_timesteps def genGraph(gData: dict, showPlot: bool = True): x2_time, scope_time, _ = process_data(gData) graphData = { "title": f"Shock response Time\nFor {gData['info']['long_name']}", "xLabel": "Time (ns)", "yLabel": "Voltage Reading (V)", "grid": True, "plots": [ { "x": x2_time, "y": (gData["x2"][4][:] - gData["x2"][4][0]) * 0.0148, "label": "ST1" }, { "x": x2_time, "y": (gData["x2"][6][:] - gData["x2"][6][0]) * 0.0148, "label": "ST3" }, { "x": x2_time, "y": (gData["x2"][16][:] - gData["x2"][16][0])/1000, "label": "Trigger" }, { "x": scope_time, "y": (gData["probes"][:, 1] - gData["probes"][0, 1]), "label": "ST2-G1" }, { "x": scope_time, "y": (gData["probes"][:, 2] - gData["probes"][0, 2]), "label": "ST2-G2" }, { "x": scope_time, "y": (gData["probes"][:, 3] - gData["probes"][0, 3]), "label": "ST2-Trigger" }, ] } makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath="./images/{0}.png") #print("Graphing showPlot=showPlot, Data") genGraph(data[loaded_data[0]], showPlot=False) genGraph(data[loaded_data[1]], showPlot=False) # Try to process things gData = data[loaded_data[0]] x2_time, scope_time, x2_timestamps = process_data(gData) time = (gData["x2"][0][:] - gData["x2"][0][0]) #x2_out = canny_shock_finder(time, (gData["x2"][4][:] - gData["x2"][4][0]) * 0.0148) #print(x2_out) # This forces matplotlib to hang untill I tell it to close all windows pltKeyClose() print("Done")