pbf.driver.Fastapi

  1import json
  2from typing import Union
  3import traceback
  4import uvicorn
  5import hmac
  6from fastapi import FastAPI, Header, Request
  7
  8from ..utils import scheduler
  9from ..setup import logger, pluginsManager
 10from ..controller.Handler import Handler
 11from ..config import ob_access_token
 12from ..controller.Data import Event
 13from ..controller.Client import Client
 14
 15fastapi_host = "localhost"
 16fastapi_port = 8000
 17fastapi_debug = True
 18
 19description = '''
 20> PigBotFramework is built on FastApi, all APIs are listed below and provide query parameters
 21'''
 22tags_metadata = [
 23    {
 24        "name": "上报接口",
 25        "description": "OneBot(v11/v12)标准上报接口",
 26        "externalDocs": {
 27            "description": "OneBot Docs",
 28            "url": "https://onebot.dev/",
 29        },
 30    },
 31    {
 32        "name": "操作接口",
 33        "description": "操作接口",
 34    },
 35    {
 36        "name": "其他接口",
 37        "description": "其他接口",
 38    },
 39]
 40app = FastAPI(
 41    title="PigBotFramework API",
 42    description=description,
 43    openapi_tags=tags_metadata,
 44    version="5.0.0",
 45    contact={
 46        "name": "xzyStudio",
 47        "url": "https://xzynb.top",
 48        "email": "[email protected]",
 49    },
 50)
 51
 52
 53@app.on_event('shutdown')
 54def app_on_shutdown():
 55    """
 56    关闭时执行的逻辑
 57    :return: None
 58    """
 59    try:
 60        scheduler.shutdown(wait=False)
 61        pluginsManager.clearPlugins()
 62        logger.info('Scheduler shutdown.')
 63    except Exception:
 64        pass
 65
 66
 67async def check_signature(request: Request, access_token: str, X_Signature: Union[str, None],
 68                    Authorization: Union[str, None]):
 69    """
 70    ## 校验签名
 71    - :param `request`: `Request`
 72    - :param `access_token`: `access_token`
 73    - :param `X_Signature`: `X-Signature in header`
 74    - :param `Authorization`: Authorization in header`
 75    - :return: `bool`
 76    """
 77    # sha1校验防伪上报
 78    if X_Signature is not None:
 79        sig = hmac.new(ob_access_token.encode('utf-8'), await request.body(), 'sha1').hexdigest()
 80        received_sig = X_Signature[len('sha1='):]
 81        if sig == received_sig:
 82            return True
 83
 84    # Authorization校验
 85    if Authorization == f'Bearer {ob_access_token}':
 86        return True
 87
 88    # access_token校验
 89    if access_token == ob_access_token:
 90        return True
 91
 92    return False
 93
 94
 95@app.post("/", tags=['上报接口'])
 96async def report(request: Request, access_token: str = None, X_Signature: Union[str, None] = Header(default=None),
 97           Authorization: Union[str, None] = Header(default=None)):
 98    """
 99    ## 上报接口
100    - :param `request`: `Request` 上报数据
101    - :param `access_token`: `str` (可选)access_token
102    - :param `X_Signature`: `str` in header (可选)X-Signature
103    - :param `Authorization`: `str` in header (可选)Authorization
104    - :return: `dict` `{"status": "ok"/"error", "message": str}`
105    """
106    try:
107        if not await check_signature(request, access_token, X_Signature, Authorization):
108            return {'status': 'error', 'message': 'Unauthorized'}
109
110        data = await request.json()
111        logger.info(f"Received data: {data}")
112        Handler(json.dumps(data)).handle()
113        return {'status': 'ok'}
114    except Exception as e:
115        logger.error(f'Crashed: {e}\n{traceback.format_exc()}')
116        return {'status': 'error', 'message': f'Internal Server Error: {e}'}
117
118
119@app.post("/call_api", tags=['操作接口'])
120async def call_api(request: Request, access_token: str = None, X_Signature: Union[str, None] = Header(default=None),
121                   Authorization: Union[str, None] = Header(default=None)):
122    """
123    ## 调用OneBot实现的API
124    - :param `request`: `Request` 请求数据
125    - :param `access_token`: `str` (可选)access_token
126    - :param `X_Signature`: `str` in header (可选)X-Signature
127    - :param `Authorization`: `str` in header (可选)Authorization
128    - :return: `dict` 请求结果
129    """
130    if not await check_signature(request, access_token, X_Signature, Authorization):
131        return {'status': 'error', 'message': 'Unauthorized'}
132
133    data = await request.json()
134    event: Event = Handler(json.dumps(data)).classify()
135    client = Client(event)
136    return client.request(data.get("action"), data.get("data"), data.get("echo"))
137
138
139@app.get("/status", tags=['其他接口'])
140async def ping():
141    """
142    ## Ping!
143    - :return: `dict` `{"status": "ok"}`
144    """
145    return {'status': 'ok'}
146
147
148@app.get("/plugins/get_all", tags=['其他接口'])
149async def get_all_plugins():
150    """
151    ## 获取所有插件
152    - :return: `list` 插件列表
153    """
154    return pluginsManager.getAllPlugins()
155
156
157@app.get("/plugins/enable", tags=['其他接口'])
158async def enable_plugin(plugin: str):
159    """
160    ## 启用插件
161    - :param `plugin`: `str` 插件名
162    - :return: `dict` `{"status": "ok"}`
163    """
164    pluginsManager.enable(plugin)
165    return {'status': 'ok'}
166
167
168@app.get("/plugins/disable", tags=['其他接口'])
169async def disable_plugin(plugin: str):
170    """
171    ## 禁用插件
172    - :param `plugin`: `str` 插件名
173    - :return: `dict` `{"status": "ok"}`
174    """
175    pluginsManager.disable(plugin)
176    return {'status': 'ok'}
177
178
179@app.get("/plugins/load_all", tags=['其他接口'])
180async def load_all_plugins():
181    """
182    ## 装载所有插件
183    :return: `dict` `{"status": "ok"}`
184    """
185    pluginsManager.loadPlugins()
186    return {'status': 'ok'}
187
188
189def start():
190    """
191    启动FastAPI服务
192    :return: None
193    """
194    uvicorn.run(app="pbf.driver.Fastapi:app", host=fastapi_host, port=fastapi_port, reload=fastapi_debug)
fastapi_host = 'localhost'
fastapi_port = 8000
fastapi_debug = True
description = '\n> PigBotFramework is built on FastApi, all APIs are listed below and provide query parameters\n'
tags_metadata = [{'name': '上报接口', 'description': 'OneBot(v11/v12)标准上报接口', 'externalDocs': {'description': 'OneBot Docs', 'url': 'https://onebot.dev/'}}, {'name': '操作接口', 'description': '操作接口'}, {'name': '其他接口', 'description': '其他接口'}]
app = <fastapi.applications.FastAPI object>
@app.on_event('shutdown')
def app_on_shutdown():
54@app.on_event('shutdown')
55def app_on_shutdown():
56    """
57    关闭时执行的逻辑
58    :return: None
59    """
60    try:
61        scheduler.shutdown(wait=False)
62        pluginsManager.clearPlugins()
63        logger.info('Scheduler shutdown.')
64    except Exception:
65        pass

关闭时执行的逻辑

Returns

None

async def check_signature( request: starlette.requests.Request, access_token: str, X_Signature: Union[str, NoneType], Authorization: Union[str, NoneType]):
68async def check_signature(request: Request, access_token: str, X_Signature: Union[str, None],
69                    Authorization: Union[str, None]):
70    """
71    ## 校验签名
72    - :param `request`: `Request`
73    - :param `access_token`: `access_token`
74    - :param `X_Signature`: `X-Signature in header`
75    - :param `Authorization`: Authorization in header`
76    - :return: `bool`
77    """
78    # sha1校验防伪上报
79    if X_Signature is not None:
80        sig = hmac.new(ob_access_token.encode('utf-8'), await request.body(), 'sha1').hexdigest()
81        received_sig = X_Signature[len('sha1='):]
82        if sig == received_sig:
83            return True
84
85    # Authorization校验
86    if Authorization == f'Bearer {ob_access_token}':
87        return True
88
89    # access_token校验
90    if access_token == ob_access_token:
91        return True
92
93    return False

校验签名

  • :param request: Request
  • :param access_token: access_token
  • :param X_Signature: X-Signature in header
  • :param Authorization: Authorization in header`
  • :return: bool
@app.post('/', tags=['上报接口'])
async def report( request: starlette.requests.Request, access_token: str = None, X_Signature: Union[str, NoneType] = Header(None), Authorization: Union[str, NoneType] = Header(None)):
 96@app.post("/", tags=['上报接口'])
 97async def report(request: Request, access_token: str = None, X_Signature: Union[str, None] = Header(default=None),
 98           Authorization: Union[str, None] = Header(default=None)):
 99    """
100    ## 上报接口
101    - :param `request`: `Request` 上报数据
102    - :param `access_token`: `str` (可选)access_token
103    - :param `X_Signature`: `str` in header (可选)X-Signature
104    - :param `Authorization`: `str` in header (可选)Authorization
105    - :return: `dict` `{"status": "ok"/"error", "message": str}`
106    """
107    try:
108        if not await check_signature(request, access_token, X_Signature, Authorization):
109            return {'status': 'error', 'message': 'Unauthorized'}
110
111        data = await request.json()
112        logger.info(f"Received data: {data}")
113        Handler(json.dumps(data)).handle()
114        return {'status': 'ok'}
115    except Exception as e:
116        logger.error(f'Crashed: {e}\n{traceback.format_exc()}')
117        return {'status': 'error', 'message': f'Internal Server Error: {e}'}

上报接口

  • :param request: Request 上报数据
  • :param access_token: str (可选)access_token
  • :param X_Signature: str in header (可选)X-Signature
  • :param Authorization: str in header (可选)Authorization
  • :return: dict {"status": "ok"/"error", "message": str}
@app.post('/call_api', tags=['操作接口'])
async def call_api( request: starlette.requests.Request, access_token: str = None, X_Signature: Union[str, NoneType] = Header(None), Authorization: Union[str, NoneType] = Header(None)):
120@app.post("/call_api", tags=['操作接口'])
121async def call_api(request: Request, access_token: str = None, X_Signature: Union[str, None] = Header(default=None),
122                   Authorization: Union[str, None] = Header(default=None)):
123    """
124    ## 调用OneBot实现的API
125    - :param `request`: `Request` 请求数据
126    - :param `access_token`: `str` (可选)access_token
127    - :param `X_Signature`: `str` in header (可选)X-Signature
128    - :param `Authorization`: `str` in header (可选)Authorization
129    - :return: `dict` 请求结果
130    """
131    if not await check_signature(request, access_token, X_Signature, Authorization):
132        return {'status': 'error', 'message': 'Unauthorized'}
133
134    data = await request.json()
135    event: Event = Handler(json.dumps(data)).classify()
136    client = Client(event)
137    return client.request(data.get("action"), data.get("data"), data.get("echo"))

调用OneBot实现的API

  • :param request: Request 请求数据
  • :param access_token: str (可选)access_token
  • :param X_Signature: str in header (可选)X-Signature
  • :param Authorization: str in header (可选)Authorization
  • :return: dict 请求结果
@app.get('/status', tags=['其他接口'])
async def ping():
140@app.get("/status", tags=['其他接口'])
141async def ping():
142    """
143    ## Ping!
144    - :return: `dict` `{"status": "ok"}`
145    """
146    return {'status': 'ok'}

Ping!

  • :return: dict {"status": "ok"}
@app.get('/plugins/get_all', tags=['其他接口'])
async def get_all_plugins():
149@app.get("/plugins/get_all", tags=['其他接口'])
150async def get_all_plugins():
151    """
152    ## 获取所有插件
153    - :return: `list` 插件列表
154    """
155    return pluginsManager.getAllPlugins()

获取所有插件

  • :return: list 插件列表
@app.get('/plugins/enable', tags=['其他接口'])
async def enable_plugin(plugin: str):
158@app.get("/plugins/enable", tags=['其他接口'])
159async def enable_plugin(plugin: str):
160    """
161    ## 启用插件
162    - :param `plugin`: `str` 插件名
163    - :return: `dict` `{"status": "ok"}`
164    """
165    pluginsManager.enable(plugin)
166    return {'status': 'ok'}

启用插件

  • :param plugin: str 插件名
  • :return: dict {"status": "ok"}
@app.get('/plugins/disable', tags=['其他接口'])
async def disable_plugin(plugin: str):
169@app.get("/plugins/disable", tags=['其他接口'])
170async def disable_plugin(plugin: str):
171    """
172    ## 禁用插件
173    - :param `plugin`: `str` 插件名
174    - :return: `dict` `{"status": "ok"}`
175    """
176    pluginsManager.disable(plugin)
177    return {'status': 'ok'}

禁用插件

  • :param plugin: str 插件名
  • :return: dict {"status": "ok"}
@app.get('/plugins/load_all', tags=['其他接口'])
async def load_all_plugins():
180@app.get("/plugins/load_all", tags=['其他接口'])
181async def load_all_plugins():
182    """
183    ## 装载所有插件
184    :return: `dict` `{"status": "ok"}`
185    """
186    pluginsManager.loadPlugins()
187    return {'status': 'ok'}

装载所有插件

Returns

dict {"status": "ok"}

def start():
190def start():
191    """
192    启动FastAPI服务
193    :return: None
194    """
195    uvicorn.run(app="pbf.driver.Fastapi:app", host=fastapi_host, port=fastapi_port, reload=fastapi_debug)

启动FastAPI服务

Returns

None