1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426 |
- #
- # TM5 run tools
- #
- # ***
- def Command_Line( rcf, exe, args, in_debugger ) :
- """
- Return command line that runs executable.
-
- ARGUMENTS
- rcf
- Rcfile with settings.
- exe
- Name of executable.
- args
- Arguments to be passed to executable.
- in_debugger
- Set to 'True' if the job should be run in a debugger.
- RETURN VALUES
- cmndline
- Command line to be executed.
- """
- # external
- import socket
- import logging
-
- # mpi run ?
- if rcf.get('par.mpi','bool') :
-
- # number of mpi tasks:
- ntask = rcf.get('par.ntask','int')
-
- # get command line:
- cmnd_exec = rcf.get('mpirun.command')
- cmnd_args = rcf.get('mpirun.args' )
-
- # write command file ?
- cmdfile = rcf.get('mpirun.cmdfile',default='')
- if len(cmdfile) > 0 :
- # write command line for each task:
- f = open(cmdfile,'w')
- for i in range(ntask) : f.write( '%s %s\n' % (exe,args) )
- f.close()
- else :
- # otherwise, add the executable and its arguments:
- cmnd_args = '%s %s %s' % (cmnd_args,exe,args)
- #endif
- # write host file ? PLS: This is done too early, and should be done
- # inside the *_run.jb script : that will let you specify nodes
- # scattered on different hosts (eg, linux cluster), and also different
- # from the current host (which probably is a login node)!! See
- # WriteJob below for cases of Loadleveler and Slurm.
- #
- # Leave it here for other cases not handled in WriteJob yet.
- hostfile = rcf.get('mpirun.hostfile',default='')
- if len(hostfile) > 0 :
- # get hostname:
- hname = socket.gethostname()
- # write hostname for each task:
- f = open(hostfile,'w')
- for i in range(ntask) : f.write( '%s\n' % hname )
- f.close()
-
- else :
-
- # standard run:
- cmnd_exec = exe
- cmnd_args = args
-
- # run in debugger ?
- if in_debugger :
- # debugger type:
- debugger = rcf.get( 'debugger' )
- # get debugger command:
- debugger_call = rcf.get( 'debugger.command' )
- # large differences ...
- if debugger == 'totalview' :
- # syntaxis: totalview <executable> [-a <arguments>]
- # pass executable:
- cmndline = '%s %s' % (debugger_call,cmnd_exec)
- # add arguments ?
- if len(cmnd_args) > 0 :
- cmndline = '%s -a %s' % (cmndline,cmnd_args)
- #endif
- elif debugger == 'idb' :
- # syntaxis: idb [-args <executable> <arguments>]
- # fill executable and arguments:
- cmndline = '%s -args %s %s' % (debugger_call,cmnd_exec,cmnd_args)
- else :
- logging.error('unsupported debugger : %s' % debugger )
- raise Exception
- #endif
-
- else :
-
- # standard line:
- cmndline = '%s %s' % (cmnd_exec,cmnd_args)
-
- # ok
- return cmndline
-
- # ***
- def WriteAndSubmitNewJob( rcfile, bindir ) :
- """
- Write first or next rcfile and job files(s) in the job chain;
- if chain is not finished yet, submit a new job.
-
- The argument could be the name of an rcfile or an rcfile object itself,
- since the submit script might have changed some values given
- the provided command line arguments.
-
- The following function is used:
-
- submit_tm5_setup_rcfile.WriteRcfile # writes the new rcfile
-
- This is placed in a seperate file since users might need to
- change these routines for their specific projects.
- """
-
- # external:
- import sys
- import logging
- import rc
-
- # import setup module:
- import submit_tm5_setup_rcfile
-
- # name provided ?
- if type(rcfile) == str :
- # load:
- rcf = rc.RcFile( rcfile )
- else :
- # just copy ..
- rcf = rcfile
- #endif
- # write next rfile, return name:
- try :
- rcfile_next = submit_tm5_setup_rcfile.WriteRcfile( rcf )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'exception from WriteRcfile' )
- raise Exception
-
- # finished ?
- if rcfile_next == None :
- logging.info( ' end of job chain !' )
- else :
- # write job file(s) for this period and return the (first) name;
- # last command in a file should submit the next job if necessary:
- logging.info( ' write jobfile for %s ...' % rcfile_next )
- try :
- jobfile_next = WriteJob( rcfile_next, bindir )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'exception from WriteJob' )
- raise Exception
- logging.info( ' submit next job : %s' % jobfile_next )
- try :
- jobid = SubmitJob( jobfile_next, rcfile_next )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'exception from SubmitJob' )
- raise Exception
-
- return
- # ***
- def WriteJob( rcfile, bindir ) :
- """
- jobfile = WriteJob(rcfile)
- Write job file given the settings in rcfile.
- The name of the jobfile is based on the name of the rcfile.
- The last command in the job should submit the next job,
- and the script is therefore written in python.
- """
-
- # external:
- import os
- import rc
-
- # load settings:
- rcf = rc.RcFile( rcfile )
-
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(rcfile)
-
- # loadleveler supports master job with multiple steps:
- with_master_job = (rcf.get('submit.to') == 'queue') and (rcf.get('queue') == 'loadleveler')
- # which shell ?
- job_shell = '/usr/bin/env python'
- # start master job ?
- if with_master_job :
- ntasks = rcf.get('par.ntask')
- # name of jobfile:
- jobfile = '%s.jb' % bname
- # set header:
- header = []
- header.append( '#! %s\n' % job_shell )
- header.append( '\n' )
- # init queue options:
- qopt = QueueOptions( bname, rcf, 'default' )
-
- # init job file:
- job = []
- job.append( '# external:\n' )
- job.append( 'import os\n' )
- job.append( 'import sys\n' )
- job.append( 'import socket\n')
- job.append( 'import subprocess\n' )
- job.append( 'import logging\n' )
- job.append( '\n' )
- job.append( '# setup messages:\n' )
- job.append( "logging.basicConfig( format='%(lineno)-4s:%(filename)-30s [%(levelname)-8s] %(message)s', level=logging.INFO, stream=sys.stdout )\n" )
- job.append( '\n' )
- job.append( '# prepend locations of python modules to search path:\n' )
- job.append( "sys.path.insert( 0, '%s' )\n" % bindir )
- job.append( '\n' )
- job.append( '# tools:\n' )
- job.append( 'import submit_tm5_tools\n' )
- job.append( '\n' )
- job.append( '# current loadleveler steps:\n' )
- job.append( 'step_name = os.getenv("LOADL_STEP_NAME")\n' )
- job.append( '\n' )
-
- # HOSTFILE - Moved here from Command_Line for loadleveler.
- hostfile = rcf.get('mpirun.hostfile',default='') # calling script will create hostfile
- if (len(hostfile) > 0) and rcf.get('par.mpi','bool') :
- job.append( 'f = open("%s",\'w\') \n' % hostfile )
- job.append( 'hname = socket.gethostname() \n')
- job.append( "for i in range( %s ) : \n" % ntasks )
- job.append( "\tf.write( '%s\\n' % hname)\n" )
- job.append( 'f.close() \n')
- job.append( '\n' )
- # job step names:
- steps = rcf.get('job.steps').split(' ')
- # number of steps:
- nstep = len(steps)
-
- # loop over job steps:
- for istep in range(nstep) :
- # current:
- step = steps[istep]
- # next:
- if istep < nstep-1 : step_next = steps[istep+1]
- # list with queue option lines for this step:
- qopt_step = QueueOptions( bname, rcf, step )
- # call to actual script:
- if step == 'run' :
- # get command line:
- exe = os.path.join( os.curdir, rcf.get('job.step.%s.exe' % step) )
- args = rcfile
- indb = rcf.get('submit.debugger','bool')
- cmndline = Command_Line( rcf, exe, args, indb )
- # <script> <commandline>
- step_command = '["%s/submit_tm5_step_%s","%s"]' % (bindir,step,cmndline)
- else :
- # <script> <rcfile>
- step_command = '["%s/submit_tm5_step_%s","%s","--bindir=%s"]' % (bindir,step,rcfile,bindir)
- #endif
-
- # add queue options to destintation:
- if with_master_job :
- # add to queue options for master job:
- qopt = qopt + qopt_step
- # add lines to run the step:
- job.append( '# which step ?\n' )
- job.append( 'if step_name == "%s" :\n' % step )
- job.append( ' \n' )
- job.append( ' # run:\n' )
- job.append( ' retcode = subprocess.call( %s )\n' % step_command )
- job.append( ' if retcode != 0 :\n' )
- job.append( ' logging.error( sys.exc_info()[1] )\n' )
- job.append( ' logging.error( \'exception from subprocess call to : %s\' )\n' % step_command )
- job.append( ' sys.exit(1)\n' )
- job.append( ' #endif\n' )
- job.append( ' \n' )
- # last step ? then add lines to submit next job:
- if istep == nstep-1 :
- job.append( ' # write and submit next job if necessary:\n' )
- job.append( ' submit_tm5_tools.WriteAndSubmitNewJob( "%s", "%s" )\n' % (rcfile,bindir) )
- job.append( ' \n' )
- #endif
- # close step:
- job.append( '#endif\n' )
- job.append( '\n' )
- else : # no master job, but seperate files
- # name of step job to be written:
- step_job_template = bname+'_%s.jb'
- # actual name:
- step_job = step_job_template % step
- # open:
- f = open( step_job, 'w' )
- # write header:
- f.write( '#! %s\n' % job_shell )
- f.write( '\n' )
- # add queue options:
- for line in qopt_step : f.write(line)
- # add lines to call the actual script:
- f.write( '# external:\n' )
- f.write( 'import sys\n' )
- f.write( 'import os\n' )
- f.write( 'import logging\n' )
- f.write( 'import socket\n' )
- f.write( 'import subprocess\n' )
- f.write( '\n' )
- f.write( '# go to run directory:\n' )
- f.write( 'os.chdir("%s")\n' % os.getcwd() )
- f.write( '\n' )
- f.write( '# prepend locations of python modules to search path:\n' )
- f.write( "sys.path.insert( 0, '%s' )\n" % bindir )
- f.write( '\n' )
- f.write( '# tools:\n' )
- f.write( 'import rc\n' )
- f.write( 'import submit_tm5_tools\n' )
- f.write( '\n' )
- f.write( '# setup messages:\n' )
- f.write( "logging.basicConfig( format='%(lineno)-4s:%(filename)-30s [%(levelname)-8s] %(message)s', level=logging.INFO, stream=sys.stdout )\n" )
- f.write( '\n' )
- f.write( '# info ...\n' )
- f.write( 'logging.info( "start" )\n' )
- f.write( '\n' )
- # module command if needed
- module_cmd = rcf.get('module.cmd',default='')
- if (module_cmd != '') and (step == 'run'):
- f.write( 'mod_cmd="%s".split()\n' % module_cmd )
- f.write( 'retcode = subprocess.call( mod_cmd )\n' )
- f.write( 'if retcode != 0:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from subprocess call to : %s\' )\n' % module_cmd )
- f.write( ' sys.exit(1)\n' )
- f.write( '\n' )
-
- # openMP
- if ( (rcf.get('queue') == 'slurm') or (rcf.get('queue') == 'pbs')) and \
- rcf.get('par.openmp','bool') and (step == 'run'):
- nthread = rcf.get( 'par.nthread', 'int' )
- f.write( "os.putenv( \'OMP_NUM_THREADS\', '%s')\n" % nthread )
- # HOSTFILE - Moved here (and adapted) from Command_Line for case of
- # "SLURM with a host file" (case of NEURON@KNMI, and not CARTESIUS@SARA).
- # Needed only if MPI, step run.
- hostfile = rcf.get('mpirun.hostfile',default='')
- if (len(hostfile) > 0) and (rcf.get('queue') == 'slurm') and rcf.get('par.mpi','bool') and (step == 'run'):
- f.write( 'f = open("%s",\'w\') \n' % hostfile)
- # PREVIOUS:
- #f.write( 'hname = socket.gethostname() \n')
- #f.write( 'for i in range(ntask) : f.write( '%s' % hname ) \n')
- # NOW: for SLURM on NEURON, need nodelist and number of
- # tasks-per-node effectively allocated.
-
- # --Idee #1
- # -- Use "output environment variables" to gather effective values:
- #
- #f.write( 'for node in os.getenv('SLURM_NODELIST'): \n')
- #f.write( ' for i in range(os.getenv('SLURM_NTASKS_PER_NODE'): f.write( '%s\n' % node ) \n')
- #
- # -- BUT that would not work in all imaginable cases... so we
- # -- should use SLURM_TASKS_PER_NODE instead, but its format, for eg:
- # -- 2(x3),1,5,2(x5)
- # -- is a bit annoying to process. Then idee #2... good old unix command line:
-
- # --Idee #2 - Use srun | sort | awk to get ordered list
- hcommand = '[ "srun -l /bin/hostname | sort -n | awk \'{print $2}\' > %s"]' % hostfile
- f.write( 'retcode = subprocess.call( %s, shell=True )\n' % hcommand )
- f.write( 'if retcode != 0:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from creating host file\' )\n' )
- f.write( ' sys.exit(1)\n' )
-
- f.write( 'f.close() \n')
- f.write( '\n' )
-
- f.write( '# call user script:\n' )
- f.write( 'retcode = subprocess.call( %s )\n' % step_command )
- f.write( 'if retcode != 0:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from subprocess call to : %s\' )\n' % step_command )
- f.write( ' sys.exit(1)\n' )
- f.write( '#endif\n' )
- f.write( '\n' )
- # add submission of next step?
- if istep < nstep-1 :
- # job script of next step:
- step_job_next = step_job_template % step_next
- # add submission command:
- f.write( '# submit next step:\n' )
- f.write( 'try :\n' )
- f.write( ' submit_tm5_tools.SubmitJob( "%s", "%s" )\n' % (step_job_next,rcfile) )
- f.write( 'except:\n' )
- f.write( ' logging.error( sys.exc_info()[1] )\n' )
- f.write( ' logging.error( \'exception from SubmitJob( "%s", "%s" )\' )\n' % (step_job_next,rcfile) )
- f.write( ' sys.exit(1)\n' )
- f.write( '#endtry\n' )
- f.write( '\n' )
- else :
- # last step; might be necessary to submit a new job:
- f.write( '# write and submit next job if necessary:\n' )
- f.write( 'submit_tm5_tools.WriteAndSubmitNewJob( "%s", "%s" )\n' % (rcfile,bindir) )
- f.write( '\n' )
-
- f.write( '# info ...\n' )
- f.write( 'logging.info( "end" )\n' )
- f.write( '\n' )
- f.close()
- # make it executable and readible for all, writable for user only:
- # u+r u+w u+x g+r g-w g+x o+r o-w o+x
- os.chmod( step_job, 2**8 + 2**7 + 2**6 + 2**5 + 0 + 2**3 + 2**2 + 0 + 2**0 )
-
- # fill return value:
- if istep == 0 : jobfile = step_job
- # write master job:
- if with_master_job :
- # combine:
- job = header + qopt + job
- # write:
- f = open(jobfile,'w')
- f.writelines(job)
- f.close()
- # make it executable and readible for all, writable for user only:
- # u+r u+w u+x g+r g-w g+x o+r o-w o+x
- os.chmod( jobfile, 2**8 + 2**7 + 2**6 + 2**5 + 0 + 2**3 + 2**2 + 0 + 2**0 )
-
- return jobfile
- # ***
- def WriteAndSubmitBuildJob( rcf, command ):
- """
- Write bash script that run 'command' after being submitted to the
- queue. Written to compile TM5 in the queue.
- """
- import os
- orig_val = rcf.get('submit.to')
- dummy = rcf.replace('submit.to','queue')
-
- source_dir = os.getcwd() # assume that it is called from
- # source directory
- jobfile = os.path.join(source_dir,'build.jb')
- f = open( jobfile, 'w' )
- f.write( '#! /bin/bash\n' )
- f.write( '\n' )
-
- qopt = QueueOptions( 'buildTM', rcf, 'build' )
- for line in qopt : f.write(line)
- f.write( "cd %s \n" % source_dir )
- f.write( '%s' % ' '.join(command) )
- f.write( '\n' )
- f.close()
- id=SubmitJob( jobfile, rcf )
- dummy = rcf.replace('submit.to', orig_val)
- return
- # ***
- def QueueOptions( bname, rcf, step ) :
-
- """
- Return list with queue option lines.
- """
- # modules:
- import logging
- # submit to queue ?
- if rcf.get('submit.to') == 'queue' :
-
- # queue type:
- queue = rcf.get('queue')
- # different options and commands:
- if queue == 'loadleveler' :
- qopt = QueueOptions_LoadLeveler( bname, rcf, step )
- elif queue == 'bsub' :
- qopt = QueueOptions_BSub( bname, rcf, step )
- elif queue == 'qsub' :
- qopt = QueueOptions_QSub( bname, rcf, step )
- elif queue == 'pbs' :
- qopt = QueueOptions_PBS( bname, rcf, 'all' ) # init result with the "all" step
- qopt = qopt + QueueOptions_PBS( bname, rcf, step )
- elif queue == 'slurm' :
- qopt = QueueOptions_Slurm( bname, rcf, step )
- else :
- # not supported ...
- logging.error( 'unsupported queue : %s' % queue )
- raise Exception
- else:
- qopt = []
-
- return qopt
-
- # ***
- def SubmitJob( job_script, rcfile ) :
- """
- Submit jobscript. Where to submit to (foreground,background, queue) is
- read from rcfile settings. Returns jobid if submitting to queue (for now
- dummy for all cases except PBS).
- """
-
- import sys
- import logging
- import rc
- jobid='not-a-real-jobid-yet' # default
-
- # settings
- if type(rcfile) == str :
- rcf = rc.RcFile( rcfile )
- else :
- rcf = rcfile
-
- # where to ?
- submit_to = rcf.get('submit.to')
- # info ...
- logging.info( 'submit %s to %s ...' % (job_script,submit_to) )
- # call specific submit routines:
- if submit_to == 'foreground' :
- # call run script, catch errors:
- try :
- Run_Job_In_Foreground( job_script )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'from Run_Job_In_Foreground for %s' % job_script )
- raise Exception
-
- elif submit_to == 'background' :
-
- # call run script, catch errors:
- try :
- Submit_Job_To_Background( job_script, rcf )
- except :
- logging.error( 'from Submit_Job_To_Background for %s' % job_script )
- raise Exception
- elif submit_to == 'queue' :
-
- # queue type:
- queue = rcf.get('queue')
- # different options and commands:
- if queue == 'loadleveler' :
- Submit_Job_To_LoadLeveler( job_script, rcf )
- elif queue == 'bsub' :
- Submit_Job_To_BSub( job_script, rcf )
- elif queue == 'qsub' :
- Submit_Job_To_QSub( job_script, rcf )
- elif queue == 'pbs' :
- jobid = Submit_Job_To_PBS( job_script, rcf )
- elif queue == 'slurm' :
- Submit_Job_To_Slurm( job_script, rcf )
- else :
- # not supported ...
- logging.error( 'unsupported queue : %s' % queue )
- raise Exception
- else :
-
- # not supported ...
- logging.error( 'unsupported run environment : %s' % submit_to )
- sys.exit(1)
-
- return jobid
-
- # ======================================================================
- # ===
- # === foreground
- # ===
- # ======================================================================
- def Run_Job_In_Foreground( job_script ) :
- """
- Run job script in foreground.
- """
-
- # external:
- import sys
- import os
- import logging
- import subprocess
- # setup command line, e.g. './myscript.jb' :
- command = os.path.join(os.curdir,job_script)
- # execute:
- retcode = subprocess.call( command )
- if retcode != 0 :
- logging.error( sys.exc_info()[1] )
- logging.error( 'from subprocess call to : %s' % command )
- raise Exception
-
- return
- # ======================================================================
- # ===
- # === background
- # ===
- # ======================================================================
- def Submit_Job_To_Background( job_script, rcf ) :
- """
- Submit job to background.
- """
-
- # external:
- import sys
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_stdout = bname+'.out'
- job_stderr = bname+'.err'
- job_info = bname+'.info'
- # setup command line, e.g. './myscript.jb' :
- command = os.path.join(os.curdir,job_script)
- # re-direct standard output:
- command = command+' > %s' % job_stdout
- # write error messages to seperate file:
- #command = command+' 2> %s' % job_stderr
- command = command+' 2>&1'
-
- # run in background, return process id:
- logging.info( 'run shell command : "%s" ...' % command )
- p = subprocess.Popen( command, shell=True )
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( ' standard output : %s\n' % job_stdout )
- infotext.append( ' standard error : %s\n' % job_stderr )
- infotext.append( '\n' )
- infotext.append( 'Process snapshot:\n')
- infotext.append( '\n')
- p2 = subprocess.Popen( '/bin/ps -f -p %i' % p.pid, shell=True,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- for line in p2.stdout.readlines() : infotext.append( line )
- infotext.append( '\n')
- infotext.append( 'To manage this process:\n' )
- infotext.append( '\n' )
- infotext.append( ' # show process snapshot:\n' )
- infotext.append( ' ps -f -p %i\n' % p.pid )
- infotext.append( ' \n' )
- infotext.append( ' # kill process:\n' )
- infotext.append( ' kill %i\n' % p.pid )
- infotext.append( ' \n' )
- infotext.append( ' # follow standard output:\n' )
- infotext.append( ' tail -f %s\n' % job_stdout )
- infotext.append( '\n' )
- # write to log:
- for line in infotext : logging.info( line.strip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- return
- # ======================================================================
- # ===
- # === LoadLeveler queue
- # ===
- # ======================================================================
- def QueueOptions_LoadLeveler( bname, rcf, step ) :
- """
- Return list with queue options.
- """
- # external:
- import math
-
- # init result:
- qopt = []
- # which step ?
- if step == 'default' :
-
- # list with options:
- opts = rcf.get( 'queue.ll.options.%s' % step ).split()
- # default options:
- for opt in opts :
- # get value:
- val = rcf.get( 'queue.ll.option.%s.%s' % (step,opt) )
- # write:
- qopt.append( '#@ %-20s = %s\n' % (opt,val) )
- #endfor
- # layout ...
- qopt.append( '\n' )
- else :
-
- # list with options:
- opts = rcf.get( 'queue.ll.options.%s' % step ).split()
- # default options:
- for opt in opts :
- # get value:
- val = rcf.get( 'queue.ll.option.%s.%s' % (step,opt) )
- # to be set ?
- if val == '<auto>' :
- # differs per option ...
- if opt == 'output' :
- val = '%s_%s.out' % (bname,step)
- elif opt == 'error' :
- val = '%s_%s.err' % (bname,step)
- elif opt == 'tasks_per_node' :
- t_mpi = rcf.get( 'par.ntask', 'int' )
- t_omp = rcf.get( 'par.nthread', 'int' )
- val = str(t_omp*t_mpi)
- #endif
- #endif
- # no, empty, or normal value ?
- if val == '<none>' :
- # skip this keyword:
- continue
- elif val == '' :
- # just the keyword:
- qopt.append( '#@ %s\n' % opt )
- else :
- # keyword and value:
- qopt.append( '#@ %-20s = %s\n' % (opt,val) )
- #endif
- #endfor
- # layout ...
- qopt.append( '\n' )
-
- return qopt
- # ***
- def Submit_Job_To_LoadLeveler( job_script, rcf ) :
- """
- Submit job to LoadLeveler queue.
- """
-
- # external:
- import sys
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_info = bname+'.info'
- # options passed directly to submission command:
- qopts = rcf.get( 'queue.ll.submit.options' )
- # add options passed to submit script:
- qopts = qopts+' '+rcf.get('submit.options')
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'llsubmit '+qopts
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
-
- # init submission info file:
- infotext = []
- infotext.append( '\n' )
- # call submit command, trap errors:
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except :
- logging.error( sys.exc_info()[1] )
- logging.error( 'from subprocess.Popen( %s )' % command.split() )
- raise Exception
- #endtry
- # extract:
- outlines = p.stdout.readlines()
- # add to help info message:
- infotext = infotext + outlines
- # extract job id from last line:
- # llsubmit: The job "c1a0303.4290133" with 3 job steps has been submitted.
- firstwords = 'llsubmit: The job'
- lastline = outlines[-1]
- if lastline.startswith(firstwords) :
- job_id = lastline.lstrip(firstwords).split()[0].replace('"','')
- else :
- job_id = '<job-id>'
- #endif
-
- # add help text to submission info:
- infotext.append( '\n' )
- infotext.append( 'To manage LoadLeveler jobs:\n' )
- infotext.append( '\n' )
- infotext.append( ' llq [-u ${USER}] # list [your] current jobs\n' )
- infotext.append( ' llq %s # list this job\n' % job_id )
- infotext.append( ' llcancel %s # kill this job\n' % job_id )
- infotext.append( '\n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- # ok
- return
- #enddef
-
- # ======================================================================
- # ===
- # === BSUB queue
- # ===
- # ======================================================================
- def QueueOptions_BSub( bname, rcf, step ) :
- """
- Return list with queue options.
- """
- # init result:
- qopt = []
- # List of options for specified step:
- opts = rcf.get( 'queue.bsub.options.%s' % step ).split()
- # loop over options:
- for opt in opts :
- # get value:
- val = rcf.get( 'queue.bsub.option.%s.%s' % (step,opt) )
- # default options:
- if val == '<auto>' :
- if opt in ['o','oo'] :
- val = '%s_%s.out' % (bname,step)
- elif opt in ['e','eo'] :
- val = '%s_%s.err' % (bname,step)
- #endif
- #endif # <auto> value
- # define option key:
- # queue.bsub.options.R : 20 -> -R 20
- # queue.bsub.options.Rx : -R 20 -> -R 20
- if val.startswith('-') :
- qopt.append( '#BSUB %s\n' % (val) )
- else :
- qopt.append( '#BSUB -%s %s\n' % (opt,val) )
- #endif
- #endfor # opts
- # layout ...
- qopt.append( '\n' )
-
- # ok
- return qopt
-
- #enddef QueueOptions_BSub
- # ***
- def Submit_Job_To_BSub( job_script, rcf ) :
- """
- Submit job to BSUB queue.
- """
-
- # external:
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_info = bname+'.info'
- # options passed directly to submission command:
- qopts = rcf.get( 'queue.bsub.submit.options' )
- # add options passed to submit script:
- qopts = qopts+' '+rcf.get('submit.options')
-
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'bsub '+qopts
- # pass job script to std.input:
- command = command+' < '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError, err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- raise Exception
- #endtry
- # extract:
- outlines = p.stdout.readlines()
- # display:
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # <jobname> <jobnr>
- # extract job nr:
- job_nr = outlines[0].split()[1].strip('<>')
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' bkill %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' bjobs\n' )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- # ok
- return
- #enddef
-
- # ======================================================================
- # ===
- # === QSUB queue
- # ===
- # ======================================================================
- def QueueOptions_QSub( bname, rcf, step ) :
- """
- Return list with queue options.
- """
- # init result:
- qopt = []
- # list with options:
- opts = rcf.get( 'queue.qsub.options' ).split()
- append_jobstep = rcf.get('queue.qsub.name.append.jobstep',default=False)
- # default options:
- for opt in opts :
- # look first for a jobstep-specific qsub option
- val = rcf.get( 'queue.qsub.option.%s.%s' % (opt,step), default='STEP_SPECIFIC_KEY_MISSING' )
- # if jobstep-specific option is missing, look for one without jobstep
- if val == 'STEP_SPECIFIC_KEY_MISSING':
- val = rcf.get( 'queue.qsub.option.%s' % opt)
- if (opt == 'N') and append_jobstep:
- jobstep = rcf.get('jobstep')
- val = val + '_%03d' % (int(jobstep))
- # fill option line:
- qopt.append( '#PBS -%s %s\n' % (opt,val) )
- #endfor
- # layout ...
- qopt.append( '\n' )
-
- # ok
- return qopt
-
- #enddef
- # ***
- def Submit_Job_To_QSub( job_script, rcf ) :
- """
- Submit job to QSUB queue.
- """
-
- # external:
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_info = bname+'.info'
- # options passed directly to submission command:
- qopts = rcf.get( 'queue.qsub.submit.options' )
- # add options passed to submit script:
- qopts = qopts+' '+rcf.get('submit.options')
-
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'qsub '+qopts
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError, err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- raise Exception
- #endtry
- # extract:
- outlines = p.stdout.readlines()
- # display:
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # jobnr
- # extract job nr:
- job_nr = outlines[0].split()[0]
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' qdel %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' qstat [-u ${USER}]\n' )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- # ok
- return
-
- # ======================================================================
- # ===
- # === PBS queue (PBSpro)
- # ===
- # === Note that there are several implementations of PBS and the "pbs queue"
- # === here, although it also uses the qsub command, differs from the "qsub
- # === queue" defined above. The later uses #$ directives, while the pbsPro
- # === directives start with #PBS.
- # ===
- # ======================================================================
- def QueueOptions_PBS( bname, rcf, step ) :
- """
- Return list with queue options. To call twice, first for pseudo-step
- 'all'. See QueueOptions above.
- """
- import os
- qopt = [] # init result
-
- opts = rcf.get( 'queue.pbs.options.%s' % step ).split() # list with options
- # use fully qualified filename for log files
- if step != 'build':
- fqpath = rcf.get('rundir')
- else:
- fqpath = rcf.get('build.sourcedir')
- fqname = os.path.join( fqpath, bname )
- # build list of job directives
- for opt in opts :
- val = rcf.get( 'queue.pbs.option.%s.%s' % (step,opt) )
- # deal w/ multiple options with -l
- if opt == 'l':
- l_vals = val.split()
- for lval in l_vals: qopt.append( '#PBS -l %s\n' % lval )
- continue
- # options still to be set
- if val == '<auto>' :
- if opt == 'o' :
- val = '%s_%s.out' % (fqname,step)
- elif opt == 'e' :
- val = '%s_%s.err' % (fqname,step)
- # none, empty or plain normal value ?
- if val == '<none>' :
- continue # skip this option
- elif val == '' :
- qopt.append( '#PBS -%s\n' % opt ) # just the keyword
- else :
- qopt.append( '#PBS -%s %s\n' % (opt,val) ) # keyword and value
- qopt.append( '\n' )
- return qopt
- def Submit_Job_To_PBS( job_script, rcf ) :
- """
- Submit job to PBS queue.
- """
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_info = bname+'.info'
- # Two set of options to pass directly unmodified at the submission command :
- qopts = rcf.get( 'queue.pbs.submit.options' ) # (1) from machine or queue (if separated from machine) rcfile
- qopts = qopts+' '+rcf.get('submit.options') # (2) from expert rcfile
-
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'qsub '+qopts
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError, err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- raise Exception
- # log output
- outlines = p.stdout.readlines()
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # jobnr
- # extract job nr:
- job_nr = outlines[0].split()[0]
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' qdel %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' qstat [-u ${USER}]\n' )
- infotext.append( ' \n' )
- infotext.append( 'To monitor this running job (ECMWF only!):\n' )
- infotext.append( ' \n' )
- infotext.append( ' qcat %s\n' % job_nr )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- return job_nr
- # ======================================================================
- # ===
- # === SLURM queue
- # ===
- # ======================================================================
- def QueueOptions_Slurm( bname, rcf, step ) :
- """
- Return list with queue options directives, that will be at the top of
- script.
- Script for run step (NEURON@KNMI) should contains:
- #SBATCH -n ${par.nthread}*${ntask}
-
- export OMP_NUM_THREADS=${par.nthread}
- mpiexec.hydra -machinefile ./mpd.hosts -np ${ntask} ./$bin
-
- """
- qopt = [] # init result
- # List of options for specified step:
- opts = rcf.get( 'queue.slurm.options.%s' % step ).split()
- for opt in opts :
- # get value:
- val = rcf.get( 'queue.slurm.option.%s.%s' % (step,opt) )
- # default options:
- if val == '<auto>' :
- if opt == 'o' :
- val = '%s_%s.out' % (bname,step)
- elif opt == 'e' :
- val = '%s_%s.err' % (bname,step)
- # skip unset (empty) directives
- if opt == 'p' or 'w':
- if len(val)==0 : continue
-
- qopt.append( '#SBATCH -%s %s\n' % (opt,val) )
- # layout ...
- qopt.append( '\n' )
-
- # ok
- return qopt
-
- def Submit_Job_To_Slurm( job_script, rcf ) :
- """
- Submit job to SLURM queue.
- """
-
- # external:
- import os
- import logging
- import subprocess
- # basename for scripts etc is name of rcfile minus extension:
- bname,ext = os.path.splitext(job_script)
-
- # output files:
- job_info = bname+'.info'
- # Two set of options to pass directly unmodified at the submission command :
- qopts = rcf.get( 'queue.slurm.submit.options' ) # (1) from pycassso-queue-slurm.rc
- qopts = qopts+' '+rcf.get('submit.options') # (2) from pycasso-tm5-expert.rc
-
- # info ...
- logging.info( ' launch ...' )
- # setup command line:
- command = 'sbatch '+qopts
-
- # last argument is script:
- command = command+' '+job_script
- # info ...
- logging.info( ' command: %s' % command )
- # prepare for OS errors (file does not exist etc.)
- try:
- # submit; redirect errors to standard output:
- p = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
- except OSError, err :
- logging.error( 'OSError: '+err.strerror )
- logging.error( 'from call : %s' % command )
- raise Exception
-
- # extract:
- outlines = p.stdout.readlines()
- # display:
- for line in outlines : logging.info( ' %s' % line.rstrip() )
- # standard output is:
- # jobnr
- # extract job nr:
- job_nr = outlines[0].split()[0]
- # info ...
- infotext = []
- infotext.append( '\n' )
- infotext.append( 'Summary:\n' )
- infotext.append( '\n' )
- infotext.append( ' current dir : %s\n' % os.getcwd() )
- infotext.append( ' job script : %s\n' % job_script )
- infotext.append( '\n' )
- infotext.append( 'To manage this job:\n' )
- infotext.append( ' \n' )
- infotext.append( ' # kill job:\n' )
- infotext.append( ' scancel %s\n' % job_nr )
- infotext.append( ' \n' )
- infotext.append( 'To show all your running jobs:\n' )
- infotext.append( ' \n' )
- infotext.append( ' squeue [-u ${USER}]\n' )
- infotext.append( ' \n' )
- # write to log:
- for line in infotext : logging.info( line.rstrip() )
- # write to file:
- f = open( job_info, 'w' )
- f.writelines(infotext)
- f.close()
-
- return
- # ======================================================================
- # ===
- # === end
- # ===
- # ======================================================================
|