当前架构
当前权限管理大概是命令插件先调用 permission_manager 的 register 方法注册这个权限:
permission_manager = PermissionManager()
permission_manager.register('ping')
permission_manager.register('whitelist')
permission_manager.register('whitelist.list')
permission_manager.register('whitelist.reload')
permission_manager.register('perm.node1.node2')
permission_manager.register('perm.node2')
上面的代码会使得 permission_manager 的 all_permissions 属性变成这样的一个权限节点树:
{'perm': {'node1': {'node2': {}}, 'node2': {}},
'ping': {},
'whitelist': {'list': {}, 'reload': {}}}
这个节点树的作用主要是解析用户权限里面带星号的权限。比如在上述的注册权限基础上,有一个用户的权限是 perm.* 这样通过 permission_manager.expand_permission 方法就可以把这个用户的权限扩展成:[‘perm.node1.node2’, ‘perm.node2’] 。最后我们需要验证用户权限的时候只需要查找权限字符串是否在用户所拥有的权限列表里面就可以了。
存在问题
当前的权限解析代码最大的问题就是这个代码太过于复杂,维护不是很好维护,而且经过仔细思考之后使用这样的权限树解析权限根本就是大材小用。实际上通过简单的字符串比较就可以实现解析用户权限里面的星号的功能了。
重新实现
我们需要修改的其实也就是两个方面,一个是模块注册权限的时候根本不用创建一个权限树,我们只需要把这个权限放入一个集合里面就行了。
这是之前的权限注册逻辑:
def put_permission(self, node_dict, node_permission):
"""parse current registering permission string into dict"""
if node_permission:
if node_permission[0] not in node_dict:
node_dict[node_permission[0]] = dict()
self.put_permission(node_dict[node_permission[0]], node_permission[1:])
def register(self, perm):
"""register permissions, used by modules"""
self.put_permission(self.all_permissions, perm.split('.'))
这是修改后的权限注册逻辑:
def register(self, perm):
"""register permissions, used by modules"""
self.all_permissions.add(perm)
可以看到这样做简单了许多。
但是这样的更改还必须伴有解析用户权限代码的更改:
这是之前解析用户权限的代码:
def expand_permission(self, perm_str: str, server_names=None, all_permissions=None) -> set:
""" handle asterisks in perm string like 'whitelist.*' """
# used for test
if not server_names:
server_names = (i for i in config.server_properties.keys())
if not all_permissions:
all_permissions = self.all_permissions
perm2process_list = perm_str.split('.')
servers = list()
if perm2process_list[0] == '*':
for server_name in server_names:
servers.append(server_name)
else:
servers.append(perm2process_list[0])
# the function for parsing asterisks in permission string
def parse_asterisk(perm2process_left, permissions_pre_left, string_pre_parsed=''):
if string_pre_parsed.startswith('.'):
string_pre_parsed = string_pre_parsed[1:]
if not permissions_pre_left:
yield string_pre_parsed
current_node = perm2process_left[0] if perm2process_left else '*'
if current_node == '*':
for perm_node, permissions_left in permissions_pre_left.items():
for string_parsed in parse_asterisk(['*'],
permissions_left,
f'{string_pre_parsed}.{perm_node}'):
yield string_parsed
else:
for string_parsed in parse_asterisk(perm2process_left[1:],
permissions_pre_left[current_node],
f'{string_pre_parsed}.{current_node}'):
yield string_parsed
expanded_permissions = set()
for server in servers:
for perm in parse_asterisk(perm2process_list[1:], all_permissions):
expanded_permissions.add(f'{server}.{perm}')
return expanded_permissions
可以看到非常的冗长。
我们基于现在新的权限注册系统修改代码:
def expand_permission(self, perm_str: str, server_names=None, all_permissions=None) -> set:
""" handle asterisks in perm string like 'whitelist.*' """
# used for test
if not server_names:
server_names = (i for i in config.server_properties.keys())
if not all_permissions:
all_permissions = self.all_permissions
if perm_str == '*':
perm_str = '*.*'
sub_permissions = perm_str.split('.', 1)
if sub_permissions[0] == '*':
needed_server = server_names
else:
needed_server = {sub_permissions[0]}
if sub_permissions[1].endswith('*'):
sub_permissions[1] = sub_permissions[1][:-1]
needed_permission = {permission for permission in all_permissions if permission.startswith(sub_permissions[1])}
else:
needed_permission = {sub_permissions[1]}
return {f'{server_name}.{permission}' for server_name in needed_server for permission in needed_permission}
最后再更改一下测试用例保证测试通过就好啦!