Source code for slackminion.plugins.core.acl

from functools import wraps

from slackminion.plugin import cmd
from slackminion.plugin.base import BasePlugin

try:
    from . import commit
except ImportError:
    commit = "HEAD"


[docs]def user_mgt_command(f): @wraps(f) def wrapper(self, msg, args): if len(args) < 2: return "Usage: !%s *acl_name* *username*" % f.__name__.replace("_", " ") name, user = args return f(self, name, user) return wrapper
[docs]def acl_mgt_command(f): @wraps(f) def wrapper(self, msg, args): if len(args) < 1: return "Usage: !%s *acl_name*" % f.__name__.replace("_", " ") name = args[0] return f(self, name) return wrapper
[docs]class AuthManager(BasePlugin): """Basic Authorization Plugin"""
[docs] def on_load(self): # Hook into the dispatcher to receive acl checks setattr(self._bot.dispatcher, "auth_manager", self) # Setup default ACL # ACL Rule Ordering (first match) # Allow Rule --> Deny Rule --> Allow All self._acl = { "*": {"allow": [], "deny": []}, } return super(AuthManager, self).on_load()
[docs] @cmd(admin_only=True) def acl(self, msg, args): """ACL Management. Usage: !acl _action_ [args] Actions: new _acl_ - Create a new ACL delete _acl_ - Delete an ACL allow _acl_ _user_ - Add user to the acl allow block deny _acl_ _user_ - Add user to the acl deny block remove _acl_ _user_ - Remove user from acl allow and deny blocks show - Show all defined ACLs show _acl_ - Show allow and deny blocks of specified ACL """ if len(args) == 0: return "Usage: !acl show OR !acl _action_ _args_" valid_actions = ["allow", "deny", "remove", "show", "new", "delete"] return "Valid actions: %s" % ", ".join(valid_actions)
[docs] @cmd(admin_only=True) @acl_mgt_command def acl_new(self, name): if self.create_acl(name): return "Created new acl '%s'" % name return "ACL '%s' already exists" % name
[docs] @cmd(admin_only=True) @acl_mgt_command def acl_delete(self, name): if self.delete_acl(name): return "Deleted acl '%s'" % name return "ACL '%s' does not exist" % name
[docs] @cmd(admin_only=True) @user_mgt_command def acl_allow(self, name, user): if self.add_user_to_allow(name, user): return "Added %s to %s (allow)" % (user, name) return "Failed to add %s to %s (allow)" % (user, name)
[docs] @cmd(admin_only=True) @user_mgt_command def acl_deny(self, name, user): if self.add_user_to_deny(name, user): return "Added %s to %s (deny)" % (user, name) return "Failed to add %s to %s (deny)" % (user, name)
[docs] @cmd(admin_only=True) @user_mgt_command def acl_remove(self, name, user): if self.remove_user_from_acl(name, user): return "Removed %s from %s (allow and deny)" % (user, name) return "Failed to remove %s from %s (allow and deny)" % (user, name)
[docs] @cmd(admin_only=True) def acl_show(self, msg, args): """Show current allow and deny blocks for the given acl.""" name = args[0] if len(args) > 0 else None if name is None: return "%s: The following ACLs are defined: %s" % ( msg.user, ", ".join(list(self._acl.keys())), ) if name not in self._acl: return "Sorry, couldn't find an acl named '%s'" % name return "\n".join( [ "%s: ACL '%s' is defined as follows:" % (msg.user, name), "allow: %s" % ", ".join(self._acl[name]["allow"]), "deny: %s" % ", ".join(self._acl[name]["deny"]), ] )
[docs] def add_user_to_allow(self, name, user): """Add a user to the given acl allow block.""" # Clear user from both allow and deny before adding if not self.remove_user_from_acl(name, user): return False if name not in self._acl: return False self._acl[name]["allow"].append(user) return True
[docs] def add_user_to_deny(self, name, user): """Add a user to the given acl deny block.""" if not self.remove_user_from_acl(name, user): return False if name not in self._acl: return False self._acl[name]["deny"].append(user) return True
[docs] def remove_user_from_acl(self, name, user): """Remove a user from the given acl (both allow and deny).""" if name not in self._acl: return False if user in self._acl[name]["allow"]: self._acl[name]["allow"].remove(user) if user in self._acl[name]["deny"]: self._acl[name]["deny"].remove(user) return True
[docs] def create_acl(self, name): """Create a new acl.""" if name in self._acl: return False self._acl[name] = {"allow": [], "deny": []} return True
[docs] def delete_acl(self, name): """Delete an acl.""" if name not in self._acl: return False del self._acl[name] return True
[docs] @staticmethod def admin_check(cmd, user): # Commands not needing admin-level access pass if not cmd.admin_only: return True return user.is_bot_admin
[docs] def acl_check(self, cmd, user): effective_acl = cmd.acl if effective_acl not in self._acl: self.log.warning( "Unable to locate ACL %s for %s, defaulting to *", effective_acl, cmd.method.__name__, ) effective_acl = "*" if self._check_allow(self._acl[effective_acl], user): return True if self._check_deny(self._acl[effective_acl], user): return False return True
@staticmethod def _check_allow(acl, user): return "*" in acl["allow"] or user.username in acl["allow"] @staticmethod def _check_deny(acl, user): return "*" in acl["deny"] or user.username in acl["deny"]