Thesis/main.py

237 lines
7.9 KiB
Python
Raw Normal View History

2024-10-15 15:11:10 +10:00
# Cal Wing (c.wing@uq.net.au) - Oct 2024
# Thesis Graphing
2024-09-30 19:30:28 +10:00
import os
2024-09-30 19:13:09 +10:00
import numpy as np
2024-10-15 15:11:10 +10:00
import pandas as pd
import yaml
2024-09-30 19:13:09 +10:00
from nptdms import TdmsFile
from makeGraph import makeGraph, pltKeyClose, UQ_COLOURS as UQC
2024-10-15 20:33:26 +10:00
from canny_shock_finder import canny_shock_finder
2024-09-30 19:30:28 +10:00
# Folder correction
# Make sure the relevant folders folder exists
folders = ["./images"]
for folder in folders:
if not os.path.isdir(folder): os.mkdir(folder)
2024-10-16 22:09:24 +10:00
# Data Paths
2024-10-15 15:11:10 +10:00
DATA_PATH = "./data"
DATA_INFO = "_info.yaml"
2024-10-16 22:09:24 +10:00
TUNNEL_INFO_FILE = "./tunnel-info.yaml"
SAMPLES_TO_AVG = 500
2024-10-16 20:57:25 +10:00
2024-10-16 22:09:24 +10:00
with open(TUNNEL_INFO_FILE, 'r') as file:
TUNNEL_INFO = yaml.safe_load(file)
2024-09-30 16:45:52 +10:00
2024-10-15 15:11:10 +10:00
data_to_load = [
2024-10-15 18:28:10 +10:00
"x2s5823",
2024-10-16 19:59:49 +10:00
"x2s5824",
"x2s5827"
2024-10-15 15:11:10 +10:00
]
2024-09-30 16:45:52 +10:00
2024-10-16 22:09:24 +10:00
# ==== Data Loading & Processing ====
def load_data(data_to_load: list[str]) -> dict:
data = {}
for dp in data_to_load:
data_path = f"{DATA_PATH}/{dp}/"
data_info_path = data_path + DATA_INFO
if not os.path.exists(data_info_path):
print(f"[ERR] Could not find data info file: '{data_info_path}'")
print(f"[WARN] Not Loading Data '{dp}'")
continue
# Load Shot Data Info YAML File (Cal)
with open(data_info_path, 'r') as file:
dataInfo = yaml.safe_load(file)
# Grab the shot name
2024-10-15 15:11:10 +10:00
x2_shot = dataInfo["shot-info"]["name"]
2024-10-16 22:09:24 +10:00
# Load Raw Data
# TDMS File (X2 DAQ Data)
2024-10-16 19:31:51 +10:00
x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True)
2024-10-15 15:11:10 +10:00
x2_channels = x2_tdms_data.groups()[0].channels()
2024-10-16 22:09:24 +10:00
x2_channel_names = tuple(c.name for c in x2_channels)
# Scope info _if it exists_
2024-10-15 15:11:10 +10:00
if dataInfo["probe-info"]["data-record"]["type"] == "scope":
scope_data_path = data_path + dataInfo["probe-info"]["data-record"]["data"]
2024-10-16 22:09:24 +10:00
scope_config_path = data_path + dataInfo["probe-info"]["data-record"]["config"] # [TODO] Read this file
2024-10-15 15:11:10 +10:00
2024-10-16 22:09:24 +10:00
# Generate Data Headers - This could be better
2024-10-15 15:11:10 +10:00
with open(scope_data_path, 'r') as dfile:
scope_header = []
2024-10-16 22:09:24 +10:00
2024-10-15 15:11:10 +10:00
header_lines = []
for i, line in enumerate(dfile):
if i > 1: break
header_lines.append(line.strip().split(","))
for i, name in enumerate(header_lines[0]):
if name == "x-axis":
name = "Time"
if header_lines[1][i] in ["second", "Volt"]:
outStr = f"{name} [{header_lines[1][i][0]}]"
else:
outStr = f"{name} [{header_lines[1][i]}]"
2024-10-16 22:09:24 +10:00
2024-10-15 15:11:10 +10:00
scope_header.append(outStr)
2024-10-16 22:09:24 +10:00
# Load the Scope CSV Data
2024-10-15 18:28:10 +10:00
scope_data = np.loadtxt(scope_data_path, delimiter=',', skiprows=2)
2024-10-15 15:11:10 +10:00
2024-10-16 22:09:24 +10:00
# Build a data object (this could be cached - or partially cached if I was clever enough)
# Raw Data is always added - processing comes after
2024-10-15 15:11:10 +10:00
data[x2_shot] = {
"info": dataInfo,
2024-10-16 22:09:24 +10:00
"shot_time": np.datetime64(f"{dataInfo["date"]}T{dataInfo["time"]}"),
"raw-data":{
"probe_headers": scope_header,
"probes": scope_data,
"x2": x2_channels,
"x2-channels": x2_channel_names,
"x2-tdms": x2_tdms_data
},
"time": {
"x2": None,
"trigger_index": None
},
"data": {
"x2": {} # Only pop channels with a voltage scale in ./tunnel-info.yaml
}
2024-09-30 19:30:28 +10:00
}
2024-10-16 22:09:24 +10:00
# === Process the data ===
# Generate X2 time arrays
time_data = x2_channels[0]
second_fractions = np.array(time_data[:].second_fractions, dtype=int) # 2^-64 ths of a second
x2_time_seconds = (second_fractions - second_fractions[0]) * (2**(-64)) # 0 time data and convert to seconds
x2_time_us = x2_time_seconds * 1000 # Scale to ms
# --- Un Scale Data ---
for channel, vScale in TUNNEL_INFO["volt-scale"].items():
# Get the channel index from its name
chIndex = x2_channel_names.index(channel)
# Calculate the average noise offset
avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean()
# Save the channel data
data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale
# Process Trigger Info
trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset
x2_trigger_index = np.where(trigger_volts > 1)[0][0]
x2_trigger_time = x2_time_us[x2_trigger_index]
# Add the time data
data[x2_shot]["time"] = {
"x2": x2_time_us,
"trigger_index": x2_trigger_index
}
2024-10-16 19:31:51 +10:00
2024-10-16 22:09:24 +10:00
# Scope timing _if it exists_
if dataInfo["probe-info"]["data-record"]["type"] == "scope":
trigger_info = dataInfo["probe-info"]["data-record"]["trigger"] # Get the scope trigger info
2024-10-16 19:31:51 +10:00
2024-10-16 22:09:24 +10:00
scope_time = (scope_data[:, 0] - scope_data[0, 0]) * 1000 # to us
scope_time -= trigger_info["alignment-offset"] # manual offset delay
scope_time += trigger_info["delay"] # us delay from the actual trigger signal to the scope received trigger
2024-10-16 19:31:51 +10:00
2024-10-16 22:09:24 +10:00
# Trigger Alignment
scope_trigger_volts = (scope_data[:, 3] - scope_data[0:SAMPLES_TO_AVG, 3].mean()) # Use a mean here too
scope_trigger_index = np.where(scope_trigger_volts > 1)[0][0]
scope_trigger_time = scope_time[scope_trigger_index]
2024-10-15 20:33:26 +10:00
2024-10-16 22:09:24 +10:00
scope_alignment = x2_trigger_time - scope_trigger_time
2024-10-15 20:33:26 +10:00
2024-10-16 22:09:24 +10:00
scope_time += scope_alignment
2024-10-15 20:33:26 +10:00
2024-10-16 22:09:24 +10:00
data[x2_shot]["time"]["scope"] = scope_time
data[x2_shot]["time"]["scope-offset"] = scope_alignment
2024-10-15 20:33:26 +10:00
2024-10-16 22:09:24 +10:00
data[x2_shot]["data"]["scope"] = {}
for i, header in enumerate(scope_header):
if i == 0: continue # Don't record time
data[x2_shot]["data"]["scope"][header] = scope_data[i]
2024-10-16 19:31:51 +10:00
2024-10-15 20:33:26 +10:00
2024-10-16 22:09:24 +10:00
# Return the data & the successfully loaded data keys
return data, tuple(data.keys())
data, loaded_data = load_data(data_to_load)
print("Loaded Data")
2024-10-15 20:33:26 +10:00
2024-10-16 22:09:24 +10:00
#[TODO] Refactor
def genGraph(gData: dict, showPlot: bool = True):
2024-10-15 20:33:26 +10:00
graphData = {
"title": f"Shock response Time\nFor {gData['info']['long_name']}",
2024-10-15 20:33:26 +10:00
"xLabel": "Time (ns)",
"yLabel": "Voltage Reading (V)",
"grid": True,
"plots": [
{
"x": x2_time,
"y": (gData["x2"][4][:] - gData["x2"][4][0]) * 0.0148,
"label": "ST1"
},
{
"x": x2_time,
"y": (gData["x2"][6][:] - gData["x2"][6][0]) * 0.0148,
"label": "ST3"
},
{
"x": x2_time,
"y": (gData["x2"][16][:] - gData["x2"][16][0])/1000,
"label": "Trigger"
},
{
"x": scope_time,
"y": (gData["probes"][:, 1] - gData["probes"][0, 1]),
"label": "ST2-G1"
},
{
"x": scope_time,
"y": (gData["probes"][:, 2] - gData["probes"][0, 2]),
"label": "ST2-G2"
},
{
"x": scope_time,
"y": (gData["probes"][:, 3] - gData["probes"][0, 3]),
"label": "ST2-Trigger"
},
]
}
2024-10-16 19:31:51 +10:00
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath="./images/{0}.png")
2024-10-15 20:33:26 +10:00
2024-10-15 21:30:01 +10:00
2024-10-15 18:28:10 +10:00
2024-10-16 22:09:24 +10:00
#print("Graphing showPlot=showPlot, Data")
#genGraph(data[loaded_data[0]], showPlot=False)
#genGraph(data[loaded_data[1]], showPlot=False)
2024-10-15 20:33:26 +10:00
2024-10-15 21:25:37 +10:00
2024-10-16 22:09:24 +10:00
#x2_out = canny_shock_finder(x2_time, (gData["raw-data"]["x2"][16][:] - gData["raw-data"]["x2"][16][0]))
2024-10-15 20:33:26 +10:00
2024-10-16 22:09:24 +10:00
#print(x2_out)
2024-10-15 21:30:01 +10:00
2024-10-16 22:09:24 +10:00
# This forces matplotlib to hang until I tell it to close all windows
2024-10-15 21:28:53 +10:00
pltKeyClose()
2024-10-15 18:28:10 +10:00
2024-10-15 21:30:01 +10:00
print("Done")