Package util :: Module ruffus_utils
[hide private]
[frames] | no frames]

Source Code for Module util.ruffus_utils

  1   
  2  from optparse import OptionParser 
  3  import logging 
  4  import logging.handlers 
  5  import os 
  6  import sys 
  7  from collections import defaultdict 
  8  import subprocess 
  9  from time import ctime 
 10   
 11  from ruffus import pipeline_printout, pipeline_printout_graph, pipeline_run 
 12  from ruffus.proxy_logger import make_shared_logger_and_proxy 
 13   
 14   
15 -def ruffus_opt_parser():
16 'creates a ruffus optparse opts object' 17 parser = OptionParser(version="%prog 1.0", usage = "\n\n %progs [options]") 18 # general options: verbosity / logging 19 parser.add_option("-v", "--verbose", dest = "verbose", 20 action="count", default=3, 21 help="Print more verbose messages for each additional verbose level.") 22 # pipeline 23 parser.add_option("-t", "--target_tasks", dest="target_tasks", 24 action="append", 25 default = list(), 26 metavar="JOBNAME", 27 type="string", 28 help="Target task(s) of pipeline.") 29 parser.add_option("-j", "--jobs", dest="jobs", 30 default=4, 31 metavar="N", 32 type="int", 33 help="Allow N jobs (commands) to run simultaneously. (%default by default)") 34 parser.add_option("-n", "--just_print", dest="just_print", 35 action="store_true", default=False, 36 help="Don't actually run any commands; just print the pipeline.") 37 parser.add_option("--flowchart", dest="flowchart", 38 metavar="FILE", 39 type="string", 40 help="Don't actually run any commands; just print the pipeline " 41 "as a flowchart.") 42 # Less common pipeline options 43 parser.add_option("--key_legend_in_graph", dest="key_legend_in_graph", 44 action="store_true", default=False, 45 help="Print out legend and key for dependency graph.") 46 parser.add_option("--forced_tasks", '-f', dest="forced_tasks", 47 action="append", 48 default = list(), 49 metavar="JOBNAME", 50 type="string", 51 help="Pipeline task(s) which will be included even if they are up to date.") 52 parser.add_option("--config_file", '-c', dest="config_file", 53 default='pipeline.cfg', 54 type="string", 55 help="Configuration file for the pipeline") 56 return parser
57
58 -class DefaultLog:
59 log_file = 'ruffus.log' 60 verbose = 4
61
62 -def ruffus_logger(options=None, module_name='pipeline'):
63 'creates a shared logger and mutex' 64 if options is None: 65 options = DefaultLog() 66 logger = logging.getLogger(module_name) 67 _setup_std_logging(logger, options.log_file, options.verbose) 68 def get_logger (logger_name, args): 69 return logger
70 (logger_proxy, 71 logging_mutex) = make_shared_logger_and_proxy (get_logger, module_name, {}) 72 logger_proxy.log_file = options.log_file 73 return logger_proxy, logging_mutex 74
75 -def _setup_std_logging (logger, log_file, verbose):
76 """ 77 set up logging using programme options 78 """ 79 class NullHandler(logging.Handler): 80 """ 81 for when there is no logging 82 """ 83 def emit(self, record): 84 pass
85 # We are interested in all messages 86 logger.setLevel(logging.DEBUG) 87 has_handler = False 88 89 # log to file if that is specified 90 if log_file: 91 handler = logging.FileHandler(log_file, delay=False) 92 handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)6s - %(message)s")) 93 handler.setLevel(logging.DEBUG) 94 logger.addHandler(handler) 95 has_handler = True 96 97 # log to stderr if verbose 98 if verbose: 99 stderrhandler = logging.StreamHandler(sys.stderr) 100 stderrhandler.setFormatter(logging.Formatter(" %(message)s")) 101 stderrhandler.setLevel(logging.DEBUG) 102 logger.addHandler(stderrhandler) 103 has_handler = True 104 105 # no logging 106 if not has_handler: 107 logger.addHandler(NullHandler()) 108 109 main_logger, main_mutex = ruffus_logger() 110
111 -def ruffus_main(options, args):
112 'Main entry point for ruffus pipelines' 113 if options.just_print: 114 pipeline_printout(sys.stdout, options.target_tasks, options.forced_tasks, 115 verbose=options.verbose) 116 elif options.flowchart: 117 pipeline_printout_graph ( open(options.flowchart, "w"), 118 # use flowchart file name extension to decide flowchart format 119 # e.g. svg, jpg etc. 120 os.path.splitext(options.flowchart)[1][1:], 121 options.target_tasks, 122 options.forced_tasks, 123 no_key_legend = not options.key_legend_in_graph) 124 else: 125 pipeline_run(options.target_tasks, options.forced_tasks, 126 multiprocess = options.jobs, 127 logger = main_logger, 128 verbose = options.verbose)
129 130
131 -def sys_call(cmd, logger=main_logger, log_mutex=main_mutex, file_log=True):
132 """Fork a job and write the results to the log file for this process 133 134 stderr and stdout from the forked job are available in LOGFILE.PID 135 """ 136 logfile = '%s.%s.log' % (logger.log_file, os.getpid()) 137 if file_log: 138 cmd += ' 2>&1 | tee -a %s ' % logfile 139 with open(logfile, 'a') as logout: 140 logout.write('\n****** %s ******\n%s\n\n' % (ctime(), cmd)) 141 main_logger.debug(cmd) 142 process = subprocess.Popen([cmd], stdout=subprocess.PIPE, shell=True) 143 output, _ = process.communicate() 144 145 retcode = process.poll() 146 if retcode: 147 with log_mutex: 148 logger.error(output) 149 raise subprocess.CalledProcessError(retcode, cmd) 150 else: 151 with log_mutex: 152 logger.debug(output)
153
154 -def touch(fname, times = None):
155 'touch the given file (update timestamp or create the file)' 156 with file(fname, 'a'): 157 os.utime(fname, times)
158