pbf.controller.Handler
1import json 2import re 3import inspect 4from typing import Union, Tuple 5 6from .Data import Event 7from ..utils.Logging import Logger 8from ..setup import ListenerManager 9from ..utils.Register import Base as RegisterBase 10from ..utils.Register import RegexMode 11 12logger = Logger(__name__) 13 14 15class Handler: 16 def __init__(self, event_data: str) -> None: 17 """ 18 Initialize the Handler. 19 :param event_data: str 事件数据 20 """ 21 self.data = json.loads(event_data) 22 self.event: Event = Event() 23 self.classify() 24 logger.info(f"{self.event.sender.get('nickname', '未知昵称')}({self.event.user_id}): {self.event.raw_message}") 25 26 def classify(self) -> Event: 27 """ 28 统一OBv11和OBv12数据格式。 29 :return: Event 30 """ 31 if "self" in self.data: 32 self.data["self_id"] = self.data["self"].get("user_id") 33 del self.data["self"] 34 35 if "post_type" in self.data: 36 self.data["type"] = self.data["post_type"] 37 del self.data["post_type"] 38 39 if "message_type" in self.data: 40 self.data["detail_type"] = self.data["message_type"] 41 del self.data["message_type"] 42 43 if "notice_type" in self.data: 44 self.data["detail_type"] = self.data["notice_type"] 45 del self.data["notice_type"] 46 47 if "request_type" in self.data: 48 self.data["detail_type"] = self.data["request_type"] 49 del self.data["request_type"] 50 51 if "meta_event_type" in self.data: 52 self.data["detail_type"] = self.data["meta_event_type"] 53 del self.data["meta_event_type"] 54 55 if "alt_message" in self.data: 56 self.data["raw_message"] = self.data["alt_message"] 57 del self.data["alt_message"] 58 59 if "qq.nickname" in self.data: 60 self.data["sender"] = { 61 "user_id": self.data["user_id"], 62 "nickname": self.data["qq.nickname"], 63 "role": self.data.get("sender", {}).get("role", "member") 64 } 65 del self.data["qq.nickname"] 66 67 if "sender" not in self.data: 68 self.data["sender"] = { 69 "user_id": self.data.get("user_id") 70 } 71 72 if "raw_message" in self.data: 73 self.data["message_list"] = self.data["raw_message"].split() 74 75 event = Event(**self.data) 76 self.event = event 77 return event 78 79 def handle(self) -> bool: 80 """ 81 Handle the event. 82 :return: bool 83 """ 84 listeners = ListenerManager.get_listeners_by_type(self.event.type) 85 for _, value in listeners.items(): 86 for listener in value: 87 if listener.permission(self.event): 88 listener.func(self.event) 89 90 def matchCommand(listener: RegisterBase) -> Tuple[bool, Union[None, re.Match]]: 91 raw_message = self.event.raw_message 92 # 如果 regex_mode 是 none,直接匹配前缀 93 if listener.regex_mode == RegexMode.none: 94 if raw_message.startswith(listener.name): 95 return True, None 96 if listener.alias: 97 if any(raw_message.startswith(alias) for alias in listener.alias): 98 return True, None 99 return False, None 100 101 # 定义一个通用的正则匹配器,根据模式选择 re.match 或 re.search 102 def regex_matcher(pattern: str): 103 return (re.match if listener.regex_mode == RegexMode.match else re.search)(pattern, raw_message, 104 listener.regex_flags) 105 106 # 使用正则表达式进行匹配 107 matchRes = regex_matcher(listener.name) 108 if matchRes: 109 return True, matchRes 110 111 if listener.alias: 112 for alias in listener.alias: 113 matchRes = regex_matcher(alias) 114 if matchRes: 115 return True, matchRes 116 117 return False, None 118 119 def callFunction(func, origin_func, listener: RegisterBase, matchRes: Union[None, re.Match] = None): 120 sig = inspect.signature(origin_func) 121 num_params = len(sig.parameters) 122 if num_params == 0: 123 return func() 124 elif num_params == 1: 125 return func(self.event) 126 elif num_params == 2: 127 return func(self.event, listener) 128 elif num_params == 3: 129 return func(self.event, listener, matchRes) 130 131 if self.event.type == "message": 132 break_signal: bool = False 133 listeners = ListenerManager.get_listeners_by_type("command") 134 for key, value in listeners.items(): 135 for listener in value: 136 res, matchRes = matchCommand(listener) 137 if res: 138 break_signal = True 139 if callFunction(listener.permission, listener.permission, listener, matchRes): 140 callFunction(listener.func, listener.origin_func, listener, matchRes) 141 break 142 if break_signal: 143 break 144 if not break_signal: 145 logger.info("No command matched") 146 return True
logger =
<pbf.utils.Logging.Logger object>
class
Handler:
16class Handler: 17 def __init__(self, event_data: str) -> None: 18 """ 19 Initialize the Handler. 20 :param event_data: str 事件数据 21 """ 22 self.data = json.loads(event_data) 23 self.event: Event = Event() 24 self.classify() 25 logger.info(f"{self.event.sender.get('nickname', '未知昵称')}({self.event.user_id}): {self.event.raw_message}") 26 27 def classify(self) -> Event: 28 """ 29 统一OBv11和OBv12数据格式。 30 :return: Event 31 """ 32 if "self" in self.data: 33 self.data["self_id"] = self.data["self"].get("user_id") 34 del self.data["self"] 35 36 if "post_type" in self.data: 37 self.data["type"] = self.data["post_type"] 38 del self.data["post_type"] 39 40 if "message_type" in self.data: 41 self.data["detail_type"] = self.data["message_type"] 42 del self.data["message_type"] 43 44 if "notice_type" in self.data: 45 self.data["detail_type"] = self.data["notice_type"] 46 del self.data["notice_type"] 47 48 if "request_type" in self.data: 49 self.data["detail_type"] = self.data["request_type"] 50 del self.data["request_type"] 51 52 if "meta_event_type" in self.data: 53 self.data["detail_type"] = self.data["meta_event_type"] 54 del self.data["meta_event_type"] 55 56 if "alt_message" in self.data: 57 self.data["raw_message"] = self.data["alt_message"] 58 del self.data["alt_message"] 59 60 if "qq.nickname" in self.data: 61 self.data["sender"] = { 62 "user_id": self.data["user_id"], 63 "nickname": self.data["qq.nickname"], 64 "role": self.data.get("sender", {}).get("role", "member") 65 } 66 del self.data["qq.nickname"] 67 68 if "sender" not in self.data: 69 self.data["sender"] = { 70 "user_id": self.data.get("user_id") 71 } 72 73 if "raw_message" in self.data: 74 self.data["message_list"] = self.data["raw_message"].split() 75 76 event = Event(**self.data) 77 self.event = event 78 return event 79 80 def handle(self) -> bool: 81 """ 82 Handle the event. 83 :return: bool 84 """ 85 listeners = ListenerManager.get_listeners_by_type(self.event.type) 86 for _, value in listeners.items(): 87 for listener in value: 88 if listener.permission(self.event): 89 listener.func(self.event) 90 91 def matchCommand(listener: RegisterBase) -> Tuple[bool, Union[None, re.Match]]: 92 raw_message = self.event.raw_message 93 # 如果 regex_mode 是 none,直接匹配前缀 94 if listener.regex_mode == RegexMode.none: 95 if raw_message.startswith(listener.name): 96 return True, None 97 if listener.alias: 98 if any(raw_message.startswith(alias) for alias in listener.alias): 99 return True, None 100 return False, None 101 102 # 定义一个通用的正则匹配器,根据模式选择 re.match 或 re.search 103 def regex_matcher(pattern: str): 104 return (re.match if listener.regex_mode == RegexMode.match else re.search)(pattern, raw_message, 105 listener.regex_flags) 106 107 # 使用正则表达式进行匹配 108 matchRes = regex_matcher(listener.name) 109 if matchRes: 110 return True, matchRes 111 112 if listener.alias: 113 for alias in listener.alias: 114 matchRes = regex_matcher(alias) 115 if matchRes: 116 return True, matchRes 117 118 return False, None 119 120 def callFunction(func, origin_func, listener: RegisterBase, matchRes: Union[None, re.Match] = None): 121 sig = inspect.signature(origin_func) 122 num_params = len(sig.parameters) 123 if num_params == 0: 124 return func() 125 elif num_params == 1: 126 return func(self.event) 127 elif num_params == 2: 128 return func(self.event, listener) 129 elif num_params == 3: 130 return func(self.event, listener, matchRes) 131 132 if self.event.type == "message": 133 break_signal: bool = False 134 listeners = ListenerManager.get_listeners_by_type("command") 135 for key, value in listeners.items(): 136 for listener in value: 137 res, matchRes = matchCommand(listener) 138 if res: 139 break_signal = True 140 if callFunction(listener.permission, listener.permission, listener, matchRes): 141 callFunction(listener.func, listener.origin_func, listener, matchRes) 142 break 143 if break_signal: 144 break 145 if not break_signal: 146 logger.info("No command matched") 147 return True
Handler(event_data: str)
17 def __init__(self, event_data: str) -> None: 18 """ 19 Initialize the Handler. 20 :param event_data: str 事件数据 21 """ 22 self.data = json.loads(event_data) 23 self.event: Event = Event() 24 self.classify() 25 logger.info(f"{self.event.sender.get('nickname', '未知昵称')}({self.event.user_id}): {self.event.raw_message}")
Initialize the Handler.
Parameters
- event_data: str 事件数据
event: pbf.controller.Data.Event
27 def classify(self) -> Event: 28 """ 29 统一OBv11和OBv12数据格式。 30 :return: Event 31 """ 32 if "self" in self.data: 33 self.data["self_id"] = self.data["self"].get("user_id") 34 del self.data["self"] 35 36 if "post_type" in self.data: 37 self.data["type"] = self.data["post_type"] 38 del self.data["post_type"] 39 40 if "message_type" in self.data: 41 self.data["detail_type"] = self.data["message_type"] 42 del self.data["message_type"] 43 44 if "notice_type" in self.data: 45 self.data["detail_type"] = self.data["notice_type"] 46 del self.data["notice_type"] 47 48 if "request_type" in self.data: 49 self.data["detail_type"] = self.data["request_type"] 50 del self.data["request_type"] 51 52 if "meta_event_type" in self.data: 53 self.data["detail_type"] = self.data["meta_event_type"] 54 del self.data["meta_event_type"] 55 56 if "alt_message" in self.data: 57 self.data["raw_message"] = self.data["alt_message"] 58 del self.data["alt_message"] 59 60 if "qq.nickname" in self.data: 61 self.data["sender"] = { 62 "user_id": self.data["user_id"], 63 "nickname": self.data["qq.nickname"], 64 "role": self.data.get("sender", {}).get("role", "member") 65 } 66 del self.data["qq.nickname"] 67 68 if "sender" not in self.data: 69 self.data["sender"] = { 70 "user_id": self.data.get("user_id") 71 } 72 73 if "raw_message" in self.data: 74 self.data["message_list"] = self.data["raw_message"].split() 75 76 event = Event(**self.data) 77 self.event = event 78 return event
统一OBv11和OBv12数据格式。
Returns
Event
def
handle(self) -> bool:
80 def handle(self) -> bool: 81 """ 82 Handle the event. 83 :return: bool 84 """ 85 listeners = ListenerManager.get_listeners_by_type(self.event.type) 86 for _, value in listeners.items(): 87 for listener in value: 88 if listener.permission(self.event): 89 listener.func(self.event) 90 91 def matchCommand(listener: RegisterBase) -> Tuple[bool, Union[None, re.Match]]: 92 raw_message = self.event.raw_message 93 # 如果 regex_mode 是 none,直接匹配前缀 94 if listener.regex_mode == RegexMode.none: 95 if raw_message.startswith(listener.name): 96 return True, None 97 if listener.alias: 98 if any(raw_message.startswith(alias) for alias in listener.alias): 99 return True, None 100 return False, None 101 102 # 定义一个通用的正则匹配器,根据模式选择 re.match 或 re.search 103 def regex_matcher(pattern: str): 104 return (re.match if listener.regex_mode == RegexMode.match else re.search)(pattern, raw_message, 105 listener.regex_flags) 106 107 # 使用正则表达式进行匹配 108 matchRes = regex_matcher(listener.name) 109 if matchRes: 110 return True, matchRes 111 112 if listener.alias: 113 for alias in listener.alias: 114 matchRes = regex_matcher(alias) 115 if matchRes: 116 return True, matchRes 117 118 return False, None 119 120 def callFunction(func, origin_func, listener: RegisterBase, matchRes: Union[None, re.Match] = None): 121 sig = inspect.signature(origin_func) 122 num_params = len(sig.parameters) 123 if num_params == 0: 124 return func() 125 elif num_params == 1: 126 return func(self.event) 127 elif num_params == 2: 128 return func(self.event, listener) 129 elif num_params == 3: 130 return func(self.event, listener, matchRes) 131 132 if self.event.type == "message": 133 break_signal: bool = False 134 listeners = ListenerManager.get_listeners_by_type("command") 135 for key, value in listeners.items(): 136 for listener in value: 137 res, matchRes = matchCommand(listener) 138 if res: 139 break_signal = True 140 if callFunction(listener.permission, listener.permission, listener, matchRes): 141 callFunction(listener.func, listener.origin_func, listener, matchRes) 142 break 143 if break_signal: 144 break 145 if not break_signal: 146 logger.info("No command matched") 147 return True
Handle the event.
Returns
bool