Commit c4576410 authored by Mark Hymers's avatar Mark Hymers Committed by Joe Lyons
Browse files

Add pre-process script for want of a better place to keep it


Signed-off-by: Mark Hymers's avatarMark Hymers <mark.hymers@hankel.co.uk>
parent b5f6360c
#!/usr/bin/python3
# Script to run processing on MEGSCAN HDF5 files and apply various
# fixups if necessary
import re
from sys import exit, argv
import h5py
import numpy as np
# TODO: ? Add unapply_weights command
def usage():
print("""Usage: hdf5_pre_process [--dry-run] OPERATION")
Channel naming
==============
dup_ioname_fix FILENAME
Fixes any duplicate io_name entries by appending a number to them.
chan_ioname_fix FILENAME
Reverses the channel/io names in the given file (unless they have
already been fixed)
Calibration checks
==================
coils_ori_check FILENAME
Checks whether any MEG channel orientations are flipped (outwards)
coils_ori_fix FILENAME
Fixes any MEG channel orientations which are flipped (outwards)
UPB changes
===========
apply_upb FILENAME [all|default|NUM]
Applies the UPB to the data in the specified acquisition(s)
Copy ccs_to_scs_transform
=========================
transform_copy FILENAME
Copies first ccs_to_scs_transform found in the file to any acquisition(s) without one
Apply reference weights
=======================
apply_weights FILENAME
Subtracts weighted reference channel time series' from the head channel time series'
""")
exit(0)
operations = {}
def open_h5py(filename, mode='r+'):
f = h5py.File(filename)
# Quick hacky check for MEGSCAN HDF5 file
if 'config' not in f or 'acquisitions' not in f:
print("Bad HDF5 file {}".format(filename))
exit(1)
return f
def ori_check(f):
"""Returns list of bad coil orientations"""
# Find suspect channels
bads = []
chan_conf = f['config']['channels']
meg_chans = [x for x in chan_conf if chan_conf[x].attrs['chan_type'] == 'MEG']
num_meg_chans = len(meg_chans)
# Start by defining centre of mass reference frame
cmx = 0
cmy = 0
cmz = 0
for chan in meg_chans:
cmx += chan_conf[chan]['position'][0][0]
cmy += chan_conf[chan]['position'][0][1]
cmz += chan_conf[chan]['position'][0][2]
orix = cmx / num_meg_chans
oriy = cmy / num_meg_chans
oriz = cmz / num_meg_chans
p=np.ndarray(shape=3)
for chan in meg_chans:
# Check that angle between COM to position vector, and orientation, is obtuse
p[0] = chan_conf[chan]['position'][0][0]-orix
p[1] = chan_conf[chan]['position'][0][1]-oriy
p[2] = chan_conf[chan]['position'][0][2]-oriz
o = chan_conf[chan]['orientation'][0, :]
p_norm = np.linalg.norm(p)
o_norm = np.linalg.norm(o)
if np.dot(p, o) / (p_norm * o_norm) < 0:
bads.append(chan)
return bads
def coils_ori_check(args, dryrun):
f = open_h5py(args[0], 'r')
bads = ori_check(f)
if len(bads) > 0:
bad_s = ','.join(bads)
print("Channels with bad orientations: {}".format(bad_s))
operations['coils_ori_check'] = coils_ori_check
def coils_ori_fix(args, dryrun):
if len(args) != 1:
print("E: Need input filename")
exit(1)
# Open file for potential writing if not in dryrun
if dryrun:
f = open_h5py(args[0], 'r')
else:
f = open_h5py(args[0], 'r+')
bads = ori_check(f)
if len(bads) == 0:
print("I: All channels OK - no changes needed")
exit(0)
for chan_name in bads:
print("Fixing channel {}".format(chan_name))
# 1. Channel orientation and UPB in /config
chan_info = f['config']['channels'][chan_name]
if dryrun:
print("I: Would flip orientation and UPB for {}".format(chan_name))
else:
print("I: Flipping orientation and UPB for {}".format(chan_name))
# Flip the orientation
chan_info['orientation'][...] *= -1
# Flip the sign of the UPB
chan_info.attrs['units_per_bit'] *= -1
# 2. For any acquisitions where UPB is already applied, flip the channel
# data, but don't do this to any raw (non-UPB applied data)
for acqkey in f['acquisitions']:
try:
# This is a painful way of checking whether we are a link
f['acquisitions'].id.get_linkval(acqkey.encode('utf-8'))
print("I: Skipping acquisition {} as it is a symlink".format(acqkey))
fixit = False
except ValueError:
# We're not a link, so fix it
fixit = True
if fixit:
if f['acquisitions'][acqkey].attrs.get('upb_applied', 0):
try:
chan_idx = list(f['acquisitions'][acqkey]['channel_list']).index(chan_name)
if dryrun:
print("I: Would update data for {} in UPB applied acquisition {}".format(chan_name, acqkey))
else:
print("I: Updating data for {} in UPB applied acquisition {}".format(chan_name, acqkey))
f['acquisitions'][acqkey]['data'][:, chan_idx] *= -1
except IndexError:
print("I: Channel {} is not in acquisition {} - skipping".format(chan_name, acqkey))
continue
else:
print("I: Skipping updating data for acquisition {} as UPB not applied".format(acqkey))
f.close()
operations['coils_ori_fix'] = coils_ori_fix
def dup_ioname_fix(args, dryrun):
if len(args) != 1:
print("E: Need input filename")
exit(1)
# Open file for potential writing if not in dryrun
if dryrun:
f = open_h5py(args[0], 'r')
else:
f = open_h5py(args[0], 'r+')
chan_conf = f['config']['channels']
seen_ionames = []
for name in chan_conf:
io_name = chan_conf[name].attrs['io_name']
if io_name in seen_ionames:
# Figure out a new name
idx = 1
while io_name in seen_ionames:
io_name = chan_conf[name].attrs['io_name'] + str(idx)
if dryrun:
print("I: Would rename io_name {} to {}".format(name, io_name))
else:
print("I: Renaming io_name {} to {}".format(name, io_name))
chan_conf[name].attrs['io_name'] = io_name
seen_ionames.append(io_name)
operations['dup_ioname_fix'] = dup_ioname_fix
def chan_ioname_fix(args, dryrun):
if len(args) != 1:
print("E: Need input filename")
exit(1)
# Open file for potential writing if not in dryrun
if dryrun:
f = open_h5py(args[0], 'r')
else:
f = open_h5py(args[0], 'r+')
chan_conf = f['config']['channels']
# If any of the channel names aren't of the form sNN[lr]bf[01]-NN
# then don't go ahead
regex = re.compile('^s\d\d[lr]bf[01]-\d\d$')
sled_names = {}
for name in chan_conf:
if regex.search(name) is None:
print("E: Channel {} does not match expected pattern - file may be already fixed".format(name))
exit(0)
if name in sled_names:
print("E: Duplcate sled name {}".format(name))
exit(0)
io_name = chan_conf[name].attrs['io_name']
if io_name in sled_names.values():
print("E: Duplcate io_name {} for sled {}".format(io_name, name))
exit(0)
sled_names[name] = io_name
# Now re-map the following:
io_names = list(sled_names.values())
# 1. Channel names/groups in /config
for chan_name, io_name in sled_names.items():
if dryrun:
print("I: Dry-run mode: would remap /config/channels/{} to {}".format(chan_name, io_name))
else:
# Make the io_name now be the sled_name
chan_conf[chan_name].attrs['io_name'] = chan_name
# Move the group
chan_conf.move(chan_name, io_name)
# 2. Channel names in channel_list in all acquisitions
for acqkey in f['acquisitions']:
try:
# This is a painful way of checking whether we are a link
f['acquisitions'].id.get_linkval(acqkey.encode('utf-8'))
print("I: Skipping acquisition {} as it is a symlink".format(acqkey))
fixit = False
# Don't do anything if we get here
except ValueError:
fixit = True
if fixit:
print("I: Updating acquisition {}".format(acqkey))
# We're not a link - continue
for idx, chan_name in enumerate(f['acquisitions'][acqkey]['channel_list']):
# Swap the channel name out
if not dryrun:
f['acquisitions'][acqkey]['channel_list'][idx] = sled_names[chan_name]
# 3. Channel names in weights lists
for weights in f['config']['weights']:
print("I: Updating weights table {}".format(weights))
wtable = f['config']['weights'][weights]
for idx, refchan in enumerate(wtable['ref_chans']):
if not dryrun:
wtable['ref_chans'][idx] = sled_names[refchan]
for idx, tgtchan in enumerate(wtable['tgt_chans']):
if not dryrun:
wtable['tgt_chans'][idx] = sled_names[tgtchan]
f.close()
operations['chan_ioname_fix'] = chan_ioname_fix
def apply_upb(args, dryrun):
if len(args) != 2:
print("E: Need input filename and acquisitions (see usage)")
exit(1)
# Open file for potential writing if not in dryrun
if dryrun:
f = open_h5py(args[0], 'r')
else:
f = open_h5py(args[0], 'r+')
if args[1].lower() == 'all':
# Note that we skip acquisitions which are links below
acquisitions = f['acquisitions'].keys()
elif args[1].lower() == 'default':
# Read default symlink and replace with correct acquisition
# I hate the low-level parts of h5py...
tgt = f['acquisitions'].id.get_linkval('default'.encode('utf-8')).decode('utf-8')
acquisitions = [tgt]
else:
try:
acquisitions = [str(int(args[1]))]
except ValueError as e:
print("E: cannot parse acquisition {}".format(args[1]))
exit(0)
for acqkey in acquisitions:
try:
# This is a painful way of checking whether we are a link
f['acquisitions'].id.get_linkval(acqkey.encode('utf-8'))
print("I: Skipping acquisition {} as it is a symlink".format(acqkey))
fixit = False
# Don't do anything if we get here
except ValueError:
fixit = True
if fixit:
if f['acquisitions'][acqkey].attrs.get('upb_applied', 0):
print("I: Skipping acquisition {} as UPB already applied".format(acqkey))
continue
print("I: Applying UPB to acquisition {}".format(acqkey))
data = f['acquisitions'][acqkey]['data'][...].astype(np.float64)
upb = np.zeros((1, data.shape[1]), np.float64)
if upb.shape[1] != len(list(f['acquisitions'][acqkey]['channel_list'])):
print("E: Acquisition {} data and channel list length do not match".format(acqkey))
exit(0)
for idx, chan_name in enumerate(f['acquisitions'][acqkey]['channel_list']):
upb[0, idx] = f['config']['channels'][chan_name].attrs['units_per_bit']
data = data * upb
if dryrun:
print("I: Would replace dataset for acquisition {} with UPB applied dataset".format(acqkey))
print("I: Would mark upb_applied as True in acquisition {}".format(acqkey))
else:
print("I: Replacing dataset for acquisition {} with UPB applied dataset".format(acqkey))
# Remove and replace the dataset
del(f['acquisitions'][acqkey]['data'])
f['acquisitions'][acqkey].create_dataset(name='data', data=data)
print("I: Marking upb_applied as True for acquisition {}".format(acqkey))
f['acquisitions'][acqkey].attrs['upb_applied'] = np.uint8(True)
f.close()
operations['apply_upb'] = apply_upb
def transform_copy(args, dryrun):
if len(args) != 1:
print("E: Need input filename")
exit(1)
# Open file for potential writing if not in dryrun
if dryrun:
f = open_h5py(args[0], 'r')
else:
f = open_h5py(args[0], 'r+')
"""Copies ccs_to_scs_transform from COH acquisition to other acquisitions"""
#Find first COH run to take transform from
for acqkey in f['acquisitions']:
try:
# This is a painful way of checking whether we are a link
f['acquisitions'].id.get_linkval(acqkey.encode('utf-8'))
print("I: Skipping acquisition {} as it is a symlink".format(acqkey))
fixit = False
# Don't do anything if we get here
except ValueError:
fixit = True
if fixit:
if f['acquisitions'][acqkey].attrs['acq_type']=='COH':
COHkey=acqkey
break
#Return if none can be found
try:
transform=f['acquisitions'][COHkey]['ccs_to_scs_transform'][:]
except:
print("No transform found")
return
for acqkey in f['acquisitions']:
try:
# This is a painful way of checking whether we are a link
f['acquisitions'].id.get_linkval(acqkey.encode('utf-8'))
print("I: Skipping acquisition {} as it is a symlink".format(acqkey))
fixit = False
# Don't do anything if we get here
except ValueError:
fixit = True
if fixit:
# Don't do anything for COHs
if f['acquisitions'][acqkey].attrs['acq_type']=='COH':
continue
else:
if dryrun:
print('I: Would copy transform from acquisition ' + COHkey + ' to acquisition ' + acqkey)
else:
try:
f['acquisitions'][acqkey]['ccs_to_scs_transform']
print('I: Transform already exists for acquisition ' + acqkey)
except:
f['acquisitions'][acqkey]['ccs_to_scs_transform']=transform
print('I: Copied transform from acquisition ' + COHkey + ' to acquisition ' + acqkey)
f.close()
operations['transform_copy'] = transform_copy
def apply_weights(args, dryrun):
if len(args) != 1:
print("E: Need input filename (See usage)")
exit(1)
# Open file for potential writing if not in dryrun
if dryrun:
f = open_h5py(args[0], 'r')
else:
f = open_h5py(args[0], 'r+')
for acqkey in f['acquisitions']:
try:
f['acquisitions'][acqkey].attrs['upb_applied']
print('I: UPB has already been applied to acquisition '+acqkey+', weights must be applied before UPB')
continue
except:
print('I: No UBP applied')
try:
# This is a painful way of checking whether we are a link
f['acquisitions'].id.get_linkval(acqkey.encode('utf-8'))
print("I: Skipping acquisition {} as it is a symlink".format(acqkey))
fixit = False
# Don't do anything if we get here
except ValueError:
fixit = True
if fixit:
try:
weight_name=f['acquisitions'][acqkey].attrs['weights_applied']
print('I: Acquisition ' + acqkey + ' already has weights applied: ' + weight_name)
continue
except:
print("I: Applying weights to acquisition {}".format(acqkey))
data = f['acquisitions'][acqkey]['data'][...].astype(np.float64)
weight_name=f['/acquisitions'][acqkey].attrs['weights_configured']
ref_chans=f['config']['weights'][weight_name]['ref_chans'][...]
tgt_chans=f['config']['weights'][weight_name]['tgt_chans'][...]
weights=f['config']['weights'][weight_name]['weights'][...]
channel_list=f['/acquisitions'][acqkey]['channel_list'][...]
for itgt, tgt in enumerate(tgt_chans):
data_chan_index=np.where(channel_list==tgt)
for iref, ref in enumerate(ref_chans):
ref_index=np.where(channel_list==ref)
data[:,data_chan_index]-=data[:,ref_index]*weights[itgt,iref]
if dryrun:
print("I: Would replace dataset for acquisition {} with weighted dataset".format(acqkey))
print('I: Would add the weights ' + f['/acquisitions'][acqkey].attrs['weights_configured'] +' as the weights_configured attribute for acquisition ' + acqkey)
else:
print("I: Replacing dataset for acquisition {} with weighted dataset".format(acqkey))
# Remove and replace the dataset
del(f['acquisitions'][acqkey]['data'])
f['acquisitions'][acqkey].create_dataset(name='data', data=data)
print("I: Listing weights applied for acquisition {}".format(acqkey))
f['/acquisitions'][acqkey].attrs['weights_applied']= f['/acquisitions'][acqkey].attrs['weights_configured']
f.close()
operations['apply_weights'] = apply_weights
def main():
if len(argv) < 3:
usage()
dryrun = False
if argv[1].lower() == '--dry-run':
dryrun = True
argv.pop(1)
if len(argv) < 3:
usage()
operation = argv[1]
infilename = argv[2]
if operation not in operations:
usage()
operations[operation](argv[2:], dryrun)
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment