Source code for slackminion.bot

import asyncio
import datetime
import logging

from slack_sdk.web.async_client import AsyncWebClient

from slackminion.dispatcher import MessageDispatcher
from slackminion.exceptions import NotSetupError
from slackminion.plugin import PluginManager
from slackminion.plugins.core import version as my_version
from slackminion.slack import SlackConversation, SlackEvent, SlackUser
from slackminion.slack.rtm_client import MyRTMClient
from slackminion.utils.async_task import AsyncTaskManager
from slackminion.webserver import Webserver

ignore_subtypes = [
    "bot_message",
    "message_changed",
    "message_replied",
    "message_deleted",
]


[docs]class Bot(object): rtm_client = None api_client = None webserver = None _info = {} _channels = {} runnable = True shutting_down = False always_send_dm = [] is_setup = False bot_start_time = None timers = [] def __init__(self, config, test_mode=False, dev_mode=False): self.config = config self.dispatcher = MessageDispatcher() self.log = logging.getLogger(type(self).__name__) self.plugin_manager = PluginManager(self, test_mode) self.plugins = self.plugin_manager # backward compatibility self.test_mode = test_mode self.dev_mode = dev_mode self.event_loop = asyncio.get_event_loop() if self.test_mode: self.metrics = {"startup_time": 0} self.version = my_version try: from slackminion.plugins.core import commit self.commit = commit except ImportError: self.commit = "HEAD" @property def sc(self): return self.api_client @property def my_username(self): return self._info.get("name") @property def my_userid(self): return self._info.get("user_id") @property def channels(self): if self.is_setup: if self._channels: return self._channels else: self.log.warning( "Bot.channels was called but self._bot_channels was empty!" ) return {} self.log.warning("Bot.channels was called before bot was setup.") return {}
[docs] async def get_my_conversations(self, *args, **kwargs): return await self.api_client.users_conversations( exclude_archived="true", limit=200, types="public_channel,private_channel", *args, **kwargs, )
[docs] async def update_channels(self): self.log.debug("Starting update_channels") try: resp = await self.get_my_conversations() results = resp.get("channels") while resp.get("response_metadata").get("next_cursor"): cursor = resp.get("response_metadata").get("next_cursor") resp = await self.get_my_conversations(cursor=cursor) results.extend(resp.get("channels")) await asyncio.sleep(1) for channel in results: self._channels.update( { channel.get("id"): SlackConversation( conversation=channel, api_client=self.api_client ) } ) except Exception: # noqa self.log.exception("update_channels failed due to exception") self.log.debug(f"Loaded {len(self.channels)} channels.")
[docs] def start(self): """Initializes the bot, plugins, and everything.""" self.log.info(f"Starting SlackMinion version {self.version}") self.task_manager = AsyncTaskManager(self) self.bot_start_time = datetime.datetime.now() self.log.debug("Slack clients initialized.") self.webserver = Webserver( self.config["webserver"]["host"], self.config["webserver"]["port"] ) self.plugin_manager.load() self.plugin_manager.load_state() self.rtm_client = MyRTMClient( token=self.config.get("slack_token"), run_async=True ) self.api_client = AsyncWebClient(token=self.config.get("slack_token")) self.always_send_dm = ["_unauthorized_"] if "always_send_dm" in self.config: self.always_send_dm.extend(["!" + x for x in self.config["always_send_dm"]]) self._add_event_handlers() self.is_setup = True if self.test_mode: self.metrics["startup_time"] = ( datetime.datetime.now() - self.bot_start_time ).total_seconds() * 1000.0
[docs] def graceful_shutdown(self): if not self.shutting_down: self.shutting_down = True self.log.debug("Starting graceful shutdown.") self.task_manager.runnable = False self.runnable = False
[docs] async def run(self): """ Connects to slack and enters the main loop. """ # Fail out if setup wasn't run if not self.is_setup: raise NotSetupError # Start the web server self.log.debug("Starting Web Server") self.webserver.start() first_connect = True self._info = await self.api_client.auth_test() while self.runnable: if first_connect: self.log.debug("Starting RTM Client") self.task_manager.start_rtm_client(self.rtm_client) self.plugin_manager.connect() self.task_manager.start_periodic_task(600, self.update_channels) first_connect = False await self.task_manager.start() await asyncio.sleep(1)
[docs] async def stop(self): """Does cleanup of bot and plugins.""" if not self.test_mode: self.plugin_manager.save_state() self.log.debug("Stopping Task Manager") await self.task_manager.shutdown() self.log.debug("Stopping RTM client.") # cleanup any running timer threads so bot doesn't hang on shutdown for t in self.timers: t.cancel() if self.webserver is not None: self.webserver.stop() self.plugin_manager.unload_all()
[docs] async def send_message( self, channel, text, thread=None, reply_broadcast=None, attachments=None, parse=None, link_names=1, ): """ Sends a message to the specified channel * channel - The channel to send to. This can be a SlackConversation object, a channel id, or a channel name (without the #) * text - String to send * thread - reply to the thread. See https://api.slack.com/docs/message-threading#threads_party * reply_broadcast - Set to true to indicate your reply is germane to all members of a channel * parse - Set to "full" for the slack api to linkify names and channels """ if not text: self.log.debug("send_message was called without text to send") return # This doesn't want the # in the channel name if isinstance(channel, SlackConversation): channel = channel.channel_id self.log.debug(f"Trying to send to {channel}: {text[:40]} (truncated)") await self.api_client.chat_postMessage( as_user=True, channel=channel, text=text, thread_ts=thread, reply_broadcast=reply_broadcast, attachments=attachments, parse=parse, link_names=link_names, )
[docs] async def send_im(self, user, text, parse=None): """ Sends a message to a user as an IM * user - The user to send to. This can be a SlackUser object, a user id, or the username (without the @) * text - String to send """ if isinstance(user, SlackUser): channelid = user.user_id else: channelid = user await self.send_message(channelid, text, parse)
[docs] async def at_user(self, user, channel_id, text, **kwargs): """ Appends @user Slack formatting to the beginning of a message. * user - The SlackUser to send to. * channel_id - The channel ID of the channel to send to * text - String to send * kwargs - add'l keyword arguments to pass to send_message() """ message = f"{user.at_user}: {text}" await self.send_message(channel_id, message, **kwargs)
def _load_user_rights(self, user): self.log.debug(f"Loading user rights for {user}") if user is not None: if "bot_admins" in self.config: if user.username in self.config["bot_admins"]: user.set_admin(True) # Parse incoming event and return a corresponding SlackEvent object async def _parse_event(self, payload): self.log.debug(payload) event_type, data = self._unpack_payload(**payload) subtype = data.get("subtype") # ignore message subtypes we aren't interested in if subtype and subtype in ignore_subtypes: self.log.info(f"Ignoring message subtype {subtype} from {data.get('user')}") self.log.debug(data.get("text")) return event = SlackEvent(event_type=event_type, **payload) self.log.debug("Received event type: %s", event.event_type) if event.user_id and event.user_id != self.my_userid: if hasattr(self, "user_manager"): event.user = self.user_manager.get(event.user_id) if event.user is None: slack_user = SlackUser( user_id=event.user_id, api_client=self.api_client ) await slack_user.load() event.user = self.user_manager.set(slack_user) if event.channel_id: event.channel = await self.get_channel(event.channel_id) return event def _unpack_payload(self, **payload): data = payload["data"] event_type = data["type"] return event_type, data def _add_event_handlers(self): MyRTMClient.on(event="channel_joined", callback=self._event_channel_joined) MyRTMClient.on(event="message", callback=self._event_message) MyRTMClient.on(event="error", callback=self._event_error) for plugin in self.plugin_manager.plugins: if plugin.notify_event_types: t = type(plugin.notify_event_types) try: assert t == list except AssertionError: self.log.exception( f"notify_event_types for {plugin.__class__.__name__} type should be list, got {t}" ) for event_type in plugin.notify_event_types: self.log.info( f"Registering handler for {event_type} for plugin {plugin.__class__.__name__}" ) try: MyRTMClient.on(event=event_type, callback=self._event_plugin) except Exception as e: self.log.exception( f"Unexpected exception when attempting to register event handler for " f'type {event_type} for plugin {plugin.__class__.__name__}" [{e}] ' ) # generic handler for handling event types registered by plugins via notify_event_types class attribute async def _event_plugin(self, **payload): event_type, data = self._unpack_payload(**payload) await self.plugin_manager.broadcast_event(event_type, data) # when the bot is invited to a channel, add the channel to self.channels async def _event_channel_joined(self, **payload): try: event_type, data = self._unpack_payload(**payload) self.log.debug(f"Received channel_joined event: {data}") channel_info = data.get("channel") channel = SlackConversation( conversation=channel_info, api_client=self.api_client ) self._channels.update({channel.id: channel}) except Exception: # noqa self.log.exception("Uncaught exception") async def _event_message(self, **payload): msg = await self._parse_event(payload) if not msg: return # The user manager should load rights when a user is added if not hasattr(self, "user_manager"): self._load_user_rights(msg.user) try: self.log.debug(f"Sending to dispatcher: {msg}") cmd, output, cmd_options = await self.dispatcher.push(msg, self.dev_mode) self.log.debug(f"Output from dispatcher: {output}") if output: await self._prepare_and_send_output(cmd, msg, cmd_options, output) except Exception: self.log.exception("Unhandled exception") async def _prepare_and_send_output(self, cmd, msg, cmd_options, output): self.log.debug( f"Preparing to send output for {cmd} with options {cmd_options}" ) if msg.thread_ts: thread_ts = msg.thread_ts elif cmd_options.get("reply_in_thread"): thread_ts = msg.ts else: thread_ts = None parse = cmd_options.get("parse", None) if cmd in self.always_send_dm or cmd_options.get("always_send_dm"): await self.send_im(msg.user, output, parse=parse) else: await self.send_message( msg.channel, output, thread=thread_ts, reply_broadcast=cmd_options.get("reply_broadcast"), parse=parse, ) async def _event_error(self, **payload): event_type, data = self._unpack_payload(**payload) await self.plugin_manager.broadcast_event(event_type, data) self.log.error(f"Received an error response from Slack: {payload}")
[docs] def get_channel_by_name(self, channel_name): channels = [x for x in self.channels.values() if channel_name in x.all_names] if len(channels) == 0: raise RuntimeError(f"Unable to find channel {channel_name}") if len(channels) > 1: self.log.warning(f"Found more than one channel named {channel_name}") return channels[0]
[docs] async def get_channel(self, channel_id): if channel_id in self.channels.keys(): channel = self.channels.get(channel_id) else: channel = SlackConversation(None, self.api_client) await channel.load(channel_id) self._channels.update({channel_id: channel}) if channel: return channel