debug shit

This commit is contained in:
Cal Wing 2024-10-18 21:08:55 +10:00
parent 61e9caaf4e
commit c5f24cb0bf
4 changed files with 53 additions and 44 deletions

View File

@ -25,6 +25,9 @@ canny-args:
pcb-refs: pcb-refs:
- "st1" - "st1"
no-graph:
- "None"
probe-info: probe-info:
type: "Thin" type: "Thin"
locations: # In order of pulse locations: # In order of pulse

View File

@ -18,7 +18,7 @@ canny-args:
post_pres: 0.03 post_pres: 0.03
- sigma: 1 - sigma: 1
post_pres: 0.2 post_pres: 0.05
no-graph: no-graph:
- "at1" - "at1"

View File

@ -1,7 +1,7 @@
# Data Info File # Data Info File
# Cal Wing - Oct 24 # Cal Wing - Oct 24
long_name: "Shot 7 (x2s5829) - Thin Probe Set (ST1, ST2 & ST3) - 2024-10-18 - Flush ST3 - - Low Pressure, 'Pure' Air" long_name: "Shot 7 (x2s5829) - Thin Probe Set (ST1, ST2 & ST3) - 2024-10-18\nFlush ST3 - Low Pressure, 'Pure' Air"
name: "Shot 6" name: "Shot 6"
date: "2024-10-18" date: "2024-10-18"
time: "15:58" time: "15:58"
@ -19,15 +19,19 @@ canny-args:
- sigma: 1 - sigma: 1
post_pres: 0.2 post_pres: 0.2
- sigma: 1
post_pres: 0.05
no-graph: no-graph:
- "at1" - "None"
- "at2" # - "at1"
- "at3" # - "at2"
# - "at3"
- "at4" # - "at4"
- "at5" # - "at5"
- "at6" # - "at6"
pcb-refs: pcb-refs:
- "at1" - "at1"

74
main.py
View File

@ -33,8 +33,10 @@ data_to_load = [
#"x2s5823", #"x2s5823",
#"x2s5824", #"x2s5824",
#"x2s5827", #"x2s5827",
#"x2s5829", "x2s5829",
"x2s5830", #"x2s5830",
"x2s5831",
"x2s5832"
] ]
# ==== Data Loading & Processing ==== # ==== Data Loading & Processing ====
@ -44,11 +46,11 @@ def load_data(data_path: str, data={}) -> dict:
print(f"[ERR] Could not find data info file: '{data_info_path}'") print(f"[ERR] Could not find data info file: '{data_info_path}'")
print(f"[WARN] Not Loading Data '{data_path}'") print(f"[WARN] Not Loading Data '{data_path}'")
return None return None
# Load Shot Data Info YAML File (Cal) # Load Shot Data Info YAML File (Cal)
with open(data_info_path, 'r') as file: with open(data_info_path, 'r') as file:
dataInfo = yaml.safe_load(file) dataInfo = yaml.safe_load(file)
# Grab the shot name # Grab the shot name
x2_shot = dataInfo["shot-info"]["name"] x2_shot = dataInfo["shot-info"]["name"]
@ -56,13 +58,13 @@ def load_data(data_path: str, data={}) -> dict:
dataInfo["shot-info"]["tdms"] = dataInfo["shot-info"]["tdms"].format(x2_shot) dataInfo["shot-info"]["tdms"] = dataInfo["shot-info"]["tdms"].format(x2_shot)
dataInfo["shot-info"]["config"] = dataInfo["shot-info"]["config"].format(x2_shot) dataInfo["shot-info"]["config"] = dataInfo["shot-info"]["config"].format(x2_shot)
dataInfo["shot-info"]["info"] = dataInfo["shot-info"]["info"].format(x2_shot) dataInfo["shot-info"]["info"] = dataInfo["shot-info"]["info"].format(x2_shot)
# Load Raw Data # Load Raw Data
# TDMS File (X2 DAQ Data) # TDMS File (X2 DAQ Data)
x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True) x2_tdms_data = TdmsFile.read(data_path + dataInfo["shot-info"]['tdms'], raw_timestamps=True)
x2_channels = x2_tdms_data.groups()[0].channels() x2_channels = x2_tdms_data.groups()[0].channels()
x2_channel_names = tuple(c.name for c in x2_channels) x2_channel_names = tuple(c.name for c in x2_channels)
data_locs = [dr["type"] for dr in dataInfo["probe-info"]["data-records"]] data_locs = [dr["type"] for dr in dataInfo["probe-info"]["data-records"]]
# Scope info _if it exists_ # Scope info _if it exists_
@ -80,16 +82,16 @@ def load_data(data_path: str, data={}) -> dict:
for i, line in enumerate(dfile): for i, line in enumerate(dfile):
if i > 1: break if i > 1: break
header_lines.append(line.strip().split(",")) header_lines.append(line.strip().split(","))
for i, name in enumerate(header_lines[0]): for i, name in enumerate(header_lines[0]):
if name == "x-axis": if name == "x-axis":
name = "Time" name = "Time"
if header_lines[1][i] in ["second", "Volt"]: if header_lines[1][i] in ["second", "Volt"]:
outStr = f"{name} [{header_lines[1][i][0]}]" outStr = f"{name} [{header_lines[1][i][0]}]"
else: else:
outStr = f"{name} [{header_lines[1][i]}]" outStr = f"{name} [{header_lines[1][i]}]"
scope_header.append(outStr) scope_header.append(outStr)
# Load the Scope CSV Data # Load the Scope CSV Data
@ -117,14 +119,14 @@ def load_data(data_path: str, data={}) -> dict:
"x2": {}, # Only pop channels with a voltage scale in ./tunnel-info.yaml "x2": {}, # Only pop channels with a voltage scale in ./tunnel-info.yaml
"probes": [[None], [None]] # Save probe data in volts - [G1, G2] "probes": [[None], [None]] # Save probe data in volts - [G1, G2]
}, },
"shock-speed": {} # Note all in us "shock-speed": {} # Note all in us
} }
# === Process the data === # === Process the data ===
# Generate X2 time arrays # Generate X2 time arrays
time_data = x2_channels[0] time_data = x2_channels[0]
ns_time = time_data[:].as_datetime64('ns') ns_time = time_data[:].as_datetime64('ns')
x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns] x2_time_seconds = (ns_time - ns_time[0]) # timedelta64[ns]
x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us x2_time_us = x2_time_seconds.astype("float64") / 1000 # Scale to us
@ -203,20 +205,20 @@ def load_data(data_path: str, data={}) -> dict:
refData = data[x2_shot]["data"]["x2"][ref] refData = data[x2_shot]["data"]["x2"][ref]
first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, plot=False, print_func=None) first_value, first_value_uncertainty, _, _ = canny_shock_finder(x2_time_us, refData, plot=False, print_func=None)
shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1 shock_point = np.where(x2_time_us >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"][ref] = shock_point, first_value data[x2_shot]["shock-point"][ref] = shock_point, first_value
for i, probe in enumerate(dataInfo["probe-info"]["locations"]): for i, probe in enumerate(dataInfo["probe-info"]["locations"]):
probeCh1 = data[x2_shot]["data"]["probes"][0] probeCh1 = data[x2_shot]["data"]["probes"][0]
probeCh2 = data[x2_shot]["data"]["probes"][1] probeCh2 = data[x2_shot]["data"]["probes"][1]
# Get the canny-args # Get the canny-args
cArgs = dataInfo["canny-args"] cArgs = dataInfo["canny-args"]
doCannyPlot = False doCannyPlot = False
if i in range(len(cArgs)): if i in range(len(cArgs)):
sigma = cArgs[i]["sigma"] sigma = cArgs[i]["sigma"]
post_sup_thresh = cArgs[i]["post_pres"] post_sup_thresh = cArgs[i]["post_pres"]
else: else:
sigma = cArgs[-1]["sigma"] sigma = cArgs[-1]["sigma"]
post_sup_thresh = cArgs[-1]["post_pres"] post_sup_thresh = cArgs[-1]["post_pres"]
@ -234,22 +236,22 @@ def load_data(data_path: str, data={}) -> dict:
raise ValueError(f"{probe}-g1 not detected") raise ValueError(f"{probe}-g1 not detected")
shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1 shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"][f"{probe}-g1"] = shock_point, first_value data[x2_shot]["shock-point"][f"{probe}-g1"] = shock_point, first_value
# Do the same for G2 # Do the same for G2
if i > 0: if i > 0:
time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g2"][1] + CANNY_TIME_OFFSET time_offset = data[x2_shot]["shock-point"][f"{privPoint}-g2"][1] + CANNY_TIME_OFFSET
# Find G2 Shock Time # Find G2 Shock Time
first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh2, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None) first_value, first_value_uncertainty, _, _ = canny_shock_finder(scope_time, probeCh2, sigma=sigma, post_suppression_threshold=post_sup_thresh, plot=doCannyPlot, start_time=time_offset, print_func=None)
if first_value is None: if first_value is None:
print(f"[ERROR] {x2_shot} - {probe}-g2 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}") print(f"[ERROR] {x2_shot} - {probe}-g2 could not be detected using: Sigma = {sigma}, post_suppression_threshold = {post_sup_thresh}")
raise ValueError(f"{probe}-g2 not detected") raise ValueError(f"{probe}-g2 not detected")
shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1 shock_point = np.where(scope_time >= first_value)[0][0] # [BUG] Seems to give n+1
data[x2_shot]["shock-point"][f"{probe}-g2"] = shock_point, first_value data[x2_shot]["shock-point"][f"{probe}-g2"] = shock_point, first_value
# Calculate Shock Speeds # Calculate Shock Speeds
print("="*30, x2_shot, "="*30) print("="*30, x2_shot, "="*30)
print("--", dataInfo["long_name"], "--") print("--", dataInfo["long_name"], "--")
@ -258,8 +260,8 @@ def load_data(data_path: str, data={}) -> dict:
if i == 0: continue if i == 0: continue
p1_time = data[x2_shot]["shock-point"][refProbe][1] / 1e6 # Convert to seconds p1_time = data[x2_shot]["shock-point"][refProbe][1] / 1e6 # Convert to seconds
p2_time = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][1] / 1e6 # Convert to seconds p2_time = data[x2_shot]["shock-point"][dataInfo["pcb-refs"][i-1]][1] / 1e6 # Convert to seconds
p2p_dist = (TUNNEL_INFO["distance"][refProbe] - TUNNEL_INFO["distance"][dataInfo["pcb-refs"][i-1]]) / 1000 # convert to m p2p_dist = (TUNNEL_INFO["distance"][refProbe] - TUNNEL_INFO["distance"][dataInfo["pcb-refs"][i-1]]) / 1000 # convert to m
probe_velocity = p2p_dist / abs(p2_time - p1_time) # m/s probe_velocity = p2p_dist / abs(p2_time - p1_time) # m/s
print(f"{refProbe}-{dataInfo["pcb-refs"][i-1]} Measured a shock speed of {probe_velocity:.2f} m/s ({probe_velocity/1000:.2f} km/s)") print(f"{refProbe}-{dataInfo["pcb-refs"][i-1]} Measured a shock speed of {probe_velocity:.2f} m/s ({probe_velocity/1000:.2f} km/s)")
@ -268,8 +270,8 @@ def load_data(data_path: str, data={}) -> dict:
for probe in dataInfo["probe-info"]["locations"]: for probe in dataInfo["probe-info"]["locations"]:
g1_time = data[x2_shot]["shock-point"][f"{probe}-g1"][1] / 1e6 # Convert to seconds g1_time = data[x2_shot]["shock-point"][f"{probe}-g1"][1] / 1e6 # Convert to seconds
g2_time = data[x2_shot]["shock-point"][f"{probe}-g2"][1] / 1e6 # Convert to seconds g2_time = data[x2_shot]["shock-point"][f"{probe}-g2"][1] / 1e6 # Convert to seconds
c2c_dist = dataInfo["probe-info"]["c2c"] / 1000 # convert to m c2c_dist = dataInfo["probe-info"]["c2c"] / 1000 # convert to m
probe_velocity = c2c_dist / abs(g2_time - g1_time) # m/s probe_velocity = c2c_dist / abs(g2_time - g1_time) # m/s
print(f"{probe} Measured a shock speed of {probe_velocity:.2f} m/s ({probe_velocity/1000:.2f} km/s)") print(f"{probe} Measured a shock speed of {probe_velocity:.2f} m/s ({probe_velocity/1000:.2f} km/s)")
@ -280,12 +282,12 @@ def load_data(data_path: str, data={}) -> dict:
probe_locs = dataInfo["probe-info"]["locations"] probe_locs = dataInfo["probe-info"]["locations"]
p1_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g1"][1] / 1e6 # Convert to seconds p1_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g1"][1] / 1e6 # Convert to seconds
p1_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g2"][1] / 1e6 # Convert to seconds p1_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i]}-g2"][1] / 1e6 # Convert to seconds
p2_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g1"][1] / 1e6 # Convert to seconds p2_g1_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g1"][1] / 1e6 # Convert to seconds
p2_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g2"][1] / 1e6 # Convert to seconds p2_g2_time = data[x2_shot]["shock-point"][f"{probe_locs[i+1]}-g2"][1] / 1e6 # Convert to seconds
p2p = (TUNNEL_INFO["distance"][probe_locs[1]] - TUNNEL_INFO["distance"][probe_locs[0]]) / 1000 # convert to m p2p = (TUNNEL_INFO["distance"][probe_locs[1]] - TUNNEL_INFO["distance"][probe_locs[0]]) / 1000 # convert to m
p2p_1 = p2p / abs(p2_g1_time - p1_g1_time) # m/s p2p_1 = p2p / abs(p2_g1_time - p1_g1_time) # m/s
p2p_2 = p2p / abs(p2_g2_time - p1_g2_time) # m/s p2p_2 = p2p / abs(p2_g2_time - p1_g2_time) # m/s
@ -295,8 +297,8 @@ def load_data(data_path: str, data={}) -> dict:
data[x2_shot]["shock-speed"][f"{probe_locs[i]}-{probe_locs[i + 1]}-g2"] = p2p_2 data[x2_shot]["shock-speed"][f"{probe_locs[i]}-{probe_locs[i + 1]}-g2"] = p2p_2
print() print()
# Return the data & the successfully loaded data keys # Return the data & the successfully loaded data keys
return data #, tuple(data.keys()) return data #, tuple(data.keys())
data = {} data = {}
@ -320,7 +322,7 @@ def genGraph(gData: dict, showPlot: bool = True):
"ledgLoc": 'upper left', "ledgLoc": 'upper left',
"plots": [] "plots": []
} }
lims = [] lims = []
for label in gData["info"]["pcb-refs"]: # + ["trigbox"]: for label in gData["info"]["pcb-refs"]: # + ["trigbox"]:
@ -379,11 +381,11 @@ def genGraph(gData: dict, showPlot: bool = True):
"x": 0.94, "y": 0.94 "x": 0.94, "y": 0.94
}) })
if len(lims) > 1: #if len(lims) > 1:
OFFSET = 10 # OFFSET = 10
graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET)) # graphData["xLim"] = (float(min(lims) - OFFSET), float(max(lims) + OFFSET))
makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath="./images/{0}.png") makeGraph(graphData, doProgramBlock=False, showPlot=showPlot, figSavePath="./images/{0}.png")