1
2 from optparse import OptionParser
3 import logging
4 import logging.handlers
5 import os
6 import sys
7 from collections import defaultdict
8 import subprocess
9 from time import ctime
10
11 from ruffus import pipeline_printout, pipeline_printout_graph, pipeline_run
12 from ruffus.proxy_logger import make_shared_logger_and_proxy
13
14
16 'creates a ruffus optparse opts object'
17 parser = OptionParser(version="%prog 1.0", usage = "\n\n %progs [options]")
18
19 parser.add_option("-v", "--verbose", dest = "verbose",
20 action="count", default=3,
21 help="Print more verbose messages for each additional verbose level.")
22
23 parser.add_option("-t", "--target_tasks", dest="target_tasks",
24 action="append",
25 default = list(),
26 metavar="JOBNAME",
27 type="string",
28 help="Target task(s) of pipeline.")
29 parser.add_option("-j", "--jobs", dest="jobs",
30 default=4,
31 metavar="N",
32 type="int",
33 help="Allow N jobs (commands) to run simultaneously. (%default by default)")
34 parser.add_option("-n", "--just_print", dest="just_print",
35 action="store_true", default=False,
36 help="Don't actually run any commands; just print the pipeline.")
37 parser.add_option("--flowchart", dest="flowchart",
38 metavar="FILE",
39 type="string",
40 help="Don't actually run any commands; just print the pipeline "
41 "as a flowchart.")
42
43 parser.add_option("--key_legend_in_graph", dest="key_legend_in_graph",
44 action="store_true", default=False,
45 help="Print out legend and key for dependency graph.")
46 parser.add_option("--forced_tasks", '-f', dest="forced_tasks",
47 action="append",
48 default = list(),
49 metavar="JOBNAME",
50 type="string",
51 help="Pipeline task(s) which will be included even if they are up to date.")
52 parser.add_option("--config_file", '-c', dest="config_file",
53 default='pipeline.cfg',
54 type="string",
55 help="Configuration file for the pipeline")
56 return parser
57
61
63 'creates a shared logger and mutex'
64 if options is None:
65 options = DefaultLog()
66 logger = logging.getLogger(module_name)
67 _setup_std_logging(logger, options.log_file, options.verbose)
68 def get_logger (logger_name, args):
69 return logger
70 (logger_proxy,
71 logging_mutex) = make_shared_logger_and_proxy (get_logger, module_name, {})
72 logger_proxy.log_file = options.log_file
73 return logger_proxy, logging_mutex
74
76 """
77 set up logging using programme options
78 """
79 class NullHandler(logging.Handler):
80 """
81 for when there is no logging
82 """
83 def emit(self, record):
84 pass
85
86 logger.setLevel(logging.DEBUG)
87 has_handler = False
88
89
90 if log_file:
91 handler = logging.FileHandler(log_file, delay=False)
92 handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)6s - %(message)s"))
93 handler.setLevel(logging.DEBUG)
94 logger.addHandler(handler)
95 has_handler = True
96
97
98 if verbose:
99 stderrhandler = logging.StreamHandler(sys.stderr)
100 stderrhandler.setFormatter(logging.Formatter(" %(message)s"))
101 stderrhandler.setLevel(logging.DEBUG)
102 logger.addHandler(stderrhandler)
103 has_handler = True
104
105
106 if not has_handler:
107 logger.addHandler(NullHandler())
108
109 main_logger, main_mutex = ruffus_logger()
110
111 -def ruffus_main(options, args):
112 'Main entry point for ruffus pipelines'
113 if options.just_print:
114 pipeline_printout(sys.stdout, options.target_tasks, options.forced_tasks,
115 verbose=options.verbose)
116 elif options.flowchart:
117 pipeline_printout_graph ( open(options.flowchart, "w"),
118
119
120 os.path.splitext(options.flowchart)[1][1:],
121 options.target_tasks,
122 options.forced_tasks,
123 no_key_legend = not options.key_legend_in_graph)
124 else:
125 pipeline_run(options.target_tasks, options.forced_tasks,
126 multiprocess = options.jobs,
127 logger = main_logger,
128 verbose = options.verbose)
129
130
131 -def sys_call(cmd, logger=main_logger, log_mutex=main_mutex, file_log=True):
132 """Fork a job and write the results to the log file for this process
133
134 stderr and stdout from the forked job are available in LOGFILE.PID
135 """
136 logfile = '%s.%s.log' % (logger.log_file, os.getpid())
137 if file_log:
138 cmd += ' 2>&1 | tee -a %s ' % logfile
139 with open(logfile, 'a') as logout:
140 logout.write('\n****** %s ******\n%s\n\n' % (ctime(), cmd))
141 main_logger.debug(cmd)
142 process = subprocess.Popen([cmd], stdout=subprocess.PIPE, shell=True)
143 output, _ = process.communicate()
144
145 retcode = process.poll()
146 if retcode:
147 with log_mutex:
148 logger.error(output)
149 raise subprocess.CalledProcessError(retcode, cmd)
150 else:
151 with log_mutex:
152 logger.debug(output)
153
154 -def touch(fname, times = None):
155 'touch the given file (update timestamp or create the file)'
156 with file(fname, 'a'):
157 os.utime(fname, times)
158