123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458 |
- #
- # TM5 run tools
- #
- # ***
- def Command_Line( rcf, exe, args, in_debugger ) :
- """
- Return command line that runs executable.
-
- ARGUMENTS
- rcf
- Rcfile with settings.
- exe
- Name of executable.
- args
- Arguments to be passed to executable.
- in_debugger
- Set to 'True' if the job should be run in a debugger.
- RETURN VALUES
- cmndline
- Command line to be executed.
- """
- # external
- import socket
- import logging
-
- # mpi run ?
- if rcf.get('par.mpi','bool') :
-
- # number of mpi tasks:
- ntask = rcf.get('par.ntask','int')
-
- # get command line:
- cmnd_exec = rcf.get('mpirun.command')
- cmnd_args = rcf.get('mpirun.args' )
-
- # write command file ?
- cmdfile = rcf.get('mpirun.cmdfile',default='')
- if len(cmdfile) > 0 :
- # write command line for each task:
- f = open(cmdfile,'w')
- for i in range(ntask) : f.write( '%s %s\n' % (exe,args) )
- f.close()
- else :
- # otherwise, add the executable and its arguments:
- cmnd_args = '%s %s %s' % (cmnd_args,exe,args)
- #endif
- # write host file ? PLS: This is done too early, and should be done
- # inside the *_run.jb script : that will let you specify nodes
- # scattered on different hosts (eg, linux cluster), and also different
- # from the current host (which probably is a login node)!! See
- # WriteJob below for cases of Loadleveler and Slurm.
- #
- # Leave it here for other cases not handled in WriteJob yet.
- hostfile = rcf.get('mpirun.hostfile',default='')
- if len(hostfile) > 0 :
- # get hostname:
- hname = socket.gethostname()
- # write hostname for each task:
- f = open(hostfile,'w')
- for i in range(ntask) : f.write( '%s\n' % hname )
- f.close()
-
- else :
-
- # standard run:
- cmnd_exec = exe
- cmnd_args = args
-
- # run in debugger ?
- if in_debugger :
- # debugger type:
- debugger = rcf.get( 'debugger' )
- # get debugger command:
- debugger_call = rcf.get( 'debugger.command' )
- # large differences ...
- if debugger == 'totalview' :
- # syntaxis: totalview <executable> [-a <arguments>]
- # pass executable:
- cmndline = '%s %s' % (debugger_call,cmnd_exec)
- # add arguments ?
- if len(cmnd_args) > 0 :
- cmndline = '%s -a %s' % (cmndline,cmnd_args)
- #endif
- elif debugger == 'idb' :
- # syntaxis: idb [-args <executable> <arguments>]
- # fill executable and arguments:
- cmndline = '%s -args %s %s' % (debugger_call,cmnd_exec,cmnd_args)
- else :
- logging.error('unsupported debugger : %s' % debugger )
- raise Exception
- #endif
-
- else :
-
- # standard line:
- cmndline = '%s %s' % (cmnd_exec,cmnd_args)
-
- # ok
- return cmndline
-
- # ***
- def WriteAndSubmitNewJob( rcfile, bindir ) :
- """
- Write first or next rcfile and job files(s) in the job chain;
- if chain is not finished yet, submit a new job.
-
- The argument could be the name of an rcfile or an rcfile object itself,
- since the submit script might have changed some values given
- the provided command line arguments.
-
- The following function is used:
-
- submit_tm5_setup_rcfile.WriteRcfile # writes the new rcfile
-
- This is placed in a seperate file since users might need to
- change these routines for their specific projects.
- """
-
- # external:
- import sys
- import logging
- import rc
-
- # import setup module:
- import submit_tm5_setup_rcfile
-
- # name provided ?
- if type(rcfile) == str :
- # load:
- rcf = rc.RcFile( rcfile )
- else :
- # just copy ..
- rcf = rcfile
- #endif
- # write next rfile, return name:
- try :
- rcfile_next = submit_tm5_setup_rcfile.WriteRcfile( rcf )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'exception from WriteRcfile' )
- raise Exception
-
- # finished ?
- if rcfile_next == None :
- logging.info( ' end of job chain !' )
- else :
- # write job file(s) for this period and return the (first) name;
- # last command in a file should submit the next job if necessary:
- logging.info( ' write jobfile for %s ...' % rcfile_next )
- try :
- jobfile_next = WriteJob( rcfile_next, bindir )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'exception from WriteJob' )
- raise Exception
- logging.info( ' submit next job : %s' % jobfile_next )
- try :
- jobid = SubmitJob( jobfile_next, rcfile_next )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'exception from SubmitJob' )
- raise Exception
-
- return
- # ***
- def WriteJob( rcfile, bindir ) :
- """
- jobfile = WriteJob(rcfile)
- Write job file given the settings in rcfile.
- The name of the jobfile is based on the name of the rcfile.
- The last command in the job should submit the next job,
- and the script is therefore written in python.
- """
-
- # external:
- import os
- import rc
-
- # load settings:
- rcf = rc.RcFile( rcfile )
-
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(rcfile)
-
- # loadleveler supports master job with multiple steps:
- with_master_job = (rcf.get('submit.to') == 'queue') and (rcf.get('queue') == 'loadleveler')
- # which shell ?
- job_shell = '/usr/bin/env python'
- # start master job ?
- if with_master_job :
- ntasks = rcf.get('par.ntask')
- # name of jobfile:
- jobfile = '%s.jb' % bname
- # set header:
- header = []
- header.append( '#! %s\n' % job_shell )
- header.append( '\n' )
- # init queue options:
- qopt = QueueOptions( bname, rcf, 'default' )
-
- # init job file:
- job = []
- job.append( '# external:\n' )
- job.append( 'import os\n' )
- job.append( 'import sys\n' )
- job.append( 'import socket\n')
- job.append( 'import subprocess\n' )
- job.append( 'import logging\n' )
- job.append( '\n' )
- job.append( '# setup messages:\n' )
- job.append( "logging.basicConfig( format='%(lineno)-4s:%(filename)-30s [%(levelname)-8s] %(message)s', level=logging.INFO, stream=sys.stdout )\n" )
- job.append( '\n' )
- job.append( '# prepend locations of python modules to search path:\n' )
- job.append( "sys.path.insert( 0, '%s' )\n" % bindir )
- job.append( '\n' )
- job.append( '# tools:\n' )
- job.append( 'import submit_tm5_tools\n' )
- job.append( '\n' )
- job.append( '# current loadleveler steps:\n' )
- job.append( 'step_name = os.getenv("LOADL_STEP_NAME")\n' )
- job.append( '\n' )
-
- # HOSTFILE - Moved here from Command_Line for loadleveler.
- hostfile = rcf.get('mpirun.hostfile',default='') # calling script will create hostfile
- if (len(hostfile) > 0) and rcf.get('par.mpi','bool') :
- job.append( 'f = open("%s",\'w\') \n' % hostfile )
- job.append( 'hname = socket.gethostname() \n')
- job.append( "for i in range( %s ) : \n" % ntasks )
- job.append( "\tf.write( '%s\\n' % hname)\n" )
- job.append( 'f.close() \n')
- job.append( '\n' )
- # job step names:
- steps = rcf.get('job.steps').split(' ')
- # number of steps:
- nstep = len(steps)
-
- # loop over job steps:
- for istep in range(nstep) :
- # current:
- step = steps[istep]
- # next:
- if istep < nstep-1 : step_next = steps[istep+1]
- # list with queue option lines for this step:
- qopt_step = QueueOptions( bname, rcf, step )
- # call to actual script:
- if step == 'run' :
- # get command line:
- exe = os.path.join( os.curdir, rcf.get('job.step.%s.exe' % step) )
- args = rcfile
- indb = rcf.get('submit.debugger','bool')
- cmndline = Command_Line( rcf, exe, args, indb )
- # <script> <commandline>
- step_command = '["%s/submit_tm5_step_%s","%s"]' % (bindir,step,cmndline)
- else :
- # <script> <rcfile>
- step_command = '["%s/submit_tm5_step_%s","%s","--bindir=%s"]' % (bindir,step,rcfile,bindir)
- #endif
-
- # add queue options to destintation:
- if with_master_job :
- # add to queue options for master job:
- qopt = qopt + qopt_step
- # add lines to run the step:
- job.append( '# which step ?\n' )
- job.append( 'if step_name == "%s" :\n' % step )
- job.append( ' \n' )
- job.append( ' # run:\n' )
- job.append( ' retcode = subprocess.call( %s )\n' % step_command )
- job.append( ' if retcode != 0 :\n' )
- job.append( ' logging.error( sys.exc_info()[1] )\n' )
- job.append( ' logging.error( \'exception from subprocess call to : %s\' )\n' % step_command )
- job.append( ' sys.exit(1)\n' )
- job.append( ' #endif\n' )
- job.append( ' \n' )
- # last step ? then add lines to submit next job:
- if istep == nstep-1 :
- job.append( ' # write and submit next job if necessary:\n' )
- job.append( ' submit_tm5_tools.WriteAndSubmitNewJob( "%s", "%s" )\n' % (rcfile,bindir) )
- job.append( ' \n' )
- #endif
- # close step:
- job.append( '#endif\n' )
- job.append( '\n' )
- else : # no master job, but seperate files
- # name of step job to be written:
- step_job_template = bname+'_%s.jb'
- # actual name:
- step_job = step_job_template % step
- # open:
- f = open( step_job, 'w' )
- # write header:
- f.write( '#! %s\n' % job_shell )
- f.write( '\n' )
- # add queue options:
- for line in qopt_step : f.write(line)
- # add lines to call the actual script:
- f.write( '# external:\n' )
- f.write( 'import sys\n' )
- f.write( 'import os\n' )
- f.write( 'import logging\n' )
- f.write( 'import socket\n' )
- f.write( 'import subprocess\n' )
- f.write( '\n' )
- f.write( '# go to run directory:\n' )
- f.write( 'os.chdir("%s")\n' % os.getcwd() )
- f.write( '\n' )
- f.write( '# prepend locations of python modules to search path:\n' )
- f.write( "sys.path.insert( 0, '%s' )\n" % bindir )
- f.write( '\n' )
- f.write( '# tools:\n' )
- f.write( 'import rc\n' )
- f.write( 'import submit_tm5_tools\n' )
- f.write( '\n' )
- f.write( '# setup messages:\n' )
- f.write( "logging.basicConfig( format='%(lineno)-4s:%(filename)-30s [%(levelname)-8s] %(message)s', level=logging.INFO, stream=sys.stdout )\n" )
- f.write( '\n' )
- f.write( '# info ...\n' )
- f.write( 'logging.info( "start" )\n' )
- f.write( '\n' )
- # module command if needed
- module_cmd = rcf.get('module.cmd',default='')
- if (module_cmd != '') and (step == 'run'):
- f.write( 'mod_cmd="%s".split()\n' % module_cmd )
- f.write( 'retcode = subprocess.call( mod_cmd )\n' )
- f.write( 'if retcode != 0:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from subprocess call to : %s\' )\n' % module_cmd )
- f.write( ' sys.exit(1)\n' )
- f.write( '\n' )
-
- # openMP
- if ( (rcf.get('queue') == 'slurm') or (rcf.get('queue') == 'pbs')) and \
- rcf.get('par.openmp','bool') and (step == 'run'):
- nthread = rcf.get( 'par.nthread', 'int' )
- f.write( "os.putenv( \'OMP_NUM_THREADS\', '%s')\n" % nthread )
- # HOSTFILE - Moved here (and adapted) from Command_Line for case of
- # "SLURM with a host file" (case of NEURON@KNMI, and not CARTESIUS@SARA).
- # Needed only if MPI, step run.
- hostfile = rcf.get('mpirun.hostfile',default='')
- if (len(hostfile) > 0) and (rcf.get('queue') == 'slurm') and rcf.get('par.mpi','bool') and (step == 'run'):
- f.write( 'f = open("%s",\'w\') \n' % hostfile)
- # PREVIOUS:
- #f.write( 'hname = socket.gethostname() \n')
- #f.write( 'for i in range(ntask) : f.write( '%s' % hname ) \n')
- # NOW: for SLURM on NEURON, need nodelist and number of
- # tasks-per-node effectively allocated.
-
- # --Idee #1
- # -- Use "output environment variables" to gather effective values:
- #
- #f.write( 'for node in os.getenv('SLURM_NODELIST'): \n')
- #f.write( ' for i in range(os.getenv('SLURM_NTASKS_PER_NODE'): f.write( '%s\n' % node ) \n')
- #
- # -- BUT that would not work in all imaginable cases... so we
- # -- should use SLURM_TASKS_PER_NODE instead, but its format, for eg:
- # -- 2(x3),1,5,2(x5)
- # -- is a bit annoying to process. Then idee #2... good old unix command line:
-
- # --Idee #2 - Use srun | sort | awk to get ordered list
- hcommand = '[ "srun -l /bin/hostname | sort -n | awk \'{print $2}\' > %s"]' % hostfile
- f.write( 'retcode = subprocess.call( %s, shell=True )\n' % hcommand )
- f.write( 'if retcode != 0:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from creating host file\' )\n' )
- f.write( ' sys.exit(1)\n' )
-
- f.write( 'f.close() \n')
- f.write( '\n' )
-
- f.write( '# call user script:\n' )
- f.write( 'retcode = subprocess.call( %s )\n' % step_command )
- f.write( 'if retcode != 0:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from subprocess call to : %s\' )\n' % step_command )
- f.write( ' sys.exit(1)\n' )
- f.write( '#endif\n' )
- f.write( '\n' )
- # add submission of next step?
- if istep < nstep-1 :
- # job script of next step:
- step_job_next = step_job_template % step_next
- # add submission command:
- f.write( '# submit next step:\n' )
- f.write( 'try :\n' )
- f.write( ' submit_tm5_tools.SubmitJob( "%s", "%s" )\n' % (step_job_next,rcfile) )
- f.write( 'except:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from SubmitJob( "%s", "%s" )\' )\n' % (step_job_next,rcfile) )
- f.write( ' sys.exit(1)\n' )
- f.write( '#endtry\n' )
- f.write( '\n' )
- else :
- # last step; might be necessary to submit a new job:
- f.write( '# write and submit next job if necessary:\n' )
- f.write( 'submit_tm5_tools.WriteAndSubmitNewJob( "%s", "%s" )\n' % (rcfile,bindir) )
- f.write( '\n' )
-
- f.write( '# info ...\n' )
- f.write( 'logging.info( "end" )\n' )
- f.write( '\n' )
- f.close()
- # make it executable and readible for all, writable for user only:
- # u+r u+w u+x g+r g-w g+x o+r o-w o+x
- os.chmod( step_job, 2**8 + 2**7 + 2**6 + 2**5 + 0 + 2**3 + 2**2 + 0 + 2**0 )
-
- # fill return value:
- if istep == 0 : jobfile = step_job
- # write master job:
- if with_master_job :
- # combine:
- job = header + qopt + job
- # write:
- f = open(jobfile,'w')
- f.writelines(job)
- f.close()
- # make it executable and readible for all, writable for user only:
- # u+r u+w u+x g+r g-w g+x o+r o-w o+x
- os.chmod( jobfile, 2**8 + 2**7 + 2**6 + 2**5 + 0 + 2**3 + 2**2 + 0 + 2**0 )
-
- return jobfile
- # ***
- def WriteAndSubmitBuildJob( rcf, command ):
- """
- Write bash script that run 'command' after being submitted to the
- queue. Written to compile TM5 in the queue.
- """
- import os
- orig_val = rcf.get('submit.to')
- dummy = rcf.replace('submit.to','queue')
-
- source_dir = os.getcwd() # assume that it is called from
- # source directory
- jobfile = os.path.join(source_dir,'build.jb')
- f = open( jobfile, 'w' )
- f.write( '#! /bin/bash\n' )
- f.write( '\n' )
-
- qopt = QueueOptions( 'buildTM', rcf, 'build' )
- for line in qopt : f.write(line)
- f.write( "cd %s \n" % source_dir )
- f.write( '%s' % ' '.join(command) )
- f.write( '\n' )
- f.close()
- id=SubmitJob( jobfile, rcf )
- dummy = rcf.replace('submit.to', orig_val)
- return
- # ***
- def QueueOptions( bname, rcf, step ) :
-
- """
- Return list with queue option lines.
- """
- # modules:
- import logging
- # submit to queue ?
- if rcf.get('submit.to') == 'queue' :
-
- # queue type:
- queue = rcf.get('queue')
- # different options and commands:
- if queue == 'loadleveler' :
- qopt = QueueOptions_LoadLeveler( bname, rcf, step )
- elif queue == 'bsub' :
- qopt = QueueOptions_BSub( bname, rcf, step )
- elif queue == 'qsub' :
- qopt = QueueOptions_QSub( bname, rcf, step )
- elif queue == 'pbs' :
- qopt = QueueOptions_PBS( bname, rcf, 'all' ) # init result with the "all" step
- qopt = qopt + QueueOptions_PBS( bname, rcf, step )
- elif queue == 'slurm' :
- qopt = QueueOptions_Slurm( bname, rcf, step )
- else :
- # not supported ...
- logging.error( 'unsupported queue : %s' % queue )
- raise Exception
- else:
- qopt = []
-
- return qopt
-
- # ***
- def SubmitJob( job_script, rcfile ) :
- """
- Submit jobscript. Where to submit to (foreground,background, queue) is
- read from rcfile settings. Returns jobid if submitting to queue (for now
- dummy for all cases except PBS).
- """
-
- import sys
- import logging
- import rc
- jobid='not-a-real-jobid-yet' # default
-
- # settings
- if type(rcfile) == str :
- rcf = rc.RcFile( rcfile )
- else :
- rcf = rcfile
-
- # where to ?
- submit_to = rcf.get('submit.to')
- # info ...
- logging.info( 'submit %s to %s ...' % (job_script,submit_to) )
- # call specific submit routines:
- if submit_to == 'foreground' :
- # call run script, catch errors:
- try :
- Run_Job_In_Foreground( job_script )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'from Run_Job_In_Foreground for %s' % job_script )
- raise Exception
-
- elif submit_to == 'background' :
-
- # call run script, catch errors:
- try :
- Submit_Job_To_Background( job_script, rcf )
- except :
- logging.error( 'from Submit_Job_To_Background for %s' % job_script )
- raise Exception
- elif submit_to == 'queue' :
-
- # queue type:
- queue = rcf.get('queue')
- # different options and commands:
- if queue == 'loadleveler' :
- Submit_Job_To_LoadLeveler( job_script, rcf )
- elif queue == 'bsub' :
- Submit_Job_To_BSub( job_script, rcf )
- elif queue == 'qsub' :
- Submit_Job_To_QSub( job_script, rcf )
- elif queue == 'pbs' :
- jobid = Submit_Job_To_PBS( job_script, rcf )
- elif queue == 'slurm' :
- Submit_Job_To_Slurm( job_script, rcf )
- else :
- # not supported ...
- logging.error( 'unsupported queue : %s' % queue )
- raise Exception
- else :
-
- # not supported ...
- logging.error( 'unsupported run environment : %s' % submit_to )
- sys.exit(1)
-
- return jobid
-
- # ======================================================================
- # ===
- # === foreground
- # ===
- # ======================================================================
- def Run_Job_In_Foreground( job_script ) :
- """
- Run job script in foreground.
- """
-
- # external:
- import sys
- import os
- import logging
- import subprocess
- # setup command line, e.g. './myscript.jb' :
- command = os.path.join(os.curdir,job_script)
- # execute:
- retcode = subprocess.call( command )
- if retcode != 0 :
- logging.error( sys.exc_info()[1] )
- logging.error( 'from subprocess call to : %s' % command )
- raise Exception
-
- return
- # ======================================================================
- # ===
- # === background
- # ===
- # ======================================================================
- def Submit_Job_To_Background( job_script, rcf ) :
- """
- Submit job to background.
- """
-
- # external:
- import sys
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_stdout = bname+'.out'
- job_stderr = bname+'.err'
- job_info = bname+'.info'
- # setup command line, e.g. './myscript.jb' :
- command = os.path.join(os.curdir,job_script)
- # re-direct standard output:
- command = command+' > %s' % job_stdout
- # write error messages to seperate file:
- #command = command+' 2> %s' % job_stderr
- command = command+' 2>&1'
-
- # run in background, return process id:
- logging.info( 'run shell command : "%s" ...' % command )
- p = subprocess.Popen( command, shell=True )
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( ' standard output : %s\n' % job_stdout )
- infotext.append( ' standard error : %s\n' % job_stderr )
- infotext.append( '\n' )
- infotext.append( 'Process snapshot:\n')
- infotext.append( '\n')
- p2 = subprocess.Popen( '/bin/ps -f -p %i' % p.pid, shell=True,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- for line in p2.stdout.readlines() : infotext.append( line )
- infotext.append( '\n')
- infotext.append( 'To manage this process:\n' )
- infotext.append( '\n' )
- infotext.append( ' # show process snapshot:\n' )
- infotext.append( ' ps -f -p %i\n' % p.pid )
- infotext.append( ' \n' )
- infotext.append( ' # kill process:\n' )
- infotext.append( ' kill %i\n' % p.pid )
- infotext.append( ' \n' )
- infotext.append( ' # follow standard output:\n' )
- infotext.append( ' tail -f %s\n' % job_stdout )
- infotext.append( '\n' )
- # write to log:
- for line in infotext : logging.info( line.strip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- return
- # ======================================================================
- # ===
- # === LoadLeveler queue
- # ===
- # ======================================================================
- def QueueOptions_LoadLeveler( bname, rcf, step ) :
- """
- Return list with queue options.
- """
- # external:
- import math
-
- # init result:
- qopt = []
- # which step ?
- if step == 'default' :
-
- # list with options:
- opts = rcf.get( 'queue.ll.options.%s' % step ).split()
- # default options:
- for opt in opts :
- # get value:
- val = rcf.get( 'queue.ll.option.%s.%s' % (step,opt) )
- # write:
- qopt.append( '#@ %-20s = %s\n' % (opt,val) )
- #endfor
- # layout ...
- qopt.append( '\n' )
- else :
-
- # list with options:
- opts = rcf.get( 'queue.ll.options.%s' % step ).split()
- # default options:
- for opt in opts :
- # get value:
- val = rcf.get( 'queue.ll.option.%s.%s' % (step,opt) )
- # to be set ?
- if val == '<auto>' :
- # differs per option ...
- if opt == 'output' :
- val = '%s_%s.out' % (bname,step)
- elif opt == 'error' :
- val = '%s_%s.err' % (bname,step)
- elif opt == 'tasks_per_node' :
- t_mpi = rcf.get( 'par.ntask', 'int' )
- t_omp = rcf.get( 'par.nthread', 'int' )
- val = str(t_omp*t_mpi)
- #endif
- #endif
- # no, empty, or normal value ?
- if val == '<none>' :
- # skip this keyword:
- continue
- elif val == '' :
- # just the keyword:
- qopt.append( '#@ %s\n' % opt )
- else :
- # keyword and value:
- qopt.append( '#@ %-20s = %s\n' % (opt,val) )
- #endif
- #endfor
- # layout ...
- qopt.append( '\n' )
-
- return qopt
- # ***
- def Submit_Job_To_LoadLeveler( job_script, rcf ) :
- """
- Submit job to LoadLeveler queue.
- """
-
- # external:
- import sys
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_info = bname+'.info'
- # options passed directly to submission command:
- qopts = rcf.get( 'queue.ll.submit.options' )
- # add options passed to submit script:
- qopts = qopts+' '+rcf.get('submit.options')
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'llsubmit '+qopts
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
-
- # init submission info file:
- infotext = []
- infotext.append( '\n' )
- # call submit command, trap errors:
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'from subprocess.Popen( %s )' % command.split() )
- raise Exception
- #endtry
- # extract:
- outlines = p.stdout.readlines()
- # add to help info message:
- infotext = infotext + outlines
- # extract job id from last line:
- # llsubmit: The job "c1a0303.4290133" with 3 job steps has been submitted.
- firstwords = 'llsubmit: The job'
- lastline = outlines[-1]
- if lastline.startswith(firstwords) :
- job_id = lastline.lstrip(firstwords).split()[0].replace('"','')
- else :
- job_id = '<job-id>'
- #endif
-
- # add help text to submission info:
- infotext.append( '\n' )
- infotext.append( 'To manage LoadLeveler jobs:\n' )
- infotext.append( '\n' )
- infotext.append( ' llq [-u ${USER}] # list [your] current jobs\n' )
- infotext.append( ' llq %s # list this job\n' % job_id )
- infotext.append( ' llcancel %s # kill this job\n' % job_id )
- infotext.append( '\n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- # ok
- return
- #enddef
-
- # ======================================================================
- # ===
- # === BSUB queue
- # ===
- # ======================================================================
- def QueueOptions_BSub( bname, rcf, step ) :
- """
- Return list with queue options.
- """
- # init result:
- qopt = []
- # List of options for specified step:
- opts = rcf.get( 'queue.bsub.options.%s' % step ).split()
- # loop over options:
- for opt in opts :
- # get value:
- val = rcf.get( 'queue.bsub.option.%s.%s' % (step,opt) )
- # default options:
- if val == '<auto>' :
- if opt in ['o','oo'] :
- val = '%s_%s.out' % (bname,step)
- elif opt in ['e','eo'] :
- val = '%s_%s.err' % (bname,step)
- #endif
- #endif # <auto> value
- # define option key:
- # queue.bsub.options.R : 20 -> -R 20
- # queue.bsub.options.Rx : -R 20 -> -R 20
- if val.startswith('-') :
- qopt.append( '#BSUB %s\n' % (val) )
- else :
- qopt.append( '#BSUB -%s %s\n' % (opt,val) )
- #endif
- #endfor # opts
- # layout ...
- qopt.append( '\n' )
-
- # ok
- return qopt
-
- #enddef QueueOptions_BSub
- # ***
- def Submit_Job_To_BSub( job_script, rcf ) :
- """
- Submit job to BSUB queue.
- """
-
- # external:
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_info = bname+'.info'
- # options passed directly to submission command:
- qopts = rcf.get( 'queue.bsub.submit.options' )
- # add options passed to submit script:
- qopts = qopts+' '+rcf.get('submit.options')
-
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'bsub '+qopts
- # pass job script to std.input:
- command = command+' < '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError, err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- raise Exception
- #endtry
- # extract:
- outlines = p.stdout.readlines()
- # display:
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # <jobname> <jobnr>
- # extract job nr:
- job_nr = outlines[0].split()[1].strip('<>')
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' bkill %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' bjobs\n' )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- # ok
- return
- #enddef
-
- # ======================================================================
- # ===
- # === QSUB queue
- # ===
- # ======================================================================
- def QueueOptions_QSub( bname, rcf, step ) :
- """
- Return list with queue options.
- """
- # init result:
- qopt = []
- # list with options:
- opts = rcf.get( 'queue.qsub.options' ).split()
- append_jobstep = rcf.get('queue.qsub.name.append.jobstep',default=False)
- # default options:
- for opt in opts :
- # look first for a jobstep-specific qsub option
- val = rcf.get( 'queue.qsub.option.%s.%s' % (opt,step), default='STEP_SPECIFIC_KEY_MISSING' )
- # if jobstep-specific option is missing, look for one without jobstep
- if val == 'STEP_SPECIFIC_KEY_MISSING':
- val = rcf.get( 'queue.qsub.option.%s' % opt)
- if (opt == 'N') and append_jobstep:
- jobstep = rcf.get('jobstep')
- val = val + '_%03d' % (int(jobstep))
- # fill option line:
- qopt.append( '#PBS -%s %s\n' % (opt,val) )
- #endfor
- # layout ...
- qopt.append( '\n' )
-
- # ok
- return qopt
-
- #enddef
- # ***
- def Submit_Job_To_QSub( job_script, rcf ) :
- """
- Submit job to QSUB queue.
- """
-
- # external:
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_info = bname+'.info'
- # options passed directly to submission command:
- qopts = rcf.get( 'queue.qsub.submit.options' )
- # add options passed to submit script:
- qopts = qopts+' '+rcf.get('submit.options')
-
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'qsub '+qopts
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError, err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- raise Exception
- #endtry
- # extract:
- outlines = p.stdout.readlines()
- # display:
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # jobnr
- # extract job nr:
- job_nr = outlines[0].split()[0]
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' qdel %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' qstat [-u ${USER}]\n' )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- # ok
- return
-
- # ======================================================================
- # ===
- # === PBS queue (PBSpro)
- # ===
- # === Note that there are several implementations of PBS and the "pbs queue"
- # === here, although it also uses the qsub command, differs from the "qsub
- # === queue" defined above. The later uses #$ directives, while the pbsPro
- # === directives start with #PBS.
- # ===
- # ======================================================================
- def QueueOptions_PBS( bname, rcf, step ) :
- """
- Return list with queue options. To call twice, first for pseudo-step
- 'all'. See QueueOptions above.
- """
- import os
- qopt = [] # init result
-
- opts = rcf.get( 'queue.pbs.options.%s' % step ).split() # list with options
- # use fully qualified filename for log files
- if step != 'build':
- fqpath = rcf.get('rundir')
- else:
- fqpath = rcf.get('build.sourcedir')
- fqname = os.path.join( fqpath, bname )
- # build list of job directives
- for opt in opts :
- val = rcf.get( 'queue.pbs.option.%s.%s' % (step,opt) )
- # deal w/ multiple options with -l
- if opt == 'l':
- l_vals = val.split()
- for lval in l_vals: qopt.append( '#PBS -l %s\n' % lval )
- continue
- # options still to be set
- if val == '<auto>' :
- if opt == 'o' :
- val = '%s_%s.out' % (fqname,step)
- elif opt == 'e' :
- val = '%s_%s.err' % (fqname,step)
- # none, empty or plain normal value ?
- if val == '<none>' :
- continue # skip this option
- elif val == '' :
- qopt.append( '#PBS -%s\n' % opt ) # just the keyword
- else :
- qopt.append( '#PBS -%s %s\n' % (opt,val) ) # keyword and value
- qopt.append( '\n' )
- return qopt
- def Submit_Job_To_PBS( job_script, rcf ) :
- """
- Submit job to PBS queue.
- """
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_info = bname+'.info'
- # Two set of options to pass directly unmodified at the submission command :
- qopts = rcf.get( 'queue.pbs.submit.options' ) # (1) from machine or queue (if separated from machine) rcfile
- qopts = qopts+' '+rcf.get('submit.options') # (2) from expert rcfile
-
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'qsub '+qopts
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError, err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- raise Exception
- # log output
- outlines = p.stdout.readlines()
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # jobnr
- # extract job nr:
- job_nr = outlines[0].split()[0]
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' qdel %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' qstat [-u ${USER}]\n' )
- infotext.append( ' \n' )
- infotext.append( 'To monitor this running job (ECMWF only!):\n' )
- infotext.append( ' \n' )
- infotext.append( ' qcat %s\n' % job_nr )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- return job_nr
- # ======================================================================
- # ===
- # === SLURM queue
- # ===
- # ======================================================================
- def QueueOptions_Slurm( bname, rcf, step ) :
- """
- Return list with queue options directives, that will be at the top of
- script.
- Script for run step (NEURON@KNMI) should contains:
- #SBATCH -n ${par.nthread}*${ntask}
-
- export OMP_NUM_THREADS=${par.nthread}
- mpiexec.hydra -machinefile ./mpd.hosts -np ${ntask} ./$bin
-
- """
- this_jobname=""
- ntasks=0
- qopt = [] # init result
- # List of options for specified step:
- if rcf.has_key(( 'queue.slurm.options.%s' % step )) :
- opts = rcf.get( 'queue.slurm.options.%s' % step ).split()
- else:
- opts = rcf.get( 'queue.slurm.options').split()
- # NOAA-specific option
- append_jobstep = rcf.get('queue.slurm.name.append.jobstep',default=False)
- for opt in opts :
- # look first for a step-specific slurm option
- val = rcf.get( 'queue.slurm.option.%s.%s' % (step,opt), default='STEP_SPECIFIC_KEY_MISSING' )
- # if step-specific option is missing, look for one without step
- if (val == 'STEP_SPECIFIC_KEY_MISSING') :
- # get value:
- val = rcf.get( 'queue.slurm.option.%s' % opt, default='NOT_PRESENT' )
- # default options:
- if val == '<auto>' :
- if opt == 'o' :
- val = '%s_%s.out' % (bname,step)
- elif opt == 'e' :
- val = '%s_%s.err' % (bname,step)
- if (opt == 'J' or opt == 'job-name'):
- if append_jobstep:
- jobstep = rcf.get('jobstep')
- val = val + '_' + ( '%03d' % int(jobstep) )
- this_jobname=val
- if (opt == 'ntasks'):
- ntasks=int(val)
- # skip unset (empty) directives
- if opt == 'p' or 'w':
- if len(val)==0 : continue
-
- # Some options are represented byonly a single-letter
- # (e.g. J), and these require a single dash
- # (e.g. -J <jobname>) but others have only a multi-letter
- # invocation, and need two dashes (e.g. --qos batch).
- dashes='-'
- if(len(opt)>1):
- dashes='--'
- qopt.append( '#SBATCH %s%s %s\n' % (dashes,opt,val) )
- # layout ...
- qopt.append( '\n' )
-
- # ok
- return qopt
-
- def Submit_Job_To_Slurm( job_script, rcf ) :
- """
- Submit job to SLURM queue.
- """
-
- # external:
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_info = bname+'.info'
- # Two set of options to pass directly unmodified at the submission command :
- qopts = rcf.get( 'queue.slurm.submit.options' ) # (1) from pycassso-queue-slurm.rc
- qopts = qopts+' '+rcf.get('submit.options') # (2) from pycasso-tm5-expert.rc
-
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'sbatch '+qopts
-
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError, err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- raise Exception
-
- # extract:
- outlines = p.stdout.readlines()
- # display:
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # jobnr
- # extract job nr:
- job_nr = outlines[0].split()[3]
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' scancel %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' squeue [-u ${USER}]\n' )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- return
- # ======================================================================
- # ===
- # === end
- # ===
- # ======================================================================
|