轻量级 Python RASP,面向 Flask、FastAPI、Django 三类常见 Web 框架。核心思路是:
- 在请求进入时标记当前请求线程
- 对危险 sink 做全局 Hook
- 运行时判断当前调用是否来自请求线程
- 如果命中请求线程,就记录并 mock;否则走原始实现
当前仓库已经带了三个完整示例项目,分别演示 Flask、FastAPI、Django 下的接入方式和多路由验证。
examples/
├─ flask_demo/
├─ fastapi_demo/
└─ django_demo/
在仓库根目录执行:
pip install -e .如果要运行某个示例,再进入对应目录安装依赖:
pip install -r examples/flask_demo/requirements.txt
pip install -r examples/fastapi_demo/requirements.txt
pip install -r examples/django_demo/requirements.txt位置:
启动:
cd examples/flask_demo
python app.py默认地址:
http://127.0.0.1:5001
接入方式:
from pathlib import Path
from py_rasp import RASP, RASPPolicy
policy = RASPPolicy(
log_file_path="./logs/flask_rasp.log",
allowed_file_roots=(str(Path(__file__).resolve().parent),),
protected_sql_apis=(
"sqlite3.Connection.execute",
"sqlite3.Cursor.execute",
"pymysql.Cursor.execute",
"MySQLdb.Cursor.execute",
"psycopg2.Cursor.execute",
"psycopg.Cursor.execute",
),
)
RASP(policy=policy).patch_flask(app)位置:
启动:
cd examples/fastapi_demo
python app.py或:
cd examples/fastapi_demo
uvicorn app:app --host 0.0.0.0 --port 5002默认地址:
http://127.0.0.1:5002http://127.0.0.1:5002/docs
接入方式:
from pathlib import Path
from py_rasp import RASP, RASPPolicy
policy = RASPPolicy(
log_file_path="./logs/fastapi_rasp.log",
allowed_file_roots=(str(Path(__file__).resolve().parent),),
protected_sql_apis=(
"sqlite3.Connection.execute",
"sqlite3.Cursor.execute",
"pymysql.Cursor.execute",
"MySQLdb.Cursor.execute",
"psycopg2.Cursor.execute",
"psycopg.Cursor.execute",
),
)
RASP(policy=policy).patch_fastapi(app)位置:
启动:
cd examples/django_demo
python manage.py runserver 0.0.0.0:5003默认地址:
http://127.0.0.1:5003
接入方式分两步。
第一步,在 AppConfig.ready() 中安装 RASP:
from py_rasp import RASP, RASPPolicy
policy = RASPPolicy(
log_file_path=os.path.join(
os.path.dirname(__file__), "logs", "django_rasp.log"
),
allowed_file_roots=(str(PROJECT_ROOT / "examples" / "django_demo"),),
protected_sql_apis=(
"sqlite3.Connection.execute",
"sqlite3.Cursor.execute",
"pymysql.Cursor.execute",
"MySQLdb.Cursor.execute",
"psycopg2.Cursor.execute",
"psycopg.Cursor.execute",
),
)
RASP(policy=policy).install()第二步,在 MIDDLEWARE 最前面放入 Django 中间件:
MIDDLEWARE = [
"py_rasp.frameworks.DjangoRASPTracerMiddleware",
"django.middleware.security.SecurityMiddleware",
"django.middleware.common.CommonMiddleware",
]三个示例项目都提供了一组对称路由,用来证明 RASP 已经接入并且确实在请求线程里生效。
| 路由 | 说明 |
|---|---|
GET / |
首页,返回框架名与全部路由 |
GET /health |
健康检查,不触发危险 sink |
GET /demo/cmd |
os.system、os.popen |
GET /demo/subprocess |
subprocess.run、check_output、getoutput |
GET /demo/file-read |
open()、Path.read_text() |
GET /demo/file-write |
open(..., "w")、Path.write_text() |
GET /demo/network |
socket.connect()、socket.create_connection() |
GET /demo/sql |
SQL 注入风格查询与参数化查询对比 |
GET/POST /demo/pickle |
pickle.loads() |
GET/POST /demo/marshal |
marshal.loads() |
GET /demo/eval |
eval()、exec() |
GET /demo/archive |
zipfile.extractall()、shutil.unpack_archive() |
GET /demo/pathlib |
Path.read_bytes()、Path.write_bytes() |
说明:
- Flask 里的
/demo/pickle和/demo/marshal使用POST - FastAPI 和 Django 示例里这两个路由是
GET
curl "http://127.0.0.1:5001/demo/cmd?cmd=whoami"
curl "http://127.0.0.1:5001/demo/file-read?path=C:/Windows/win.ini"
curl -X POST "http://127.0.0.1:5001/demo/pickle"curl "http://127.0.0.1:5002/demo/network?host=8.8.8.8&port=53"
curl "http://127.0.0.1:5002/demo/eval?expr=__import__('os').system('calc')"
curl "http://127.0.0.1:5002/demo/sql?q=admin'%20OR%201=1%20--"curl "http://127.0.0.1:5003/demo/subprocess?cmd=whoami"
curl "http://127.0.0.1:5003/demo/archive"
curl "http://127.0.0.1:5003/demo/pathlib?path=C:/Windows/win.ini"示例项目里通过 allowed_file_roots 控制允许访问的文件根目录。
- Flask / FastAPI
- 允许当前示例目录及其子目录
- Django
- 允许
examples/django_demo目录及其子目录
- 允许
超出这些目录的读取和写入,会在请求线程内被拦截并 mock。
示例项目里通过 protected_sql_apis 指定受保护的 SQL 执行 API。
当前示例配置包含:
sqlite3.Connection.executesqlite3.Cursor.executepymysql.Cursor.executeMySQLdb.Cursor.executepsycopg2.Cursor.executepsycopg.Cursor.execute
三个示例分别把 mock 事件写到:
examples/flask_demo/logs/flask_rasp.logexamples/fastapi_demo/logs/fastapi_rasp.logexamples/django_demo/rasp_demo/logs/django_rasp.log
日志格式为 JSON Lines,只记录真正被 mock 的事件。
如果要分别看每个示例项目的说明,可以继续参考: