Compare commits

...

2 Commits

Author SHA1 Message Date
Cal W
bdf25d5736 Add canny args to _info.yaml 2024-10-18 00:03:22 +10:00
Cal W
ba1136b0ea quick data load refactor 2024-10-17 22:58:30 +10:00
6 changed files with 256 additions and 210 deletions

View File

@ -12,6 +12,11 @@ shot-info:
config: "x2s5823.config" config: "x2s5823.config"
info: "x2s5823.txt" info: "x2s5823.txt"
# Canny Args
canny-args:
- sigma: 2
post_pres: 0.03
pcb-refs: pcb-refs:
- "st1" - "st1"
- "st3" - "st3"

View File

@ -12,6 +12,11 @@ shot-info:
config: "x2s5824.config" config: "x2s5824.config"
info: "x2s5824.txt" info: "x2s5824.txt"
# Canny Args
canny-args:
- sigma: 2
post_pres: 0.03
pcb-refs: pcb-refs:
- "st1" - "st1"
- "st3" - "st3"

View File

@ -12,6 +12,14 @@ shot-info:
config: "x2s5827.config" config: "x2s5827.config"
info: "x2s5827.txt" info: "x2s5827.txt"
# Canny Args
canny-args:
- sigma: 2
post_pres: 0.03
- sigma: 0.75
post_pres: 0.1
pcb-refs: pcb-refs:
- "st1" - "st1"

View File

@ -12,6 +12,14 @@ shot-info:
config: "x2s5829.config" config: "x2s5829.config"
info: "x2s5829.txt" info: "x2s5829.txt"
# Canny Args
canny-args:
- sigma: 2
post_pres: 0.03
- sigma: 1
post_pres: 0.2
pcb-refs: pcb-refs:
- "st1" - "st1"

436
main.py
View File

@ -24,241 +24,248 @@ DATA_PATH = "./data"
DATA_INFO = "_info.yaml" DATA_INFO = "_info.yaml"
TUNNEL_INFO_FILE = "./tunnel-info.yaml" TUNNEL_INFO_FILE = "./tunnel-info.yaml"
SAMPLES_TO_AVG = 500 SAMPLES_TO_AVG = 500
CANNY_TIME_OFFSET = 50 #us
with open(TUNNEL_INFO_FILE, 'r') as file: with open(TUNNEL_INFO_FILE, 'r') as file:
TUNNEL_INFO = yaml.safe_load(file) TUNNEL_INFO = yaml.safe_load(file)
data_to_load = [ data_to_load = [
#"x2s5823", "x2s5823",
#"x2s5824", "x2s5824",
#"x2s5827", "x2s5827",
"x2s5829", "x2s5829",
] ]
# ==== Data Loading & Processing ==== # ==== Data Loading & Processing ====
def load_data(data_to_load: list[str]) -> dict: def load_data(data_path: str, data={}) -> dict:
data = {} data_info_path = data_path + DATA_INFO
for dp in data_to_load: if not os.path.exists(data_info_path):
data_path = f"{DATA_PATH}/{dp}/" print(f"[ERR] Could not find data info file: '{data_info_path}'")
data_info_path = data_path + DATA_INFO print(f"[WARN] Not Loading Data '{data_path}'")
if not os.path.exists(data_info_path): return None
print(f"[ERR] Could not find data info file: '{data_info_path}'")
print(f"[WARN] Not Loading Data '{dp}'") # Load Shot Data Info YAML File (Cal)
continue with open(data_info_path, 'r') as file:
dataInfo = yaml.safe_load(file)
# Grab the shot name
x2_shot = dataInfo["shot-info"]["name"]
# Load Shot Data Info YAML File (Cal) # Load Raw Data
with open(data_info_path, 'r') as file: # TDMS File (X2 DAQ Data)
dataInfo = yaml.safe_load(file) x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True)
x2_channels = x2_tdms_data.groups()[0].channels()
x2_channel_names = tuple(c.name for c in x2_channels)
# Grab the shot name # Scope info _if it exists_
x2_shot = dataInfo["shot-info"]["name"] if dataInfo["probe-info"]["data-record"]["type"] == "scope":
scope_data_path = data_path + dataInfo["probe-info"]["data-record"]["data"]
# Load Raw Data scope_config_path = data_path + dataInfo["probe-info"]["data-record"]["config"] # [TODO] Read this file
# TDMS File (X2 DAQ Data)
x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True)
x2_channels = x2_tdms_data.groups()[0].channels()
x2_channel_names = tuple(c.name for c in x2_channels)
# Scope info _if it exists_
if dataInfo["probe-info"]["data-record"]["type"] == "scope":
scope_data_path = data_path + dataInfo["probe-info"]["data-record"]["data"]
scope_config_path = data_path + dataInfo["probe-info"]["data-record"]["config"] # [TODO] Read this file
# Generate Data Headers - This could be better # Generate Data Headers - This could be better
with open(scope_data_path, 'r') as dfile: with open(scope_data_path, 'r') as dfile:
scope_header = [] scope_header = []
header_lines = [] header_lines = []
for i, line in enumerate(dfile): for i, line in enumerate(dfile):
if i > 1: break if i > 1: break
header_lines.append(line.strip().split(",")) header_lines.append(line.strip().split(","))
for i, name in enumerate(header_lines[0]):
if name == "x-axis":
name = "Time"
for i, name in enumerate(header_lines[0]): if header_lines[1][i] in ["second", "Volt"]:
if name == "x-axis": outStr = f"{name} [{header_lines[1][i][0]}]"
name = "Time" else:
outStr = f"{name} [{header_lines[1][i]}]"
if header_lines[1][i] in ["second", "Volt"]:
outStr = f"{name} [{header_lines[1][i][0]}]" scope_header.append(outStr)
else:
outStr = f"{name} [{header_lines[1][i]}]"
scope_header.append(outStr)
# Load the Scope CSV Data # Load the Scope CSV Data
scope_data = np.loadtxt(scope_data_path, delimiter=',', skiprows=2) scope_data = np.loadtxt(scope_data_path, delimiter=',', skiprows=2)
# Build a data object (this could be cached - or partially cached if I was clever enough) # Build a data object (this could be cached - or partially cached if I was clever enough)
# Raw Data is always added - processing comes after # Raw Data is always added - processing comes after
data[x2_shot] = { data[x2_shot] = {
"info": dataInfo, "info": dataInfo,
"shot_time": np.datetime64(f"{dataInfo["date"]}T{dataInfo["time"]}"), "shot_time": np.datetime64(f"{dataInfo["date"]}T{dataInfo["time"]}"),
"raw-data":{ "raw-data":{
"probe_headers": scope_header, "probe_headers": scope_header,
"probes": scope_data, "probes": scope_data,
"x2": x2_channels, "x2": x2_channels,
"x2-channels": x2_channel_names, "x2-channels": x2_channel_names,
"x2-tdms": x2_tdms_data "x2-tdms": x2_tdms_data
}, },
"time": { "time": {
"x2": None, "x2": None,
"trigger_index": None "trigger_index": None
}, },
"data": { "data": {
"x2": {} # Only pop channels with a voltage scale in ./tunnel-info.yaml "x2": {} # Only pop channels with a voltage scale in ./tunnel-info.yaml
} },
} "shock-speed": {}
}
# === Process the data === # === Process the data ===
# Generate X2 time arrays # Generate X2 time arrays
time_data = x2_channels[0] time_data = x2_channels[0]
ns_time = time_data[:].as_datetime64('ns')
x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns]
x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us
#second_fractions = np.array(time_data[:].second_fractions, dtype=int) # 2^-64 ths of a second
#x2_time_seconds = (second_fractions - second_fractions[0]) / (2**(-64)) # 0 time data and convert to seconds
#x2_time_us = x2_time_seconds * 1000 # Scale to us
# --- Un Scale Data ---
for channel, vScale in TUNNEL_INFO["volt-scale"].items():
# Get the channel index from its name
chIndex = x2_channel_names.index(channel)
# Calculate the average noise offset
avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean()
# Save the channel data
data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale
# Process Trigger Info
trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset
x2_trigger_index = np.where(trigger_volts > 1)[0][0]
x2_trigger_time = x2_time_us[x2_trigger_index]
# Add the time data
data[x2_shot]["time"] = {
"x2": x2_time_us,
"trigger_index": x2_trigger_index
}
# Scope timing _if it exists_
if dataInfo["probe-info"]["data-record"]["type"] == "scope":
trigger_info = dataInfo["probe-info"]["data-record"]["trigger"] # Get the scope trigger info
# Calc the scope time & apply any manual offsets
scope_time = (scope_data[:, 0] - scope_data[0, 0]) * 1e6 # to us
scope_time -= trigger_info["alignment-offset"] # manual offset delay
# Trigger Alignment
scope_trigger_volts = (scope_data[:, 3] - scope_data[0:SAMPLES_TO_AVG, 3].mean()) # Use a mean here too
scope_trigger_index = np.where(scope_trigger_volts > 1)[0][0]
scope_trigger_time = scope_time[scope_trigger_index]
scope_alignment = x2_trigger_time - scope_trigger_time
scope_time += scope_alignment
# Offset any trigger delays
scope_time += trigger_info["delay"] # us delay from the actual trigger signal to the scope received trigger
data[x2_shot]["time"]["scope"] = scope_time
data[x2_shot]["time"]["scope-offset"] = scope_alignment
data[x2_shot]["data"]["scope"] = {}
for i, header in enumerate(scope_header):
if i == 0: continue # Don't record time
# Python reference so its the same object
ref = scope_data[:, i]
data[x2_shot]["data"]["scope"][i] = ref
data[x2_shot]["data"]["scope"][header] = ref
# Find Shock Times
# X2 - Canning Edge
data[x2_shot]["shock-point"] = {}
for ref in dataInfo["pcb-refs"]:
refData = data[x2_shot]["data"]["x2"][ref]
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, plot=False, print_func=None)
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
ns_time = time_data[:].as_datetime64('ns') data[x2_shot]["shock-point"][ref] = shock_point, first_value
x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns]
x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us for i, probe in enumerate(dataInfo["probe-info"]["locations"]):
probeCh1 = data[x2_shot]["data"]["scope"][1]
#second_fractions = np.array(time_data[:].second_fractions, dtype=int) # 2^-64 ths of a second probeCh2 = data[x2_shot]["data"]["scope"][2]
#x2_time_seconds = (second_fractions - second_fractions[0]) / (2**(-64)) # 0 time data and convert to seconds
#x2_time_us = x2_time_seconds * 1000 # Scale to us
# --- Un Scale Data ---
for channel, vScale in TUNNEL_INFO["volt-scale"].items():
# Get the channel index from its name
chIndex = x2_channel_names.index(channel)
# Calculate the average noise offset
avg_noise = x2_channels[chIndex][0:SAMPLES_TO_AVG].mean()
# Save the channel data
data[x2_shot]["data"]["x2"][channel] = (x2_channels[chIndex][:] - avg_noise) * vScale
# Process Trigger Info
trigger_volts = data[x2_shot]["data"]["x2"]["trigbox"] # Use a mean to offset
x2_trigger_index = np.where(trigger_volts > 1)[0][0]
x2_trigger_time = x2_time_us[x2_trigger_index]
# Add the time data
data[x2_shot]["time"] = {
"x2": x2_time_us,
"trigger_index": x2_trigger_index
}
# Scope timing _if it exists_
if dataInfo["probe-info"]["data-record"]["type"] == "scope":
trigger_info = dataInfo["probe-info"]["data-record"]["trigger"] # Get the scope trigger info
# Calc the scope time & apply any manual offsets
scope_time = (scope_data[:, 0] - scope_data[0, 0]) * 1e6 # to us
scope_time -= trigger_info["alignment-offset"] # manual offset delay
# Trigger Alignment
scope_trigger_volts = (scope_data[:, 3] - scope_data[0:SAMPLES_TO_AVG, 3].mean()) # Use a mean here too
scope_trigger_index = np.where(scope_trigger_volts > 1)[0][0]
scope_trigger_time = scope_time[scope_trigger_index]
scope_alignment = x2_trigger_time - scope_trigger_time
scope_time += scope_alignment
# Offset any trigger delays
scope_time += trigger_info["delay"] # us delay from the actual trigger signal to the scope received trigger
data[x2_shot]["time"]["scope"] = scope_time
data[x2_shot]["time"]["scope-offset"] = scope_alignment
data[x2_shot]["data"]["scope"] = {}
for i, header in enumerate(scope_header):
if i == 0: continue # Don't record time
# Python reference so its the same object
ref = scope_data[:, i]
data[x2_shot]["data"]["scope"][i] = ref
data[x2_shot]["data"]["scope"][header] = ref
# Find Shock Times
# X2 - Canning Edge
data[x2_shot]["shock-point"] = {}
for ref in dataInfo["pcb-refs"]:
refData = data[x2_shot]["data"]["x2"][ref]
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, plot=False, print_func=None)
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"][ref] = shock_point, first_value
for i, probe in enumerate(dataInfo["probe-info"]["locations"]): # Get the canny-args
probeCh1 = data[x2_shot]["data"]["scope"][1] cArgs = dataInfo["canny-args"]
probeCh2 = data[x2_shot]["data"]["scope"][2] doCannyPlot = False
doCannyPlot = i > 0 and x2_shot == "x2s5827"
#[HACK] For detection if i in range(len(cArgs)):
TIME_OFFSET = 50 #us sigma = cArgs[i]["sigma"]
if i > 0: post_pres = cArgs[i]["post_pres"]
privPoint = dataInfo["probe-info"]["locations"][i-1] else:
time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g1"][1] + TIME_OFFSET sigma = cArgs[-1]["sigma"]
post_pres = 0.03 post_pres = cArgs[-1]["post_pres"]
sigma = 7
doPlot = True
else: # These work for the first probe int he chain
time_offset = None
post_pres = 0.03
sigma = 2
doPlot = False
shock_point = np.where(probeCh1 >= 0.3)[0]
first_value = scope_time[shock_point]
first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh1, sigma=sigma, post_suppression_threshold=post_pres, plot=doPlot, start_time=time_offset, print_func=None) # If this _isn't_ the first probe then apply a time offset
shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1 if i > 0:
privPoint = dataInfo["probe-info"]["locations"][i-1]
data[x2_shot]["shock-point"][f"{probe}-g1"] = shock_point, first_value time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g1"][1] + CANNY_TIME_OFFSET
else:
time_offset = None
#[HACK] For detection
if i > 0:
time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g2"][1] + TIME_OFFSET
shock_point = np.where(probeCh2 >= 0.3)[0] # + offset # Find G1 Shock Time
first_value = scope_time[shock_point] first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh1, sigma=sigma, post_suppression_threshold=post_pres, plot=doCannyPlot, start_time=time_offset, print_func=None)
shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"][f"{probe}-g1"] = shock_point, first_value
first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh2, sigma=sigma, post_suppression_threshold=post_pres, plot=doPlot, start_time=time_offset, print_func=None) # Do the same for G2
shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1 if i > 0:
data[x2_shot]["shock-point"][f"{probe}-g2"] = shock_point, first_value time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g2"][1] + CANNY_TIME_OFFSET
# Find G2 Shock Time
first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh2, sigma=sigma, post_suppression_threshold=post_pres, plot=doCannyPlot, start_time=time_offset, print_func=None)
shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"][f"{probe}-g2"] = shock_point, first_value
# Calculate Shock Speeds
print("="*25, x2_shot, "="*25)
for probe in dataInfo["probe-info"]["locations"]:
g1_time = data[x2_shot]["shock-point"][f"{probe}-g1"][1] / 1e6 # Convert to seconds
g2_time = data[x2_shot]["shock-point"][f"{probe}-g2"][1] / 1e6 # Convert to seconds
c2c_dist = dataInfo["probe-info"]["c2c"] / 1000 # convert to m
probe_velocity = c2c_dist / abs(g2_time - g1_time) # m/s
print(f"{probe} Measured a shock speed of {probe_velocity:.2f} m/s ({probe_velocity/1000:.2f} km/s)")
data[x2_shot]["shock-speed"][probe] = probe_velocity # m/s
if len(dataInfo["probe-info"]["locations"]) > 1:
for i in range(len(dataInfo["probe-info"]["locations"]) - 1):
probe_locs = dataInfo["probe-info"]["locations"]
p1_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g1"][1] / 1e6 # Convert to seconds
p1_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g2"][1] / 1e6 # Convert to seconds
# Calculate Shock Speeds p2_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g1"][1] / 1e6 # Convert to seconds
print("="*25, x2_shot, "="*25) p2_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g2"][1] / 1e6 # Convert to seconds
for probe in dataInfo["probe-info"]["locations"]:
g1_time = data[x2_shot]["shock-point"][f"{probe}-g1"][1] / 1e6 # Convert to seconds
g2_time = data[x2_shot]["shock-point"][f"{probe}-g2"][1] / 1e6 # Convert to seconds
c2c_dist = dataInfo["probe-info"]["c2c"] / 1000 # convert to m
probe_velocity = c2c_dist / abs(g2_time - g1_time) # m/s p2p = (TUNNEL_INFO["distance"][probe_locs[1]] - TUNNEL_INFO["distance"][probe_locs[0]]) / 1000 # convert to m
p2p_1 = p2p / abs(p2_g1_time - p1_g1_time) # m/s
p2p_2 = p2p / abs(p2_g2_time - p1_g2_time) # m/s
print(f"{probe} Measured a shock speed of {probe_velocity:.2f} m/s ({probe_velocity/1000:.2f} km/s)") print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G1 - Measured a shock speed of {p2p_1:.2f} m/s ({p2p_1/1000:.2f} km/s)")
print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G2 - Measured a shock speed of {p2p_2:.2f} m/s ({p2p_2/1000:.2f} km/s)")
data[x2_shot]["shock-speed"][f"{probe_locs[i]}-{probe_locs[i + 1]}-g1"] = p2p_1
data[x2_shot]["shock-speed"][f"{probe_locs[i]}-{probe_locs[i + 1]}-g2"] = p2p_2
if len(dataInfo["probe-info"]["locations"]) > 1: print()
for i in range(len(dataInfo["probe-info"]["locations"]) - 1):
probe_locs = dataInfo["probe-info"]["locations"]
p1_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g1"][1] / 1e6 # Convert to seconds
p1_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g2"][1] / 1e6 # Convert to seconds
p2_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g1"][1] / 1e6 # Convert to seconds
p2_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g2"][1] / 1e6 # Convert to seconds
p2p = (TUNNEL_INFO["distance"][probe_locs[1]] - TUNNEL_INFO["distance"][probe_locs[0]]) / 1000 # convert to m
p2p_1 = p2p / abs(p2_g1_time - p1_g1_time) # m/s
p2p_2 = p2p / abs(p2_g2_time - p1_g2_time) # m/s
print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G1 - Measured a shock speed of {p2p_1:.2f} m/s ({p2p_1/1000:.2f} km/s)")
print(f"{probe_locs[i]}-{probe_locs[i + 1]} - G2 - Measured a shock speed of {p2p_2:.2f} m/s ({p2p_2/1000:.2f} km/s)")
print()
# Return the data & the successfully loaded data keys # Return the data & the successfully loaded data keys
return data, tuple(data.keys()) return data #, tuple(data.keys())
data, loaded_data = load_data(data_to_load) data = {}
for dp in data_to_load:
pdp = f"{DATA_PATH}/{dp}/"
load_data(pdp, data)
loaded_data = tuple(data.keys())
print("Loaded Data") print("Loaded Data")
@ -269,6 +276,8 @@ def genGraph(gData: dict, showPlot: bool = True):
"xLabel": "Time ($\\mu$s)", "xLabel": "Time ($\\mu$s)",
"yLabel": "Voltage Reading (V)", "yLabel": "Voltage Reading (V)",
"grid": True, "grid": True,
"figSize": (8,6.5),
"ledgLoc": 'upper left',
"plots": [] "plots": []
} }
@ -318,9 +327,20 @@ def genGraph(gData: dict, showPlot: bool = True):
lims.append(gData["shock-point"][f"{probe}-g2"][1]) lims.append(gData["shock-point"][f"{probe}-g2"][1])
lims.append(gData["shock-point"][f"{probe}-g1"][1]) lims.append(gData["shock-point"][f"{probe}-g1"][1])
probeText = ""
for shock_speed_loc in gData["shock-speed"]:
probeText += f"\n{shock_speed_loc} - {gData["shock-speed"][shock_speed_loc]/1000:.2f} km/s"
graphData["plots"].append({
"type": "text",
"text": f"Measured Shock Speeds{probeText}",
"align": ("top", "right"),
"x": 0.94, "y": 0.94
})
if len(lims) > 1: if len(lims) > 1:
OFFSET = 10 OFFSET = 10
#graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET)) graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath="./images/{0}.png") makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath="./images/{0}.png")
@ -329,7 +349,7 @@ def genGraph(gData: dict, showPlot: bool = True):
print("Graphing Data") print("Graphing Data")
for shot in loaded_data: for shot in loaded_data:
#if shot != loaded_data[-1]: continue if shot != loaded_data[-2]: continue
genGraph(data[shot], showPlot=False) genGraph(data[shot], showPlot=False)
# This forces matplotlib to hang until I tell it to close all windows # This forces matplotlib to hang until I tell it to close all windows

View File

@ -378,8 +378,8 @@ def makeGraph(graphData, showPlot=True, doProgramBlock=True, figSavePath=None, h
} }
align = ( align = (
getSafeValue("valign", None), getSafeValue("valign", "center"),
getSafeValue("halign", None), getSafeValue("halign", "center"),
) )
align = getSafeValue("align", align) align = getSafeValue("align", align)