pbf.driver.Fastapi
1import json 2from typing import Union 3import traceback 4import uvicorn 5import hmac 6from fastapi import FastAPI, Header, Request 7 8from ..utils import scheduler 9from ..setup import logger, pluginsManager 10from ..controller.Handler import Handler 11from ..config import ob_access_token 12from ..controller.Data import Event 13from ..controller.Client import Client 14 15fastapi_host = "localhost" 16fastapi_port = 8000 17fastapi_debug = True 18 19description = ''' 20> PigBotFramework is built on FastApi, all APIs are listed below and provide query parameters 21''' 22tags_metadata = [ 23 { 24 "name": "上报接口", 25 "description": "OneBot(v11/v12)标准上报接口", 26 "externalDocs": { 27 "description": "OneBot Docs", 28 "url": "https://onebot.dev/", 29 }, 30 }, 31 { 32 "name": "操作接口", 33 "description": "操作接口", 34 }, 35 { 36 "name": "其他接口", 37 "description": "其他接口", 38 }, 39] 40app = FastAPI( 41 title="PigBotFramework API", 42 description=description, 43 openapi_tags=tags_metadata, 44 version="5.0.0", 45 contact={ 46 "name": "xzyStudio", 47 "url": "https://xzynb.top", 48 "email": "[email protected]", 49 }, 50) 51 52 53@app.on_event('shutdown') 54def app_on_shutdown(): 55 """ 56 关闭时执行的逻辑 57 :return: None 58 """ 59 try: 60 scheduler.shutdown(wait=False) 61 pluginsManager.clearPlugins() 62 logger.info('Scheduler shutdown.') 63 except Exception: 64 pass 65 66 67async def check_signature(request: Request, access_token: str, X_Signature: Union[str, None], 68 Authorization: Union[str, None]): 69 """ 70 ## 校验签名 71 - :param `request`: `Request` 72 - :param `access_token`: `access_token` 73 - :param `X_Signature`: `X-Signature in header` 74 - :param `Authorization`: Authorization in header` 75 - :return: `bool` 76 """ 77 # sha1校验防伪上报 78 if X_Signature is not None: 79 sig = hmac.new(ob_access_token.encode('utf-8'), await request.body(), 'sha1').hexdigest() 80 received_sig = X_Signature[len('sha1='):] 81 if sig == received_sig: 82 return True 83 84 # Authorization校验 85 if Authorization == f'Bearer {ob_access_token}': 86 return True 87 88 # access_token校验 89 if access_token == ob_access_token: 90 return True 91 92 return False 93 94 95@app.post("/", tags=['上报接口']) 96async def report(request: Request, access_token: str = None, X_Signature: Union[str, None] = Header(default=None), 97 Authorization: Union[str, None] = Header(default=None)): 98 """ 99 ## 上报接口 100 - :param `request`: `Request` 上报数据 101 - :param `access_token`: `str` (可选)access_token 102 - :param `X_Signature`: `str` in header (可选)X-Signature 103 - :param `Authorization`: `str` in header (可选)Authorization 104 - :return: `dict` `{"status": "ok"/"error", "message": str}` 105 """ 106 try: 107 if not await check_signature(request, access_token, X_Signature, Authorization): 108 return {'status': 'error', 'message': 'Unauthorized'} 109 110 data = await request.json() 111 logger.info(f"Received data: {data}") 112 Handler(json.dumps(data)).handle() 113 return {'status': 'ok'} 114 except Exception as e: 115 logger.error(f'Crashed: {e}\n{traceback.format_exc()}') 116 return {'status': 'error', 'message': f'Internal Server Error: {e}'} 117 118 119@app.post("/call_api", tags=['操作接口']) 120async def call_api(request: Request, access_token: str = None, X_Signature: Union[str, None] = Header(default=None), 121 Authorization: Union[str, None] = Header(default=None)): 122 """ 123 ## 调用OneBot实现的API 124 - :param `request`: `Request` 请求数据 125 - :param `access_token`: `str` (可选)access_token 126 - :param `X_Signature`: `str` in header (可选)X-Signature 127 - :param `Authorization`: `str` in header (可选)Authorization 128 - :return: `dict` 请求结果 129 """ 130 if not await check_signature(request, access_token, X_Signature, Authorization): 131 return {'status': 'error', 'message': 'Unauthorized'} 132 133 data = await request.json() 134 event: Event = Handler(json.dumps(data)).classify() 135 client = Client(event) 136 return client.request(data.get("action"), data.get("data"), data.get("echo")) 137 138 139@app.get("/status", tags=['其他接口']) 140async def ping(): 141 """ 142 ## Ping! 143 - :return: `dict` `{"status": "ok"}` 144 """ 145 return {'status': 'ok'} 146 147 148@app.get("/plugins/get_all", tags=['其他接口']) 149async def get_all_plugins(): 150 """ 151 ## 获取所有插件 152 - :return: `list` 插件列表 153 """ 154 return pluginsManager.getAllPlugins() 155 156 157@app.get("/plugins/enable", tags=['其他接口']) 158async def enable_plugin(plugin: str): 159 """ 160 ## 启用插件 161 - :param `plugin`: `str` 插件名 162 - :return: `dict` `{"status": "ok"}` 163 """ 164 pluginsManager.enable(plugin) 165 return {'status': 'ok'} 166 167 168@app.get("/plugins/disable", tags=['其他接口']) 169async def disable_plugin(plugin: str): 170 """ 171 ## 禁用插件 172 - :param `plugin`: `str` 插件名 173 - :return: `dict` `{"status": "ok"}` 174 """ 175 pluginsManager.disable(plugin) 176 return {'status': 'ok'} 177 178 179@app.get("/plugins/load_all", tags=['其他接口']) 180async def load_all_plugins(): 181 """ 182 ## 装载所有插件 183 :return: `dict` `{"status": "ok"}` 184 """ 185 pluginsManager.loadPlugins() 186 return {'status': 'ok'} 187 188 189def start(): 190 """ 191 启动FastAPI服务 192 :return: None 193 """ 194 uvicorn.run(app="pbf.driver.Fastapi:app", host=fastapi_host, port=fastapi_port, reload=fastapi_debug)
fastapi_host =
'localhost'
fastapi_port =
8000
fastapi_debug =
True
description =
'\n> PigBotFramework is built on FastApi, all APIs are listed below and provide query parameters\n'
app =
<fastapi.applications.FastAPI object>
@app.on_event('shutdown')
def
app_on_shutdown():
54@app.on_event('shutdown') 55def app_on_shutdown(): 56 """ 57 关闭时执行的逻辑 58 :return: None 59 """ 60 try: 61 scheduler.shutdown(wait=False) 62 pluginsManager.clearPlugins() 63 logger.info('Scheduler shutdown.') 64 except Exception: 65 pass
关闭时执行的逻辑
Returns
None
async def
check_signature( request: starlette.requests.Request, access_token: str, X_Signature: Union[str, NoneType], Authorization: Union[str, NoneType]):
68async def check_signature(request: Request, access_token: str, X_Signature: Union[str, None], 69 Authorization: Union[str, None]): 70 """ 71 ## 校验签名 72 - :param `request`: `Request` 73 - :param `access_token`: `access_token` 74 - :param `X_Signature`: `X-Signature in header` 75 - :param `Authorization`: Authorization in header` 76 - :return: `bool` 77 """ 78 # sha1校验防伪上报 79 if X_Signature is not None: 80 sig = hmac.new(ob_access_token.encode('utf-8'), await request.body(), 'sha1').hexdigest() 81 received_sig = X_Signature[len('sha1='):] 82 if sig == received_sig: 83 return True 84 85 # Authorization校验 86 if Authorization == f'Bearer {ob_access_token}': 87 return True 88 89 # access_token校验 90 if access_token == ob_access_token: 91 return True 92 93 return False
校验签名
- :param
request:Request - :param
access_token:access_token - :param
X_Signature:X-Signature in header - :param
Authorization: Authorization in header` - :return:
bool
@app.post('/', tags=['上报接口'])
async def
report( request: starlette.requests.Request, access_token: str = None, X_Signature: Union[str, NoneType] = Header(None), Authorization: Union[str, NoneType] = Header(None)):
96@app.post("/", tags=['上报接口']) 97async def report(request: Request, access_token: str = None, X_Signature: Union[str, None] = Header(default=None), 98 Authorization: Union[str, None] = Header(default=None)): 99 """ 100 ## 上报接口 101 - :param `request`: `Request` 上报数据 102 - :param `access_token`: `str` (可选)access_token 103 - :param `X_Signature`: `str` in header (可选)X-Signature 104 - :param `Authorization`: `str` in header (可选)Authorization 105 - :return: `dict` `{"status": "ok"/"error", "message": str}` 106 """ 107 try: 108 if not await check_signature(request, access_token, X_Signature, Authorization): 109 return {'status': 'error', 'message': 'Unauthorized'} 110 111 data = await request.json() 112 logger.info(f"Received data: {data}") 113 Handler(json.dumps(data)).handle() 114 return {'status': 'ok'} 115 except Exception as e: 116 logger.error(f'Crashed: {e}\n{traceback.format_exc()}') 117 return {'status': 'error', 'message': f'Internal Server Error: {e}'}
上报接口
- :param
request:Request上报数据 - :param
access_token:str(可选)access_token - :param
X_Signature:strin header (可选)X-Signature - :param
Authorization:strin header (可选)Authorization - :return:
dict{"status": "ok"/"error", "message": str}
@app.post('/call_api', tags=['操作接口'])
async def
call_api( request: starlette.requests.Request, access_token: str = None, X_Signature: Union[str, NoneType] = Header(None), Authorization: Union[str, NoneType] = Header(None)):
120@app.post("/call_api", tags=['操作接口']) 121async def call_api(request: Request, access_token: str = None, X_Signature: Union[str, None] = Header(default=None), 122 Authorization: Union[str, None] = Header(default=None)): 123 """ 124 ## 调用OneBot实现的API 125 - :param `request`: `Request` 请求数据 126 - :param `access_token`: `str` (可选)access_token 127 - :param `X_Signature`: `str` in header (可选)X-Signature 128 - :param `Authorization`: `str` in header (可选)Authorization 129 - :return: `dict` 请求结果 130 """ 131 if not await check_signature(request, access_token, X_Signature, Authorization): 132 return {'status': 'error', 'message': 'Unauthorized'} 133 134 data = await request.json() 135 event: Event = Handler(json.dumps(data)).classify() 136 client = Client(event) 137 return client.request(data.get("action"), data.get("data"), data.get("echo"))
调用OneBot实现的API
- :param
request:Request请求数据 - :param
access_token:str(可选)access_token - :param
X_Signature:strin header (可选)X-Signature - :param
Authorization:strin header (可选)Authorization - :return:
dict请求结果
@app.get('/status', tags=['其他接口'])
async def
ping():
140@app.get("/status", tags=['其他接口']) 141async def ping(): 142 """ 143 ## Ping! 144 - :return: `dict` `{"status": "ok"}` 145 """ 146 return {'status': 'ok'}
Ping!
- :return:
dict{"status": "ok"}
@app.get('/plugins/get_all', tags=['其他接口'])
async def
get_all_plugins():
149@app.get("/plugins/get_all", tags=['其他接口']) 150async def get_all_plugins(): 151 """ 152 ## 获取所有插件 153 - :return: `list` 插件列表 154 """ 155 return pluginsManager.getAllPlugins()
获取所有插件
- :return:
list插件列表
@app.get('/plugins/enable', tags=['其他接口'])
async def
enable_plugin(plugin: str):
158@app.get("/plugins/enable", tags=['其他接口']) 159async def enable_plugin(plugin: str): 160 """ 161 ## 启用插件 162 - :param `plugin`: `str` 插件名 163 - :return: `dict` `{"status": "ok"}` 164 """ 165 pluginsManager.enable(plugin) 166 return {'status': 'ok'}
启用插件
- :param
plugin:str插件名 - :return:
dict{"status": "ok"}
@app.get('/plugins/disable', tags=['其他接口'])
async def
disable_plugin(plugin: str):
169@app.get("/plugins/disable", tags=['其他接口']) 170async def disable_plugin(plugin: str): 171 """ 172 ## 禁用插件 173 - :param `plugin`: `str` 插件名 174 - :return: `dict` `{"status": "ok"}` 175 """ 176 pluginsManager.disable(plugin) 177 return {'status': 'ok'}
禁用插件
- :param
plugin:str插件名 - :return:
dict{"status": "ok"}
@app.get('/plugins/load_all', tags=['其他接口'])
async def
load_all_plugins():
180@app.get("/plugins/load_all", tags=['其他接口']) 181async def load_all_plugins(): 182 """ 183 ## 装载所有插件 184 :return: `dict` `{"status": "ok"}` 185 """ 186 pluginsManager.loadPlugins() 187 return {'status': 'ok'}
装载所有插件
Returns
dict{"status": "ok"}
def
start():
190def start(): 191 """ 192 启动FastAPI服务 193 :return: None 194 """ 195 uvicorn.run(app="pbf.driver.Fastapi:app", host=fastapi_host, port=fastapi_port, reload=fastapi_debug)
启动FastAPI服务
Returns
None