123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608 |
- #! /usr/bin/env python
- # -----------------------------------------------
- # help
- # -----------------------------------------------
- """
- NAME
- submit_tm5_step_done
-
- DESCRIPTION
- Script to post process run output:
- o store output files in an archive
- o touch output files
- o run special user output scripts
-
- STORE OUTPPUT FILES
- Specify space seperated list with store tasks:
- store.tasks : task1 task2
- For each task, an number of task specific settings is defined in the
- rest of this file; some values are initialized below using a dummy
- task 'default' :
- ! from where ? leave empty for run directory, use '<output.dir>' for output directory:
- <task>.store.from : <output.dir>
- ! copy some extra files to output subdirectory for storage:
- <task>.store.copy : station-list.txt
- <task>.store.copy.to : STATION/
- ! name(s) of target file(s) for storage;
- ! if only one file is specified with extension '.tar'
- ! then an archive file is created. Default is a *.tar defined by
- ! sources.store.files in pycasso-tm5-expert.rc file.
- <task>.store.files : all.tar
- ! if a tarfile has to be created, specify which files to archive
- ! through a filter:
- <task>.store.tarfilter : STATION/*.hdf STATION/*.txt
- ! zip files before archiving ? empty, or a zipper command (gzip, bzip2, compress, ...) :
- default.zipper : gzip
- ! archive in:
- default.store.arch : ec:/xxx/MAIN_ARCHIVE/
- <task>.store.arch : ec:/xxx/SPECIAL_OUTPUT/
- ! If the 'arch' does not contain any ':' characters it is assumed to be
- ! a local directory. In this case, a 'cp' command is used to archive
- ! the files rather than the 'gss' script, and the files might therefore
- ! include filters:
- <task>.store.files : save*.hdf
- <task>.store.arch : ${SCRATCH}/savefiles/
- ! store in archive even if already present ?
- ! by default probably True, but might not be necessary for some precomputed data:
- default.store.renew : True
- correlations.store.renew : False
-
- ! extra shell command to be called at the end of the task;
- ! for example to cleanup or do other important stuff:
- <task>.store.extra.command : rm -f STATION/*.hdf STATION/*.txt
- Storage might be subject to a special condition, e.g. end of iteration in a 4D-var run.
- Specifiy a condition line that could be evaluated by python.
- If not specified, the default condition is True .
- Keys '%{..}' in the condition line are expanded with the values in the restart file
- if present; this file is written by the main program and has the same name as
- the rcfile but with extension '.rs' instead of '.rc' .
- Note the '%' instead of '$', otherwise the line is evaluated too early!
- store.condition : %{m1qn3.finished} == 1
- TOUCH OUTPUT FILES
- To avoid that output files are removed from scratch
- during long, long runs.
- Enable this flag to touch recursively the access time
- of all files in 'output.dir' .
- ! touch files to prevent removal (True|False) ?
- output.touch : False
- USER SCRIPTS
- For more elaborate post-processing, you can write your own
- script. Then add the full command (including options and arguments)
- to the output.user.script key, so it will automatically be executed:
- output.user.scripts : <bindir>/myscript <rcfile>
- More than one command can be specified: use ';' to separate them.
- The path should be either absolute or relative to the run directory.
- For scripts in any of the 'bin' subdirectories of the source, use
- <bindir>. Arguments/options can use <rcfile>, which is replaced by
- the name of the runtime rcfile.
- Conditional call is possible. Just specifiy a condition line
- that can be evaluated with python eval() function.
- Set with the `user.script.condition` key,
- and default to True if not set or empty.
- The same condition apply to all listed scripts.
- ! condition line:
- output.user.scripts.condition : %{m1qn3.finished} == 1
- """
- # -----------------------------------------------
- # external
- # -----------------------------------------------
- # standard modules:
- import sys
- import os
- import shutil
- import go
- import optparse
- import logging
- # -----------------------------------------------
- # logging
- # -----------------------------------------------
- # setup messages:
- logging.basicConfig( format='%(lineno)-4s:%(filename)-30s [%(levelname)-8s] %(message)s', level=logging.INFO, stream=sys.stdout )
- # -----------------------------------------------
- # default values
- # -----------------------------------------------
- # location of auxilary scripts:
- bindir_default = os.curdir
- # -----------------------------------------------
- # arguments
- # -----------------------------------------------
- # set text for 'usage' help line:
- usage = "%prog <rcfile>"
- # initialise the option parser:
- parser = optparse.OptionParser(usage=usage)
- # define options:
- parser.add_option( "--bindir",
- help="location of auxilary scripts (%s)" % bindir_default,
- dest="bindir", action="store", default=bindir_default )
- # now parse the actual arguments;
- # return an object 'opts' with fields 'verbose' etc,
- # and the unnamed arguments in the list 'args' :
- opts,args = parser.parse_args()
- # only one argument ...
- if len(args) != 1 :
- if opts.verbose : logging.error( 'single argument command should be specified, found : %i' % len(args) )
- parser.print_usage()
- sys.exit(1)
- #endif
- # extract ...
- rcfile = args[0]
- # -----------------------------------------------
- # toolboxes
- # -----------------------------------------------
- # location of scripts:
- scriptdir = opts.bindir
- # prepend locations of python modules to search path:
- sys.path.insert( 0, scriptdir )
- # local modules:
- import rc
- # -----------------------------------------------
- # begin
- # -----------------------------------------------
- # info ...
- logging.info( 'start' )
- # read settings:
- rcf = rc.RcFile( rcfile )
- # ===============
- # store output
- # ===============
- # info ...
- logging.info( 'store output files if necessary ...' )
- # condition line:
- condition_line = rcf.get( 'store.condition', default='None' )
- # no condition specified ? then apply:
- if condition_line == 'None' :
- # info
- logging.info( ' no storage condition found, thus apply ...' )
- # apply by default:
- do_store = True
- else :
- # info
- logging.info( ' conditional storage : %s' % condition_line )
- # name of restart file that might be present: rcfile with '.rs' instead of '.rc' :
- bname,ext = os.path.splitext(rcfile)
- rsfile = bname+'.rs'
- # present ?
- if os.path.exists(rsfile) :
- # read restart settings:
- rsf = rc.RcFile( rsfile )
- # evaluate '%{..}' keys in line:
- condition_line = rsf.substitute( condition_line, marks=('%{','}') )
- else:
- # If no RS file, try to use the current RC file
- logging.info( ' RS file not found, try current RC file to evaluate condition' )
- condition_line = rcf.substitute( condition_line, marks=('%{','}') )
-
- # info
- logging.info( ' condition expanded to : %s' % condition_line )
-
- # evaluate condition line:
- try :
- do_store = eval( condition_line )
- except :
- logging.error( 'could not evaluate storage condition ...' )
- sys.exit(1)
- #endtry
- # info
- logging.info( ' condition evaluated to : %s' % do_store )
- #endif
- # list with store tasks:
- store_tasks = rcf.get( 'store.tasks' ).split()
- # apply ?
- if do_store and (len(store_tasks) > 0) :
- # name of output directory:
- output_dir = rcf.get( 'output.dir' )
- # zipper command ?
- default_zipper = rcf.get( 'default.store.zipper' )
- # destination ...
- default_dest = rcf.get( 'default.store.arch' )
- # renew files in archvie ?
- default_renew = rcf.get( 'default.store.renew', 'bool' )
-
- # loop over all types:
- for store_task in store_tasks :
- # info ...
- logging.info( ' task %s ...' % store_task )
- # from where ?
- from_dir = rcf.get( '%s.store.from' % store_task, default='' )
- # replace some keys:
- from_dir = from_dir.replace('<output.dir>',output_dir)
- # if specified ...
- if len(from_dir) > 0 :
- # go to it:
- owd = os.getcwd()
- os.chdir(from_dir)
- #endif
- # extra files:
- extras = rcf.get( '%s.store.copy' % store_task, default='None' )
- # specified ?
- if extras != 'None' :
- # destination:
- copy_to = rcf.get( '%s.store.copy.to' % store_task )
- # loop over extra files:
- for extra in extras.split() :
- # info ...
- logging.info( ' copy %s ...' % extra )
- # check ...
- if not os.path.exists(extra) :
- logging.error( ' file not found ...' )
- sys.exit(1)
- #endif
- # copy:
- shutil.copy( extra, copy_to )
- #endif # loop over extra files
- #endif # extra files specified
- # destinations:
- dests = rcf.get( '%s.store.arch' % store_task, default=default_dest ).split()
- # info ...
- logging.info( ' store in:' )
- for dest in dests : logging.info( ' %s' % dest )
- # files to be stored:
- store_files = rcf.get( '%s.store.files' % store_task ).split()
- # flag to check if a tarfile is to be created:
- do_tar = False
- # check if a single file should be stored; could be an archive file ...
- if len(store_files) == 1 :
- # short name ...
- tarfile = store_files[0]
- # extension of archive file ?
- do_tar = tarfile.endswith('.tar')
- #endif
- # create a tar file ?
- if do_tar :
- # file filter:
- tarfilter = rcf.get( '%s.store.tarfilter' % store_task )
- # info ...
- logging.info( ' create %s ...' % tarfile )
- # collection command;
- # do not use the gnu form 'tar c -f', this is not supported on all machines;
- # call in a shell since the tarfilter might contain '*' etc:
- command = 'tar cf %s %s' % (tarfile,tarfilter)
- # execute:
- try :
- p = go.subprocess.log_call( command, shell=True )
- except Exception, err :
- logging.error( err )
- sys.exit(1)
- #endtry
- #endif
- # zipper command ?
- zipper = rcf.get( '%s.store.zipper' % store_task, default=default_zipper )
- # renew file if already present ?
- renew = rcf.get( '%s.store.renew' % store_task, 'bool', default=default_renew )
- # loop over files to be stored (could be the single archive file):
- for store_file in store_files :
- # destination name in archive:
- afile = store_file
- # extend the name with the zipping extension if necessary:
- if len(zipper) > 0 :
- # store original:
- afile_unzipped = afile
- # new name:
- if zipper == 'gzip' : afile = afile+'.gz'
- if zipper == 'bzip2' : afile = afile+'.bz2'
- if zipper == 'compress' : afile = afile+'.Z'
- # could be the tarfile ...
- if do_tar : tarfile = afile
- #endif
- # loop over destinations:
- for dest in dests :
- # check on presence ?
- if not renew :
- # flag ...
- found = True
- # command to check precence:
- command = [ os.path.join(scriptdir,'gss'), 'exist', os.path.join(dest,afile) ]
- # execute:
- try :
- p = go.subprocess.log_call( command )
- except Exception, err :
- logging.error( err )
- sys.exit(1)
- #endtry
- # skip ?
- if found :
- # info ...
- logging.info( ' %s already present in archive; skip ...' % afile )
- # next:
- continue
- #endif
- #endif
- # now apply zipping if necessary:
- if len(zipper) > 0 :
- # info ...
- logging.info( ' %s ...' % zipper )
- # zipping command:
- command = [ zipper, afile_unzipped ]
- # execute:
- try :
- p = go.subprocess.log_call( command )
- except Exception, err :
- logging.error( err )
- sys.exit(1)
- #endtry
- #endif
- # info ...
- logging.info( ' store %s in %s ...' % (afile,dest) )
- # destination could be location to be interpreted by gss scripts;
- # just check on ':' to decide on this:
- if ':' in dest :
- # gss command to copy to archive, create directories if necessary:
- command = [ os.path.join(scriptdir,'gss'), 'copy', '--mkdir', afile, os.path.join(dest,afile) ]
- # execute:
- try :
- p = go.subprocess.log_call( command )
- except Exception, err :
- logging.error( err )
- sys.exit(1)
- #endtry
- else :
- # check if destination exists:
- if not os.path.isdir(dest) : os.makedirs(dest)
- # use a simple copy command; execute in a shell, the source file might be a filename filter:
- command = 'cp %s %s' % (afile,dest)
- # execute:
- try :
- p = go.subprocess.log_call( command, shell=True )
- except Exception, err :
- logging.error( err )
- sys.exit(1)
- #endtry
- #endif
- #endfor # destinations
- # if this is (the) tarfile, remove it:
- if do_tar :
- # not necessary anymore ...
- os.remove( tarfile )
- else :
- # unzip the stored file since might be needed for re-start:
- if len(zipper) > 0 :
- # unzipper:
- if zipper == 'gzip' : unzipper = 'gunzip'
- if zipper == 'bzip2' : unzipper = 'bunzip2'
- if zipper == 'compress' : unzipper = 'uncompress'
- # unzipping command:
- command = [ unzipper, afile ]
- # execute:
- try :
- p = go.subprocess.log_call( command )
- except Exception, err :
- logging.error( err )
- sys.exit(1)
- #endtry
- #endif # files were zipped
- #endif # tarred
- #endfor # files to be stored
- # back ?
- if len(from_dir) > 0 : os.chdir(owd)
-
- # extra shell command ?
- command = rcf.get( '%s.store.extra.command' % store_task, default='None' )
- if command != 'None' :
- # info ...
- logging.info( ' call shell command : %s' % command )
- # call subprocess, log output:
- try :
- p = go.subprocess.log_call( command, shell=True )
- except Exception, err :
- logging.error( err )
- sys.exit(1)
- #endtry
- #endif
- #endfor # output tasks
- #endif # store tasks specified
- # ===============
- # touch (to avoid removal from scratch)
- # ===============
- # info ...
- logging.info( 'touch output files if necessary ...' )
- # touch files to prevent removal ?
- output_touch = rcf.get( 'output.touch', 'bool', default=False )
- # touch output ?
- if output_touch :
- # info ...
- logging.info( ' touch all files (access time) ...' )
- # main output directory:
- output_dir = rcf.get( 'output.dir' )
- # goto output directory:
- owd = os.getcwd()
- os.chdir( owd )
- # command to find all files, and touch access time:
- command = [ 'find', '.', '-type', 'f', '-exec', 'touch', '-a', '{}', '\;' ]
- # execute:
- try :
- p = go.subprocess.log_call( command )
- except Exception, err :
- logging.error( err )
- sys.exit(1)
- #endtry
- # back ...
- os.chdir( owd )
- #endif # touch ?
-
- # ===============
- # user scripts
- # ===============
- # info ...
- logging.info( 'call user scripts if necessary ...' )
- # condition line:
- condition_line = rcf.get( 'output.user.scripts.condition', default='None' )
- # no condition specified ? then apply:
- if condition_line == 'None' :
- # info
- logging.info( ' no condition on user script, thus apply if any...' )
- # apply by default:
- do_store = True
- else :
- # info
- logging.info( ' condition for calling user script : %s' % condition_line )
- # name of "rs" file : rcfile with '.rs' instead of '.rc' :
- bname,ext = os.path.splitext(rcfile)
- rsfile = bname+'.rs'
- if os.path.exists(rsfile) :
- # read restart settings:
- rsf = rc.RcFile( rsfile )
- # evaluate '%{..}' keys in line:
- condition_line = rsf.substitute( condition_line, marks=('%{','}') )
- else:
- # If no RS file, try to use the current RC file
- logging.info( ' RS file not found, will use current RC file to evaluate condition' )
- condition_line = rcf.substitute( condition_line, marks=('%{','}') )
-
- # info
- logging.info( ' condition expanded to : %s' % condition_line )
-
- # evaluate condition line:
- try :
- do_store = eval( condition_line )
- except :
- logging.error( 'could not evaluate storage condition ...' )
- sys.exit(1)
- #endtry
- # info
- logging.info( ' condition evaluated to : %s' % do_store )
- #endif
- # apply ?
- if do_store :
-
- # ";"-seperated list with script calls; might be empty:
- user_scripts = rcf.get( 'output.user.scripts' )
-
- # any specified ?
- if len(user_scripts) > 0 :
- # loop over sripts:
- for user_script in user_scripts.split(';') :
- # info ...
- logging.info( ' call script "%s" ...' % user_script )
- # command to call the script; replace some keywords:
- command = user_script
- command = command.replace('<bindir>',opts.bindir)
- command = command.replace('<rcfile>',rcfile)
- # execute as a shell command:
- try :
- p = go.subprocess.watch_call( command, shell=True )
- except Exception, err :
- logging.error( err )
- sys.exit(1)
- #endtry
- #endfor # user uscripts
- #endif # list of scripts specified
- #endif # call user scripts
- # ===============
- # Done
- # ===============
- logging.info( 'end' )
- # -----------------------------------------------
- # end
- # -----------------------------------------------
|