Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import inspect 

2from . import _tracing 

3from .hooks import HookImpl, _HookRelay, _HookCaller, normalize_hookimpl_opts 

4import warnings 

5 

6import importlib_metadata 

7 

8 

9def _warn_for_function(warning, function): 

10 warnings.warn_explicit( 

11 warning, 

12 type(warning), 

13 lineno=function.__code__.co_firstlineno, 

14 filename=function.__code__.co_filename, 

15 ) 

16 

17 

18class PluginValidationError(Exception): 

19 """ plugin failed validation. 

20 

21 :param object plugin: the plugin which failed validation, 

22 may be a module or an arbitrary object. 

23 """ 

24 

25 def __init__(self, plugin, message): 

26 self.plugin = plugin 

27 super(Exception, self).__init__(message) 

28 

29 

30class DistFacade(object): 

31 """Emulate a pkg_resources Distribution""" 

32 

33 def __init__(self, dist): 

34 self._dist = dist 

35 

36 @property 

37 def project_name(self): 

38 return self.metadata["name"] 

39 

40 def __getattr__(self, attr, default=None): 

41 return getattr(self._dist, attr, default) 

42 

43 def __dir__(self): 

44 return sorted(dir(self._dist) + ["_dist", "project_name"]) 

45 

46 

47class PluginManager(object): 

48 """ Core Pluginmanager class which manages registration 

49 of plugin objects and 1:N hook calling. 

50 

51 You can register new hooks by calling ``add_hookspecs(module_or_class)``. 

52 You can register plugin objects (which contain hooks) by calling 

53 ``register(plugin)``. The Pluginmanager is initialized with a 

54 prefix that is searched for in the names of the dict of registered 

55 plugin objects. 

56 

57 For debugging purposes you can call ``enable_tracing()`` 

58 which will subsequently send debug information to the trace helper. 

59 """ 

60 

61 def __init__(self, project_name, implprefix=None): 

62 """If ``implprefix`` is given implementation functions 

63 will be recognized if their name matches the implprefix. """ 

64 self.project_name = project_name 

65 self._name2plugin = {} 

66 self._plugin2hookcallers = {} 

67 self._plugin_distinfo = [] 

68 self.trace = _tracing.TagTracer().get("pluginmanage") 

69 self.hook = _HookRelay(self.trace.root.get("hook")) 

70 if implprefix is not None: 

71 warnings.warn( 

72 "Support for the `implprefix` arg is now deprecated and will " 

73 "be removed in an upcoming release. Please use HookimplMarker.", 

74 DeprecationWarning, 

75 stacklevel=2, 

76 ) 

77 self._implprefix = implprefix 

78 self._inner_hookexec = lambda hook, methods, kwargs: hook.multicall( 

79 methods, 

80 kwargs, 

81 firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, 

82 ) 

83 

84 def _hookexec(self, hook, methods, kwargs): 

85 # called from all hookcaller instances. 

86 # enable_tracing will set its own wrapping function at self._inner_hookexec 

87 return self._inner_hookexec(hook, methods, kwargs) 

88 

89 def register(self, plugin, name=None): 

90 """ Register a plugin and return its canonical name or None if the name 

91 is blocked from registering. Raise a ValueError if the plugin is already 

92 registered. """ 

93 plugin_name = name or self.get_canonical_name(plugin) 

94 

95 if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers: 

96 if self._name2plugin.get(plugin_name, -1) is None: 

97 return # blocked plugin, return None to indicate no registration 

98 raise ValueError( 

99 "Plugin already registered: %s=%s\n%s" 

100 % (plugin_name, plugin, self._name2plugin) 

101 ) 

102 

103 # XXX if an error happens we should make sure no state has been 

104 # changed at point of return 

105 self._name2plugin[plugin_name] = plugin 

106 

107 # register matching hook implementations of the plugin 

108 self._plugin2hookcallers[plugin] = hookcallers = [] 

109 for name in dir(plugin): 

110 hookimpl_opts = self.parse_hookimpl_opts(plugin, name) 

111 if hookimpl_opts is not None: 

112 normalize_hookimpl_opts(hookimpl_opts) 

113 method = getattr(plugin, name) 

114 hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts) 

115 hook = getattr(self.hook, name, None) 

116 if hook is None: 

117 hook = _HookCaller(name, self._hookexec) 

118 setattr(self.hook, name, hook) 

119 elif hook.has_spec(): 

120 self._verify_hook(hook, hookimpl) 

121 hook._maybe_apply_history(hookimpl) 

122 hook._add_hookimpl(hookimpl) 

123 hookcallers.append(hook) 

124 return plugin_name 

125 

126 def parse_hookimpl_opts(self, plugin, name): 

127 method = getattr(plugin, name) 

128 if not inspect.isroutine(method): 

129 return 

130 try: 

131 res = getattr(method, self.project_name + "_impl", None) 

132 except Exception: 

133 res = {} 

134 if res is not None and not isinstance(res, dict): 

135 # false positive 

136 res = None 

137 # TODO: remove when we drop implprefix in 1.0 

138 elif res is None and self._implprefix and name.startswith(self._implprefix): 

139 _warn_for_function( 

140 DeprecationWarning( 

141 "The `implprefix` system is deprecated please decorate " 

142 "this function using an instance of HookimplMarker." 

143 ), 

144 method, 

145 ) 

146 res = {} 

147 return res 

148 

149 def unregister(self, plugin=None, name=None): 

150 """ unregister a plugin object and all its contained hook implementations 

151 from internal data structures. """ 

152 if name is None: 

153 assert plugin is not None, "one of name or plugin needs to be specified" 

154 name = self.get_name(plugin) 

155 

156 if plugin is None: 

157 plugin = self.get_plugin(name) 

158 

159 # if self._name2plugin[name] == None registration was blocked: ignore 

160 if self._name2plugin.get(name): 

161 del self._name2plugin[name] 

162 

163 for hookcaller in self._plugin2hookcallers.pop(plugin, []): 

164 hookcaller._remove_plugin(plugin) 

165 

166 return plugin 

167 

168 def set_blocked(self, name): 

169 """ block registrations of the given name, unregister if already registered. """ 

170 self.unregister(name=name) 

171 self._name2plugin[name] = None 

172 

173 def is_blocked(self, name): 

174 """ return True if the given plugin name is blocked. """ 

175 return name in self._name2plugin and self._name2plugin[name] is None 

176 

177 def add_hookspecs(self, module_or_class): 

178 """ add new hook specifications defined in the given module_or_class. 

179 Functions are recognized if they have been decorated accordingly. """ 

180 names = [] 

181 for name in dir(module_or_class): 

182 spec_opts = self.parse_hookspec_opts(module_or_class, name) 

183 if spec_opts is not None: 

184 hc = getattr(self.hook, name, None) 

185 if hc is None: 

186 hc = _HookCaller(name, self._hookexec, module_or_class, spec_opts) 

187 setattr(self.hook, name, hc) 

188 else: 

189 # plugins registered this hook without knowing the spec 

190 hc.set_specification(module_or_class, spec_opts) 

191 for hookfunction in hc.get_hookimpls(): 

192 self._verify_hook(hc, hookfunction) 

193 names.append(name) 

194 

195 if not names: 

196 raise ValueError( 

197 "did not find any %r hooks in %r" % (self.project_name, module_or_class) 

198 ) 

199 

200 def parse_hookspec_opts(self, module_or_class, name): 

201 method = getattr(module_or_class, name) 

202 return getattr(method, self.project_name + "_spec", None) 

203 

204 def get_plugins(self): 

205 """ return the set of registered plugins. """ 

206 return set(self._plugin2hookcallers) 

207 

208 def is_registered(self, plugin): 

209 """ Return True if the plugin is already registered. """ 

210 return plugin in self._plugin2hookcallers 

211 

212 def get_canonical_name(self, plugin): 

213 """ Return canonical name for a plugin object. Note that a plugin 

214 may be registered under a different name which was specified 

215 by the caller of register(plugin, name). To obtain the name 

216 of an registered plugin use ``get_name(plugin)`` instead.""" 

217 return getattr(plugin, "__name__", None) or str(id(plugin)) 

218 

219 def get_plugin(self, name): 

220 """ Return a plugin or None for the given name. """ 

221 return self._name2plugin.get(name) 

222 

223 def has_plugin(self, name): 

224 """ Return True if a plugin with the given name is registered. """ 

225 return self.get_plugin(name) is not None 

226 

227 def get_name(self, plugin): 

228 """ Return name for registered plugin or None if not registered. """ 

229 for name, val in self._name2plugin.items(): 

230 if plugin == val: 

231 return name 

232 

233 def _verify_hook(self, hook, hookimpl): 

234 if hook.is_historic() and hookimpl.hookwrapper: 

235 raise PluginValidationError( 

236 hookimpl.plugin, 

237 "Plugin %r\nhook %r\nhistoric incompatible to hookwrapper" 

238 % (hookimpl.plugin_name, hook.name), 

239 ) 

240 if hook.spec.warn_on_impl: 

241 _warn_for_function(hook.spec.warn_on_impl, hookimpl.function) 

242 # positional arg checking 

243 notinspec = set(hookimpl.argnames) - set(hook.spec.argnames) 

244 if notinspec: 

245 raise PluginValidationError( 

246 hookimpl.plugin, 

247 "Plugin %r for hook %r\nhookimpl definition: %s\n" 

248 "Argument(s) %s are declared in the hookimpl but " 

249 "can not be found in the hookspec" 

250 % ( 

251 hookimpl.plugin_name, 

252 hook.name, 

253 _formatdef(hookimpl.function), 

254 notinspec, 

255 ), 

256 ) 

257 

258 def check_pending(self): 

259 """ Verify that all hooks which have not been verified against 

260 a hook specification are optional, otherwise raise PluginValidationError""" 

261 for name in self.hook.__dict__: 

262 if name[0] != "_": 

263 hook = getattr(self.hook, name) 

264 if not hook.has_spec(): 

265 for hookimpl in hook.get_hookimpls(): 

266 if not hookimpl.optionalhook: 

267 raise PluginValidationError( 

268 hookimpl.plugin, 

269 "unknown hook %r in plugin %r" 

270 % (name, hookimpl.plugin), 

271 ) 

272 

273 def load_setuptools_entrypoints(self, group, name=None): 

274 """ Load modules from querying the specified setuptools ``group``. 

275 

276 :param str group: entry point group to load plugins 

277 :param str name: if given, loads only plugins with the given ``name``. 

278 :rtype: int 

279 :return: return the number of loaded plugins by this call. 

280 """ 

281 count = 0 

282 for dist in importlib_metadata.distributions(): 

283 for ep in dist.entry_points: 

284 if ( 

285 ep.group != group 

286 or (name is not None and ep.name != name) 

287 # already registered 

288 or self.get_plugin(ep.name) 

289 or self.is_blocked(ep.name) 

290 ): 

291 continue 

292 plugin = ep.load() 

293 self.register(plugin, name=ep.name) 

294 self._plugin_distinfo.append((plugin, DistFacade(dist))) 

295 count += 1 

296 return count 

297 

298 def list_plugin_distinfo(self): 

299 """ return list of distinfo/plugin tuples for all setuptools registered 

300 plugins. """ 

301 return list(self._plugin_distinfo) 

302 

303 def list_name_plugin(self): 

304 """ return list of name/plugin pairs. """ 

305 return list(self._name2plugin.items()) 

306 

307 def get_hookcallers(self, plugin): 

308 """ get all hook callers for the specified plugin. """ 

309 return self._plugin2hookcallers.get(plugin) 

310 

311 def add_hookcall_monitoring(self, before, after): 

312 """ add before/after tracing functions for all hooks 

313 and return an undo function which, when called, 

314 will remove the added tracers. 

315 

316 ``before(hook_name, hook_impls, kwargs)`` will be called ahead 

317 of all hook calls and receive a hookcaller instance, a list 

318 of HookImpl instances and the keyword arguments for the hook call. 

319 

320 ``after(outcome, hook_name, hook_impls, kwargs)`` receives the 

321 same arguments as ``before`` but also a :py:class:`_Result`` object 

322 which represents the result of the overall hook call. 

323 """ 

324 return _tracing._TracedHookExecution(self, before, after).undo 

325 

326 def enable_tracing(self): 

327 """ enable tracing of hook calls and return an undo function. """ 

328 hooktrace = self.hook._trace 

329 

330 def before(hook_name, methods, kwargs): 

331 hooktrace.root.indent += 1 

332 hooktrace(hook_name, kwargs) 

333 

334 def after(outcome, hook_name, methods, kwargs): 

335 if outcome.excinfo is None: 

336 hooktrace("finish", hook_name, "-->", outcome.get_result()) 

337 hooktrace.root.indent -= 1 

338 

339 return self.add_hookcall_monitoring(before, after) 

340 

341 def subset_hook_caller(self, name, remove_plugins): 

342 """ Return a new _HookCaller instance for the named method 

343 which manages calls to all registered plugins except the 

344 ones from remove_plugins. """ 

345 orig = getattr(self.hook, name) 

346 plugins_to_remove = [plug for plug in remove_plugins if hasattr(plug, name)] 

347 if plugins_to_remove: 

348 hc = _HookCaller( 

349 orig.name, orig._hookexec, orig.spec.namespace, orig.spec.opts 

350 ) 

351 for hookimpl in orig.get_hookimpls(): 

352 plugin = hookimpl.plugin 

353 if plugin not in plugins_to_remove: 

354 hc._add_hookimpl(hookimpl) 

355 # we also keep track of this hook caller so it 

356 # gets properly removed on plugin unregistration 

357 self._plugin2hookcallers.setdefault(plugin, []).append(hc) 

358 return hc 

359 return orig 

360 

361 

362if hasattr(inspect, "signature"): 

363 

364 def _formatdef(func): 

365 return "%s%s" % (func.__name__, str(inspect.signature(func))) 

366 

367 

368else: 

369 

370 def _formatdef(func): 

371 return "%s%s" % ( 

372 func.__name__, 

373 inspect.formatargspec(*inspect.getargspec(func)), 

374 )