搭建查询 SQL 服务
简介
在测试过程中,由于不同的公司的权限管理,导致很多的测试没有直接查询数据库的权限。
被测系统需要具备充分的可测性,如果可测性不足,那么应该区对应的解决方案,以下引用自《Google 测试之道》:
系统或文档中描述的整套系统的可测试性怎样?是否需要新增测试钩子(译注:testing hook,这里指为了测试而增加一些接口,用以显示系统内部状态信息)?如果需要,确保他们也被添加到文档之中。系统的设计是否考虑到易测试性,而为之也做了一些调整?是否可以使用已有的测试框架?预估一下在测试方面我们都需要做哪些工作,并把这部分内容也增加到设计文档中去。
应用价值
- 解决由于权限限制带来的可测性不足的问题。
- 通过接口的封装,既能很好的解决权限问题,也能满足测试的需求。
解决思路
实现思路
解析 sql 语句
- 解析 SQL 字符串,获取 WHERE 部分的内容
- 解析 SQL 字符串完成后,便可以根据内部对于数据库的安全规范,对该字符串的各个部分进行判断是否予以执行。
def parse_query(sql_query):
# 初始化解析结果
parsed = sqlparse.parse(sql_query)[0]
print(f"parsed = {parsed}")
token_list = parsed.tokens
print(f"token_list ={token_list},list {[token.value for token in token_list]}")
# 提取表名和查询条件(这里只做简单解析,生产中应加强SQL检查)
# if token_list[0].value.upper() == "SELECT":
if parsed.get_type() == "SELECT":
# 假设语句格式为 "SELECT * FROM table_name WHERE ... "
table_name = token_list[6].value # 第四个 token 是表名
print(f"table_name = {table_name}")
# 如果 sql 中包含 WHERE 关键字,则提取 WHERE 后的条件
conditions = token_list[-1].value.upper().split("WHERE")[-1].strip() if "WHERE" in sql_query.upper() else None
print(f"conditions = {conditions}")
return table_name, conditions
else:
raise ValueError("只支持查询操作!!!")
执行 sql 操作
def execute_sqlalchemy_query(session, table_name, conditions):
# 如果是支持查询的表
if table_name in supported_tables:
# 查询对应的表
query = session.query(supported_tables.get(table_name))
if conditions:
# 使用 text 直接写入条件
query = query.filter(text(conditions))
return query.all()
else:
raise ValueError("当前表不提供查询服务!")
路由层
import sqlparse
from flask import Flask, request, jsonify
from sqlalchemy import create_engine, Column, Integer, String, text
from sqlalchemy.orm import declarative_base, sessionmaker, Session
# 声明基类
Base = declarative_base()
# 创建引擎,连接到数据库
engine = create_engine(f'mysql+pymysql://{username}:{pwd}@{ip}:{port}/{database}')
# 创建session对象,绑定到数据库引擎上
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()
class User(Base):
# 设置数据库表名
__tablename__ = "user"
id = Column(Integer, primary_key=True, comment="用户ID")
username = Column(String(80), unique=True, nullable=False, comment="用户名")
email = Column(String(120), unique=True, nullable=False, comment="邮箱")
gender = Column(String(3), comment="性别")
def __repr__(self):
return f'<User {self.username}>'
# 支持查询的表
supported_tables = {
"user": User
}
app = Flask(__name__)
@app.route('/query', methods=['POST'])
def sql_query():
data = request.json
sql_query = data.get('sql')
try:
table_name, conditions = parse_query(sql_query)
# 使用数据库会话执行查询
with app.app_context():
session = db_session
results = execute_sqlalchemy_query(session, table_name, conditions)
print(f"查询结果为 {results}")
# 关闭 session
session.close()
# 格式化输出结果
result_list = [result.__dict__ for result in results]
print(f"查询结果列表为 {result_list}")
for result in result_list:
# 去掉 SQLAlchemy 默认字段
result.pop('_sa_instance_state', None)
print(f"去掉SQLAlchemy默认字段后的 result_list {result_list}")
return jsonify(result_list)
except ValueError as e:
return {"error": str(e)}, 400
except Exception as e:
return {"error": "查询操作失败"}, 500
if __name__ == '__main__':
app.run(debug=True, port=5009)
完整代码
import sqlparse
from flask import Flask, request, jsonify
from sqlalchemy import create_engine, Column, Integer, String, text
from sqlalchemy.orm import declarative_base, sessionmaker, Session
# 声明基类
Base = declarative_base()
# 创建引擎,连接到数据库
engine = create_engine(f'mysql+pymysql://{username}:{pwd}@{ip}:{port}/{database}')
# 创建session对象,绑定到数据库引擎上
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()
class User(Base):
# 设置数据库表名
__tablename__ = "user"
id = Column(Integer, primary_key=True, comment="用户ID")
username = Column(String(80), unique=True, nullable=False, comment="用户名")
email = Column(String(120), unique=True, nullable=False, comment="邮箱")
gender = Column(String(3), comment="性别")
def __repr__(self):
return f'<User {self.username}>'
supported_tables = {
"user": User
}
def parse_query(sql_query):
# 初始化解析结果
parsed = sqlparse.parse(sql_query)[0]
print(f"parsed = {parsed}")
token_list = parsed.tokens
print(f"token_list ={token_list},list {[token.value for token in token_list]}")
# 提取表名和查询条件(这里只做简单解析,生产中应加强SQL检查)
# if token_list[0].value.upper() == "SELECT":
if parsed.get_type() == "SELECT":
# 假设语句格式为 "SELECT * FROM table_name WHERE ... "
table_name = token_list[6].value # 第四个 token 是表名
print(f"table_name = {table_name}")
# 如果 sql 中包含 WHERE 关键字,则提取 WHERE 后的条件
conditions = token_list[-1].value.upper().split("WHERE")[-1].strip() if "WHERE" in sql_query.upper() else None
print(f"conditions = {conditions}")
return table_name, conditions
else:
raise ValueError("只支持查询操作!!!")
def execute_sqlalchemy_query(session, table_name, conditions):
# 如果是支持查询的表
if table_name in supported_tables:
# 查询对应的表
query = session.query(supported_tables.get(table_name))
if conditions:
# 使用 text 直接写入条件
query = query.filter(text(conditions))
return query.all()
else:
raise ValueError("当前表不提供查询服务!")
app = Flask(__name__)
@app.route('/query', methods=['POST'])
def sql_query():
data = request.json
sql_query = data.get('sql')
try:
table_name, conditions = parse_query(sql_query)
# 使用数据库会话执行查询
with app.app_context():
session = db_session
results = execute_sqlalchemy_query(session, table_name, conditions)
print(f"查询结果为 {results}")
# 关闭 session
session.close()
# 格式化输出结果
result_list = [result.__dict__ for result in results]
print(f"查询结果列表为 {result_list}")
for result in result_list:
# 去掉 SQLAlchemy 默认字段
result.pop('_sa_instance_state', None)
print(f"去掉SQLAlchemy默认字段后的 result_list {result_list}")
return jsonify(result_list)
except ValueError as e:
return {"error": str(e)}, 400
except Exception as e:
return {"error": "查询操作失败"}, 500
if __name__ == '__main__':
app.run(debug=True, port=5009)
总结
搭建查询 SQL 服务,本质需求便是如何在安全限制下,让没有权限的用户访问到数据库。
至于如何对用户的请求进行限制,则需要对用户的 SQL 请求进行一定的解析与判断,避免某些私密数据被外部查询获取,避免数据库被恶意注入,避免数据库数据被删除等。