Coverage for /usr/local/lib/python3.7/site-packages/pytest_cov/engine.py : 6%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Coverage controllers for use by pytest-cov and nose-cov."""
2import contextlib
3import copy
4import os
5import random
6import socket
7import sys
9import coverage
10from coverage.data import CoverageData
12from .compat import StringIO
13from .compat import workerinput
14from .compat import workeroutput
15from .embed import cleanup
18class _NullFile(object):
19 @staticmethod
20 def write(v):
21 pass
24@contextlib.contextmanager
25def _backup(obj, attr):
26 backup = getattr(obj, attr)
27 try:
28 setattr(obj, attr, copy.copy(backup))
29 yield
30 finally:
31 setattr(obj, attr, backup)
34class CovController(object):
35 """Base class for different plugin implementations."""
37 def __init__(self, cov_source, cov_report, cov_config, cov_append, cov_branch, config=None, nodeid=None):
38 """Get some common config used by multiple derived classes."""
39 self.cov_source = cov_source
40 self.cov_report = cov_report
41 self.cov_config = cov_config
42 self.cov_append = cov_append
43 self.cov_branch = cov_branch
44 self.config = config
45 self.nodeid = nodeid
47 self.cov = None
48 self.combining_cov = None
49 self.data_file = None
50 self.node_descs = set()
51 self.failed_workers = []
52 self.topdir = os.getcwd()
54 def pause(self):
55 self.cov.stop()
56 self.unset_env()
58 def resume(self):
59 self.cov.start()
60 self.set_env()
62 def set_env(self):
63 """Put info about coverage into the env so that subprocesses can activate coverage."""
64 if self.cov_source is None:
65 os.environ['COV_CORE_SOURCE'] = os.pathsep
66 else:
67 os.environ['COV_CORE_SOURCE'] = os.pathsep.join(self.cov_source)
68 config_file = os.path.abspath(self.cov_config)
69 if os.path.exists(config_file):
70 os.environ['COV_CORE_CONFIG'] = config_file
71 else:
72 os.environ['COV_CORE_CONFIG'] = os.pathsep
73 os.environ['COV_CORE_DATAFILE'] = os.path.abspath(self.cov.config.data_file)
74 if self.cov_branch:
75 os.environ['COV_CORE_BRANCH'] = 'enabled'
77 @staticmethod
78 def unset_env():
79 """Remove coverage info from env."""
80 os.environ.pop('COV_CORE_SOURCE', None)
81 os.environ.pop('COV_CORE_CONFIG', None)
82 os.environ.pop('COV_CORE_DATAFILE', None)
83 os.environ.pop('COV_CORE_BRANCH', None)
85 @staticmethod
86 def get_node_desc(platform, version_info):
87 """Return a description of this node."""
89 return 'platform %s, python %s' % (platform, '%s.%s.%s-%s-%s' % version_info[:5])
91 @staticmethod
92 def sep(stream, s, txt):
93 if hasattr(stream, 'sep'):
94 stream.sep(s, txt)
95 else:
96 sep_total = max((70 - 2 - len(txt)), 2)
97 sep_len = sep_total // 2
98 sep_extra = sep_total % 2
99 out = '%s %s %s\n' % (s * sep_len, txt, s * (sep_len + sep_extra))
100 stream.write(out)
102 def summary(self, stream):
103 """Produce coverage reports."""
104 total = None
106 if not self.cov_report:
107 with _backup(self.cov, "config"):
108 return self.cov.report(show_missing=True, ignore_errors=True, file=_NullFile)
110 # Output coverage section header.
111 if len(self.node_descs) == 1:
112 self.sep(stream, '-', 'coverage: %s' % ''.join(self.node_descs))
113 else:
114 self.sep(stream, '-', 'coverage')
115 for node_desc in sorted(self.node_descs):
116 self.sep(stream, ' ', '%s' % node_desc)
118 # Report on any failed workers.
119 if self.failed_workers:
120 self.sep(stream, '-', 'coverage: failed workers')
121 stream.write('The following workers failed to return coverage data, '
122 'ensure that pytest-cov is installed on these workers.\n')
123 for node in self.failed_workers:
124 stream.write('%s\n' % node.gateway.id)
126 # Produce terminal report if wanted.
127 if any(x in self.cov_report for x in ['term', 'term-missing']):
128 options = {
129 'show_missing': ('term-missing' in self.cov_report) or None,
130 'ignore_errors': True,
131 'file': stream,
132 }
133 skip_covered = isinstance(self.cov_report, dict) and 'skip-covered' in self.cov_report.values()
134 options.update({'skip_covered': skip_covered or None})
135 with _backup(self.cov, "config"):
136 total = self.cov.report(**options)
138 # Produce annotated source code report if wanted.
139 if 'annotate' in self.cov_report:
140 annotate_dir = self.cov_report['annotate']
142 with _backup(self.cov, "config"):
143 self.cov.annotate(ignore_errors=True, directory=annotate_dir)
144 # We need to call Coverage.report here, just to get the total
145 # Coverage.annotate don't return any total and we need it for --cov-fail-under.
147 with _backup(self.cov, "config"):
148 total = self.cov.report(ignore_errors=True, file=_NullFile)
149 if annotate_dir:
150 stream.write('Coverage annotated source written to dir %s\n' % annotate_dir)
151 else:
152 stream.write('Coverage annotated source written next to source\n')
154 # Produce html report if wanted.
155 if 'html' in self.cov_report:
156 output = self.cov_report['html']
157 with _backup(self.cov, "config"):
158 total = self.cov.html_report(ignore_errors=True, directory=output)
159 stream.write('Coverage HTML written to dir %s\n' % (self.cov.config.html_dir if output is None else output))
161 # Produce xml report if wanted.
162 if 'xml' in self.cov_report:
163 output = self.cov_report['xml']
164 with _backup(self.cov, "config"):
165 total = self.cov.xml_report(ignore_errors=True, outfile=output)
166 stream.write('Coverage XML written to file %s\n' % (self.cov.config.xml_output if output is None else output))
168 return total
171class Central(CovController):
172 """Implementation for centralised operation."""
174 def start(self):
175 cleanup()
177 self.cov = coverage.Coverage(source=self.cov_source,
178 branch=self.cov_branch,
179 config_file=self.cov_config)
180 self.combining_cov = coverage.Coverage(source=self.cov_source,
181 branch=self.cov_branch,
182 data_file=os.path.abspath(self.cov.config.data_file),
183 config_file=self.cov_config)
185 # Erase or load any previous coverage data and start coverage.
186 if self.cov_append:
187 self.cov.load()
188 else:
189 self.cov.erase()
190 self.cov.start()
191 self.set_env()
193 def finish(self):
194 """Stop coverage, save data to file and set the list of coverage objects to report on."""
196 self.unset_env()
197 self.cov.stop()
198 self.cov.save()
200 self.cov = self.combining_cov
201 self.cov.load()
202 self.cov.combine()
203 self.cov.save()
205 node_desc = self.get_node_desc(sys.platform, sys.version_info)
206 self.node_descs.add(node_desc)
209class DistMaster(CovController):
210 """Implementation for distributed master."""
212 def start(self):
213 cleanup()
215 # Ensure coverage rc file rsynced if appropriate.
216 if self.cov_config and os.path.exists(self.cov_config):
217 self.config.option.rsyncdir.append(self.cov_config)
219 self.cov = coverage.Coverage(source=self.cov_source,
220 branch=self.cov_branch,
221 config_file=self.cov_config)
222 self.combining_cov = coverage.Coverage(source=self.cov_source,
223 branch=self.cov_branch,
224 data_file=os.path.abspath(self.cov.config.data_file),
225 config_file=self.cov_config)
226 if self.cov_append:
227 self.cov.load()
228 else:
229 self.cov.erase()
230 self.cov.start()
231 self.cov.config.paths['source'] = [self.topdir]
233 def configure_node(self, node):
234 """Workers need to know if they are collocated and what files have moved."""
236 workerinput(node).update({
237 'cov_master_host': socket.gethostname(),
238 'cov_master_topdir': self.topdir,
239 'cov_master_rsync_roots': [str(root) for root in node.nodemanager.roots],
240 })
242 def testnodedown(self, node, error):
243 """Collect data file name from worker."""
245 # If worker doesn't return any data then it is likely that this
246 # plugin didn't get activated on the worker side.
247 output = workeroutput(node, {})
248 if 'cov_worker_node_id' not in output:
249 self.failed_workers.append(node)
250 return
252 # If worker is not collocated then we must save the data file
253 # that it returns to us.
254 if 'cov_worker_data' in output:
255 data_suffix = '%s.%s.%06d.%s' % (
256 socket.gethostname(), os.getpid(),
257 random.randint(0, 999999),
258 output['cov_worker_node_id']
259 )
261 cov = coverage.Coverage(source=self.cov_source,
262 branch=self.cov_branch,
263 data_suffix=data_suffix,
264 config_file=self.cov_config)
265 cov.start()
266 if coverage.version_info < (5, 0):
267 data = CoverageData()
268 data.read_fileobj(StringIO(output['cov_worker_data']))
269 cov.data.update(data)
270 else:
271 data = CoverageData(no_disk=True)
272 data.loads(output['cov_worker_data'])
273 cov.get_data().update(data)
274 cov.stop()
275 cov.save()
276 path = output['cov_worker_path']
277 self.cov.config.paths['source'].append(path)
279 # Record the worker types that contribute to the data file.
280 rinfo = node.gateway._rinfo()
281 node_desc = self.get_node_desc(rinfo.platform, rinfo.version_info)
282 self.node_descs.add(node_desc)
284 def finish(self):
285 """Combines coverage data and sets the list of coverage objects to report on."""
287 # Combine all the suffix files into the data file.
288 self.cov.stop()
289 self.cov.save()
290 self.cov = self.combining_cov
291 self.cov.load()
292 self.cov.combine()
293 self.cov.save()
296class DistWorker(CovController):
297 """Implementation for distributed workers."""
299 def start(self):
300 cleanup()
302 # Determine whether we are collocated with master.
303 self.is_collocated = (socket.gethostname() == workerinput(self.config)['cov_master_host'] and
304 self.topdir == workerinput(self.config)['cov_master_topdir'])
306 # If we are not collocated then rewrite master paths to worker paths.
307 if not self.is_collocated:
308 master_topdir = workerinput(self.config)['cov_master_topdir']
309 worker_topdir = self.topdir
310 if self.cov_source is not None:
311 self.cov_source = [source.replace(master_topdir, worker_topdir)
312 for source in self.cov_source]
313 self.cov_config = self.cov_config.replace(master_topdir, worker_topdir)
315 # Erase any previous data and start coverage.
316 self.cov = coverage.Coverage(source=self.cov_source,
317 branch=self.cov_branch,
318 data_suffix=True,
319 config_file=self.cov_config)
320 self.cov.start()
321 self.set_env()
323 def finish(self):
324 """Stop coverage and send relevant info back to the master."""
325 self.unset_env()
326 self.cov.stop()
328 if self.is_collocated:
329 # We don't combine data if we're collocated - we can get
330 # race conditions in the .combine() call (it's not atomic)
331 # The data is going to be combined in the master.
332 self.cov.save()
334 # If we are collocated then just inform the master of our
335 # data file to indicate that we have finished.
336 workeroutput(self.config)['cov_worker_node_id'] = self.nodeid
337 else:
338 self.cov.combine()
339 self.cov.save()
340 # If we are not collocated then add the current path
341 # and coverage data to the output so we can combine
342 # it on the master node.
344 # Send all the data to the master over the channel.
345 if coverage.version_info < (5, 0):
346 buff = StringIO()
347 self.cov.data.write_fileobj(buff)
348 data = buff.getvalue()
349 else:
350 data = self.cov.get_data().dumps()
352 workeroutput(self.config).update({
353 'cov_worker_path': self.topdir,
354 'cov_worker_node_id': self.nodeid,
355 'cov_worker_data': data,
356 })
358 def summary(self, stream):
359 """Only the master reports so do nothing."""
361 pass