Commit b5f6360c authored by Mark Hymers's avatar Mark Hymers Committed by Joe Lyons
Browse files

Add MEG processor daemon


Signed-off-by: Mark Hymers's avatarMark Hymers <mark.hymers@hankel.co.uk>
parent 1f0519b5
#!/usr/bin/python3
from argparse import ArgumentParser
from glob import glob
import logging
import multiprocessing
from os import stat, unlink, rmdir
from os.path import join, splitext, abspath, exists
from signal import signal, SIGTERM, SIGINT
from sys import exit
from time import sleep
OUTPUTDIR = ''
KEEPGOING = True
CHECK_JOBS_TIME = 15
def process_incoming_directory(incomingdir):
"""Incoming directory processing function"""
from yias import MEGIncomingHandler
from yias.destinations import RawAndAnonDestination, RawOnlyDestination
from yias.processor import IncomingProcessor
from yias.utils import ynic_fixup_studydesc
# Log that we've started
logging.info('Accepting processing job for %s' % idir)
# Set up destinations
dest_error = join(OUTPUTDIR, 'error')
dest_tmpdir = join(OUTPUTDIR, 'tmp')
ForUsDestination = RawAndAnonDestination(OUTPUTDIR)
NotForUsDestination = RawOnlyDestination(raw_dir=join(OUTPUTDIR, 'notforus'))
destinations = [ForUsDestination, NotForUsDestination]
# Set up our processor
iproc = IncomingProcessor(destinations, dest_error, dest_tmpdir)
iproc.study_description_postproc = ynic_fixup_studydesc
# Handle the incoming directory - TODO: this logic really should be in MEGIncomingHandler
raw_file = None
proc_file = None
if exists(join(incomingdir, 'raw.hdf5')):
raw_file = join(incomingdir, 'raw.hdf5')
if exists(join(incomingdir, 'processed.hdf5')):
proc_file = join(incomingdir, 'processed.hdf5')
i = MEGIncomingHandler(raw_file, proc_file)
# and process it
iproc.process_meg_handler(i)
# Finally remove the .complete and .queued file
try:
unlink(incomingdir + '.complete')
except OSError as e:
pass
try:
unlink(incomingdir + '.queued')
except OSError as e:
pass
# And the directory if possible
try:
rmdir(incomingdir)
except OSError as e:
print(e)
pass
return
def parse_args():
parser = ArgumentParser(
description='Processing daemon for incoming directory')
parser.add_argument('incomingdir')
parser.add_argument('outputdir')
parser.add_argument('--max-jobs', type=int, default=3,
help='Maximum number of jobs to run concurrently')
parser.add_argument('--check-new-jobs', type=int, default=15,
help='Duration in s to wait before checking for new jobs')
parser.add_argument('--log', type=str, default='systemd',
choices=['systemd', 'term', 'file'],
help='Method by which to log')
parser.add_argument('--log-level', type=str, default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
parser.add_argument('--log-file', type=str, default=None,
help='Log file to use - required --log=file')
args = parser.parse_args()
if args.check_new_jobs < 1:
print("Error: --check-new-jobs must be at least 1")
exit(1)
if args.max_jobs < 1:
print("Error: --max-jobs must be at least 1")
exit(1)
if args.log_file is not None and args.log != 'file':
print("Error: --log-file=FILE requires --log=file")
exit(1)
if args.log_file is None and args.log == 'file':
print("Error: --log=file requires --log-file=FILE")
exit(1)
return args
def configure_logging(args):
# Work out our logging level
loglevel = getattr(logging, args.log_level.upper())
if args.log == 'systemd':
# Set up logging infrastructure
from systemd.journal import JournalHandler
log = logging.getLogger()
log.addHandler(JournalHandler())
log.setLevel(loglevel)
elif args.log == 'file':
logging.basicConfig(filename=args.log_file, level=loglevel,
format='%(asctime)s %(message)s')
else:
# Leave as logging to terminal but set logging level
log = logging.getLogger()
log.setLevel(loglevel)
def asked_to_quit(signumber, stackframe):
global KEEPGOING
KEEPGOING = False
# Check more regularly once we know that we need to quit
CHECK_JOBS_TIME = 1
if __name__ == '__main__':
args = parse_args()
configure_logging(args)
CHECK_JOBS_TIME = args.check_new_jobs
INPUTDIR = abspath(args.incomingdir)
OUTPUTDIR = abspath(args.outputdir)
logging.info('Starting up')
queued_jobs = {}
jobs = []
# Deal with interrupts
signal(SIGTERM, asked_to_quit)
signal(SIGINT, asked_to_quit)
while(True):
# If we're not shutting down
if KEEPGOING:
# Find new .complete files
for cfile in glob(join(INPUTDIR, '*.complete')):
basec = splitext(cfile)[0]
try:
stat(basec + '.queued')
except OSError as e:
# If we haven't queued the job, queue it
logging.debug('Queueing %s' % basec)
open(basec + '.queued', 'w').close()
queued_jobs[cfile] = splitext(cfile)[0]
# Add a job to the queue if we have space
num_cur_jobs = len(jobs)
if num_cur_jobs < args.max_jobs:
if len(queued_jobs) > 0:
idir = queued_jobs.pop(list(queued_jobs.keys())[0])
logging.info('Starting processing job for %s' % idir)
p = multiprocessing.Process(target=process_incoming_directory,
args=(idir,))
jobs.append(p)
p.start()
# Check if any jobs are done
for job in jobs:
if not job.is_alive():
job.join()
jobs.remove(job)
# If we're waiting to end and we have no jobs left, quit
if not KEEPGOING:
if len(jobs) == 0:
break
sleep(CHECK_JOBS_TIME)
# If we still have jobs queued, clean up our .queued files if possible
# before exiting
for cfile in queued_jobs:
try:
basec = splitext(cfile)[0]
unlink(basec + '.queued')
except OSError as e:
# Just for testing
print(e)
pass
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment