From ba1136b0eabb0f7984087adea1a560ad47765274 Mon Sep 17 00:00:00 2001 From: Cal W <20716204+calw20@users.noreply.github.com> Date: Thu, 17 Oct 2024 22:58:30 +1000 Subject: [PATCH] quick data load refactor --- main.py | 409 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 206 insertions(+), 203 deletions(-) diff --git a/main.py b/main.py index a73dd73..ffe06a9 100644 --- a/main.py +++ b/main.py @@ -24,6 +24,7 @@ DATA_PATH = "./data" DATA_INFO = "_info.yaml" TUNNEL_INFO_FILE = "./tunnel-info.yaml" SAMPLES_TO_AVG = 500 +CANNY_TIME_OFFSET = 50 #us with open(TUNNEL_INFO_FILE, 'r') as file: TUNNEL_INFO = yaml.safe_load(file) @@ -36,229 +37,231 @@ data_to_load = [ ] # ==== Data Loading & Processing ==== -def load_data(data_to_load: list[str]) -> dict: - data = {} - for dp in data_to_load: - data_path = f"{DATA_PATH}/{dp}/" - data_info_path = data_path + DATA_INFO - if not os.path.exists(data_info_path): - print(f"[ERR] Could not find data info file: '{data_info_path}'") - print(f"[WARN] Not Loading Data '{dp}'") - continue +def load_data(data_path: str, data={}) -> dict: + data_info_path = data_path + DATA_INFO + if not os.path.exists(data_info_path): + print(f"[ERR] Could not find data info file: '{data_info_path}'") + print(f"[WARN] Not Loading Data '{data_path}'") + return None + + # Load Shot Data Info YAML File (Cal) + with open(data_info_path, 'r') as file: + dataInfo = yaml.safe_load(file) + + # Grab the shot name + x2_shot = dataInfo["shot-info"]["name"] - # Load Shot Data Info YAML File (Cal) - with open(data_info_path, 'r') as file: - dataInfo = yaml.safe_load(file) + # Load Raw Data + # TDMS File (X2 DAQ Data) + x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True) + x2_channels = x2_tdms_data.groups()[0].channels() + x2_channel_names = tuple(c.name for c in x2_channels) - # Grab the shot name - x2_shot = dataInfo["shot-info"]["name"] - - # Load Raw Data - # TDMS File (X2 DAQ Data) - x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True) - x2_channels = x2_tdms_data.groups()[0].channels() - x2_channel_names = tuple(c.name for c in x2_channels) - - # Scope info _if it exists_ - if dataInfo["probe-info"]["data-record"]["type"] == "scope": - scope_data_path = data_path + dataInfo["probe-info"]["data-record"]["data"] - scope_config_path = data_path + dataInfo["probe-info"]["data-record"]["config"] # [TODO] Read this file + # Scope info _if it exists_ + if dataInfo["probe-info"]["data-record"]["type"] == "scope": + scope_data_path = data_path + dataInfo["probe-info"]["data-record"]["data"] + scope_config_path = data_path + dataInfo["probe-info"]["data-record"]["config"] # [TODO] Read this file - # Generate Data Headers - This could be better - with open(scope_data_path, 'r') as dfile: - scope_header = [] + # Generate Data Headers - This could be better + with open(scope_data_path, 'r') as dfile: + scope_header = [] - header_lines = [] - for i, line in enumerate(dfile): - if i > 1: break - header_lines.append(line.strip().split(",")) + header_lines = [] + for i, line in enumerate(dfile): + if i > 1: break + header_lines.append(line.strip().split(",")) + + for i, name in enumerate(header_lines[0]): + if name == "x-axis": + name = "Time" - for i, name in enumerate(header_lines[0]): - if name == "x-axis": - name = "Time" - - if header_lines[1][i] in ["second", "Volt"]: - outStr = f"{name} [{header_lines[1][i][0]}]" - else: - outStr = f"{name} [{header_lines[1][i]}]" - - scope_header.append(outStr) + if header_lines[1][i] in ["second", "Volt"]: + outStr = f"{name} [{header_lines[1][i][0]}]" + else: + outStr = f"{name} [{header_lines[1][i]}]" + + scope_header.append(outStr) - # Load the Scope CSV Data - scope_data = np.loadtxt(scope_data_path, delimiter=',', skiprows=2) + # Load the Scope CSV Data + scope_data = np.loadtxt(scope_data_path, delimiter=',', skiprows=2) - # Build a data object (this could be cached - or partially cached if I was clever enough) - # Raw Data is always added - processing comes after - data[x2_shot] = { - "info": dataInfo, - "shot_time": np.datetime64(f"{dataInfo["date"]}T{dataInfo["time"]}"), - "raw-data":{ - "probe_headers": scope_header, - "probes": scope_data, - "x2": x2_channels, - "x2-channels": x2_channel_names, - "x2-tdms": x2_tdms_data - }, - "time": { - "x2": None, - "trigger_index": None - }, - "data": { - "x2": {} # Only pop channels with a voltage scale in ./tunnel-info.yaml - } + # Build a data object (this could be cached - or partially cached if I was clever enough) + # Raw Data is always added - processing comes after + data[x2_shot] = { + "info": dataInfo, + "shot_time": np.datetime64(f"{dataInfo["date"]}T{dataInfo["time"]}"), + "raw-data":{ + "probe_headers": scope_header, + "probes": scope_data, + "x2": x2_channels, + "x2-channels": x2_channel_names, + "x2-tdms": x2_tdms_data + }, + "time": { + "x2": None, + "trigger_index": None + }, + "data": { + "x2": {} # Only pop channels with a voltage scale in ./tunnel-info.yaml } + } - # === Process the data === - # Generate X2 time arrays - time_data = x2_channels[0] + # === Process the data === + # Generate X2 time arrays + time_data = x2_channels[0] + + ns_time = time_data[:].as_datetime64('ns') + x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns] + x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us + + #second_fractions = np.array(time_data[:].second_fractions, dtype=int) # 2^-64 ths of a second + #x2_time_seconds = (second_fractions - second_fractions[0]) / (2**(-64)) # 0 time data and convert to seconds + #x2_time_us = x2_time_seconds * 1000 # Scale to us + + # --- Un Scale Data --- + for channel, vScale in TUNNEL_INFO["volt-scale"].items(): + # Get the channel index from its name + chIndex = x2_channel_names.index(channel) + + # Calculate the average noise offset + avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean() + + # Save the channel data + data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale + + # Process Trigger Info + trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset + x2_trigger_index = np.where(trigger_volts > 1)[0][0] + x2_trigger_time = x2_time_us[x2_trigger_index] + + # Add the time data + data[x2_shot]["time"] = { + "x2": x2_time_us, + "trigger_index": x2_trigger_index + } + + + # Scope timing _if it exists_ + if dataInfo["probe-info"]["data-record"]["type"] == "scope": + trigger_info = dataInfo["probe-info"]["data-record"]["trigger"] # Get the scope trigger info + + # Calc the scope time & apply any manual offsets + scope_time = (scope_data[:, 0] - scope_data[0, 0]) * 1e6 # to us + scope_time -= trigger_info["alignment-offset"] # manual offset delay + + # Trigger Alignment + scope_trigger_volts = (scope_data[:, 3] - scope_data[0:SAMPLES_TO_AVG, 3].mean()) # Use a mean here too + scope_trigger_index = np.where(scope_trigger_volts > 1)[0][0] + scope_trigger_time = scope_time[scope_trigger_index] + + scope_alignment = x2_trigger_time - scope_trigger_time + + scope_time += scope_alignment + + # Offset any trigger delays + scope_time += trigger_info["delay"] # us delay from the actual trigger signal to the scope received trigger + + data[x2_shot]["time"]["scope"] = scope_time + data[x2_shot]["time"]["scope-offset"] = scope_alignment + + data[x2_shot]["data"]["scope"] = {} + for i, header in enumerate(scope_header): + if i == 0: continue # Don't record time + + # Python reference so its the same object + ref = scope_data[:, i] + data[x2_shot]["data"]["scope"][i] = ref + data[x2_shot]["data"]["scope"][header] = ref + + + # Find Shock Times + # X2 - Canning Edge + data[x2_shot]["shock-point"] = {} + for ref in dataInfo["pcb-refs"]: + refData = data[x2_shot]["data"]["x2"][ref] + first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, plot=False, print_func=None) + shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1 - ns_time = time_data[:].as_datetime64('ns') - x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns] - x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us - - #second_fractions = np.array(time_data[:].second_fractions, dtype=int) # 2^-64 ths of a second - #x2_time_seconds = (second_fractions - second_fractions[0]) / (2**(-64)) # 0 time data and convert to seconds - #x2_time_us = x2_time_seconds * 1000 # Scale to us - - # --- Un Scale Data --- - for channel, vScale in TUNNEL_INFO["volt-scale"].items(): - # Get the channel index from its name - chIndex = x2_channel_names.index(channel) - - # Calculate the average noise offset - avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean() - - # Save the channel data - data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale - - # Process Trigger Info - trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset - x2_trigger_index = np.where(trigger_volts > 1)[0][0] - x2_trigger_time = x2_time_us[x2_trigger_index] - - # Add the time data - data[x2_shot]["time"] = { - "x2": x2_time_us, - "trigger_index": x2_trigger_index - } - - - # Scope timing _if it exists_ - if dataInfo["probe-info"]["data-record"]["type"] == "scope": - trigger_info = dataInfo["probe-info"]["data-record"]["trigger"] # Get the scope trigger info - - # Calc the scope time & apply any manual offsets - scope_time = (scope_data[:, 0] - scope_data[0, 0]) * 1e6 # to us - scope_time -= trigger_info["alignment-offset"] # manual offset delay - - # Trigger Alignment - scope_trigger_volts = (scope_data[:, 3] - scope_data[0:SAMPLES_TO_AVG, 3].mean()) # Use a mean here too - scope_trigger_index = np.where(scope_trigger_volts > 1)[0][0] - scope_trigger_time = scope_time[scope_trigger_index] - - scope_alignment = x2_trigger_time - scope_trigger_time - - scope_time += scope_alignment - - # Offset any trigger delays - scope_time += trigger_info["delay"] # us delay from the actual trigger signal to the scope received trigger - - data[x2_shot]["time"]["scope"] = scope_time - data[x2_shot]["time"]["scope-offset"] = scope_alignment - - data[x2_shot]["data"]["scope"] = {} - for i, header in enumerate(scope_header): - if i == 0: continue # Don't record time - - # Python reference so its the same object - ref = scope_data[:, i] - data[x2_shot]["data"]["scope"][i] = ref - data[x2_shot]["data"]["scope"][header] = ref - - - # Find Shock Times - # X2 - Canning Edge - data[x2_shot]["shock-point"] = {} - for ref in dataInfo["pcb-refs"]: - refData = data[x2_shot]["data"]["x2"][ref] - first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, plot=False, print_func=None) - shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1 - - data[x2_shot]["shock-point"][ref] = shock_point, first_value + data[x2_shot]["shock-point"][ref] = shock_point, first_value + + for i, probe in enumerate(dataInfo["probe-info"]["locations"]): + probeCh1 = data[x2_shot]["data"]["scope"][1] + probeCh2 = data[x2_shot]["data"]["scope"][2] - for i, probe in enumerate(dataInfo["probe-info"]["locations"]): - probeCh1 = data[x2_shot]["data"]["scope"][1] - probeCh2 = data[x2_shot]["data"]["scope"][2] - - #[HACK] For detection - TIME_OFFSET = 50 #us - if i > 0: - privPoint = dataInfo["probe-info"]["locations"][i-1] - time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g1"][1] + TIME_OFFSET - post_pres = 0.03 - sigma = 7 - doPlot = True - else: # These work for the first probe int he chain - time_offset = None - post_pres = 0.03 - sigma = 2 - doPlot = False - - shock_point = np.where(probeCh1 >= 0.3)[0] - first_value = scope_time[shock_point] + #[HACK] For detection + if i > 0: + privPoint = dataInfo["probe-info"]["locations"][i-1] + time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g1"][1] + CANNY_TIME_OFFSET + post_pres = 0.2 + sigma = 1 + doPlot = True + else: # These work for the first probe int he chain + time_offset = None + post_pres = 0.03 + sigma = 2 + doPlot = False + + shock_point = np.where(probeCh1 >= 0.3)[0] + first_value = scope_time[shock_point] - first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh1, sigma=sigma, post_suppression_threshold=post_pres, plot=doPlot, start_time=time_offset, print_func=None) - shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1 - - data[x2_shot]["shock-point"][f"{probe}-g1"] = shock_point, first_value - - - #[HACK] For detection - if i > 0: - time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g2"][1] + TIME_OFFSET - + first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh1, sigma=sigma, post_suppression_threshold=post_pres, plot=doPlot, start_time=time_offset, print_func=None) + shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1 + + data[x2_shot]["shock-point"][f"{probe}-g1"] = shock_point, first_value + + + #[HACK] For detection + if i > 0: + time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g2"][1] + CANNY_TIME_OFFSET - shock_point = np.where(probeCh2 >= 0.3)[0] # + offset - first_value = scope_time[shock_point] - first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh2, sigma=sigma, post_suppression_threshold=post_pres, plot=doPlot, start_time=time_offset, print_func=None) - shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1 - data[x2_shot]["shock-point"][f"{probe}-g2"] = shock_point, first_value + shock_point = np.where(probeCh2 >= 0.3)[0] # + offset + first_value = scope_time[shock_point] + + first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh2, sigma=sigma, post_suppression_threshold=post_pres, plot=doPlot, start_time=time_offset, print_func=None) + shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1 + data[x2_shot]["shock-point"][f"{probe}-g2"] = shock_point, first_value + + # Calculate Shock Speeds + print("="*25, x2_shot, "="*25) + for probe in dataInfo["probe-info"]["locations"]: + g1_time = data[x2_shot]["shock-point"][f"{probe}-g1"][1] / 1e6 # Convert to seconds + g2_time = data[x2_shot]["shock-point"][f"{probe}-g2"][1] / 1e6 # Convert to seconds + c2c_dist = dataInfo["probe-info"]["c2c"] / 1000 # convert to m + + probe_velocity = c2c_dist / abs(g2_time - g1_time) # m/s + + print(f"{probe} Measured a shock speed of {probe_velocity:.2f} m/s ({probe_velocity/1000:.2f} km/s)") + + if len(dataInfo["probe-info"]["locations"]) > 1: + for i in range(len(dataInfo["probe-info"]["locations"]) - 1): + probe_locs = dataInfo["probe-info"]["locations"] + p1_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g1"][1] / 1e6 # Convert to seconds + p1_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g2"][1] / 1e6 # Convert to seconds - # Calculate Shock Speeds - print("="*25, x2_shot, "="*25) - for probe in dataInfo["probe-info"]["locations"]: - g1_time = data[x2_shot]["shock-point"][f"{probe}-g1"][1] / 1e6 # Convert to seconds - g2_time = data[x2_shot]["shock-point"][f"{probe}-g2"][1] / 1e6 # Convert to seconds - c2c_dist = dataInfo["probe-info"]["c2c"] / 1000 # convert to m + p2_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g1"][1] / 1e6 # Convert to seconds + p2_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g2"][1] / 1e6 # Convert to seconds - probe_velocity = c2c_dist / abs(g2_time - g1_time) # m/s + p2p = (TUNNEL_INFO["distance"][probe_locs[1]] - TUNNEL_INFO["distance"][probe_locs[0]]) / 1000 # convert to m + + p2p_1 = p2p / abs(p2_g1_time - p1_g1_time) # m/s + p2p_2 = p2p / abs(p2_g2_time - p1_g2_time) # m/s - print(f"{probe} Measured a shock speed of {probe_velocity:.2f} m/s ({probe_velocity/1000:.2f} km/s)") - - if len(dataInfo["probe-info"]["locations"]) > 1: - for i in range(len(dataInfo["probe-info"]["locations"]) - 1): - probe_locs = dataInfo["probe-info"]["locations"] - p1_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g1"][1] / 1e6 # Convert to seconds - p1_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g2"][1] / 1e6 # Convert to seconds - - p2_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g1"][1] / 1e6 # Convert to seconds - p2_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g2"][1] / 1e6 # Convert to seconds - - p2p = (TUNNEL_INFO["distance"][probe_locs[1]] - TUNNEL_INFO["distance"][probe_locs[0]]) / 1000 # convert to m - - p2p_1 = p2p / abs(p2_g1_time - p1_g1_time) # m/s - p2p_2 = p2p / abs(p2_g2_time - p1_g2_time) # m/s - - print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G1 - Measured a shock speed of {p2p_1:.2f} m/s ({p2p_1/1000:.2f} km/s)") - print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G2 - Measured a shock speed of {p2p_2:.2f} m/s ({p2p_2/1000:.2f} km/s)") - print() + print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G1 - Measured a shock speed of {p2p_1:.2f} m/s ({p2p_1/1000:.2f} km/s)") + print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G2 - Measured a shock speed of {p2p_2:.2f} m/s ({p2p_2/1000:.2f} km/s)") + print() # Return the data & the successfully loaded data keys - return data, tuple(data.keys()) - -data, loaded_data = load_data(data_to_load) + return data #, tuple(data.keys()) + +data = {} +for dp in data_to_load: + pdp = f"{DATA_PATH}/{dp}/" + load_data(pdp, data) + +loaded_data = tuple(data.keys()) + print("Loaded Data")