Commit c1ba8969 authored by Mark Hymers's avatar Mark Hymers Committed by Joe Lyons
Browse files

First version of MEG transfer code


Signed-off-by: Mark Hymers's avatarMark Hymers <mark.hymers@hankel.co.uk>
parent 690f8872
#!/usr/bin/python3
from .protocol_copy import ProtocolCopy
from .transform_copy import TransformCopy
from .channel_name_fix import ChannelNameFix
#!/usr/bin/python3
import re
import logging
from ..plugin_base import MEGStudyPlugin
__all__ = []
RE_IONAME = re.compile('^s[0-9]{2}[lr]bf[01]-[0-9]{2}$')
class ChannelNameFix(MEGStudyPlugin):
"""Fixup channel names (reverse io_name and chan_name) if chan_names are of
the form sXX[lb]f[01]-[0-9]{2} (i.e the io_names)"""
def should_run_individual_file(self, hdf5):
"""
Check whether all channels in a single file match the regex
"""
run = True
# Only do this if all channels match the regex
for channel in hdf5['config']['channels']:
if RE_IONAME.search(channel) is None:
run = False
return run
def should_run(self, raw_hdf5, proc_hdf5):
"""
Check whether this is necessary / possible
"""
raw_need = False
proc_need = False
if raw_hdf5 is not None:
raw_need = self.should_run_individual_file(raw_hdf5)
if proc_hdf5 is not None:
proc_need = self.should_run_individual_file(raw_hdf5)
return raw_need or proc_need
def fix_up_file(self, hdf5):
"""
Fix up a single file
"""
chan_conf = hdf5['config']['channels']
sled_names = {}
for name in chan_conf:
if RE_IONAME.search(name) is None:
logging.error("ChannelNameFix: {} does not match pattern: skipping".format(name))
return False
if name in sled_names:
logging.error("ChannelNameFix: {} duplicate name: skipping".format(name))
return False
io_name = chan_conf[name].attrs['io_name']
if io_name in sled_names.values():
logging.error("ChannelNameFix: Duplicate io_name {} for sled {}: skipping".format(io_name, name))
return False
sled_names[name] = io_name
# Now re-map the following:
io_names = list(sled_names.values())
# 1. Channel names/groups in /config
for chan_name, io_name in sled_names.items():
# Make the io_name now be the sled_name
chan_conf[chan_name].attrs['io_name'] = chan_name
# Move the group
chan_conf.move(chan_name, io_name)
# 2. Channel names in channel_list in all acquisitions
for acqkey in hdf5['acquisitions']:
try:
# This is a painful way of checking whether we are a link
hdf5['acquisitions'].id.get_linkval(acqkey.encode('utf-8'))
logging.debug("ChannelNameFix: Skipping acq {} as it is a symlink".format(acqkey))
fixit = False
# Don't do anything if we get here
except ValueError:
fixit = True
if fixit:
# We're not a link - continue
logging.debug("ChannelNameFix: Fixing acq {}".format(acqkey))
for idx, chan_name in enumerate(hdf5['acquisitions'][acqkey]['channel_list']):
# Swap the channel name out
hdf5['acquisitions'][acqkey]['channel_list'][idx] = sled_names[chan_name]
# 3. Channel names in weights lists
for weights in hdf5['config']['weights']:
logging.debug("ChannelNameFix: Updating weights table {}".format(weights))
wtable = hdf5['config']['weights'][weights]
for idx, refchan in enumerate(wtable['ref_chans']):
wtable['ref_chans'][idx] = sled_names[refchan]
for idx, tgtchan in enumerate(wtable['tgt_chans']):
wtable['tgt_chans'][idx] = sled_names[tgtchan]
return True
def run(self, raw_hdf5, proc_hdf5, tmpdir):
"""
Executes the plugin.
raw_hdf5: h5py.File object or None
proc_hdf5: h5py.File object or None
tmpdir: Temporary directory to use if necessary
Will raise a YIASSeriesPluginError on an exception
Otherwise will return True if completed and False if nothing
needed to be done for one reason or another.
"""
from ..errors import YIASNiftiConversionError
# Run for each file individually
raw_ret = True
if self.should_run_individual_file(raw_hdf5):
logging.info("ChannelNameFix: called for raw file")
raw_ret = self.fix_up_file(raw_hdf5)
else:
logging.info("ChannelNameFix: not called for raw file")
proc_ret = True
if self.should_run_individual_file(proc_hdf5):
logging.info("ChannelNameFix: called for processed file")
proc_ret = self.fix_up_file(proc_hdf5)
else:
logging.info("ChannelNameFix: not called for processed file")
return raw_ret and proc_ret
__all__.append('TransformCopy')
#!/usr/bin/python3
import logging
from ..plugin_base import MEGStudyPlugin
__all__ = []
class ProtocolCopy(MEGStudyPlugin):
"""Copy the protocol information from processed to raw file if
missing in raw file and available in processed"""
def should_run(self, raw_hdf5, proc_hdf5):
"""
Check whether this is necessary / possible
"""
# Need both files for this
if raw_hdf5 is None or proc_hdf5 is None:
return False
# Check whether protocol info is already in the raw HDF5 file
if 'protocol' in raw_hdf5.get('config', {}):
return False
# Is protocol info available in processed file
if 'protocol' not in proc_hdf5.get('config', {}):
return False
return True
def run(self, raw_hdf5, proc_hdf5, tmpdir):
"""
Executes the plugin.
raw_hdf5: h5py.File object or None
proc_hdf5: h5py.File object or None
tmpdir: Temporary directory to use if necessary
Will raise a YIASSeriesPluginError on an exception
Otherwise will return True if completed and False if nothing
needed to be done for one reason or another.
"""
from ..errors import YIASNiftiConversionError
if not self.should_run(raw_hdf5, proc_hdf5): # pragma: nocover
logging.info("ProtocolCopy: called for invalid series")
return False
if 'config' not in raw_hdf5:
raw_hdf5.create_group('config')
if 'protocol' not in raw_hdf5['config']:
raw_hdf5['config'].create_group('protocol')
for field in ['protocol_name', 'initiating_user', 'definition']:
if field in proc_hdf5['config']['protocol'].attrs:
value = proc_hdf5['config']['protocol'].attrs[field]
raw_hdf5['config']['protocol'].attrs[field] = value
return True
__all__.append('ProtocolCopy')
#!/usr/bin/python3
import logging
from ..plugin_base import MEGStudyPlugin
__all__ = []
class TransformCopy(MEGStudyPlugin):
"""Copy the ccs_to_scs and fitted_coils information from processed to raw
file if missing in raw file and available in processed"""
def should_run(self, raw_hdf5, proc_hdf5):
"""
Check whether this is necessary / possible
"""
# Need both files for this
if raw_hdf5 is None or proc_hdf5 is None:
return False
shouldrun = False
# Find a list of ccs_to_scs_transform objects in the processed file and raw file
proc_trans = [x for x in sorted(proc_hdf5['acquisitions'].keys()) \
if x != 'default' and 'ccs_to_scs_transform' in proc_hdf5['acquisitions'][x]]
raw_trans = [x for x in sorted(raw_hdf5['acquisitions'].keys()) \
if x != 'default' and 'ccs_to_scs_transform' in raw_hdf5['acquisitions'][x]]
# If these are not the same, we have something to copy
if proc_trans != raw_trans:
shouldrun = True
# Find a list of fitted_coils objects in the processed file and raw file
proc_coils = [x for x in sorted(proc_hdf5['acquisitions'].keys()) \
if x != 'default' and 'fitted_coils' in proc_hdf5['acquisitions'][x]]
raw_coils = [x for x in sorted(raw_hdf5['acquisitions'].keys()) \
if x != 'default' and 'fitted_coils' in raw_hdf5['acquisitions'][x]]
# If these are not the same, we have something to copy
if proc_coils != raw_coils:
shouldrun = True
return shouldrun
def run(self, raw_hdf5, proc_hdf5, tmpdir):
"""
Executes the plugin.
raw_hdf5: h5py.File object or None
proc_hdf5: h5py.File object or None
tmpdir: Temporary directory to use if necessary
Will raise a YIASSeriesPluginError on an exception
Otherwise will return True if completed and False if nothing
needed to be done for one reason or another.
"""
from ..errors import YIASNiftiConversionError
if not self.should_run(raw_hdf5, proc_hdf5): # pragma: nocover
logging.info("TransformCopy: called for invalid series")
return False
for acq in raw_hdf5['acquisitions']:
# Skip default
if acq == 'default':
continue
# Only do this if we don't already have a copy
if 'ccs_to_scs_transform' not in raw_hdf5['acquisitions'][acq]:
# Copy the transform if it exists in proc_hdf5
if 'ccs_to_scs_transform' in proc_hdf5['acquisitions'][acq]:
data = proc_hdf5['acquisitions'][acq]['ccs_to_scs_transform']
raw_hdf5['acquisitions'][acq].create_dataset('ccs_to_scs_transform', data=data)
# And the same for fitted_coils
if 'fitted_coils' not in raw_hdf5['acquisitions'][acq]:
# Copy the data if it exists in proc_hdf5
if 'fitted_coils' in proc_hdf5['acquisitions'][acq]:
proc_hdf5.copy('acquisitions/{}/fitted_coils'.format(acq),
raw_hdf5['acquisitions'][acq],
expand_refs=True, expand_external=True)
return True
__all__.append('TransformCopy')
......@@ -103,6 +103,10 @@ class MEGIncomingHandler(object):
self.proc_file = proc_file
# Overrides
self.patient_id_override = None
self.study_description_override = None
# Prepare some useful values
self._read_metainfo()
......@@ -111,7 +115,7 @@ class MEGIncomingHandler(object):
"""
Read in:
+ Participant ID
+ Protocol name
+ Study name
+ Scan date/time
"""
......@@ -123,24 +127,93 @@ class MEGIncomingHandler(object):
try:
with h5py.File(filename, 'r') as f:
part_id = f['subject'].attrs['id']
patient_id = f['subject'].attrs['id']
# Problem: protocol name is only available in the processed file
# We can write a GUI to fix this for raw-only cases before upload
protocol_name = f['config']['protocol'].attrs['protocol_name']
study_description = f['config']['protocol'].attrs['protocol_name']
# Look for the start time of the first acquisition
protocol_time = f['acquisitions']['0'].attrs['start_time']
study_datetime = f['acquisitions']['0'].attrs['start_time']
# Convert the time to a usable datetime
protocol_time = dateutil.parser.parse(protocol_time)
study_datetime = dateutil.parser.parse(study_datetime)
except Exception as e:
raise YIASMEGHDF5MetaMissing(filename, str(e))
self.part_id = part_id
self.protocol_name = protocol_name
self.protocol_time = protocol_time
self.base_filename = basename(filename)
self.patient_id = patient_id
self.study_description = study_description
self.study_datetime = study_datetime
@property
def PatientID(self):
"""
Extracts the PatientID field and returns it; if patient_id_override is
set on the class, this will override the patient ID.
"""
if self.patient_id_override:
return sanitise_string(self.patient_id_override)
return sanitise_string(self.patient_id)
@property
def SanitisedPatientID(self):
"""
Extracts the PatientID field from the MEG data and sanitises it.
If patient_id_override is set on the class, this will override the
patient ID
"""
if self.patient_id_override:
return sanitise_string(self.patient_id_override)
return sanitise_string(self.patient_id)
@property
def StudyDescription(self):
"""
Extracts the StudyDescription field from the MEG data and returns it.
If study_description_override is set on the class, this will override
the study description
"""
if self.study_description_override:
return sanitise_string(self.study_description_override)
return sanitise_string(self.study_description)
@property
def SanitisedStudyDescription(self):
"""
Extracts the StudyDescription field from the MEG data and sanitises it.
If study_description_override is set on the class, this will override
the study description
"""
if self.study_description_override:
return sanitise_string(self.study_description_override)
return sanitise_string(self.study_description)
@property
def StudyDate(self):
"""
Extracts the date from the MEG data and returns it.
"""
return self.study_datetime.strftime('%Y%m%d')
@property
def StudyTime(self):
"""
Extracts the time from the MEG data and returns it.
"""
return self.study_datetime.strftime('%H%M%S')
@property
def StudyDateTime(self):
"""
Extracts the date and time fields from the MEG data and returns them.
"""
return self.StudyDate + self.StudyTime
__all__.append('MEGIncomingHandler')
......@@ -39,3 +39,31 @@ class SeriesAnalysisPlugin(object):
__all__.append('SeriesAnalysisPlugin')
class MEGStudyPlugin(object):
"""ABC for MEG Study Plugins"""
def should_run(self, raw_hdf5, proc_hdf5): # pragma: nocover
"""
Must return True or False depending on whether the plugin should run
on the given raw and proc HDF5 files (passed as h5py.File objects)
"""
return False
def run(self, raw_hdf5, proc_hdf5, tmpdir): # pragma: nocover
"""
Executes the plugin.
raw_hdf5: h5py.File object or None
proc_hdf5: h5py.File object or None
tmpdir: Temporary directory to use if necessary
Will raise a YIASSeriesPluginError on an exception
Otherwise will return True if completed and False if nothing
needed to be done for one reason or another.
"""
pass
__all__.append('MEGStudyPlugin')
......@@ -2,10 +2,11 @@
from errno import EEXIST
from os import unlink, chmod, rmdir, rename, walk
from os.path import join, exists
from os.path import join, exists, abspath
from random import sample
import re
from string import ascii_lowercase
from shutil import copy2
from time import time
from tempfile import mkdtemp, TemporaryDirectory
import logging
......@@ -13,10 +14,15 @@ import logging
from yias.errors import YIASSeriesPluginError
from yias.utils import makedirs_chmod, sanitise_string
from yias.study_plugins import NIFTIConvertorSeriesPlugin, SiemensPhysiologicalSeriesPlugin
from yias.meg_plugins import ProtocolCopy, TransformCopy, ChannelNameFix
SeriesPlugins = [NIFTIConvertorSeriesPlugin(),
SiemensPhysiologicalSeriesPlugin()]
MEGPlugins = [ProtocolCopy(),
TransformCopy(),
ChannelNameFix()]
__all__ = []
class IncomingProcessor(object):
......@@ -172,7 +178,7 @@ class IncomingProcessor(object):
try:
plugin.run(series, studydest, join(anon_dir, studydir), seriesdir, tmpdir)
except YIASSeriesPluginError as e: # pragma: nocover
logging.error("Series Plugin Error: %s : %s" % (plugin.__name__, e))
logging.error("Series Plugin Error: %s : %s" % (plugin.__class__.__name__, e))
# We can now remove the series from incoming
if self.remove_incoming_after:
......@@ -228,4 +234,144 @@ class IncomingProcessor(object):
rename(i.directory, join(self.dest_error, backupname))
def process_meg_handler(self, ihandler):
import h5py
from yias.meghdf5 import anonymise_hdf5
study = ihandler
logging.info("MEG Study: {}".format(study.base_filename))
# Note the potential for unfortunate confusion
# We have raw/processed copies of the MEG data (system named)
# We also have raw/anon directories on our server (named by us before the MEG system)
# So, you can have:
# * raw version in the raw directory
# * processed version in the raw directory
# * raw version in the anon directory
# * processed version in the anon directory
# Try not to get confused
patientid = study.SanitisedPatientID
origstudydesc = study.SanitisedStudyDescription.upper()
# Figure out our destination
studydest = self.study_destination(patientid, origstudydesc)
# If requested, post-process our study description
# Note that we have to keep a copy of the original description
# which is what we use for testing anonymisation etc
if self.study_description_postproc:
studydesc = self.study_description_postproc(origstudydesc)
else:
studydesc = origstudydesc
studydir = join(patientid,
study.StudyDate + '_' + studydesc)
with TemporaryDirectory(dir=self.dest_tmpdir) as tmpdir:
filebname = '%s.meghdf5' % study.StudyDateTime
# Deal with our raw directory (ORIGINAL) copy of both raw/processed
raw_dir = studydest.get_raw_directory(patientid, origstudydesc)
raw_tgt_dir = join(raw_dir, studydir)
# Handle raw/processed file in raw directory
for filename, ftype in [(study.raw_file, 'raw'),
(study.proc_file, 'processed')]:
if filename is not None:
tgt_dir = join(raw_tgt_dir, ftype)
newfname = join(tgt_dir, filebname)
if exists(newfname):
logging.info("Raw %s copy already stored at %s - skipping" % (ftype, newfname))
else:
logging.info("Raw %s copy going to %s" % (ftype, newfname))
makedirs_chmod(tgt_dir, mode=studydest.mode_raw_dir, exist_ok=True)
copy2(filename, newfname)
chmod(newfname, studydest.mode_raw_file)
# Now work out if we should anonymise
if self.force_anonymisation or studydest.anonymise(patientid, origstudydesc):
# Store an anonymised copy with relevant fixups
anon_dir = studydest.get_anon_directory(patientid, origstudydesc)
anondname = join(anon_dir, studydir)
# Make copy of both raw and processed files into temp dir if available
tmp_raw_path = None
tmp_raw_hdf5 = None
tmp_proc_path = None
tmp_proc_hdf5 = None
if study.raw_file is not None:
tmp_raw_path = abspath(join(tmpdir, 'raw.hdf5'))
copy2(study.raw_file, tmp_raw_path)
tmp_raw_hdf5 = h5py.File(tmp_raw_path, 'a')
if study.proc_file is not None:
tmp_proc_path = abspath(join(tmpdir, 'processed.hdf5'))
copy2(study.proc_file, tmp_proc_path)
tmp_proc_hdf5 = h5py.File(tmp_proc_path, 'a')
# Run MEG plugins
for plugin in MEGPlugins:
if plugin.should_run(tmp_raw_hdf5, tmp_proc_hdf5):
logging.info("Running MEG Plugin: %s" % (plugin.__class__.__name__))
try:
plugin.run(tmp_raw_hdf5, tmp_proc_hdf5, tmpdir)
except YIASSeriesPluginError as e: # pragma: nocover
logging.error("MEG Plugin Error: %s : %s" % (plugin.__class__.__name__, e))
else:
logging.info("Skipping MEG Plugin: %s" % (plugin.__class__.__name__))
# Close the HDF5 files
tmp_raw_hdf5.close()
tmp_proc_hdf5.close()
# Run anonymisation on the file (do this last of all as it calls h5repack)
if study.raw_file is not None:
logging.info("Running anonymisation on raw file")
anonymise_hdf5(tmp_raw_path, patientid)
if study.proc_file is not None:
logging.info("Running anonymisation on processed file")
anonymise_hdf5(tmp_proc_path, patientid)
# Copy final versions of the files into the right place
try:
logging.info("Anonymous copies going to %s" % anondname)