pbf.controller.Handler

  1import json
  2import re
  3import inspect
  4from typing import Union, Tuple
  5
  6from .Data import Event
  7from ..utils.Logging import Logger
  8from ..setup import ListenerManager
  9from ..utils.Register import Base as RegisterBase
 10from ..utils.Register import RegexMode
 11
 12logger = Logger(__name__)
 13
 14
 15class Handler:
 16    def __init__(self, event_data: str) -> None:
 17        """
 18        Initialize the Handler.
 19        :param event_data: str 事件数据
 20        """
 21        self.data = json.loads(event_data)
 22        self.event: Event = Event()
 23        self.classify()
 24        logger.info(f"{self.event.sender.get('nickname', '未知昵称')}({self.event.user_id}): {self.event.raw_message}")
 25
 26    def classify(self) -> Event:
 27        """
 28        统一OBv11和OBv12数据格式。
 29        :return: Event
 30        """
 31        if "self" in self.data:
 32            self.data["self_id"] = self.data["self"].get("user_id")
 33            del self.data["self"]
 34
 35        if "post_type" in self.data:
 36            self.data["type"] = self.data["post_type"]
 37            del self.data["post_type"]
 38
 39        if "message_type" in self.data:
 40            self.data["detail_type"] = self.data["message_type"]
 41            del self.data["message_type"]
 42
 43        if "notice_type" in self.data:
 44            self.data["detail_type"] = self.data["notice_type"]
 45            del self.data["notice_type"]
 46
 47        if "request_type" in self.data:
 48            self.data["detail_type"] = self.data["request_type"]
 49            del self.data["request_type"]
 50
 51        if "meta_event_type" in self.data:
 52            self.data["detail_type"] = self.data["meta_event_type"]
 53            del self.data["meta_event_type"]
 54
 55        if "alt_message" in self.data:
 56            self.data["raw_message"] = self.data["alt_message"]
 57            del self.data["alt_message"]
 58
 59        if "qq.nickname" in self.data:
 60            self.data["sender"] = {
 61                "user_id": self.data["user_id"],
 62                "nickname": self.data["qq.nickname"],
 63                "role": self.data.get("sender", {}).get("role", "member")
 64            }
 65            del self.data["qq.nickname"]
 66
 67        if "sender" not in self.data:
 68            self.data["sender"] = {
 69                "user_id": self.data.get("user_id")
 70            }
 71
 72        if "raw_message" in self.data:
 73            self.data["message_list"] = self.data["raw_message"].split()
 74
 75        event = Event(**self.data)
 76        self.event = event
 77        return event
 78
 79    def handle(self) -> bool:
 80        """
 81        Handle the event.
 82        :return: bool
 83        """
 84        listeners = ListenerManager.get_listeners_by_type(self.event.type)
 85        for _, value in listeners.items():
 86            for listener in value:
 87                if listener.permission(self.event):
 88                    listener.func(self.event)
 89
 90        def matchCommand(listener: RegisterBase) -> Tuple[bool, Union[None, re.Match]]:
 91            raw_message = self.event.raw_message
 92            # 如果 regex_mode 是 none,直接匹配前缀
 93            if listener.regex_mode == RegexMode.none:
 94                if raw_message.startswith(listener.name):
 95                    return True, None
 96                if listener.alias:
 97                    if any(raw_message.startswith(alias) for alias in listener.alias):
 98                        return True, None
 99                return False, None
100
101            # 定义一个通用的正则匹配器,根据模式选择 re.match 或 re.search
102            def regex_matcher(pattern: str):
103                return (re.match if listener.regex_mode == RegexMode.match else re.search)(pattern, raw_message,
104                                                                                           listener.regex_flags)
105
106            # 使用正则表达式进行匹配
107            matchRes = regex_matcher(listener.name)
108            if matchRes:
109                return True, matchRes
110
111            if listener.alias:
112                for alias in listener.alias:
113                    matchRes = regex_matcher(alias)
114                    if matchRes:
115                        return True, matchRes
116
117            return False, None
118
119        def callFunction(func, origin_func, listener: RegisterBase, matchRes: Union[None, re.Match] = None):
120            sig = inspect.signature(origin_func)
121            num_params = len(sig.parameters)
122            if num_params == 0:
123                return func()
124            elif num_params == 1:
125                return func(self.event)
126            elif num_params == 2:
127                return func(self.event, listener)
128            elif num_params == 3:
129                return func(self.event, listener, matchRes)
130
131        if self.event.type == "message":
132            break_signal: bool = False
133            listeners = ListenerManager.get_listeners_by_type("command")
134            for key, value in listeners.items():
135                for listener in value:
136                    res, matchRes = matchCommand(listener)
137                    if res:
138                        break_signal = True
139                        if callFunction(listener.permission, listener.permission, listener, matchRes):
140                            callFunction(listener.func, listener.origin_func, listener, matchRes)
141                        break
142                if break_signal:
143                    break
144            if not break_signal:
145                logger.info("No command matched")
146        return True
logger = <pbf.utils.Logging.Logger object>
class Handler:
 16class Handler:
 17    def __init__(self, event_data: str) -> None:
 18        """
 19        Initialize the Handler.
 20        :param event_data: str 事件数据
 21        """
 22        self.data = json.loads(event_data)
 23        self.event: Event = Event()
 24        self.classify()
 25        logger.info(f"{self.event.sender.get('nickname', '未知昵称')}({self.event.user_id}): {self.event.raw_message}")
 26
 27    def classify(self) -> Event:
 28        """
 29        统一OBv11和OBv12数据格式。
 30        :return: Event
 31        """
 32        if "self" in self.data:
 33            self.data["self_id"] = self.data["self"].get("user_id")
 34            del self.data["self"]
 35
 36        if "post_type" in self.data:
 37            self.data["type"] = self.data["post_type"]
 38            del self.data["post_type"]
 39
 40        if "message_type" in self.data:
 41            self.data["detail_type"] = self.data["message_type"]
 42            del self.data["message_type"]
 43
 44        if "notice_type" in self.data:
 45            self.data["detail_type"] = self.data["notice_type"]
 46            del self.data["notice_type"]
 47
 48        if "request_type" in self.data:
 49            self.data["detail_type"] = self.data["request_type"]
 50            del self.data["request_type"]
 51
 52        if "meta_event_type" in self.data:
 53            self.data["detail_type"] = self.data["meta_event_type"]
 54            del self.data["meta_event_type"]
 55
 56        if "alt_message" in self.data:
 57            self.data["raw_message"] = self.data["alt_message"]
 58            del self.data["alt_message"]
 59
 60        if "qq.nickname" in self.data:
 61            self.data["sender"] = {
 62                "user_id": self.data["user_id"],
 63                "nickname": self.data["qq.nickname"],
 64                "role": self.data.get("sender", {}).get("role", "member")
 65            }
 66            del self.data["qq.nickname"]
 67
 68        if "sender" not in self.data:
 69            self.data["sender"] = {
 70                "user_id": self.data.get("user_id")
 71            }
 72
 73        if "raw_message" in self.data:
 74            self.data["message_list"] = self.data["raw_message"].split()
 75
 76        event = Event(**self.data)
 77        self.event = event
 78        return event
 79
 80    def handle(self) -> bool:
 81        """
 82        Handle the event.
 83        :return: bool
 84        """
 85        listeners = ListenerManager.get_listeners_by_type(self.event.type)
 86        for _, value in listeners.items():
 87            for listener in value:
 88                if listener.permission(self.event):
 89                    listener.func(self.event)
 90
 91        def matchCommand(listener: RegisterBase) -> Tuple[bool, Union[None, re.Match]]:
 92            raw_message = self.event.raw_message
 93            # 如果 regex_mode 是 none,直接匹配前缀
 94            if listener.regex_mode == RegexMode.none:
 95                if raw_message.startswith(listener.name):
 96                    return True, None
 97                if listener.alias:
 98                    if any(raw_message.startswith(alias) for alias in listener.alias):
 99                        return True, None
100                return False, None
101
102            # 定义一个通用的正则匹配器,根据模式选择 re.match 或 re.search
103            def regex_matcher(pattern: str):
104                return (re.match if listener.regex_mode == RegexMode.match else re.search)(pattern, raw_message,
105                                                                                           listener.regex_flags)
106
107            # 使用正则表达式进行匹配
108            matchRes = regex_matcher(listener.name)
109            if matchRes:
110                return True, matchRes
111
112            if listener.alias:
113                for alias in listener.alias:
114                    matchRes = regex_matcher(alias)
115                    if matchRes:
116                        return True, matchRes
117
118            return False, None
119
120        def callFunction(func, origin_func, listener: RegisterBase, matchRes: Union[None, re.Match] = None):
121            sig = inspect.signature(origin_func)
122            num_params = len(sig.parameters)
123            if num_params == 0:
124                return func()
125            elif num_params == 1:
126                return func(self.event)
127            elif num_params == 2:
128                return func(self.event, listener)
129            elif num_params == 3:
130                return func(self.event, listener, matchRes)
131
132        if self.event.type == "message":
133            break_signal: bool = False
134            listeners = ListenerManager.get_listeners_by_type("command")
135            for key, value in listeners.items():
136                for listener in value:
137                    res, matchRes = matchCommand(listener)
138                    if res:
139                        break_signal = True
140                        if callFunction(listener.permission, listener.permission, listener, matchRes):
141                            callFunction(listener.func, listener.origin_func, listener, matchRes)
142                        break
143                if break_signal:
144                    break
145            if not break_signal:
146                logger.info("No command matched")
147        return True
Handler(event_data: str)
17    def __init__(self, event_data: str) -> None:
18        """
19        Initialize the Handler.
20        :param event_data: str 事件数据
21        """
22        self.data = json.loads(event_data)
23        self.event: Event = Event()
24        self.classify()
25        logger.info(f"{self.event.sender.get('nickname', '未知昵称')}({self.event.user_id}): {self.event.raw_message}")

Initialize the Handler.

Parameters
  • event_data: str 事件数据
data
def classify(self) -> pbf.controller.Data.Event:
27    def classify(self) -> Event:
28        """
29        统一OBv11和OBv12数据格式。
30        :return: Event
31        """
32        if "self" in self.data:
33            self.data["self_id"] = self.data["self"].get("user_id")
34            del self.data["self"]
35
36        if "post_type" in self.data:
37            self.data["type"] = self.data["post_type"]
38            del self.data["post_type"]
39
40        if "message_type" in self.data:
41            self.data["detail_type"] = self.data["message_type"]
42            del self.data["message_type"]
43
44        if "notice_type" in self.data:
45            self.data["detail_type"] = self.data["notice_type"]
46            del self.data["notice_type"]
47
48        if "request_type" in self.data:
49            self.data["detail_type"] = self.data["request_type"]
50            del self.data["request_type"]
51
52        if "meta_event_type" in self.data:
53            self.data["detail_type"] = self.data["meta_event_type"]
54            del self.data["meta_event_type"]
55
56        if "alt_message" in self.data:
57            self.data["raw_message"] = self.data["alt_message"]
58            del self.data["alt_message"]
59
60        if "qq.nickname" in self.data:
61            self.data["sender"] = {
62                "user_id": self.data["user_id"],
63                "nickname": self.data["qq.nickname"],
64                "role": self.data.get("sender", {}).get("role", "member")
65            }
66            del self.data["qq.nickname"]
67
68        if "sender" not in self.data:
69            self.data["sender"] = {
70                "user_id": self.data.get("user_id")
71            }
72
73        if "raw_message" in self.data:
74            self.data["message_list"] = self.data["raw_message"].split()
75
76        event = Event(**self.data)
77        self.event = event
78        return event

统一OBv11和OBv12数据格式。

Returns

Event

def handle(self) -> bool:
 80    def handle(self) -> bool:
 81        """
 82        Handle the event.
 83        :return: bool
 84        """
 85        listeners = ListenerManager.get_listeners_by_type(self.event.type)
 86        for _, value in listeners.items():
 87            for listener in value:
 88                if listener.permission(self.event):
 89                    listener.func(self.event)
 90
 91        def matchCommand(listener: RegisterBase) -> Tuple[bool, Union[None, re.Match]]:
 92            raw_message = self.event.raw_message
 93            # 如果 regex_mode 是 none,直接匹配前缀
 94            if listener.regex_mode == RegexMode.none:
 95                if raw_message.startswith(listener.name):
 96                    return True, None
 97                if listener.alias:
 98                    if any(raw_message.startswith(alias) for alias in listener.alias):
 99                        return True, None
100                return False, None
101
102            # 定义一个通用的正则匹配器,根据模式选择 re.match 或 re.search
103            def regex_matcher(pattern: str):
104                return (re.match if listener.regex_mode == RegexMode.match else re.search)(pattern, raw_message,
105                                                                                           listener.regex_flags)
106
107            # 使用正则表达式进行匹配
108            matchRes = regex_matcher(listener.name)
109            if matchRes:
110                return True, matchRes
111
112            if listener.alias:
113                for alias in listener.alias:
114                    matchRes = regex_matcher(alias)
115                    if matchRes:
116                        return True, matchRes
117
118            return False, None
119
120        def callFunction(func, origin_func, listener: RegisterBase, matchRes: Union[None, re.Match] = None):
121            sig = inspect.signature(origin_func)
122            num_params = len(sig.parameters)
123            if num_params == 0:
124                return func()
125            elif num_params == 1:
126                return func(self.event)
127            elif num_params == 2:
128                return func(self.event, listener)
129            elif num_params == 3:
130                return func(self.event, listener, matchRes)
131
132        if self.event.type == "message":
133            break_signal: bool = False
134            listeners = ListenerManager.get_listeners_by_type("command")
135            for key, value in listeners.items():
136                for listener in value:
137                    res, matchRes = matchCommand(listener)
138                    if res:
139                        break_signal = True
140                        if callFunction(listener.permission, listener.permission, listener, matchRes):
141                            callFunction(listener.func, listener.origin_func, listener, matchRes)
142                        break
143                if break_signal:
144                    break
145            if not break_signal:
146                logger.info("No command matched")
147        return True

Handle the event.

Returns

bool