#! /usr/bin/env python # ----------------------------------------------- # help # ----------------------------------------------- """ NAME submit_tm5_step_done DESCRIPTION Script to post process run output: o store output files in an archive o touch output files o run special user output scripts STORE OUTPPUT FILES Specify space seperated list with store tasks: store.tasks : task1 task2 For each task, an number of task specific settings is defined in the rest of this file; some values are initialized below using a dummy task 'default' : ! from where ? leave empty for run directory, use '' for output directory: .store.from : ! copy some extra files to output subdirectory for storage: .store.copy : station-list.txt .store.copy.to : STATION/ ! name(s) of target file(s) for storage; ! if only one file is specified with extension '.tar' ! then an archive file is created. Default is a *.tar defined by ! sources.store.files in pycasso-tm5-expert.rc file. .store.files : all.tar ! if a tarfile has to be created, specify which files to archive ! through a filter: .store.tarfilter : STATION/*.hdf STATION/*.txt ! zip files before archiving ? empty, or a zipper command (gzip, bzip2, compress, ...) : default.zipper : gzip ! archive in: default.store.arch : ec:/xxx/MAIN_ARCHIVE/ .store.arch : ec:/xxx/SPECIAL_OUTPUT/ ! If the 'arch' does not contain any ':' characters it is assumed to be ! a local directory. In this case, a 'cp' command is used to archive ! the files rather than the 'gss' script, and the files might therefore ! include filters: .store.files : save*.hdf .store.arch : ${SCRATCH}/savefiles/ ! store in archive even if already present ? ! by default probably True, but might not be necessary for some precomputed data: default.store.renew : True correlations.store.renew : False ! extra shell command to be called at the end of the task; ! for example to cleanup or do other important stuff: .store.extra.command : rm -f STATION/*.hdf STATION/*.txt Storage might be subject to a special condition, e.g. end of iteration in a 4D-var run. Specifiy a condition line that could be evaluated by python. If not specified, the default condition is True . Keys '%{..}' in the condition line are expanded with the values in the restart file if present; this file is written by the main program and has the same name as the rcfile but with extension '.rs' instead of '.rc' . Note the '%' instead of '$', otherwise the line is evaluated too early! store.condition : %{m1qn3.finished} == 1 TOUCH OUTPUT FILES To avoid that output files are removed from scratch during long, long runs. Enable this flag to touch recursively the access time of all files in 'output.dir' . ! touch files to prevent removal (True|False) ? output.touch : False USER SCRIPTS For more elaborate post-processing, you can write your own script. Then add the full command (including options and arguments) to the output.user.script key, so it will automatically be executed: output.user.scripts : /myscript More than one command can be specified: use ';' to separate them. The path should be either absolute or relative to the run directory. For scripts in any of the 'bin' subdirectories of the source, use . Arguments/options can use , which is replaced by the name of the runtime rcfile. Conditional call is possible. Just specifiy a condition line that can be evaluated with python eval() function. Set with the `user.script.condition` key, and default to True if not set or empty. The same condition apply to all listed scripts. ! condition line: output.user.scripts.condition : %{m1qn3.finished} == 1 """ # ----------------------------------------------- # external # ----------------------------------------------- # standard modules: import sys import os import shutil import go import optparse import logging # ----------------------------------------------- # logging # ----------------------------------------------- # setup messages: logging.basicConfig( format='%(lineno)-4s:%(filename)-30s [%(levelname)-8s] %(message)s', level=logging.INFO, stream=sys.stdout ) # ----------------------------------------------- # default values # ----------------------------------------------- # location of auxilary scripts: bindir_default = os.curdir # ----------------------------------------------- # arguments # ----------------------------------------------- # set text for 'usage' help line: usage = "%prog " # initialise the option parser: parser = optparse.OptionParser(usage=usage) # define options: parser.add_option( "--bindir", help="location of auxilary scripts (%s)" % bindir_default, dest="bindir", action="store", default=bindir_default ) # now parse the actual arguments; # return an object 'opts' with fields 'verbose' etc, # and the unnamed arguments in the list 'args' : opts,args = parser.parse_args() # only one argument ... if len(args) != 1 : if opts.verbose : logging.error( 'single argument command should be specified, found : %i' % len(args) ) parser.print_usage() sys.exit(1) #endif # extract ... rcfile = args[0] # ----------------------------------------------- # toolboxes # ----------------------------------------------- # location of scripts: scriptdir = opts.bindir # prepend locations of python modules to search path: sys.path.insert( 0, scriptdir ) # local modules: import rc # ----------------------------------------------- # begin # ----------------------------------------------- # info ... logging.info( 'start' ) # read settings: rcf = rc.RcFile( rcfile ) # =============== # store output # =============== # info ... logging.info( 'store output files if necessary ...' ) # condition line: condition_line = rcf.get( 'store.condition', default='None' ) # no condition specified ? then apply: if condition_line == 'None' : # info logging.info( ' no storage condition found, thus apply ...' ) # apply by default: do_store = True else : # info logging.info( ' conditional storage : %s' % condition_line ) # name of restart file that might be present: rcfile with '.rs' instead of '.rc' : bname,ext = os.path.splitext(rcfile) rsfile = bname+'.rs' # present ? if os.path.exists(rsfile) : # read restart settings: rsf = rc.RcFile( rsfile ) # evaluate '%{..}' keys in line: condition_line = rsf.substitute( condition_line, marks=('%{','}') ) else: # If no RS file, try to use the current RC file logging.info( ' RS file not found, try current RC file to evaluate condition' ) condition_line = rcf.substitute( condition_line, marks=('%{','}') ) # info logging.info( ' condition expanded to : %s' % condition_line ) # evaluate condition line: try : do_store = eval( condition_line ) except : logging.error( 'could not evaluate storage condition ...' ) sys.exit(1) #endtry # info logging.info( ' condition evaluated to : %s' % do_store ) #endif # list with store tasks: store_tasks = rcf.get( 'store.tasks' ).split() # apply ? if do_store and (len(store_tasks) > 0) : # name of output directory: output_dir = rcf.get( 'output.dir' ) # zipper command ? default_zipper = rcf.get( 'default.store.zipper' ) # destination ... default_dest = rcf.get( 'default.store.arch' ) # renew files in archvie ? default_renew = rcf.get( 'default.store.renew', 'bool' ) # loop over all types: for store_task in store_tasks : # info ... logging.info( ' task %s ...' % store_task ) # from where ? from_dir = rcf.get( '%s.store.from' % store_task, default='' ) # replace some keys: from_dir = from_dir.replace('',output_dir) # if specified ... if len(from_dir) > 0 : # go to it: owd = os.getcwd() os.chdir(from_dir) #endif # extra files: extras = rcf.get( '%s.store.copy' % store_task, default='None' ) # specified ? if extras != 'None' : # destination: copy_to = rcf.get( '%s.store.copy.to' % store_task ) # loop over extra files: for extra in extras.split() : # info ... logging.info( ' copy %s ...' % extra ) # check ... if not os.path.exists(extra) : logging.error( ' file not found ...' ) sys.exit(1) #endif # copy: shutil.copy( extra, copy_to ) #endif # loop over extra files #endif # extra files specified # destinations: dests = rcf.get( '%s.store.arch' % store_task, default=default_dest ).split() # info ... logging.info( ' store in:' ) for dest in dests : logging.info( ' %s' % dest ) # files to be stored: store_files = rcf.get( '%s.store.files' % store_task ).split() # flag to check if a tarfile is to be created: do_tar = False # check if a single file should be stored; could be an archive file ... if len(store_files) == 1 : # short name ... tarfile = store_files[0] # extension of archive file ? do_tar = tarfile.endswith('.tar') #endif # create a tar file ? if do_tar : # file filter: tarfilter = rcf.get( '%s.store.tarfilter' % store_task ) # info ... logging.info( ' create %s ...' % tarfile ) # collection command; # do not use the gnu form 'tar c -f', this is not supported on all machines; # call in a shell since the tarfilter might contain '*' etc: command = 'tar cf %s %s' % (tarfile,tarfilter) # execute: try : p = go.subprocess.log_call( command, shell=True ) except Exception, err : logging.error( err ) sys.exit(1) #endtry #endif # zipper command ? zipper = rcf.get( '%s.store.zipper' % store_task, default=default_zipper ) # renew file if already present ? renew = rcf.get( '%s.store.renew' % store_task, 'bool', default=default_renew ) # loop over files to be stored (could be the single archive file): for store_file in store_files : # destination name in archive: afile = store_file # extend the name with the zipping extension if necessary: if len(zipper) > 0 : # store original: afile_unzipped = afile # new name: if zipper == 'gzip' : afile = afile+'.gz' if zipper == 'bzip2' : afile = afile+'.bz2' if zipper == 'compress' : afile = afile+'.Z' # could be the tarfile ... if do_tar : tarfile = afile #endif # loop over destinations: for dest in dests : # check on presence ? if not renew : # flag ... found = True # command to check precence: command = [ os.path.join(scriptdir,'gss'), 'exist', os.path.join(dest,afile) ] # execute: try : p = go.subprocess.log_call( command ) except Exception, err : logging.error( err ) sys.exit(1) #endtry # skip ? if found : # info ... logging.info( ' %s already present in archive; skip ...' % afile ) # next: continue #endif #endif # now apply zipping if necessary: if len(zipper) > 0 : # info ... logging.info( ' %s ...' % zipper ) # zipping command: command = [ zipper, afile_unzipped ] # execute: try : p = go.subprocess.log_call( command ) except Exception, err : logging.error( err ) sys.exit(1) #endtry #endif # info ... logging.info( ' store %s in %s ...' % (afile,dest) ) # destination could be location to be interpreted by gss scripts; # just check on ':' to decide on this: if ':' in dest : # gss command to copy to archive, create directories if necessary: command = [ os.path.join(scriptdir,'gss'), 'copy', '--mkdir', afile, os.path.join(dest,afile) ] # execute: try : p = go.subprocess.log_call( command ) except Exception, err : logging.error( err ) sys.exit(1) #endtry else : # check if destination exists: if not os.path.isdir(dest) : os.makedirs(dest) # use a simple copy command; execute in a shell, the source file might be a filename filter: command = 'cp %s %s' % (afile,dest) # execute: try : p = go.subprocess.log_call( command, shell=True ) except Exception, err : logging.error( err ) sys.exit(1) #endtry #endif #endfor # destinations # if this is (the) tarfile, remove it: if do_tar : # not necessary anymore ... os.remove( tarfile ) else : # unzip the stored file since might be needed for re-start: if len(zipper) > 0 : # unzipper: if zipper == 'gzip' : unzipper = 'gunzip' if zipper == 'bzip2' : unzipper = 'bunzip2' if zipper == 'compress' : unzipper = 'uncompress' # unzipping command: command = [ unzipper, afile ] # execute: try : p = go.subprocess.log_call( command ) except Exception, err : logging.error( err ) sys.exit(1) #endtry #endif # files were zipped #endif # tarred #endfor # files to be stored # back ? if len(from_dir) > 0 : os.chdir(owd) # extra shell command ? command = rcf.get( '%s.store.extra.command' % store_task, default='None' ) if command != 'None' : # info ... logging.info( ' call shell command : %s' % command ) # call subprocess, log output: try : p = go.subprocess.log_call( command, shell=True ) except Exception, err : logging.error( err ) sys.exit(1) #endtry #endif #endfor # output tasks #endif # store tasks specified # =============== # touch (to avoid removal from scratch) # =============== # info ... logging.info( 'touch output files if necessary ...' ) # touch files to prevent removal ? output_touch = rcf.get( 'output.touch', 'bool', default=False ) # touch output ? if output_touch : # info ... logging.info( ' touch all files (access time) ...' ) # main output directory: output_dir = rcf.get( 'output.dir' ) # goto output directory: owd = os.getcwd() os.chdir( owd ) # command to find all files, and touch access time: command = [ 'find', '.', '-type', 'f', '-exec', 'touch', '-a', '{}', '\;' ] # execute: try : p = go.subprocess.log_call( command ) except Exception, err : logging.error( err ) sys.exit(1) #endtry # back ... os.chdir( owd ) #endif # touch ? # =============== # user scripts # =============== # info ... logging.info( 'call user scripts if necessary ...' ) # condition line: condition_line = rcf.get( 'output.user.scripts.condition', default='None' ) # no condition specified ? then apply: if condition_line == 'None' : # info logging.info( ' no condition on user script, thus apply if any...' ) # apply by default: do_store = True else : # info logging.info( ' condition for calling user script : %s' % condition_line ) # name of "rs" file : rcfile with '.rs' instead of '.rc' : bname,ext = os.path.splitext(rcfile) rsfile = bname+'.rs' if os.path.exists(rsfile) : # read restart settings: rsf = rc.RcFile( rsfile ) # evaluate '%{..}' keys in line: condition_line = rsf.substitute( condition_line, marks=('%{','}') ) else: # If no RS file, try to use the current RC file logging.info( ' RS file not found, will use current RC file to evaluate condition' ) condition_line = rcf.substitute( condition_line, marks=('%{','}') ) # info logging.info( ' condition expanded to : %s' % condition_line ) # evaluate condition line: try : do_store = eval( condition_line ) except : logging.error( 'could not evaluate storage condition ...' ) sys.exit(1) #endtry # info logging.info( ' condition evaluated to : %s' % do_store ) #endif # apply ? if do_store : # ";"-seperated list with script calls; might be empty: user_scripts = rcf.get( 'output.user.scripts' ) # any specified ? if len(user_scripts) > 0 : # loop over sripts: for user_script in user_scripts.split(';') : # info ... logging.info( ' call script "%s" ...' % user_script ) # command to call the script; replace some keywords: command = user_script command = command.replace('',opts.bindir) command = command.replace('',rcfile) # execute as a shell command: try : p = go.subprocess.watch_call( command, shell=True ) except Exception, err : logging.error( err ) sys.exit(1) #endtry #endfor # user uscripts #endif # list of scripts specified #endif # call user scripts # =============== # Done # =============== logging.info( 'end' ) # ----------------------------------------------- # end # -----------------------------------------------