12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466 |
- #
- # TM5 run tools
- #
- # ***
- def Command_Line( rcf, exe, args, in_debugger ) :
- """
- Return command line that runs executable.
- ARGUMENTS
- rcf
- Rcfile with settings.
- exe
- Name of executable.
- args
- Arguments to be passed to executable.
- in_debugger
- Set to 'True' if the job should be run in a debugger.
- RETURN VALUES
- cmndline
- Command line to be executed.
- """
- # external
- import socket
- import logging
- # mpi run ?
- if rcf.get('par.mpi','bool') :
- # number of mpi tasks:
- ntask = rcf.get('par.ntask','int')
- # get command line:
- cmnd_exec = rcf.get('mpirun.command')
- cmnd_args = rcf.get('mpirun.args' )
- # write command file ?
- cmdfile = rcf.get('mpirun.cmdfile',default='')
- if len(cmdfile) > 0 :
- # write command line for each task:
- f = open(cmdfile,'w')
- for i in range(ntask) : f.write( '%s %s\n' % (exe,args) )
- f.close()
- else :
- # otherwise, add the executable and its arguments:
- cmnd_args = '%s %s %s' % (cmnd_args,exe,args)
- #endif
- # write host file ? PLS: This is done too early, and should be done
- # inside the *_run.jb script : that will let you specify nodes
- # scattered on different hosts (eg, linux cluster), and also different
- # from the current host (which probably is a login node)!! See
- # WriteJob below for cases of Loadleveler and Slurm.
- #
- # Leave it here for other cases not handled in WriteJob yet.
- hostfile = rcf.get('mpirun.hostfile',default='')
- if len(hostfile) > 0 :
- # get hostname:
- hname = socket.gethostname()
- # write hostname for each task:
- f = open(hostfile,'w')
- for i in range(ntask) : f.write( '%s\n' % hname )
- f.close()
- else :
- # standard run:
- cmnd_exec = exe
- cmnd_args = args
- # run in debugger ?
- if in_debugger :
- # debugger type:
- debugger = rcf.get( 'debugger' )
- # get debugger command:
- debugger_call = rcf.get( 'debugger.command' )
- # large differences ...
- if debugger == 'totalview' :
- # syntaxis: totalview <executable> [-a <arguments>]
- # pass executable:
- cmndline = '%s %s' % (debugger_call,cmnd_exec)
- # add arguments ?
- if len(cmnd_args) > 0 :
- cmndline = '%s -a %s' % (cmndline,cmnd_args)
- #endif
- elif debugger == 'idb' :
- # syntaxis: idb [-args <executable> <arguments>]
- # fill executable and arguments:
- cmndline = '%s -args %s %s' % (debugger_call,cmnd_exec,cmnd_args)
- else :
- logging.error('unsupported debugger : %s' % debugger )
- raise Exception
- #endif
- else :
- # standard line:
- cmndline = '%s %s' % (cmnd_exec,cmnd_args)
- # ok
- return cmndline
- # ***
- def WriteAndSubmitNewJob( rcfile, bindir ) :
- """
- Write first or next rcfile and job files(s) in the job chain;
- if chain is not finished yet, submit a new job.
- The argument could be the name of an rcfile or an rcfile object itself,
- since the submit script might have changed some values given
- the provided command line arguments.
- The following function is used:
- submit_tm5_setup_rcfile.WriteRcfile # writes the new rcfile
- This is placed in a seperate file since users might need to
- change these routines for their specific projects.
- """
- # external:
- import sys
- import logging
- import rc
- # import setup module:
- import submit_tm5_setup_rcfile
- # name provided ?
- if type(rcfile) == str :
- # load:
- rcf = rc.RcFile( rcfile )
- else :
- # just copy ..
- rcf = rcfile
- #endif
- # write next rfile, return name:
- try :
- rcfile_next = submit_tm5_setup_rcfile.WriteRcfile( rcf )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'exception from WriteRcfile' )
- raise Exception
- # finished ?
- if rcfile_next == None :
- logging.info( ' end of job chain !' )
- else :
- # write job file(s) for this period and return the (first) name;
- # last command in a file should submit the next job if necessary:
- logging.info( ' write jobfile for %s ...' % rcfile_next )
- try :
- jobfile_next = WriteJob( rcfile_next, bindir )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'exception from WriteJob' )
- raise Exception
- logging.info( ' submit next job : %s' % jobfile_next )
- try :
- jobid = SubmitJob( jobfile_next, rcfile_next )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'exception from SubmitJob' )
- raise Exception
- return
- # ***
- def WriteJob( rcfile, bindir ) :
- """
- jobfile = WriteJob(rcfile)
- Write job file given the settings in rcfile.
- The name of the jobfile is based on the name of the rcfile.
- The last command in the job should submit the next job,
- and the script is therefore written in python.
- """
- # external:
- import os
- import rc
- # load settings:
- rcf = rc.RcFile( rcfile )
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(rcfile)
- # loadleveler supports master job with multiple steps:
- with_master_job = (rcf.get('submit.to') == 'queue') and (rcf.get('queue') == 'loadleveler')
- # which shell ?
- job_shell = '/usr/bin/env python'
- # start master job ?
- if with_master_job :
- ntasks = rcf.get('par.ntask')
- # name of jobfile:
- jobfile = '%s.jb' % bname
- # set header:
- header = []
- header.append( '#! %s\n' % job_shell )
- header.append( '\n' )
- # init queue options:
- qopt = QueueOptions( bname, rcf, 'default' )
- # init job file:
- job = []
- job.append( '# external:\n' )
- job.append( 'import os\n' )
- job.append( 'import sys\n' )
- job.append( 'import socket\n')
- job.append( 'import subprocess\n' )
- job.append( 'import logging\n' )
- job.append( '\n' )
- job.append( '# setup messages:\n' )
- job.append( "logging.basicConfig( format='%(lineno)-4s:%(filename)-30s [%(levelname)-8s] %(message)s', level=logging.INFO, stream=sys.stdout )\n" )
- job.append( '\n' )
- job.append( '# prepend locations of python modules to search path:\n' )
- job.append( "sys.path.insert( 0, '%s' )\n" % bindir )
- job.append( '\n' )
- job.append( '# tools:\n' )
- job.append( 'import submit_tm5_tools\n' )
- job.append( '\n' )
- job.append( '# current loadleveler steps:\n' )
- job.append( 'step_name = os.getenv("LOADL_STEP_NAME")\n' )
- job.append( '\n' )
- # HOSTFILE - Moved here from Command_Line for loadleveler.
- hostfile = rcf.get('mpirun.hostfile',default='') # calling script will create hostfile
- if (len(hostfile) > 0) and rcf.get('par.mpi','bool') :
- job.append( 'f = open("%s",\'w\') \n' % hostfile )
- job.append( 'hname = socket.gethostname() \n')
- job.append( "for i in range( %s ) : \n" % ntasks )
- job.append( "\tf.write( '%s\\n' % hname)\n" )
- job.append( 'f.close() \n')
- job.append( '\n' )
- # job step names:
- steps = rcf.get('job.steps').split(' ')
- # number of steps:
- nstep = len(steps)
- # loop over job steps:
- for istep in range(nstep) :
- # current:
- step = steps[istep]
- # next:
- if istep < nstep-1 : step_next = steps[istep+1]
- # list with queue option lines for this step:
- qopt_step = QueueOptions( bname, rcf, step )
- # call to actual script:
- if step == 'run' :
- # get command line:
- exe = os.path.join( os.curdir, rcf.get('job.step.%s.exe' % step) )
- args = rcfile
- indb = rcf.get('submit.debugger','bool')
- cmndline = Command_Line( rcf, exe, args, indb )
- # <script> <commandline>
- step_command = '["%s/submit_tm5_step_%s","%s"]' % (bindir,step,cmndline)
- else :
- # <script> <rcfile>
- step_command = '["%s/submit_tm5_step_%s","%s","--bindir=%s"]' % (bindir,step,rcfile,bindir)
- #endif
- # add queue options to destintation:
- if with_master_job :
- # add to queue options for master job:
- qopt = qopt + qopt_step
- # add lines to run the step:
- job.append( '# which step ?\n' )
- job.append( 'if step_name == "%s" :\n' % step )
- job.append( ' \n' )
- job.append( ' # run:\n' )
- job.append( ' retcode = subprocess.call( %s )\n' % step_command )
- job.append( ' if retcode != 0 :\n' )
- job.append( ' logging.error( sys.exc_info()[1] )\n' )
- job.append( ' logging.error( \'exception from subprocess call to : %s\' )\n' % step_command )
- job.append( ' sys.exit(1)\n' )
- job.append( ' #endif\n' )
- job.append( ' \n' )
- # last step ? then add lines to submit next job:
- if istep == nstep-1 :
- job.append( ' # write and submit next job if necessary:\n' )
- job.append( ' submit_tm5_tools.WriteAndSubmitNewJob( "%s", "%s" )\n' % (rcfile,bindir) )
- job.append( ' \n' )
- #endif
- # close step:
- job.append( '#endif\n' )
- job.append( '\n' )
- else : # no master job, but seperate files
- # name of step job to be written:
- step_job_template = bname+'_%s.jb'
- # actual name:
- step_job = step_job_template % step
- # open:
- f = open( step_job, 'w' )
- # write header:
- f.write( '#! %s\n' % job_shell )
- f.write( '\n' )
- # add queue options:
- for line in qopt_step : f.write(line)
- # add lines to call the actual script:
- f.write( '# external:\n' )
- f.write( 'import sys\n' )
- f.write( 'import os\n' )
- f.write( 'import logging\n' )
- f.write( 'import socket\n' )
- f.write( 'import subprocess\n' )
- f.write( '\n' )
- f.write( '# go to run directory:\n' )
- f.write( 'os.chdir("%s")\n' % os.getcwd() )
- f.write( '\n' )
- f.write( '# prepend locations of python modules to search path:\n' )
- f.write( "sys.path.insert( 0, '%s' )\n" % bindir )
- f.write( '\n' )
- f.write( '# tools:\n' )
- f.write( 'import rc\n' )
- f.write( 'import submit_tm5_tools\n' )
- f.write( '\n' )
- f.write( '# setup messages:\n' )
- f.write( "logging.basicConfig( format='%(lineno)-4s:%(filename)-30s [%(levelname)-8s] %(message)s', level=logging.INFO, stream=sys.stdout )\n" )
- f.write( '\n' )
- f.write( '# info ...\n' )
- f.write( 'logging.info( "start" )\n' )
- f.write( '\n' )
- # module command if needed
- module_cmd = rcf.get('module.cmd',default='')
- if (module_cmd != '') and (step == 'run'):
- f.write( 'mod_cmd="%s".split()\n' % module_cmd )
- f.write( 'retcode = subprocess.call( mod_cmd )\n' )
- f.write( 'if retcode != 0:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from subprocess call to : %s\' )\n' % module_cmd )
- f.write( ' sys.exit(1)\n' )
- f.write( '\n' )
- # openMP
- if ( (rcf.get('queue') == 'slurm') or (rcf.get('queue') == 'pbs')) and \
- rcf.get('par.openmp','bool') and (step == 'run'):
- nthread = rcf.get( 'par.nthread', 'int' )
- f.write( "os.putenv( \'OMP_NUM_THREADS\', '%s')\n" % nthread )
- # HOSTFILE - Moved here (and adapted) from Command_Line for case of
- # "SLURM with a host file" (case of NEURON@KNMI, and not CARTESIUS@SARA).
- # Needed only if MPI, step run.
- hostfile = rcf.get('mpirun.hostfile',default='')
- if (len(hostfile) > 0) and (rcf.get('queue') == 'slurm') and rcf.get('par.mpi','bool') and (step == 'run'):
- f.write( 'f = open("%s",\'w\') \n' % hostfile)
- # PREVIOUS:
- #f.write( 'hname = socket.gethostname() \n')
- #f.write( 'for i in range(ntask) : f.write( '%s' % hname ) \n')
- # NOW: for SLURM on NEURON, need nodelist and number of
- # tasks-per-node effectively allocated.
- # --Idee #1
- # -- Use "output environment variables" to gather effective values:
- #
- #f.write( 'for node in os.getenv('SLURM_NODELIST'): \n')
- #f.write( ' for i in range(os.getenv('SLURM_NTASKS_PER_NODE'): f.write( '%s\n' % node ) \n')
- #
- # -- BUT that would not work in all imaginable cases... so we
- # -- should use SLURM_TASKS_PER_NODE instead, but its format, for eg:
- # -- 2(x3),1,5,2(x5)
- # -- is a bit annoying to process. Then idee #2... good old unix command line:
- # --Idee #2 - Use srun | sort | awk to get ordered list
- hcommand = '[ "srun -l /bin/hostname | sort -n | awk \'{print $2}\' > %s"]' % hostfile
- f.write( 'retcode = subprocess.call( %s, shell=True )\n' % hcommand )
- f.write( 'if retcode != 0:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from creating host file\' )\n' )
- f.write( ' sys.exit(1)\n' )
- f.write( 'f.close() \n')
- f.write( '\n' )
- f.write( '# call user script:\n' )
- f.write( 'retcode = subprocess.call( %s )\n' % step_command )
- f.write( 'if retcode != 0:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from subprocess call to : %s\' )\n' % step_command )
- f.write( ' sys.exit(1)\n' )
- f.write( '#endif\n' )
- f.write( '\n' )
- # add submission of next step?
- if istep < nstep-1 :
- # job script of next step:
- step_job_next = step_job_template % step_next
- # add submission command:
- f.write( '# submit next step:\n' )
- f.write( 'try :\n' )
- f.write( ' submit_tm5_tools.SubmitJob( "%s", "%s" )\n' % (step_job_next,rcfile) )
- f.write( 'except:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from SubmitJob( "%s", "%s" )\' )\n' % (step_job_next,rcfile) )
- f.write( ' sys.exit(1)\n' )
- f.write( '#endtry\n' )
- f.write( '\n' )
- else :
- # last step; might be necessary to submit a new job:
- f.write( '# write and submit next job if necessary:\n' )
- f.write( 'submit_tm5_tools.WriteAndSubmitNewJob( "%s", "%s" )\n' % (rcfile,bindir) )
- f.write( '\n' )
- f.write( '# info ...\n' )
- f.write( 'logging.info( "end" )\n' )
- f.write( '\n' )
- f.close()
- # make it executable and readible for all, writable for user only:
- # u+r u+w u+x g+r g-w g+x o+r o-w o+x
- os.chmod( step_job, 2**8 + 2**7 + 2**6 + 2**5 + 0 + 2**3 + 2**2 + 0 + 2**0 )
- # fill return value:
- if istep == 0 : jobfile = step_job
- # write master job:
- if with_master_job :
- # combine:
- job = header + qopt + job
- # write:
- f = open(jobfile,'w')
- f.writelines(job)
- f.close()
- # make it executable and readible for all, writable for user only:
- # u+r u+w u+x g+r g-w g+x o+r o-w o+x
- os.chmod( jobfile, 2**8 + 2**7 + 2**6 + 2**5 + 0 + 2**3 + 2**2 + 0 + 2**0 )
- return jobfile
- # ***
- def WriteAndSubmitBuildJob( rcf, command ):
- """
- Write bash script that run 'command' after being submitted to the
- queue. Written to compile TM5 in the queue.
- """
- import os
- orig_val = rcf.get('submit.to')
- dummy = rcf.replace('submit.to','queue')
- source_dir = os.getcwd() # assume that it is called from
- # source directory
- jobfile = os.path.join(source_dir,'build.jb')
- f = open( jobfile, 'w' )
- f.write( '#! /bin/bash\n' )
- f.write( '\n' )
- qopt = QueueOptions( 'buildTM', rcf, 'build' )
- for line in qopt : f.write(line)
- f.write( "cd %s \n" % source_dir )
- f.write( '%s' % ' '.join(command) )
- f.write( '\n' )
- f.close()
- id=SubmitJob( jobfile, rcf )
- dummy = rcf.replace('submit.to', orig_val)
- return
- # ***
- def QueueOptions( bname, rcf, step ) :
- """
- Return list with queue option lines.
- """
- # modules:
- import logging
- # submit to queue ?
- if rcf.get('submit.to') == 'queue' :
- # queue type:
- queue = rcf.get('queue')
- # different options and commands:
- if queue == 'loadleveler' :
- qopt = QueueOptions_LoadLeveler( bname, rcf, step )
- elif queue == 'bsub' :
- qopt = QueueOptions_BSub( bname, rcf, step )
- elif queue == 'qsub' :
- qopt = QueueOptions_QSub( bname, rcf, step )
- elif queue == 'pbs' :
- qopt = QueueOptions_PBS( bname, rcf, 'all' ) # init result with the "all" step
- qopt = qopt + QueueOptions_PBS( bname, rcf, step )
- elif queue == 'slurm' :
- qopt = QueueOptions_Slurm( bname, rcf, step )
- else :
- # not supported ...
- logging.error( 'unsupported queue : %s' % queue )
- raise Exception
- else:
- qopt = []
- return qopt
- # ***
- def SubmitJob( job_script, rcfile ) :
- """
- Submit jobscript. Where to submit to (foreground,background, queue) is
- read from rcfile settings. Returns jobid if submitting to queue (for now
- dummy for all cases except PBS).
- """
- import sys
- import logging
- import rc
- jobid='not-a-real-jobid-yet' # default
- # settings
- if type(rcfile) == str :
- rcf = rc.RcFile( rcfile )
- else :
- rcf = rcfile
- # where to ?
- submit_to = rcf.get('submit.to')
- # info ...
- logging.info( 'submit %s to %s ...' % (job_script,submit_to) )
- # call specific submit routines:
- if submit_to == 'foreground' :
- # call run script, catch errors:
- try :
- Run_Job_In_Foreground( job_script )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'from Run_Job_In_Foreground for %s' % job_script )
- raise Exception
- elif submit_to == 'background' :
- # call run script, catch errors:
- try :
- Submit_Job_To_Background( job_script, rcf )
- except :
- logging.error( 'from Submit_Job_To_Background for %s' % job_script )
- raise Exception
- elif submit_to == 'queue' :
- # queue type:
- queue = rcf.get('queue')
- # different options and commands:
- if queue == 'loadleveler' :
- Submit_Job_To_LoadLeveler( job_script, rcf )
- elif queue == 'bsub' :
- Submit_Job_To_BSub( job_script, rcf )
- elif queue == 'qsub' :
- Submit_Job_To_QSub( job_script, rcf )
- elif queue == 'pbs' :
- jobid = Submit_Job_To_PBS( job_script, rcf )
- elif queue == 'slurm' :
- Submit_Job_To_Slurm( job_script, rcf )
- else :
- # not supported ...
- logging.error( 'unsupported queue : %s' % queue )
- raise Exception
- else :
- # not supported ...
- logging.error( 'unsupported run environment : %s' % submit_to )
- sys.exit(1)
- return jobid
- # ======================================================================
- # ===
- # === foreground
- # ===
- # ======================================================================
- def Run_Job_In_Foreground( job_script ) :
- """
- Run job script in foreground.
- """
- # external:
- import sys
- import os
- import logging
- import subprocess
- # setup command line, e.g. './myscript.jb' :
- command = os.path.join(os.curdir,job_script)
- # execute:
- retcode = subprocess.call( command )
- if retcode != 0 :
- logging.error( sys.exc_info()[1] )
- logging.error( 'from subprocess call to : %s' % command )
- raise Exception
- return
- # ======================================================================
- # ===
- # === background
- # ===
- # ======================================================================
- def Submit_Job_To_Background( job_script, rcf ) :
- """
- Submit job to background.
- """
- # external:
- import sys
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
- # output files:
- job_stdout = bname+'.out'
- job_stderr = bname+'.err'
- job_info = bname+'.info'
- # setup command line, e.g. './myscript.jb' :
- command = os.path.join(os.curdir,job_script)
- # re-direct standard output:
- command = command+' > %s' % job_stdout
- # write error messages to seperate file:
- #command = command+' 2> %s' % job_stderr
- command = command+' 2>&1'
- # run in background, return process id:
- logging.info( 'run shell command : "%s" ...' % command )
- p = subprocess.Popen( command, shell=True )
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( ' standard output : %s\n' % job_stdout )
- infotext.append( ' standard error : %s\n' % job_stderr )
- infotext.append( '\n' )
- infotext.append( 'Process snapshot:\n')
- infotext.append( '\n')
- p2 = subprocess.Popen( '/bin/ps -f -p %i' % p.pid, shell=True,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- for line in p2.stdout.readlines() : infotext.append( line )
- infotext.append( '\n')
- infotext.append( 'To manage this process:\n' )
- infotext.append( '\n' )
- infotext.append( ' # show process snapshot:\n' )
- infotext.append( ' ps -f -p %i\n' % p.pid )
- infotext.append( ' \n' )
- infotext.append( ' # kill process:\n' )
- infotext.append( ' kill %i\n' % p.pid )
- infotext.append( ' \n' )
- infotext.append( ' # follow standard output:\n' )
- infotext.append( ' tail -f %s\n' % job_stdout )
- infotext.append( '\n' )
- # write to log:
- for line in infotext : logging.info( line.strip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
- return
- # ======================================================================
- # ===
- # === LoadLeveler queue
- # ===
- # ======================================================================
- def QueueOptions_LoadLeveler( bname, rcf, step ) :
- """
- Return list with queue options.
- """
- # external:
- import math
- # init result:
- qopt = []
- # which step ?
- if step == 'default' :
- # list with options:
- opts = rcf.get( 'queue.ll.options.%s' % step ).split()
- # default options:
- for opt in opts :
- # get value:
- val = rcf.get( 'queue.ll.option.%s.%s' % (step,opt) )
- # write:
- qopt.append( '#@ %-20s = %s\n' % (opt,val) )
- #endfor
- # layout ...
- qopt.append( '\n' )
- else :
- # list with options:
- opts = rcf.get( 'queue.ll.options.%s' % step ).split()
- # default options:
- for opt in opts :
- # get value:
- val = rcf.get( 'queue.ll.option.%s.%s' % (step,opt) )
- # to be set ?
- if val == '<auto>' :
- # differs per option ...
- if opt == 'output' :
- val = '%s_%s.out' % (bname,step)
- elif opt == 'error' :
- val = '%s_%s.err' % (bname,step)
- elif opt == 'tasks_per_node' :
- t_mpi = rcf.get( 'par.ntask', 'int' )
- t_omp = rcf.get( 'par.nthread', 'int' )
- val = str(t_omp*t_mpi)
- #endif
- #endif
- # no, empty, or normal value ?
- if val == '<none>' :
- # skip this keyword:
- continue
- elif val == '' :
- # just the keyword:
- qopt.append( '#@ %s\n' % opt )
- else :
- # keyword and value:
- qopt.append( '#@ %-20s = %s\n' % (opt,val) )
- #endif
- #endfor
- # layout ...
- qopt.append( '\n' )
- return qopt
- # ***
- def Submit_Job_To_LoadLeveler( job_script, rcf ) :
- """
- Submit job to LoadLeveler queue.
- """
- # external:
- import sys
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
- # output files:
- job_info = bname+'.info'
- # options passed directly to submission command:
- qopts = rcf.get( 'queue.ll.submit.options' )
- # add options passed to submit script:
- qopts = qopts+' '+rcf.get('submit.options')
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'llsubmit '+qopts
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # init submission info file:
- infotext = []
- infotext.append( '\n' )
- # call submit command, trap errors:
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'from subprocess.Popen( %s )' % command.split() )
- raise Exception
- #endtry
- # extract:
- outlines = p.stdout.readlines()
- # add to help info message:
- infotext = infotext + outlines
- # extract job id from last line:
- # llsubmit: The job "c1a0303.4290133" with 3 job steps has been submitted.
- firstwords = 'llsubmit: The job'
- lastline = outlines[-1]
- if lastline.startswith(firstwords) :
- job_id = lastline.lstrip(firstwords).split()[0].replace('"','')
- else :
- job_id = '<job-id>'
- #endif
- # add help text to submission info:
- infotext.append( '\n' )
- infotext.append( 'To manage LoadLeveler jobs:\n' )
- infotext.append( '\n' )
- infotext.append( ' llq [-u ${USER}] # list [your] current jobs\n' )
- infotext.append( ' llq %s # list this job\n' % job_id )
- infotext.append( ' llcancel %s # kill this job\n' % job_id )
- infotext.append( '\n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
- # ok
- return
- #enddef
- # ======================================================================
- # ===
- # === BSUB queue
- # ===
- # ======================================================================
- def QueueOptions_BSub( bname, rcf, step ) :
- """
- Return list with queue options.
- """
- # init result:
- qopt = []
- # List of options for specified step:
- opts = rcf.get( 'queue.bsub.options.%s' % step ).split()
- # loop over options:
- for opt in opts :
- # get value:
- val = rcf.get( 'queue.bsub.option.%s.%s' % (step,opt) )
- # default options:
- if val == '<auto>' :
- if opt in ['o','oo'] :
- val = '%s_%s.out' % (bname,step)
- elif opt in ['e','eo'] :
- val = '%s_%s.err' % (bname,step)
- #endif
- #endif # <auto> value
- # define option key:
- # queue.bsub.options.R : 20 -> -R 20
- # queue.bsub.options.Rx : -R 20 -> -R 20
- if val.startswith('-') :
- qopt.append( '#BSUB %s\n' % (val) )
- else :
- qopt.append( '#BSUB -%s %s\n' % (opt,val) )
- #endif
- #endfor # opts
- # layout ...
- qopt.append( '\n' )
- # ok
- return qopt
- #enddef QueueOptions_BSub
- # ***
- def Submit_Job_To_BSub( job_script, rcf ) :
- """
- Submit job to BSUB queue.
- """
- # external:
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
- # output files:
- job_info = bname+'.info'
- # options passed directly to submission command:
- qopts = rcf.get( 'queue.bsub.submit.options' )
- # add options passed to submit script:
- qopts = qopts+' '+rcf.get('submit.options')
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'bsub '+qopts
- # pass job script to std.input:
- command = command+' < '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError as err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- logging.error( 'directory : %s' % os.getcwd() )
- raise Exception
- #endtry
- # extract:
- outlines = p.stdout.readlines()
- # display:
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # <jobname> <jobnr>
- # extract job nr:
- job_nr = outlines[0].split()[1].strip('<>')
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' bkill %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' bjobs\n' )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
- # ok
- return
- #enddef
- # ======================================================================
- # ===
- # === QSUB queue
- # ===
- # ======================================================================
- def QueueOptions_QSub( bname, rcf, step ) :
- """
- Return list with queue options.
- """
- # init result:
- qopt = []
- # list with options:
- opts = rcf.get( 'queue.qsub.options' ).split()
- append_jobstep = rcf.get('queue.qsub.name.append.jobstep',default=False)
- # default options:
- for opt in opts :
- # look first for a jobstep-specific qsub option
- val = rcf.get( 'queue.qsub.option.%s.%s' % (opt,step), default='STEP_SPECIFIC_KEY_MISSING' )
- # if jobstep-specific option is missing, look for one without jobstep
- if val == 'STEP_SPECIFIC_KEY_MISSING':
- val = rcf.get( 'queue.qsub.option.%s' % opt)
- if (opt == 'N') and append_jobstep:
- jobstep = rcf.get('jobstep')
- val = val + '_%03d' % (int(jobstep))
- # fill option line:
- qopt.append( '#PBS -%s %s\n' % (opt,val) )
- #endfor
- # layout ...
- qopt.append( '\n' )
- # ok
- return qopt
- #enddef
- # ***
- def Submit_Job_To_QSub( job_script, rcf ) :
- """
- Submit job to QSUB queue.
- """
- # external:
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
- # output files:
- job_info = bname+'.info'
- # options passed directly to submission command:
- qopts = rcf.get( 'queue.qsub.submit.options' )
- # add options passed to submit script:
- qopts = qopts+' '+rcf.get('submit.options')
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'qsub '+qopts
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError as err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- logging.error( 'directory : %s' % os.getcwd() )
- raise Exception
- #endtry
- # extract:
- outlines = p.stdout.readlines()
- # display:
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # jobnr
- # extract job nr:
- job_nr = outlines[0].split()[0]
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' qdel %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' qstat [-u ${USER}]\n' )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
- # ok
- return
- # ======================================================================
- # ===
- # === PBS queue (PBSpro)
- # ===
- # === Note that there are several implementations of PBS and the "pbs queue"
- # === here, although it also uses the qsub command, differs from the "qsub
- # === queue" defined above. The later uses #$ directives, while the pbsPro
- # === directives start with #PBS.
- # ===
- # ======================================================================
- def QueueOptions_PBS( bname, rcf, step ) :
- """
- Return list with queue options. To call twice, first for pseudo-step
- 'all'. See QueueOptions above.
- """
- import os
- qopt = [] # init result
- opts = rcf.get( 'queue.pbs.options.%s' % step ).split() # list with options
- # use fully qualified filename for log files
- if step != 'build':
- fqpath = rcf.get('rundir')
- else:
- fqpath = rcf.get('build.sourcedir')
- fqname = os.path.join( fqpath, bname )
- # build list of job directives
- for opt in opts :
- val = rcf.get( 'queue.pbs.option.%s.%s' % (step,opt) )
- # deal w/ multiple options with -l
- if opt == 'l':
- l_vals = val.split()
- for lval in l_vals: qopt.append( '#PBS -l %s\n' % lval )
- continue
- # options still to be set
- if val == '<auto>' :
- if opt == 'o' :
- val = '%s_%s.out' % (fqname,step)
- elif opt == 'e' :
- val = '%s_%s.err' % (fqname,step)
- # none, empty or plain normal value ?
- if val == '<none>' :
- continue # skip this option
- elif val == '' :
- qopt.append( '#PBS -%s\n' % opt ) # just the keyword
- else :
- qopt.append( '#PBS -%s %s\n' % (opt,val) ) # keyword and value
- qopt.append( '\n' )
- return qopt
- def Submit_Job_To_PBS( job_script, rcf ) :
- """
- Submit job to PBS queue.
- """
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
- # output files:
- job_info = bname+'.info'
- # Two set of options to pass directly unmodified at the submission command :
- qopts = rcf.get( 'queue.pbs.submit.options' ) # (1) from machine or queue (if separated from machine) rcfile
- qopts = qopts+' '+rcf.get('submit.options') # (2) from expert rcfile
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'qsub '+qopts
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError as err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- logging.error( 'directory : %s' % os.getcwd() )
- raise Exception
- # log output
- outlines = p.stdout.readlines()
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # jobnr
- # extract job nr:
- job_nr = outlines[0].split()[0]
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' qdel %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' qstat [-u ${USER}]\n' )
- infotext.append( ' \n' )
- infotext.append( 'To monitor this running job (ECMWF only!):\n' )
- infotext.append( ' \n' )
- infotext.append( ' qcat %s\n' % job_nr )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
- return job_nr
- # ======================================================================
- # ===
- # === SLURM queue
- # ===
- # ======================================================================
- def QueueOptions_Slurm( bname, rcf, step ) :
- """
- Return list with queue options directives, that will be at the top of
- script.
- Script for run step (NEURON@KNMI) should contains:
- #SBATCH -n ${par.nthread}*${ntask}
- export OMP_NUM_THREADS=${par.nthread}
- mpiexec.hydra -machinefile ./mpd.hosts -np ${ntask} ./$bin
- """
- this_jobname=""
- ntasks=0
- qopt = [] # init result
- # List of options for specified step:
- if rcf.has_key(( 'queue.slurm.options.%s' % step )) :
- opts = rcf.get( 'queue.slurm.options.%s' % step ).split()
- else:
- opts = rcf.get( 'queue.slurm.options').split()
- # NOAA-specific option
- append_jobstep = rcf.get('queue.slurm.name.append.jobstep',default=False)
- for opt in opts :
- # look first for a step-specific slurm option
- val = rcf.get( 'queue.slurm.option.%s.%s' % (step,opt), default='STEP_SPECIFIC_KEY_MISSING' )
- # if step-specific option is missing, look for one without step
- if (val == 'STEP_SPECIFIC_KEY_MISSING') :
- # get value:
- val = rcf.get( 'queue.slurm.option.%s' % opt, default='NOT_PRESENT' )
- # default options:
- if val == '<auto>' :
- if opt in ['o','output'] :
- val = '%s_%s.out' % (bname,step)
- elif opt in ['e','error'] :
- val = '%s_%s.err' % (bname,step)
- if opt in ['J','job-name'] :
- if append_jobstep:
- jobstep = rcf.get('jobstep')
- val = val + '_' + ( '%03d' % int(jobstep) )
- this_jobname=val
- if (opt == 'ntasks'):
- ntasks=int(val)
- # skip unset (empty) directives
- if opt == 'p' or 'w':
- if len(val)==0 : continue
- # Some options are represented byonly a single-letter
- # (e.g. J), and these require a single dash
- # (e.g. -J <jobname>) but others have only a multi-letter
- # invocation, and need two dashes (e.g. --qos=batch).
- if len(opt) > 1 :
- dashes = '--'
- sep = '='
- else :
- dashes = '-'
- sep = ' '
- #endif
- qopt.append( '#SBATCH %s%s%s%s\n' % (dashes,opt,sep,val) )
- # layout ...
- qopt.append( '\n' )
- # ok
- return qopt
- def Submit_Job_To_Slurm( job_script, rcf ) :
- """
- Submit job to SLURM queue.
- """
- # external:
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
- # output files:
- job_info = bname+'.info'
- # Two set of options to pass directly unmodified at the submission command :
- qopts = rcf.get( 'queue.slurm.submit.options' ) # (1) from pycassso-queue-slurm.rc
- qopts = qopts+' '+rcf.get('submit.options') # (2) from pycasso-tm5-expert.rc
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'sbatch '+qopts
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError as err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- logging.error( 'directory : %s' % os.getcwd() )
- raise Exception
- # extract:
- outlines = p.stdout.readlines()
- # display:
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # jobnr
- # extract job nr:
- job_nr = outlines[0].split()[3]
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' scancel %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' squeue [-u ${USER}]\n' )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
- return
- # ======================================================================
- # ===
- # === end
- # ===
- # ======================================================================
|