Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Coverage controllers for use by pytest-cov and nose-cov.""" 

2import contextlib 

3import copy 

4import os 

5import random 

6import socket 

7import sys 

8 

9import coverage 

10from coverage.data import CoverageData 

11 

12from .compat import StringIO 

13from .compat import workerinput 

14from .compat import workeroutput 

15from .embed import cleanup 

16 

17 

18class _NullFile(object): 

19 @staticmethod 

20 def write(v): 

21 pass 

22 

23 

24@contextlib.contextmanager 

25def _backup(obj, attr): 

26 backup = getattr(obj, attr) 

27 try: 

28 setattr(obj, attr, copy.copy(backup)) 

29 yield 

30 finally: 

31 setattr(obj, attr, backup) 

32 

33 

34class CovController(object): 

35 """Base class for different plugin implementations.""" 

36 

37 def __init__(self, cov_source, cov_report, cov_config, cov_append, cov_branch, config=None, nodeid=None): 

38 """Get some common config used by multiple derived classes.""" 

39 self.cov_source = cov_source 

40 self.cov_report = cov_report 

41 self.cov_config = cov_config 

42 self.cov_append = cov_append 

43 self.cov_branch = cov_branch 

44 self.config = config 

45 self.nodeid = nodeid 

46 

47 self.cov = None 

48 self.combining_cov = None 

49 self.data_file = None 

50 self.node_descs = set() 

51 self.failed_workers = [] 

52 self.topdir = os.getcwd() 

53 

54 def pause(self): 

55 self.cov.stop() 

56 self.unset_env() 

57 

58 def resume(self): 

59 self.cov.start() 

60 self.set_env() 

61 

62 def set_env(self): 

63 """Put info about coverage into the env so that subprocesses can activate coverage.""" 

64 if self.cov_source is None: 

65 os.environ['COV_CORE_SOURCE'] = os.pathsep 

66 else: 

67 os.environ['COV_CORE_SOURCE'] = os.pathsep.join(self.cov_source) 

68 config_file = os.path.abspath(self.cov_config) 

69 if os.path.exists(config_file): 

70 os.environ['COV_CORE_CONFIG'] = config_file 

71 else: 

72 os.environ['COV_CORE_CONFIG'] = os.pathsep 

73 os.environ['COV_CORE_DATAFILE'] = os.path.abspath(self.cov.config.data_file) 

74 if self.cov_branch: 

75 os.environ['COV_CORE_BRANCH'] = 'enabled' 

76 

77 @staticmethod 

78 def unset_env(): 

79 """Remove coverage info from env.""" 

80 os.environ.pop('COV_CORE_SOURCE', None) 

81 os.environ.pop('COV_CORE_CONFIG', None) 

82 os.environ.pop('COV_CORE_DATAFILE', None) 

83 os.environ.pop('COV_CORE_BRANCH', None) 

84 

85 @staticmethod 

86 def get_node_desc(platform, version_info): 

87 """Return a description of this node.""" 

88 

89 return 'platform %s, python %s' % (platform, '%s.%s.%s-%s-%s' % version_info[:5]) 

90 

91 @staticmethod 

92 def sep(stream, s, txt): 

93 if hasattr(stream, 'sep'): 

94 stream.sep(s, txt) 

95 else: 

96 sep_total = max((70 - 2 - len(txt)), 2) 

97 sep_len = sep_total // 2 

98 sep_extra = sep_total % 2 

99 out = '%s %s %s\n' % (s * sep_len, txt, s * (sep_len + sep_extra)) 

100 stream.write(out) 

101 

102 def summary(self, stream): 

103 """Produce coverage reports.""" 

104 total = None 

105 

106 if not self.cov_report: 

107 with _backup(self.cov, "config"): 

108 return self.cov.report(show_missing=True, ignore_errors=True, file=_NullFile) 

109 

110 # Output coverage section header. 

111 if len(self.node_descs) == 1: 

112 self.sep(stream, '-', 'coverage: %s' % ''.join(self.node_descs)) 

113 else: 

114 self.sep(stream, '-', 'coverage') 

115 for node_desc in sorted(self.node_descs): 

116 self.sep(stream, ' ', '%s' % node_desc) 

117 

118 # Report on any failed workers. 

119 if self.failed_workers: 

120 self.sep(stream, '-', 'coverage: failed workers') 

121 stream.write('The following workers failed to return coverage data, ' 

122 'ensure that pytest-cov is installed on these workers.\n') 

123 for node in self.failed_workers: 

124 stream.write('%s\n' % node.gateway.id) 

125 

126 # Produce terminal report if wanted. 

127 if any(x in self.cov_report for x in ['term', 'term-missing']): 

128 options = { 

129 'show_missing': ('term-missing' in self.cov_report) or None, 

130 'ignore_errors': True, 

131 'file': stream, 

132 } 

133 skip_covered = isinstance(self.cov_report, dict) and 'skip-covered' in self.cov_report.values() 

134 options.update({'skip_covered': skip_covered or None}) 

135 with _backup(self.cov, "config"): 

136 total = self.cov.report(**options) 

137 

138 # Produce annotated source code report if wanted. 

139 if 'annotate' in self.cov_report: 

140 annotate_dir = self.cov_report['annotate'] 

141 

142 with _backup(self.cov, "config"): 

143 self.cov.annotate(ignore_errors=True, directory=annotate_dir) 

144 # We need to call Coverage.report here, just to get the total 

145 # Coverage.annotate don't return any total and we need it for --cov-fail-under. 

146 

147 with _backup(self.cov, "config"): 

148 total = self.cov.report(ignore_errors=True, file=_NullFile) 

149 if annotate_dir: 

150 stream.write('Coverage annotated source written to dir %s\n' % annotate_dir) 

151 else: 

152 stream.write('Coverage annotated source written next to source\n') 

153 

154 # Produce html report if wanted. 

155 if 'html' in self.cov_report: 

156 output = self.cov_report['html'] 

157 with _backup(self.cov, "config"): 

158 total = self.cov.html_report(ignore_errors=True, directory=output) 

159 stream.write('Coverage HTML written to dir %s\n' % (self.cov.config.html_dir if output is None else output)) 

160 

161 # Produce xml report if wanted. 

162 if 'xml' in self.cov_report: 

163 output = self.cov_report['xml'] 

164 with _backup(self.cov, "config"): 

165 total = self.cov.xml_report(ignore_errors=True, outfile=output) 

166 stream.write('Coverage XML written to file %s\n' % (self.cov.config.xml_output if output is None else output)) 

167 

168 return total 

169 

170 

171class Central(CovController): 

172 """Implementation for centralised operation.""" 

173 

174 def start(self): 

175 cleanup() 

176 

177 self.cov = coverage.Coverage(source=self.cov_source, 

178 branch=self.cov_branch, 

179 config_file=self.cov_config) 

180 self.combining_cov = coverage.Coverage(source=self.cov_source, 

181 branch=self.cov_branch, 

182 data_file=os.path.abspath(self.cov.config.data_file), 

183 config_file=self.cov_config) 

184 

185 # Erase or load any previous coverage data and start coverage. 

186 if self.cov_append: 

187 self.cov.load() 

188 else: 

189 self.cov.erase() 

190 self.cov.start() 

191 self.set_env() 

192 

193 def finish(self): 

194 """Stop coverage, save data to file and set the list of coverage objects to report on.""" 

195 

196 self.unset_env() 

197 self.cov.stop() 

198 self.cov.save() 

199 

200 self.cov = self.combining_cov 

201 self.cov.load() 

202 self.cov.combine() 

203 self.cov.save() 

204 

205 node_desc = self.get_node_desc(sys.platform, sys.version_info) 

206 self.node_descs.add(node_desc) 

207 

208 

209class DistMaster(CovController): 

210 """Implementation for distributed master.""" 

211 

212 def start(self): 

213 cleanup() 

214 

215 # Ensure coverage rc file rsynced if appropriate. 

216 if self.cov_config and os.path.exists(self.cov_config): 

217 self.config.option.rsyncdir.append(self.cov_config) 

218 

219 self.cov = coverage.Coverage(source=self.cov_source, 

220 branch=self.cov_branch, 

221 config_file=self.cov_config) 

222 self.combining_cov = coverage.Coverage(source=self.cov_source, 

223 branch=self.cov_branch, 

224 data_file=os.path.abspath(self.cov.config.data_file), 

225 config_file=self.cov_config) 

226 if self.cov_append: 

227 self.cov.load() 

228 else: 

229 self.cov.erase() 

230 self.cov.start() 

231 self.cov.config.paths['source'] = [self.topdir] 

232 

233 def configure_node(self, node): 

234 """Workers need to know if they are collocated and what files have moved.""" 

235 

236 workerinput(node).update({ 

237 'cov_master_host': socket.gethostname(), 

238 'cov_master_topdir': self.topdir, 

239 'cov_master_rsync_roots': [str(root) for root in node.nodemanager.roots], 

240 }) 

241 

242 def testnodedown(self, node, error): 

243 """Collect data file name from worker.""" 

244 

245 # If worker doesn't return any data then it is likely that this 

246 # plugin didn't get activated on the worker side. 

247 output = workeroutput(node, {}) 

248 if 'cov_worker_node_id' not in output: 

249 self.failed_workers.append(node) 

250 return 

251 

252 # If worker is not collocated then we must save the data file 

253 # that it returns to us. 

254 if 'cov_worker_data' in output: 

255 data_suffix = '%s.%s.%06d.%s' % ( 

256 socket.gethostname(), os.getpid(), 

257 random.randint(0, 999999), 

258 output['cov_worker_node_id'] 

259 ) 

260 

261 cov = coverage.Coverage(source=self.cov_source, 

262 branch=self.cov_branch, 

263 data_suffix=data_suffix, 

264 config_file=self.cov_config) 

265 cov.start() 

266 if coverage.version_info < (5, 0): 

267 data = CoverageData() 

268 data.read_fileobj(StringIO(output['cov_worker_data'])) 

269 cov.data.update(data) 

270 else: 

271 data = CoverageData(no_disk=True) 

272 data.loads(output['cov_worker_data']) 

273 cov.get_data().update(data) 

274 cov.stop() 

275 cov.save() 

276 path = output['cov_worker_path'] 

277 self.cov.config.paths['source'].append(path) 

278 

279 # Record the worker types that contribute to the data file. 

280 rinfo = node.gateway._rinfo() 

281 node_desc = self.get_node_desc(rinfo.platform, rinfo.version_info) 

282 self.node_descs.add(node_desc) 

283 

284 def finish(self): 

285 """Combines coverage data and sets the list of coverage objects to report on.""" 

286 

287 # Combine all the suffix files into the data file. 

288 self.cov.stop() 

289 self.cov.save() 

290 self.cov = self.combining_cov 

291 self.cov.load() 

292 self.cov.combine() 

293 self.cov.save() 

294 

295 

296class DistWorker(CovController): 

297 """Implementation for distributed workers.""" 

298 

299 def start(self): 

300 cleanup() 

301 

302 # Determine whether we are collocated with master. 

303 self.is_collocated = (socket.gethostname() == workerinput(self.config)['cov_master_host'] and 

304 self.topdir == workerinput(self.config)['cov_master_topdir']) 

305 

306 # If we are not collocated then rewrite master paths to worker paths. 

307 if not self.is_collocated: 

308 master_topdir = workerinput(self.config)['cov_master_topdir'] 

309 worker_topdir = self.topdir 

310 if self.cov_source is not None: 

311 self.cov_source = [source.replace(master_topdir, worker_topdir) 

312 for source in self.cov_source] 

313 self.cov_config = self.cov_config.replace(master_topdir, worker_topdir) 

314 

315 # Erase any previous data and start coverage. 

316 self.cov = coverage.Coverage(source=self.cov_source, 

317 branch=self.cov_branch, 

318 data_suffix=True, 

319 config_file=self.cov_config) 

320 self.cov.start() 

321 self.set_env() 

322 

323 def finish(self): 

324 """Stop coverage and send relevant info back to the master.""" 

325 self.unset_env() 

326 self.cov.stop() 

327 

328 if self.is_collocated: 

329 # We don't combine data if we're collocated - we can get 

330 # race conditions in the .combine() call (it's not atomic) 

331 # The data is going to be combined in the master. 

332 self.cov.save() 

333 

334 # If we are collocated then just inform the master of our 

335 # data file to indicate that we have finished. 

336 workeroutput(self.config)['cov_worker_node_id'] = self.nodeid 

337 else: 

338 self.cov.combine() 

339 self.cov.save() 

340 # If we are not collocated then add the current path 

341 # and coverage data to the output so we can combine 

342 # it on the master node. 

343 

344 # Send all the data to the master over the channel. 

345 if coverage.version_info < (5, 0): 

346 buff = StringIO() 

347 self.cov.data.write_fileobj(buff) 

348 data = buff.getvalue() 

349 else: 

350 data = self.cov.get_data().dumps() 

351 

352 workeroutput(self.config).update({ 

353 'cov_worker_path': self.topdir, 

354 'cov_worker_node_id': self.nodeid, 

355 'cov_worker_data': data, 

356 }) 

357 

358 def summary(self, stream): 

359 """Only the master reports so do nothing.""" 

360 

361 pass