Source code for slackminion.plugin.base

from __future__ import annotations

import logging
import typing

from slackminion.slack import SlackConversation, SlackUser

if typing.TYPE_CHECKING:
    from slackminion.bot import Bot


[docs]class BasePlugin(object): notify_event_types = [] def __init__(self, bot: Bot, **kwargs): self.log = logging.getLogger(type(self).__name__) self._bot = bot self._dont_save = ( False # By default, we want to save a plugin's state during save_state() ) self._state_handler = False # State storage backends should set this to true self._timer_callbacks = {} self.config = {} if self.notify_event_types and not hasattr(self, "handle_event"): raise AttributeError( "Plugin requested events but has no handle_event method!" ) if "config" in kwargs: self.config = kwargs["config"]
[docs] def on_load(self): """ Executes when a plugin is loaded. Override this if your plugin needs to do initialization when loading. Do not use this to restore runtime changes to variables -- they will be overwritten later on by PluginManager.load_state() """ return True
[docs] def on_unload(self): """ Executes when a plugin is unloaded. Override this if your plugin needs to do cleanup when unloading. """ return True
[docs] def on_connect(self): """ Executes immediately after connecting to slack. Will not fire on reconnects. """ return True
[docs] async def send_message( self, channel, text, thread=None, reply_broadcast=False, parse=None ): """ Used to send a message to the specified channel. * channel - can be a channel or user * text - message to send * thread - thread to reply in * reply_broadcast - whether or not to also send the message to the channel * parse - Set to "full" for the slack api to linkify names and channels """ self.log.debug( "Sending message to channel {} of type {}".format(channel, type(channel)) ) if isinstance(channel, SlackConversation): await self._bot.send_message(channel, text, thread, reply_broadcast, parse) elif isinstance(channel, str): if channel[0] == "@": await self._bot.send_im(channel[1:], text) elif channel[0] == "#": await self._bot.send_message( channel[1:], text, thread, reply_broadcast, parse ) else: await self._bot.send_message( channel, text, thread, reply_broadcast, parse ) else: await self._bot.send_message(channel, text, thread, reply_broadcast, parse)
[docs] def start_periodic_task(self, duration, func, *args, **kwargs): """ Schedules a function to be called after some period of time. * duration - time in seconds to wait before firing * func - function to be called * args - arguments to pass to the function """ self.log.debug( f"Scheduling periodic task {func.__name__} every {duration}s (args: {args}, kwargs: {kwargs})" ) if self._bot.runnable: self._bot.task_manager.start_periodic_task(duration, func, *args, **kwargs) self.log.info( f"Successfully scheduled call to {func.__name__} every {duration}" ) else: self.log.warning( f"Not scheduling call to {func.__name__} because we're shutting down." )
[docs] def start_timer(self, duration, func, *args, **kwargs): """ Schedules a function to be called after some period of time. * duration - time in seconds to wait before firing * func - function to be called * args - arguments to pass to the function """ self.log.debug( f"Scheduling call to {func.__name__} in {duration}s (args: {args}, kwargs: {kwargs})" ) if self._bot.runnable: self._bot.task_manager.start_timer(duration, func, *args, **kwargs) self.log.info( f"Successfully scheduled call to {func.__name__} in {duration}" ) else: self.log.warning( f"Not scheduling call to {func.__name__} because we're shutting down." )
[docs] def stop_timer(self, func): """ Stops a timer if it hasn't fired yet * func - the function passed in start_timer """ self.log.debug("Stopping timer {}".format(func.__name__)) self._bot.task_manager.stop_timer(func.__name__)
[docs] def run_async(self, func, *args, **kwargs): return self._bot.task_manager.create_and_schedule_task(func, *args, **kwargs)
def _timer_callback(self, func, args): self.log.debug("Executing timer function {}".format(func.__name__)) try: func(*args) except Exception: self.log.exception( "Caught exception executing timer function: {}".format(func.__name__) )
[docs] async def get_user(self, user_id): """ Utility function to query slack for a particular user :param user_id: The username of the user to lookup :return: SlackUser object or None """ if not hasattr(self._bot, "user_manager"): user = SlackUser(user_id=user_id, api_client=self._bot.api_client) await user.load() return user user = self._bot.user_manager.get_by_username(user_id) if user: return user user = SlackUser(user_id=user_id) await user.load() self._bot.user_manager.set(user) return user
[docs] async def get_channel(self, channel): """ Utility function to query slack for a particular channel :param channel: The channel name or id of the channel to lookup :return: SlackChannel object or None """ return await self._bot.get_channel(channel)
[docs] def get_channel_by_name(self, channel_name): """ Utility function to query slack for a particular channel :param channel: The channel name of the channel to lookup :return: SlackChannel object or None """ return self._bot.get_channel_by_name(channel_name)
[docs] async def at_user(self, *args, **kwargs): return await self._bot.at_user(*args, **kwargs)