submit_tm5_step_done 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. #! /usr/bin/env python
  2. # -----------------------------------------------
  3. # help
  4. # -----------------------------------------------
  5. """
  6. NAME
  7. submit_tm5_step_done
  8. DESCRIPTION
  9. Script to post process run output:
  10. o store output files in an archive
  11. o touch output files
  12. o run special user output scripts
  13. STORE OUTPPUT FILES
  14. Specify space seperated list with store tasks:
  15. store.tasks : task1 task2
  16. For each task, an number of task specific settings is defined in the
  17. rest of this file; some values are initialized below using a dummy
  18. task 'default' :
  19. ! from where ? leave empty for run directory, use '<output.dir>' for output directory:
  20. <task>.store.from : <output.dir>
  21. ! copy some extra files to output subdirectory for storage:
  22. <task>.store.copy : station-list.txt
  23. <task>.store.copy.to : STATION/
  24. ! name(s) of target file(s) for storage;
  25. ! if only one file is specified with extension '.tar'
  26. ! then an archive file is created. Default is a *.tar defined by
  27. ! sources.store.files in pycasso-tm5-expert.rc file.
  28. <task>.store.files : all.tar
  29. ! if a tarfile has to be created, specify which files to archive
  30. ! through a filter:
  31. <task>.store.tarfilter : STATION/*.hdf STATION/*.txt
  32. ! zip files before archiving ? empty, or a zipper command (gzip, bzip2, compress, ...) :
  33. default.zipper : gzip
  34. ! archive in:
  35. default.store.arch : ec:/xxx/MAIN_ARCHIVE/
  36. <task>.store.arch : ec:/xxx/SPECIAL_OUTPUT/
  37. ! If the 'arch' does not contain any ':' characters it is assumed to be
  38. ! a local directory. In this case, a 'cp' command is used to archive
  39. ! the files rather than the 'gss' script, and the files might therefore
  40. ! include filters:
  41. <task>.store.files : save*.hdf
  42. <task>.store.arch : ${SCRATCH}/savefiles/
  43. ! store in archive even if already present ?
  44. ! by default probably True, but might not be necessary for some precomputed data:
  45. default.store.renew : True
  46. correlations.store.renew : False
  47. ! extra shell command to be called at the end of the task;
  48. ! for example to cleanup or do other important stuff:
  49. <task>.store.extra.command : rm -f STATION/*.hdf STATION/*.txt
  50. Storage might be subject to a special condition, e.g. end of iteration in a 4D-var run.
  51. Specifiy a condition line that could be evaluated by python.
  52. If not specified, the default condition is True .
  53. Keys '%{..}' in the condition line are expanded with the values in the restart file
  54. if present; this file is written by the main program and has the same name as
  55. the rcfile but with extension '.rs' instead of '.rc' .
  56. Note the '%' instead of '$', otherwise the line is evaluated too early!
  57. store.condition : %{m1qn3.finished} == 1
  58. TOUCH OUTPUT FILES
  59. To avoid that output files are removed from scratch
  60. during long, long runs.
  61. Enable this flag to touch recursively the access time
  62. of all files in 'output.dir' .
  63. ! touch files to prevent removal (True|False) ?
  64. output.touch : False
  65. USER SCRIPTS
  66. For more elaborate post-processing, you can write your own
  67. script. Then add the full command (including options and arguments)
  68. to the output.user.script key, so it will automatically be executed:
  69. output.user.scripts : <bindir>/myscript <rcfile>
  70. More than one command can be specified: use ';' to separate them.
  71. The path should be either absolute or relative to the run directory.
  72. For scripts in any of the 'bin' subdirectories of the source, use
  73. <bindir>. Arguments/options can use <rcfile>, which is replaced by
  74. the name of the runtime rcfile.
  75. Conditional call is possible. Just specifiy a condition line
  76. that can be evaluated with python eval() function.
  77. Set with the `user.script.condition` key,
  78. and default to True if not set or empty.
  79. The same condition apply to all listed scripts.
  80. ! condition line:
  81. output.user.scripts.condition : %{m1qn3.finished} == 1
  82. """
  83. # -----------------------------------------------
  84. # external
  85. # -----------------------------------------------
  86. # standard modules:
  87. import sys
  88. import os
  89. import shutil
  90. import go
  91. import optparse
  92. import logging
  93. # -----------------------------------------------
  94. # logging
  95. # -----------------------------------------------
  96. # setup messages:
  97. logging.basicConfig( format='%(lineno)-4s:%(filename)-30s [%(levelname)-8s] %(message)s', level=logging.INFO, stream=sys.stdout )
  98. # -----------------------------------------------
  99. # default values
  100. # -----------------------------------------------
  101. # location of auxilary scripts:
  102. bindir_default = os.curdir
  103. # -----------------------------------------------
  104. # arguments
  105. # -----------------------------------------------
  106. # set text for 'usage' help line:
  107. usage = "%prog <rcfile>"
  108. # initialise the option parser:
  109. parser = optparse.OptionParser(usage=usage)
  110. # define options:
  111. parser.add_option( "--bindir",
  112. help="location of auxilary scripts (%s)" % bindir_default,
  113. dest="bindir", action="store", default=bindir_default )
  114. # now parse the actual arguments;
  115. # return an object 'opts' with fields 'verbose' etc,
  116. # and the unnamed arguments in the list 'args' :
  117. opts,args = parser.parse_args()
  118. # only one argument ...
  119. if len(args) != 1 :
  120. if opts.verbose : logging.error( 'single argument command should be specified, found : %i' % len(args) )
  121. parser.print_usage()
  122. sys.exit(1)
  123. #endif
  124. # extract ...
  125. rcfile = args[0]
  126. # -----------------------------------------------
  127. # toolboxes
  128. # -----------------------------------------------
  129. # location of scripts:
  130. scriptdir = opts.bindir
  131. # prepend locations of python modules to search path:
  132. sys.path.insert( 0, scriptdir )
  133. # local modules:
  134. import rc
  135. # -----------------------------------------------
  136. # begin
  137. # -----------------------------------------------
  138. # info ...
  139. logging.info( 'start' )
  140. # read settings:
  141. rcf = rc.RcFile( rcfile )
  142. # ===============
  143. # store output
  144. # ===============
  145. # info ...
  146. logging.info( 'store output files if necessary ...' )
  147. # condition line:
  148. condition_line = rcf.get( 'store.condition', default='None' )
  149. # no condition specified ? then apply:
  150. if condition_line == 'None' :
  151. # info
  152. logging.info( ' no storage condition found, thus apply ...' )
  153. # apply by default:
  154. do_store = True
  155. else :
  156. # info
  157. logging.info( ' conditional storage : %s' % condition_line )
  158. # name of restart file that might be present: rcfile with '.rs' instead of '.rc' :
  159. bname,ext = os.path.splitext(rcfile)
  160. rsfile = bname+'.rs'
  161. # present ?
  162. if os.path.exists(rsfile) :
  163. # read restart settings:
  164. rsf = rc.RcFile( rsfile )
  165. # evaluate '%{..}' keys in line:
  166. condition_line = rsf.substitute( condition_line, marks=('%{','}') )
  167. else:
  168. # If no RS file, try to use the current RC file
  169. logging.info( ' RS file not found, try current RC file to evaluate condition' )
  170. condition_line = rcf.substitute( condition_line, marks=('%{','}') )
  171. # info
  172. logging.info( ' condition expanded to : %s' % condition_line )
  173. # evaluate condition line:
  174. try :
  175. do_store = eval( condition_line )
  176. except :
  177. logging.error( 'could not evaluate storage condition ...' )
  178. sys.exit(1)
  179. #endtry
  180. # info
  181. logging.info( ' condition evaluated to : %s' % do_store )
  182. #endif
  183. # list with store tasks:
  184. store_tasks = rcf.get( 'store.tasks' ).split()
  185. # apply ?
  186. if do_store and (len(store_tasks) > 0) :
  187. # name of output directory:
  188. output_dir = rcf.get( 'output.dir' )
  189. # zipper command ?
  190. default_zipper = rcf.get( 'default.store.zipper' )
  191. # destination ...
  192. default_dest = rcf.get( 'default.store.arch' )
  193. # renew files in archvie ?
  194. default_renew = rcf.get( 'default.store.renew', 'bool' )
  195. # loop over all types:
  196. for store_task in store_tasks :
  197. # info ...
  198. logging.info( ' task %s ...' % store_task )
  199. # from where ?
  200. from_dir = rcf.get( '%s.store.from' % store_task, default='' )
  201. # replace some keys:
  202. from_dir = from_dir.replace('<output.dir>',output_dir)
  203. # if specified ...
  204. if len(from_dir) > 0 :
  205. # go to it:
  206. owd = os.getcwd()
  207. os.chdir(from_dir)
  208. #endif
  209. # extra files:
  210. extras = rcf.get( '%s.store.copy' % store_task, default='None' )
  211. # specified ?
  212. if extras != 'None' :
  213. # destination:
  214. copy_to = rcf.get( '%s.store.copy.to' % store_task )
  215. # loop over extra files:
  216. for extra in extras.split() :
  217. # info ...
  218. logging.info( ' copy %s ...' % extra )
  219. # check ...
  220. if not os.path.exists(extra) :
  221. logging.error( ' file not found ...' )
  222. sys.exit(1)
  223. #endif
  224. # copy:
  225. shutil.copy( extra, copy_to )
  226. #endif # loop over extra files
  227. #endif # extra files specified
  228. # destinations:
  229. dests = rcf.get( '%s.store.arch' % store_task, default=default_dest ).split()
  230. # info ...
  231. logging.info( ' store in:' )
  232. for dest in dests : logging.info( ' %s' % dest )
  233. # files to be stored:
  234. store_files = rcf.get( '%s.store.files' % store_task ).split()
  235. # flag to check if a tarfile is to be created:
  236. do_tar = False
  237. # check if a single file should be stored; could be an archive file ...
  238. if len(store_files) == 1 :
  239. # short name ...
  240. tarfile = store_files[0]
  241. # extension of archive file ?
  242. do_tar = tarfile.endswith('.tar')
  243. #endif
  244. # create a tar file ?
  245. if do_tar :
  246. # file filter:
  247. tarfilter = rcf.get( '%s.store.tarfilter' % store_task )
  248. # info ...
  249. logging.info( ' create %s ...' % tarfile )
  250. # collection command;
  251. # do not use the gnu form 'tar c -f', this is not supported on all machines;
  252. # call in a shell since the tarfilter might contain '*' etc:
  253. command = 'tar cf %s %s' % (tarfile,tarfilter)
  254. # execute:
  255. try :
  256. p = go.subprocess.log_call( command, shell=True )
  257. except Exception, err :
  258. logging.error( err )
  259. sys.exit(1)
  260. #endtry
  261. #endif
  262. # zipper command ?
  263. zipper = rcf.get( '%s.store.zipper' % store_task, default=default_zipper )
  264. # renew file if already present ?
  265. renew = rcf.get( '%s.store.renew' % store_task, 'bool', default=default_renew )
  266. # loop over files to be stored (could be the single archive file):
  267. for store_file in store_files :
  268. # destination name in archive:
  269. afile = store_file
  270. # extend the name with the zipping extension if necessary:
  271. if len(zipper) > 0 :
  272. # store original:
  273. afile_unzipped = afile
  274. # new name:
  275. if zipper == 'gzip' : afile = afile+'.gz'
  276. if zipper == 'bzip2' : afile = afile+'.bz2'
  277. if zipper == 'compress' : afile = afile+'.Z'
  278. # could be the tarfile ...
  279. if do_tar : tarfile = afile
  280. #endif
  281. # loop over destinations:
  282. for dest in dests :
  283. # check on presence ?
  284. if not renew :
  285. # flag ...
  286. found = True
  287. # command to check precence:
  288. command = [ os.path.join(scriptdir,'gss'), 'exist', os.path.join(dest,afile) ]
  289. # execute:
  290. try :
  291. p = go.subprocess.log_call( command )
  292. except Exception, err :
  293. logging.error( err )
  294. sys.exit(1)
  295. #endtry
  296. # skip ?
  297. if found :
  298. # info ...
  299. logging.info( ' %s already present in archive; skip ...' % afile )
  300. # next:
  301. continue
  302. #endif
  303. #endif
  304. # now apply zipping if necessary:
  305. if len(zipper) > 0 :
  306. # info ...
  307. logging.info( ' %s ...' % zipper )
  308. # zipping command:
  309. command = [ zipper, afile_unzipped ]
  310. # execute:
  311. try :
  312. p = go.subprocess.log_call( command )
  313. except Exception, err :
  314. logging.error( err )
  315. sys.exit(1)
  316. #endtry
  317. #endif
  318. # info ...
  319. logging.info( ' store %s in %s ...' % (afile,dest) )
  320. # destination could be location to be interpreted by gss scripts;
  321. # just check on ':' to decide on this:
  322. if ':' in dest :
  323. # gss command to copy to archive, create directories if necessary:
  324. command = [ os.path.join(scriptdir,'gss'), 'copy', '--mkdir', afile, os.path.join(dest,afile) ]
  325. # execute:
  326. try :
  327. p = go.subprocess.log_call( command )
  328. except Exception, err :
  329. logging.error( err )
  330. sys.exit(1)
  331. #endtry
  332. else :
  333. # check if destination exists:
  334. if not os.path.isdir(dest) : os.makedirs(dest)
  335. # use a simple copy command; execute in a shell, the source file might be a filename filter:
  336. command = 'cp %s %s' % (afile,dest)
  337. # execute:
  338. try :
  339. p = go.subprocess.log_call( command, shell=True )
  340. except Exception, err :
  341. logging.error( err )
  342. sys.exit(1)
  343. #endtry
  344. #endif
  345. #endfor # destinations
  346. # if this is (the) tarfile, remove it:
  347. if do_tar :
  348. # not necessary anymore ...
  349. os.remove( tarfile )
  350. else :
  351. # unzip the stored file since might be needed for re-start:
  352. if len(zipper) > 0 :
  353. # unzipper:
  354. if zipper == 'gzip' : unzipper = 'gunzip'
  355. if zipper == 'bzip2' : unzipper = 'bunzip2'
  356. if zipper == 'compress' : unzipper = 'uncompress'
  357. # unzipping command:
  358. command = [ unzipper, afile ]
  359. # execute:
  360. try :
  361. p = go.subprocess.log_call( command )
  362. except Exception, err :
  363. logging.error( err )
  364. sys.exit(1)
  365. #endtry
  366. #endif # files were zipped
  367. #endif # tarred
  368. #endfor # files to be stored
  369. # back ?
  370. if len(from_dir) > 0 : os.chdir(owd)
  371. # extra shell command ?
  372. command = rcf.get( '%s.store.extra.command' % store_task, default='None' )
  373. if command != 'None' :
  374. # info ...
  375. logging.info( ' call shell command : %s' % command )
  376. # call subprocess, log output:
  377. try :
  378. p = go.subprocess.log_call( command, shell=True )
  379. except Exception, err :
  380. logging.error( err )
  381. sys.exit(1)
  382. #endtry
  383. #endif
  384. #endfor # output tasks
  385. #endif # store tasks specified
  386. # ===============
  387. # touch (to avoid removal from scratch)
  388. # ===============
  389. # info ...
  390. logging.info( 'touch output files if necessary ...' )
  391. # touch files to prevent removal ?
  392. output_touch = rcf.get( 'output.touch', 'bool', default=False )
  393. # touch output ?
  394. if output_touch :
  395. # info ...
  396. logging.info( ' touch all files (access time) ...' )
  397. # main output directory:
  398. output_dir = rcf.get( 'output.dir' )
  399. # goto output directory:
  400. owd = os.getcwd()
  401. os.chdir( owd )
  402. # command to find all files, and touch access time:
  403. command = [ 'find', '.', '-type', 'f', '-exec', 'touch', '-a', '{}', '\;' ]
  404. # execute:
  405. try :
  406. p = go.subprocess.log_call( command )
  407. except Exception, err :
  408. logging.error( err )
  409. sys.exit(1)
  410. #endtry
  411. # back ...
  412. os.chdir( owd )
  413. #endif # touch ?
  414. # ===============
  415. # user scripts
  416. # ===============
  417. # info ...
  418. logging.info( 'call user scripts if necessary ...' )
  419. # condition line:
  420. condition_line = rcf.get( 'output.user.scripts.condition', default='None' )
  421. # no condition specified ? then apply:
  422. if condition_line == 'None' :
  423. # info
  424. logging.info( ' no condition on user script, thus apply if any...' )
  425. # apply by default:
  426. do_store = True
  427. else :
  428. # info
  429. logging.info( ' condition for calling user script : %s' % condition_line )
  430. # name of "rs" file : rcfile with '.rs' instead of '.rc' :
  431. bname,ext = os.path.splitext(rcfile)
  432. rsfile = bname+'.rs'
  433. if os.path.exists(rsfile) :
  434. # read restart settings:
  435. rsf = rc.RcFile( rsfile )
  436. # evaluate '%{..}' keys in line:
  437. condition_line = rsf.substitute( condition_line, marks=('%{','}') )
  438. else:
  439. # If no RS file, try to use the current RC file
  440. logging.info( ' RS file not found, will use current RC file to evaluate condition' )
  441. condition_line = rcf.substitute( condition_line, marks=('%{','}') )
  442. # info
  443. logging.info( ' condition expanded to : %s' % condition_line )
  444. # evaluate condition line:
  445. try :
  446. do_store = eval( condition_line )
  447. except :
  448. logging.error( 'could not evaluate storage condition ...' )
  449. sys.exit(1)
  450. #endtry
  451. # info
  452. logging.info( ' condition evaluated to : %s' % do_store )
  453. #endif
  454. # apply ?
  455. if do_store :
  456. # ";"-seperated list with script calls; might be empty:
  457. user_scripts = rcf.get( 'output.user.scripts' )
  458. # any specified ?
  459. if len(user_scripts) > 0 :
  460. # loop over sripts:
  461. for user_script in user_scripts.split(';') :
  462. # info ...
  463. logging.info( ' call script "%s" ...' % user_script )
  464. # command to call the script; replace some keywords:
  465. command = user_script
  466. command = command.replace('<bindir>',opts.bindir)
  467. command = command.replace('<rcfile>',rcfile)
  468. # execute as a shell command:
  469. try :
  470. p = go.subprocess.watch_call( command, shell=True )
  471. except Exception, err :
  472. logging.error( err )
  473. sys.exit(1)
  474. #endtry
  475. #endfor # user uscripts
  476. #endif # list of scripts specified
  477. #endif # call user scripts
  478. # ===============
  479. # Done
  480. # ===============
  481. logging.info( 'end' )
  482. # -----------------------------------------------
  483. # end
  484. # -----------------------------------------------