import inspect
import logging
import unicodedata
from flask import current_app, request
from six import string_types
from slackminion.exceptions import DuplicateCommandError
from slackminion.utils.util import format_docstring, strip_formatting
[docs]class BaseCommand(object):
def __init__(self, method):
self.method = method
self.help = method.__doc__
@property
def short_help(self):
if self.help:
if "." in self.help:
return self.help[0 : self.help.find(".") + 1]
return "No description provided."
@property
def formatted_help(self):
if self.help:
return format_docstring(self.help)
return "No description provided."
[docs] def execute(self, *args, **kwargs):
return self.method(*args, **kwargs)
[docs]class PluginCommand(BaseCommand):
def __init__(self, method):
super(PluginCommand, self).__init__(method)
self.acl = method.acl
self.admin_only = method.admin_only
self.is_subcmd = method.is_subcmd
self.while_ignored = method.while_ignored
self.cmd_options = method.cmd_options
self.is_async = inspect.iscoroutinefunction(method)
[docs]class WebhookCommand(BaseCommand):
def __init__(self, method, form_params):
super(WebhookCommand, self).__init__(method)
self.form_params = form_params
[docs] def execute(self):
args = {}
form_params = self.form_params
if isinstance(self.form_params, str):
form_params = [self.form_params]
if form_params is not None:
for p in form_params:
if p in request.form:
args[p] = request.form[p]
return self.method(**args)
[docs]class MessageDispatcher(object):
def __init__(self):
self.log = logging.getLogger(type(self).__name__)
self.commands = {}
self.ignored_channels = []
self.ignored_events = ["message_replied", "message_changed"]
[docs] async def push(self, event, dev_mode=False):
"""
Takes a SlackEvent, parses it for a command, and runs against registered plugin
"""
self.log.debug(event)
if self._ignore_event(event):
return None, None, None
args = self._parse_message(event)
# commands will always start with !
if len(args) == 0 or not args[0].startswith("!"):
return None, None, None
self.log.debug("Searching for command using chunks: %s", args)
cmd, msg_args = self._find_longest_prefix_command(args)
if cmd is not None:
if event.user is None:
self.log.debug("Discarded message with no originating user: %s", event)
return None, None, None
if event.channel is not None:
sender = "#%s/%s" % (event.channel.name, event.user.formatted_name)
else:
sender = event.user.slack_user.formatted_name
self.log.info(f"Received from {sender}: {cmd}, args {msg_args}")
f = self._get_command(cmd, event.user)
if f:
if self._is_channel_ignored(f, event.channel):
self.log.info(
"Channel %s is ignored, discarding command %s",
event.channel,
cmd,
)
return "_ignored_", "", None
# Strip formatting if requested by plugin
if f.cmd_options.get("strip_formatting"):
input_string = " ".join(msg_args)
self.log.debug("Calling strip_format with %s", input_string)
input_string = strip_formatting(input_string)
self.log.debug("Format Stripped message is %s", input_string)
msg_args = input_string.split(" ")
try:
if f.is_async:
if not dev_mode:
output = await f.execute(event, msg_args)
else:
output = f"DEV_MODE: Would have run async function {f} with args {msg_args}"
return cmd, output, f.cmd_options
else:
if not dev_mode:
output = f.execute(event, msg_args)
else:
output = f"DEV_MODE: Would have run function {cmd} with args {msg_args}"
return cmd, output, f.cmd_options
except Exception as e: # noqa we don't want plugins to crash the bot so
self.log.exception("Plugin raised exception")
output = f"Command failed due to an exception: {str(e)}"
return cmd, output, f.cmd_options
return (
"_unauthorized_",
"Sorry, you are not authorized to run %s" % cmd,
None,
)
return None, None, None
def _ignore_event(self, message):
"""
message_replied event is not truly a message event and does not have a message.text
don't process such events
commands may not be idempotent, so ignore message_changed events.
"""
if hasattr(message, "subtype") and message.subtype in self.ignored_events:
return True
return False
def _parse_message(self, message):
if message:
try:
args = unicodedata.normalize("NFKD", message.text).split()
return args
except AttributeError:
pass
return []
[docs] def register_plugin(self, plugin):
"""Registers a plugin and commands with the dispatcher for push()"""
self.log.info("Registering plugin %s", type(plugin).__name__)
self._register_commands(plugin)
plugin.on_load()
def _register_commands(self, plugin):
possible_commands = [x for x in dir(plugin) if not x.startswith("_")]
for name in possible_commands:
method = getattr(plugin, name)
if callable(method) and hasattr(method, "is_cmd"):
commands = [method.cmd_name]
if method.aliases is not None:
aliases = method.aliases
if isinstance(method.aliases, str):
aliases = [method.aliases]
for alias in aliases:
commands.append(alias)
for cmd_name in commands:
cmd = "!" + cmd_name
if cmd in self.commands:
raise DuplicateCommandError(cmd_name)
self.log.info(
"Registered command %s", type(plugin).__name__ + "." + cmd_name
)
self.commands[cmd] = PluginCommand(method)
elif callable(method) and hasattr(method, "is_webhook"):
self.log.info(
"Registered webhook %s", type(plugin).__name__ + "." + name
)
webhook = WebhookCommand(method, method.form_params)
with plugin._bot.webserver.app.app_context():
current_app.add_url_rule(
method.route,
method.__name__,
webhook.execute,
methods=[method.method],
)
[docs] def ignore(self, channel):
if channel.is_channel:
if channel.name not in self.ignored_channels:
self.ignored_channels.append(channel.name)
return True
return False
[docs] def unignore(self, channel):
if channel.is_channel:
if channel.name in self.ignored_channels:
self.ignored_channels.remove(channel.name)
return True
return False
def _find_longest_prefix_command(self, args):
num_parts = len(args)
while num_parts > 0:
cmd = " ".join(args[0:num_parts])
if cmd in self.commands:
return cmd, args[num_parts:]
num_parts -= 1
return None, None
def _get_command(self, cmd, user):
can_run_cmd = True
if hasattr(self, "auth_manager"):
can_run_cmd = self.auth_manager.admin_check(self.commands[cmd], user)
if can_run_cmd:
can_run_cmd = self.auth_manager.acl_check(self.commands[cmd], user)
if not can_run_cmd:
self.log.info("User %s is not authorized to run %s", user.username, cmd)
return None
return self.commands[cmd]
def _is_channel_ignored(self, cmd, channel):
channel_ignored = False
if channel.name in self.ignored_channels:
channel_ignored = not cmd.while_ignored
return channel_ignored