pbf.utils.Register

  1import re
  2import time
  3from enum import unique, Enum
  4from functools import wraps
  5from typing import List
  6
  7from ..setup import logger, ListenerManager, pluginsManager
  8from ..error import LimitExceedError
  9
 10
 11def allPermission(*_):  # 默认权限,需要接收两个实参
 12    return True
 13
 14def adminPermission(event):  # 在装饰器中指定的权限,只需要接受`event`参数
 15    role: str = event.sender.get("role", "member")
 16    if role == "admin" or role == "owner":
 17        return True
 18    return False
 19
 20def ownerPermission(event):
 21    if event.sender.get("role", "member") == "owner":
 22        return True
 23    return False
 24
 25def default_callback(event, func):
 26    raise LimitExceedError(f"User {event.user_id} exceeded the limit of {func.__name__}.")
 27
 28
 29class Limit:
 30    def __init__(self, duration, times, user=None, callback=None):
 31        """
 32        限制指令执行频率
 33        :param duration: int 限制时间
 34        :param times: int 限制次数
 35        :param user: str/int/None (可选)用户ID
 36        :param callback: Callable (可选)回调函数
 37        """
 38        self.duration = duration
 39        self.times = times
 40        self.user = user
 41        self.call_count = {}
 42        self.callback = callback or default_callback
 43
 44    def __call__(self, origin_func):
 45        @wraps(origin_func)
 46        def limit_wrapper(*args, **kwargs):
 47            # 假设第一个参数是event,且event有user_id属性
 48            event = args[0] if args else None
 49            user_id = event.user_id if event else None
 50            current_time = time.time()
 51            if self.user is not None:
 52                user_id = self.user
 53
 54            if user_id not in self.call_count:
 55                self.call_count[user_id] = []
 56
 57            # 移除过期的调用记录
 58            self.call_count[user_id] = [
 59                timestamp for timestamp in self.call_count[user_id]
 60                if current_time - timestamp < self.duration
 61            ]
 62
 63            # 检查调用次数
 64            if len(self.call_count[user_id]) < self.times:
 65                self.call_count[user_id].append(current_time)
 66                return origin_func(*args, **kwargs)
 67            else:
 68                # logger.warning(f"User {user_id} exceeded the limit of {origin_func.__name__}.")
 69                return self.callback(event, origin_func)  # 调用回调函数
 70
 71        return limit_wrapper
 72
 73
 74@unique
 75class RegexMode(Enum):
 76    match = "match"
 77    search = "search"
 78    none = "none"
 79
 80
 81class Base:
 82    name: str = 'Command name'
 83    description: str = 'Command description'
 84    permission: callable = allPermission
 85    usage: str = 'Command usage'
 86    alias: List[str] = []
 87    hidden: bool = False
 88    enabled: bool = True
 89    type: str = 'command'
 90    func: callable = None
 91    regex_mode: RegexMode = RegexMode.none
 92    regex_flags = re.I
 93    regex_data = None
 94    origin_func: callable = None
 95
 96    def __init__(self, **kwargs):
 97        """
 98        Initialize the command information.
 99        :param kwargs: **dict
100        """
101        for key, value in kwargs.items():
102            setattr(self, key, value)
103
104    def __call__(self, origin_func):
105        if self.enabled:
106            logger.debug(f"Registering {self.type} `{self.name}`")
107            ListenerManager.add_listener(self.type, pluginsManager.plugin_name, self)
108
109        def try_except_wrapper(*args, **kwargs):
110            try:
111                return origin_func(*args, **kwargs)
112            except Exception as e:
113                logger.error(f"Error in {self.type} `{self.name}`: {e}")
114
115        func = try_except_wrapper
116        self.origin_func = origin_func
117        self.func = func
118
119        @wraps(func)
120        def wrapper(*args, **kwargs):
121            logger.debug(f"<{self.type}> {self.name} called")
122            return func(*args, **kwargs)
123
124        return wrapper
125
126    def __str__(self):
127        return f"<{self.type}Listener \"{self.name}\" call:{self.origin_func.__name__}>"
128
129
130class Command(Base):
131    def __init__(self, **kwargs):
132        super().__init__(**kwargs)
133        self.type = "command"
134
135
136class Message(Base):
137    def __init__(self, **kwargs):
138        super().__init__(**kwargs)
139        self.type = "message"
140
141
142class Notice(Base):
143    def __init__(self, **kwargs):
144        super().__init__(**kwargs)
145        self.type = "notice"
146
147
148class Request(Base):
149    def __init__(self, **kwargs):
150        super().__init__(**kwargs)
151        self.type = "request"
152
153
154class Meta(Base):
155    def __init__(self, **kwargs):
156        super().__init__(**kwargs)
157        self.type = "meta"
def allPermission(*_):
12def allPermission(*_):  # 默认权限,需要接收两个实参
13    return True
def adminPermission(event):
15def adminPermission(event):  # 在装饰器中指定的权限,只需要接受`event`参数
16    role: str = event.sender.get("role", "member")
17    if role == "admin" or role == "owner":
18        return True
19    return False
def ownerPermission(event):
21def ownerPermission(event):
22    if event.sender.get("role", "member") == "owner":
23        return True
24    return False
def default_callback(event, func):
26def default_callback(event, func):
27    raise LimitExceedError(f"User {event.user_id} exceeded the limit of {func.__name__}.")
class Limit:
30class Limit:
31    def __init__(self, duration, times, user=None, callback=None):
32        """
33        限制指令执行频率
34        :param duration: int 限制时间
35        :param times: int 限制次数
36        :param user: str/int/None (可选)用户ID
37        :param callback: Callable (可选)回调函数
38        """
39        self.duration = duration
40        self.times = times
41        self.user = user
42        self.call_count = {}
43        self.callback = callback or default_callback
44
45    def __call__(self, origin_func):
46        @wraps(origin_func)
47        def limit_wrapper(*args, **kwargs):
48            # 假设第一个参数是event,且event有user_id属性
49            event = args[0] if args else None
50            user_id = event.user_id if event else None
51            current_time = time.time()
52            if self.user is not None:
53                user_id = self.user
54
55            if user_id not in self.call_count:
56                self.call_count[user_id] = []
57
58            # 移除过期的调用记录
59            self.call_count[user_id] = [
60                timestamp for timestamp in self.call_count[user_id]
61                if current_time - timestamp < self.duration
62            ]
63
64            # 检查调用次数
65            if len(self.call_count[user_id]) < self.times:
66                self.call_count[user_id].append(current_time)
67                return origin_func(*args, **kwargs)
68            else:
69                # logger.warning(f"User {user_id} exceeded the limit of {origin_func.__name__}.")
70                return self.callback(event, origin_func)  # 调用回调函数
71
72        return limit_wrapper
Limit(duration, times, user=None, callback=None)
31    def __init__(self, duration, times, user=None, callback=None):
32        """
33        限制指令执行频率
34        :param duration: int 限制时间
35        :param times: int 限制次数
36        :param user: str/int/None (可选)用户ID
37        :param callback: Callable (可选)回调函数
38        """
39        self.duration = duration
40        self.times = times
41        self.user = user
42        self.call_count = {}
43        self.callback = callback or default_callback

限制指令执行频率

Parameters
  • duration: int 限制时间
  • times: int 限制次数
  • user: str/int/None (可选)用户ID
  • callback: Callable (可选)回调函数
duration
times
user
call_count
callback
class RegexMode(enum.Enum):
76class RegexMode(Enum):
77    match = "match"
78    search = "search"
79    none = "none"

An enumeration.

match = <RegexMode.match: 'match'>
search = <RegexMode.search: 'search'>
none = <RegexMode.none: 'none'>
Inherited Members
enum.Enum
name
value
class Base:
 82class Base:
 83    name: str = 'Command name'
 84    description: str = 'Command description'
 85    permission: callable = allPermission
 86    usage: str = 'Command usage'
 87    alias: List[str] = []
 88    hidden: bool = False
 89    enabled: bool = True
 90    type: str = 'command'
 91    func: callable = None
 92    regex_mode: RegexMode = RegexMode.none
 93    regex_flags = re.I
 94    regex_data = None
 95    origin_func: callable = None
 96
 97    def __init__(self, **kwargs):
 98        """
 99        Initialize the command information.
100        :param kwargs: **dict
101        """
102        for key, value in kwargs.items():
103            setattr(self, key, value)
104
105    def __call__(self, origin_func):
106        if self.enabled:
107            logger.debug(f"Registering {self.type} `{self.name}`")
108            ListenerManager.add_listener(self.type, pluginsManager.plugin_name, self)
109
110        def try_except_wrapper(*args, **kwargs):
111            try:
112                return origin_func(*args, **kwargs)
113            except Exception as e:
114                logger.error(f"Error in {self.type} `{self.name}`: {e}")
115
116        func = try_except_wrapper
117        self.origin_func = origin_func
118        self.func = func
119
120        @wraps(func)
121        def wrapper(*args, **kwargs):
122            logger.debug(f"<{self.type}> {self.name} called")
123            return func(*args, **kwargs)
124
125        return wrapper
126
127    def __str__(self):
128        return f"<{self.type}Listener \"{self.name}\" call:{self.origin_func.__name__}>"
Base(**kwargs)
 97    def __init__(self, **kwargs):
 98        """
 99        Initialize the command information.
100        :param kwargs: **dict
101        """
102        for key, value in kwargs.items():
103            setattr(self, key, value)

Initialize the command information.

Parameters
  • kwargs: **dict
name: str = 'Command name'
description: str = 'Command description'
def permission(*_):
12def allPermission(*_):  # 默认权限,需要接收两个实参
13    return True
usage: str = 'Command usage'
alias: List[str] = []
hidden: bool = False
enabled: bool = True
type: str = 'command'
func: <built-in function callable> = None
regex_mode: RegexMode = <RegexMode.none: 'none'>
regex_flags = re.IGNORECASE
regex_data = None
origin_func: <built-in function callable> = None
class Command(Base):
131class Command(Base):
132    def __init__(self, **kwargs):
133        super().__init__(**kwargs)
134        self.type = "command"
Command(**kwargs)
132    def __init__(self, **kwargs):
133        super().__init__(**kwargs)
134        self.type = "command"

Initialize the command information.

Parameters
  • kwargs: **dict
type: str = 'command'
class Message(Base):
137class Message(Base):
138    def __init__(self, **kwargs):
139        super().__init__(**kwargs)
140        self.type = "message"
Message(**kwargs)
138    def __init__(self, **kwargs):
139        super().__init__(**kwargs)
140        self.type = "message"

Initialize the command information.

Parameters
  • kwargs: **dict
type: str = 'command'
class Notice(Base):
143class Notice(Base):
144    def __init__(self, **kwargs):
145        super().__init__(**kwargs)
146        self.type = "notice"
Notice(**kwargs)
144    def __init__(self, **kwargs):
145        super().__init__(**kwargs)
146        self.type = "notice"

Initialize the command information.

Parameters
  • kwargs: **dict
type: str = 'command'
class Request(Base):
149class Request(Base):
150    def __init__(self, **kwargs):
151        super().__init__(**kwargs)
152        self.type = "request"
Request(**kwargs)
150    def __init__(self, **kwargs):
151        super().__init__(**kwargs)
152        self.type = "request"

Initialize the command information.

Parameters
  • kwargs: **dict
type: str = 'command'
class Meta(Base):
155class Meta(Base):
156    def __init__(self, **kwargs):
157        super().__init__(**kwargs)
158        self.type = "meta"
Meta(**kwargs)
156    def __init__(self, **kwargs):
157        super().__init__(**kwargs)
158        self.type = "meta"

Initialize the command information.

Parameters
  • kwargs: **dict
type: str = 'command'