#! /usr/bin/env python import rc import os import sys import subprocess import glob from optparse import OptionParser import time import re import ttb_compare # For METHOD #2 in comparing restart. Comment for method #1 import pdb """ Module with THREE classes and ONE function. The 3 classes are: rctm5 : derived RcFile class, with specifics for TM5. Requires the tools/ttb/bin/rc.py for its raw option. tm5_test : class associated to a list of rc files. Has method to compare output restart from two runs associated to two of the rc files. LL : set of LoadLeveler specific functions (tested only with ECMWF implementation only). **OBSOLETE** -------------------------------------------------------------------------------- The function 'testtm' (which is run if module is called as a script) is an example on how to use the classes defined here. It takes one or two input(s), which must be TM5 rc filenames: if two : run both cases and compare final restarts if one : run three cases (no mpi, 4, and 10 tasks), by modifying par.ntask, and compare their final restarts OR run two cases (w/ and w/o intermediate restart), by modifying jobstep.length, and compare their final restarts -------------------------------------------------------------------------------- In a module you would do: import tm5_test tm5_test.testtm( ['file1.rc'], mpi=True, tm6=True ) tm5_test.testtm( ['file1.rc', 'file2.rc'], new=True ) At the command line, the same calls would be: $> tm5_test.py -m6 file1.rc $> tm5_test.py -n file1.rc file2.rc See: $> tm5_test.py -h """ #//////////////////////////////////////////////////////// # LoadLeveler specific functions (check run, get timing..) #//////////////////////////////////////////////////////// class LL(object): """ FIXME: should inherited rctm5 object => then no need for rootname at initialization """ def __init__(self, rootname): self.root = rootname self.err=(''.join([self.root,'_init.err']), ''.join([self.root,'_run.err'] ), ''.join([self.root,'_done.err']) ) # basic check on existence of the run (wait a bit) if not os.path.isfile(self.err[0]): time.sleep(10) if not os.path.isfile(self.err[0]): print "Run not started..." raise Exception def clean_jobs(self): """ simply remove the log files """ def check_jobs(self): """ check completion of each 3 steps of a run """ # INIT while self.isRunning(0): time.sleep(10) if not self.step_success(0): print self.root, " : FAIL at init step" raise Exception # RUN while self.isRunning(1): time.sleep(10) if not self.step_success(1): print self.root, " : FAIL at run step" raise Exception # DONE while self.isRunning(2): time.sleep(10) if not self.step_success(2): print self.root, " : FAIL at done step" raise Exception def step_success(self, step=0, verbose=False): """ Look for 'exit_code=X' (if any) in *err file from a loadleveler job step, and return True if success, False else """ ff = glob.glob(self.err[step]) status = False if not len(ff): if verbose : print '\t',"No file :" + self.err[step] else: ffo=open(ff[0],'r') found=False for line in ffo: match=re.search('.*exit_code=(\d+)',line) if match: found=True if match.group(1) != '0': if verbose:print '\t', 'Error = '+match.group(1) else: status=True if verbose:print ' Succes = '+match.group(1) if not found: # catches memory out-of-range submit if verbose:print ' Error = unknown' ffo.close() return status def step_get_timing(self, step=0, verbose=False): """ return timing as a tuple: ( elapsed time [s], cpu time [s], cpu time [hh:mm:ss], cost [system billing units] ) from a finished jobstep from Loadleveler. If the *err file associated to the step is not found, just return 0,0,0,0 """ ff = glob.glob(self.err[step]) regex=( '^ *Elapsed: *(\d+\.\d+) *sec', '^ *CPU Tot: *(\d+\.\d+) *sec', '.*\\+(\d{2}:\d{2}:\d{2})', '^ *System Billing Units .*= *(\d+\.\d+)' ) elapsed, cpu, cpuf, bill=0,0,0,0 if not len(ff): if verbose : print " No file :" + self.err[step] else: ffo=open(ff[0],'r') found=False for line in ffo: for k,r in enumerate(regex): match=re.search(r,line) if match: if k == 0: elapsed=match.group(1) if k == 1: cpu=match.group(1) if k == 2: cpuf=match.group(1) if k == 3: bill=match.group(1) ffo.close() return elapsed, cpu, cpuf, bill def isRunning(self, step=0): """ returns True if step is running, False else """ ff = glob.glob(self.err[step]) if not len(ff): print "File not found:", self.err[step] # get job id match=None while not match: ffo=open(ff[0],'r') regex = ''.join(['c2a.*\.\d+\.', str(step)]) for line in ffo: match = re.search(regex, line) if match: idr=match.group(0) break ffo.close() if not match: time.sleep(5) # if file opened too early # get status command = ['llq', '-f', '%st', idr] pr = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = pr.communicate() check=re.match('ST\n--\n(..)\n', out) if check : status=check.group(1) #print " status of job "+idr+" : "+status # if Idle, Running, Pending, almost completed, not queued yet, or Starting if status in ['R ','P ','I ', 'CP','ST', 'NQ']: return True else: return False else: return False #//////////////////////////////////////////////////////// # Derived RcFile class for TM5 #//////////////////////////////////////////////////////// class rctm5(rc.RcFile): """ Derived RcFile class that adds 'run', 'checkrun', 'clean' and a 'convert2nc3' methods. Add the following attributes (derived or existing in the rc dico but used a lot): self.rundone : T/F self.status : '', 'crashed', 'running' self.runtype : 'foreground', 'queue' self.region1 : name of first region (eg: 'glb600x400') self.restart : restart file in netCDF-4 format self.restart3 : restart file in netCDF-3 format self.timestamp : 'startdate_enddate' string used in mmix, budget,... filenames self.logcomp : name of log of setup_tm5 script self.rundir : run directory self.ok : tm5.ok fully qualified filename Also expands path of filename at init. """ def __init__(self, filename, raw=False): rc.RcFile.__init__(self,os.path.expanduser(filename), raw=raw) self.setfilenames() self.rundone = False self.status = 'unknown' self.runtype = self.get('submit.to') self.basic_checkrun(verbose=False) def setfilenames(self): try: # restart filenames (original in netCDF 4 and converted to nc-3) outrestart = self.get('restart.write.dir' ) etime = self.get('timerange.end' ) hres = self.get('my.region1' ) etime = etime.split() ymd = ''.join(etime[0].split('-')) hhmm = ''.join(etime[1][:5].split(':')) hh = ''.join(etime[1][:3].split(':')) fname = ''.join(["TM5_restart_", ymd, "_", hhmm, "_", hres, ".nc"]) self.region1 = hres self.restart = os.path.join(outrestart, fname) self.restart3 = os.path.join(outrestart, ''.join([fname,"3"])) # repeat for start time, and get YYYYMMDDHH_YYYYMMDDHH string # used in mmix, jstat filenames (useful for runs with ONE job only) stime = self.get('timerange.start' ) stime = stime.split() symd = ''.join(stime[0].split('-')) shhmm = ''.join(stime[1][:5].split(':')) shh = ''.join(etime[1][:3].split(':')) self.timestamp = ''.join([symd, shh, "_", ymd, hh]) # rundir, tm5.ok, ... self.rundir = self.get('my.run.dir') self.ok = os.path.join(self.rundir,'tm5.ok') except: self.restart = "" self.restart3 = "" self.ok='' # Setup_tm5 script log (compilation if any) self.logcomp = os.path.join(os.curdir, os.path.splitext(self.filename)[0]+'.out') def display(self): """ print basic info about tm5-rc obj. """ print "RC file :", self.filename print "run dir :", self.get('my.run.dir') print "end restart :", self.restart print " ...exists :", os.path.isfile(self.restart) print "status :", self.status print "fg/bg/queue :", self.runtype print "log compil. :", self.logcomp print "--------------------------" def run(self, force=False, clean=False, queue=True): """ Call setup_tm5, if final restart does not exist. If 'force', then runs even of the restart exists. If 'clean', re-compile everything ('build' dir is removed). If 'queue', use the queue manager, else run with the setting in the rc file. """ if queue : self.runtype = "queue" tosubmit = force or (not os.path.isfile(self.restart)) if tosubmit: command = [os.path.join(os.curdir,'setup_tm5'), self.filename,'-s'] if queue : command.append("-q") #OLD # ------ loadleveler @ ECMWF specific (begin) ------ #OLD # Remove any old log files (so .check_job will work) #OLD runid = self.get('my.basename') #OLD rundir = self.get('my.run.dir') #OLD mask = os.path.join(rundir,''.join([runid,'_[0-9][0-9][0-9]_*.err'])) #OLD #OLD for f in glob.glob(mask): #OLD #print " removing "+f #OLD os.remove(f) #OLD #OLD # ------ loadleveler @ ECMWF specific (end) ------ if clean : command.append("-n") print "submitting run for "+ self.filename fo=open(self.logcomp,'w') retcode = subprocess.call( command, stdout=fo, stderr=subprocess.STDOUT) fo.close() if retcode != 0 : print "compilation failed. See: ", self.logcomp self.status = 'crashed' self.rundone = True raise Exception else: print "submit ok" self.status = 'running' self.rundone = False else: print 'skipping run for '+ self.filename+' (final restart already exists)' self.status = 'done' self.rundone = True def convert2nc3(self): """ Goal : modifying restart files so we can use 'cmp' on them. If we were dealing with netCDF-3, taking off the TimeStamp attribute from netCDF files is usually enough. But TM5 writes restart files in netCDF-4 """ # IF netcdf 3 then # command = ['ncatted', '-a TimeStamp,global,d,c,"a"', '-h', self.restart] # IF netcdf 4, convert to netcdf 3, without adding to history # attribute [could use nccopy if available] command = ['ncks', '-3', '-h', self.restart, self.restart3] if os.path.exists(self.restart) == True: if os.path.exists(self.restart3) == False: retcode = subprocess.call( command ) if retcode != 0 : print command print "Conversion to netCDF 3 failed" raise Exception else: print "Restart converted to netCDF-3" else: print "Restart already converted to netCDF-3" else: print "No restart file to touch" def get_runtime(self): """ simple print/return some runtimes for foreground run """ regex = ( '00] root *(\d+\.\d+)', '00] init *(\d+\.\d+)', '00] step init *(\d+\.\d+)', '00] step run *(\d+\.\d+)', '00] tmm readfield 2D *(\d+\.\d+)', '00] tmm readfield 3D *(\d+\.\d+)', '00] other *(\d+\.\d+)' ) zeros=[0]*len(regex) if not self.rundone : print "run not submitted: No timing available." return zeros if self.runtype == 'foreground': try: logfile=open(self.logcomp,'r') except: print "no log file: No timing available." return zeros stat=[] for line in logfile: for k,r in enumerate(regex): klm=-1 match = re.search(r, line) if match: print line, stat.append(match.group(1)) klm=k if (klm+1) == len(regex) : break logfile.close() return stat else: # LoadLeveler case print "timing not directly available for LoadLeveler. See \ checkrun for how-to" return zeros def checkrun(self, nowait=False): """ Should return once a run is completely done. Already accounts for multiple jobsteps, and catches most crashes (as long as they give an error in log file). Now should be platform independent and works if run is in the foreground... where it is a bit overkill. """ print "checking run for "+self.filename #if self.rundone : # print " restart already exists" # return rundir = self.get('my.run.dir') runid = self.get('my.basename') #-------------------- # LOOP thru jobsteps #-------------------- islast = False rcs=[] rc_exclude=[] while not islast: # get a newer rc file while not rcs : time.sleep(1) mask = ''.join([runid,'_[0-9][0-9][0-9].rc']) rcs = glob.glob(os.path.join(rundir,mask)) if rc_exclude : for r in rc_exclude: if r in rcs: rcs.remove(r) rc_exclude.extend(rcs) # in case of older files around in the first pass newest = max(rcs, key=lambda x: os.stat(x).st_mtime) orc = rc.RcFile(newest) rcs=[] # step number id (001, 002, ...) find = re.search(r"_([0-9]{3})\.rc$", newest) if find: idnb=find.group(1) else: print "problem with RE to find jobstep number" raise Exception root = os.path.join(rundir,''.join([runid,'_',idnb])) # last chunk ? islast = ( orc.get('timerange.end') == orc.get('jobstep.timerange.end') ) if islast: print " checking last jobstep:", newest else: print " checking itermediate jobstep:", newest # check run #pdb.set_trace() if self.runtype == 'foreground': self.rundone = True if not os.path.exists(self.restart) : raise Exception timing=self.get_runtime() else: # ------ loadleveler @ ECMWF specific (begin) ------ try: ll=LL(root) except: print "Problem with LoadLeveler object init" raise Exception ll.check_jobs() self.rundone = True print ll.step_get_timing(1) # ------ loadleveler @ ECMWF specific (end) ------ # IF not using loadleveler at ECMWF, you can comment the # loadleveler lines above, and uncomment the following ones. That # just assumes a run never crashes, and wait for last restart: #while os.path.exists(self.restart) == False: time.sleep(10) #print " restart found" # ## including extra security to make sure restart is closed.. #ok = os.path.join(rundir,'tm5.ok') #while os.path.exists(ok) == False: time.sleep(10) def basic_checkrun(self, verbose=True): """ Check if a run is successfully terminated by checking the existence of the tm5.ok file and final restart file. Note this is not bullet proof: if between legs and final restart remains from a previous run. """ if verbose: print "basic run check for "+self.filename if os.path.exists(self.ok) and os.path.exists(self.restart): if verbose: print "run sucessfully terminated" if verbose: print "--------------------------" self.rundone = True return True else: if verbose: print "run not done or crashed" if verbose: print "--------------------------" return False def cleanup(self, full=True, verbose=True): if verbose: print "cleaning up : ",self.filename # Minimal if os.path.isfile(self.restart3) : os.remove(self.restart3) # Full if full: if os.path.isfile(self.restart) : os.remove(self.restart) if os.path.isfile(self.logcomp) : os.remove(self.logcomp) # Empty rundir. Output and profiling are left in their # own directory (if different from the rundir) rundir = self.get('my.run.dir') if os.path.exists(rundir): files = [ os.path.join(rundir, f) for f in os.listdir(rundir) if os.path.isfile(os.path.join(rundir, f))] for f in files: os.remove(f) self.rundone = False def get_output_list(self): # restart and log out=[self.restart, self.logcomp] out=[f for f in out if os.path.isfile(f)] # run dir rundir = self.get('my.run.dir') if os.path.exists(rundir): files = [ os.path.join(rundir, f) for f in os.listdir(rundir) if os.path.isfile(os.path.join(rundir, f))] out=out+files # output dir rundir = self.get('output.dir') if os.path.exists(rundir): files = [ os.path.join(rundir, f) for f in os.listdir(rundir) if os.path.isfile(os.path.join(rundir, f))] out=out+files # profile dir subdir = self.get('timing.output.subdir') rundir = os.path.join(rundir, subdir) if os.path.exists(rundir): files = [ os.path.join(rundir, f) for f in os.listdir(rundir) if os.path.isfile(os.path.join(rundir, f))] out=out+files return out #//////////////////////////////////////////////////////// # TEST class for TM5 #//////////////////////////////////////////////////////// class tm5_test(object): def __init__(self, Rcfiles): if len(Rcfiles) < 2 : print "pass at least 2 rc files for comparison" raise Exception self.rc=[] for fname in Rcfiles: self.rc.append( rctm5(fname) ) def comp_restart(self, ind=[0,1], clean=False, queue=False): method=2 # 1 = Unix cmp ; 2 = tools/ttb/bin/ttb_compare.py # for now, can compare 2 runs only if len(ind) != 2 : print "requires 2 file index for comparison" raise Exception # same? if self.rc[ind[0]].restart == self.rc[ind[1]].restart: print "same path-to-file/restart-file in both cases !" print "\n SUCCESS" return 0 # Submit (allows for concomittant runs in the queue) for k in ind: self.rc[k].run(clean=clean,queue=queue) # Wait for run end and do post-processing if any okrun = True for klm in ind: try: self.rc[klm].checkrun() if (method == 1): self.rc[klm].convert2nc3() except: print "\n FAILED run :", self.rc[klm].filename okrun = False if not okrun: print "\n FAILED - comparison aborted" return 1 # COMPARE if method == 1: # METHOD 1 : binary comparison of restart files converted to # netCDF-3 command = ['cmp', '-s', self.rc[ind[0]].restart3, self.rc[ind[1]].restart3 ] print "comparing:" print " ", self.rc[ind[0]].restart3 print " ", self.rc[ind[1]].restart3 retcode = subprocess.call( command ) if method == 2: # METHOD #2 : if required python libraries are available, you can # also use TTB's ttb_compare.py module try : ttb_compare.df_files( self.rc[ind[0]].restart, self.rc[ind[1]].restart ) retcode=0 except : retcode=1 if retcode != 0 : print "\n FAILED" else: print "\n SUCCESS" # minimal cleaning for rc in self.rc : rc.cleanup(full=False, verbose=False) return retcode def teardown(self): for rc in self.rc : rc.cleanup() #//////////////////////////////////////////////////////// def testtm( args, new=False, queue=False, tm6=False, teardown=False, mpi=False, restart=False): narg=len(args) status = 0 # exit code if narg == 1: # Replacing "my.project.dir" key ensures that the build are # different (the key is mandatory), and that output and restart # (which are assumed to be defined below that dir), are in # different locations. rcobj=rctm5(args[0], raw=True) if teardown and not (restart or mpi): print """ Nothing to clean up, no test being specified. Use also --mpi (-m) or --restart (-r) """ if restart: rcno = 'onechunk.rc' rcyes = 'twochunks.rc' rcobj.replace('timerange.start', "2006-01-01 00:00:00") rcobj.replace('timerange.end', "2006-01-02 03:00:00") rcobj.replace('jobstep.length', 'inf') rcobj.replace('my.project.dir',' ${my.scratch}/TM5/test/onechunk') rcobj.WriteFile(rcno) rcobj.replace('jobstep.length', 1) rcobj.replace('my.project.dir', '${my.scratch}/TM5/test/twochunks') rcobj.WriteFile(rcyes) test = tm5_test( [rcyes, rcno] ) if teardown : test.teardown() if os.path.isfile(rcyes): os.remove(rcyes) if os.path.isfile(rcno): os.remove(rcno) else: print "\nCompare add. intermediate restart:\n" status = status + test.comp_restart( clean=new, queue=queue ) if mpi: rcnames=['one_proc.rc','four_proc.rc','ten_proc.rc'] rcobj.replace('par.mpi','F') rcobj.replace('par.ntask',1) if tm6: rcobj.replace('par.nx',1) rcobj.replace('par.ny',1) rcobj.replace('my.project.dir','${my.scratch}/TM5/test/nompi') rcobj.WriteFile(rcnames[0]) rcobj.replace('par.mpi','T') rcobj.replace('par.ntask',4) if tm6: rcobj.replace('par.nx',2) rcobj.replace('par.ny',2) rcobj.replace('my.project.dir','${my.scratch}/TM5/test/mpi4') rcobj.WriteFile(rcnames[1]) rcobj.replace('par.mpi','T') rcobj.replace('par.ntask',10) if tm6: rcobj.replace('par.nx',2) rcobj.replace('par.ny',5) rcobj.replace('my.project.dir','${my.scratch}/TM5/test/mpi10') rcobj.WriteFile(rcnames[2]) test = tm5_test( rcnames ) if teardown : test.teardown() for rc in filter(lambda x: os.path.isfile(x), rcnames): os.remove(rc) else: print "\nCompare no-mpi and 4-procs-mpi runs:\n" status = status + test.comp_restart([0,1], clean=new, queue=queue) print "\nCompare 4-procs-mpi and 10-procs-mpi runs:\n" status = status + test.comp_restart([1,2], clean=new, queue=queue) print "\nCompare no-mpi and 10-procs-mpi runs:\n" status = status + test.comp_restart([0,2], queue=queue) elif narg == 2: test = tm5_test(args) if teardown : test.teardown() else: print "\nCompare \n"+args[0]+"\n and \n"+args[1]+"\n" status = status + test.comp_restart( clean=new, queue=queue ) else: print "requires 1 or 2 rc file argument(s), not ", narg status=1 return status #//////////////////////////////////////////////////////// if __name__ == "__main__": parser = OptionParser(usage='%prog [options] rcfile_1 [rcfile_2]') parser.add_option("-t", "--teardown", action="store_true", dest="teardown", help="remove restart, files in rundir & compilation log") parser.add_option("-n", "--new", action="store_true", dest="new", help="recompile everything fresh (a la realclean)") parser.add_option("-q", "--queue", action="store_true", dest="queue", help="submit job to queue instead of foreground") parser.add_option("-m", "--mpi", action="store_true", dest="mpi", help="test if different #procs gives same result") parser.add_option("-r", "--restart", action="store_true", dest="restart", help="test if cutting a run in two smaller chunks gives the same result") parser.add_option("-6", "--tm6", action="store_true", dest="tm6", help="indicates that rc file is for a tm6 model, use only for --mpi test.") options, args = parser.parse_args() sys.exit( testtm( args, new=options.new, queue=options.queue, tm6=options.tm6, teardown=options.teardown, mpi=options.mpi, restart=options.restart) )