Source code for slackminion.plugin.manager

from __future__ import annotations

import inspect
import json
import logging
import typing
from datetime import datetime

if typing.TYPE_CHECKING:
    from slackminion.bot import Bot


[docs]class PluginManager(object): def __init__(self, bot: Bot, test_mode=False): self.bot = bot self.config = bot.config self.dispatcher = bot.dispatcher self.log = logging.getLogger(type(self).__name__) self.plugins = [] self.state_handler = None self.test_mode = test_mode if self.test_mode: self.metrics = { "plugins_total": 0, "plugins_loaded": 0, "load_times": {}, "plugins_failed": [], }
[docs] def load(self): import os import sys # Add plugin dir for extra plugins sys.path.append(os.path.join(os.getcwd(), self.config["plugin_dir"])) if "plugins" not in self.config: self.config["plugins"] = [] # Add core plugins self.config["plugins"].insert(0, "slackminion.plugins.core.core.Core") for plugin_name in self.config["plugins"]: if self.test_mode: plugin_start_time = datetime.now() self.metrics["plugins_total"] += 1 # module_path.plugin_class_name module, name = plugin_name.rsplit(".", 1) try: m = __import__(module, fromlist=[""]) plugin = getattr(m, name) version = getattr(m, "version", "latest") commit = getattr(m, "commit", "HEAD") except ImportError: self.log.exception("Failed to load plugin %s", name) if self.test_mode: self.metrics["plugins_failed"].append(name) continue # load plugin config if available config = {} if name in self.config["plugin_settings"]: config = self.config["plugin_settings"][name] try: p = plugin(self.bot, config=config) p._version = version p._commit = commit self.dispatcher.register_plugin(p) self.plugins.append(p) if p._state_handler: self.state_handler = p if self.test_mode: self.metrics["plugins_loaded"] += 1 self.metrics["load_times"][name] = ( datetime.now() - plugin_start_time ).total_seconds() * 1000.0 except Exception: # noqa self.log.exception("Failed to register plugin %s", name) if self.test_mode: self.metrics["plugins_failed"].append(name)
# Broadcasts a slack event to handlers that have registered for the # event type via notify_event_types class attribute # Plugin MUST implement a handle_event method to handle these events.
[docs] async def broadcast_event(self, event_type, data): for plugin in self.plugins: if event_type in plugin.notify_event_types: self.log.debug( f"Sending event of type {event_type} to plugin {plugin.__class__.__name__}. Data: {data}" ) try: if inspect.iscoroutinefunction(plugin.handle_event): await plugin.handle_event(event_type, data) else: plugin.handle_event(event_type, data) # The plugin is expected to handle its own exceptions. except Exception: # noqa self.log.exception("Unhandled exception!")
[docs] def connect(self): for plugin in self.plugins: try: plugin.on_connect() except Exception: # noqa self.log.exception("Unhandled exception")
[docs] def save_state(self, *args, **kwargs): if self.state_handler is None: self.log.warning("Unable to save state, no handler registered") return state = {} savable_plugins = [x for x in self.plugins if x._dont_save is False] for p in savable_plugins: attr_denylist = [ "_bot", "_commit", "_dont_save", "_state_handler", "_timer_callbacks", "_version", "attr_denylist", "config", "log", ] attr_denylist.extend(getattr(p, "attr_denylist", [])) attrs = { k: v for k, v in list(p.__dict__.items()) if k not in attr_denylist } state[type(p).__name__] = attrs self.log.debug("Plugin %s: %s", type(p).__name__, attrs) state = json.dumps(state) self.log.debug("Sending the following to the handler: %s", state) try: self.state_handler.save_state(state, *args, **kwargs) except Exception: # noqa self.log.exception("Handler failed to save state")
[docs] def load_state(self, *args, **kwargs): if self.state_handler is None: self.log.warning("Unable to load state, no handler registered") return try: state_str = self.state_handler.load_state(*args, **kwargs) except IOError: self.log.warning("No state information found") return try: state = json.loads(state_str) except Exception: # noqa self.log.exception("Handler failed to load state") return for p in self.plugins: plugin_name = type(p).__name__ if plugin_name in state: self.log.info("Loading state data for %s", plugin_name) for k, v in list(state[plugin_name].items()): self.log.debug("%s.%s = %s", plugin_name, k, v) setattr(p, k, v)
[docs] def unload_all(self): for plugin in self.plugins: plugin.on_unload()