Skip to content

搭建查询 SQL 服务


简介

在测试过程中,由于不同的公司的权限管理,导致很多的测试没有直接查询数据库的权限。

uml diagram


被测系统需要具备充分的可测性,如果可测性不足,那么应该区对应的解决方案,以下引用自《Google 测试之道》:

系统或文档中描述的整套系统的可测试性怎样?是否需要新增测试钩子(译注:testing hook,这里指为了测试而增加一些接口,用以显示系统内部状态信息)?如果需要,确保他们也被添加到文档之中。系统的设计是否考虑到易测试性,而为之也做了一些调整?是否可以使用已有的测试框架?预估一下在测试方面我们都需要做哪些工作,并把这部分内容也增加到设计文档中去。


应用价值

  1. 解决由于权限限制带来的可测性不足的问题。
  2. 通过接口的封装,既能很好的解决权限问题,也能满足测试的需求。

解决思路

uml diagram


实现思路

uml diagram


解析 sql 语句

  • 解析 SQL 字符串,获取 WHERE 部分的内容
  • 解析 SQL 字符串完成后,便可以根据内部对于数据库的安全规范,对该字符串的各个部分进行判断是否予以执行。
def parse_query(sql_query):
    # 初始化解析结果
    parsed = sqlparse.parse(sql_query)[0]
    print(f"parsed = {parsed}")
    token_list = parsed.tokens
    print(f"token_list ={token_list},list {[token.value for token in token_list]}")
    # 提取表名和查询条件(这里只做简单解析,生产中应加强SQL检查)
    # if token_list[0].value.upper() == "SELECT":
    if parsed.get_type() == "SELECT":
        # 假设语句格式为 "SELECT * FROM table_name WHERE ... "
        table_name = token_list[6].value  # 第四个 token 是表名
        print(f"table_name = {table_name}")
        # 如果 sql 中包含 WHERE 关键字,则提取 WHERE 后的条件
        conditions = token_list[-1].value.upper().split("WHERE")[-1].strip() if "WHERE" in sql_query.upper() else None
        print(f"conditions = {conditions}")
        return table_name, conditions
    else:
        raise ValueError("只支持查询操作!!!")

执行 sql 操作

def execute_sqlalchemy_query(session, table_name, conditions):
    # 如果是支持查询的表
    if table_name in supported_tables:
        # 查询对应的表
        query = session.query(supported_tables.get(table_name))
        if conditions:
            # 使用 text 直接写入条件
            query = query.filter(text(conditions))
        return query.all()
    else:
        raise ValueError("当前表不提供查询服务!")

路由层

import sqlparse
from flask import Flask, request, jsonify
from sqlalchemy import create_engine, Column, Integer, String, text
from sqlalchemy.orm import declarative_base, sessionmaker, Session

# 声明基类
Base = declarative_base()
# 创建引擎,连接到数据库
engine = create_engine(f'mysql+pymysql://{username}:{pwd}@{ip}:{port}/{database}')
# 创建session对象,绑定到数据库引擎上
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()


class User(Base):

    # 设置数据库表名
    __tablename__ = "user"

    id = Column(Integer, primary_key=True, comment="用户ID")
    username = Column(String(80), unique=True, nullable=False, comment="用户名")
    email = Column(String(120), unique=True, nullable=False, comment="邮箱")
    gender = Column(String(3), comment="性别")

    def __repr__(self):
        return f'<User {self.username}>'


# 支持查询的表
supported_tables = {
    "user": User
}


app = Flask(__name__)


@app.route('/query', methods=['POST'])
def sql_query():
    data = request.json
    sql_query = data.get('sql')

    try:
        table_name, conditions = parse_query(sql_query)

        # 使用数据库会话执行查询
        with app.app_context():
            session = db_session
            results = execute_sqlalchemy_query(session, table_name, conditions)
            print(f"查询结果为 {results}")
            # 关闭 session
            session.close()

        # 格式化输出结果
        result_list = [result.__dict__ for result in results]
        print(f"查询结果列表为 {result_list}")
        for result in result_list:
            # 去掉 SQLAlchemy 默认字段
            result.pop('_sa_instance_state', None)
        print(f"去掉SQLAlchemy默认字段后的 result_list {result_list}")
        return jsonify(result_list)

    except ValueError as e:
        return {"error": str(e)}, 400
    except Exception as e:
        return {"error": "查询操作失败"}, 500


if __name__ == '__main__':
    app.run(debug=True, port=5009)

完整代码

import sqlparse
from flask import Flask, request, jsonify
from sqlalchemy import create_engine, Column, Integer, String, text
from sqlalchemy.orm import declarative_base, sessionmaker, Session

# 声明基类
Base = declarative_base()
# 创建引擎,连接到数据库
engine = create_engine(f'mysql+pymysql://{username}:{pwd}@{ip}:{port}/{database}')
# 创建session对象,绑定到数据库引擎上
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()


class User(Base):

    # 设置数据库表名
    __tablename__ = "user"

    id = Column(Integer, primary_key=True, comment="用户ID")
    username = Column(String(80), unique=True, nullable=False, comment="用户名")
    email = Column(String(120), unique=True, nullable=False, comment="邮箱")
    gender = Column(String(3), comment="性别")

    def __repr__(self):
        return f'<User {self.username}>'


supported_tables = {
    "user": User
}


def parse_query(sql_query):
    # 初始化解析结果
    parsed = sqlparse.parse(sql_query)[0]
    print(f"parsed = {parsed}")
    token_list = parsed.tokens
    print(f"token_list ={token_list},list {[token.value for token in token_list]}")
    # 提取表名和查询条件(这里只做简单解析,生产中应加强SQL检查)
    # if token_list[0].value.upper() == "SELECT":
    if parsed.get_type() == "SELECT":
        # 假设语句格式为 "SELECT * FROM table_name WHERE ... "
        table_name = token_list[6].value  # 第四个 token 是表名
        print(f"table_name = {table_name}")
        # 如果 sql 中包含 WHERE 关键字,则提取 WHERE 后的条件
        conditions = token_list[-1].value.upper().split("WHERE")[-1].strip() if "WHERE" in sql_query.upper() else None
        print(f"conditions = {conditions}")
        return table_name, conditions
    else:
        raise ValueError("只支持查询操作!!!")


def execute_sqlalchemy_query(session, table_name, conditions):
    # 如果是支持查询的表
    if table_name in supported_tables:
        # 查询对应的表
        query = session.query(supported_tables.get(table_name))
        if conditions:
            # 使用 text 直接写入条件
            query = query.filter(text(conditions))
        return query.all()
    else:
        raise ValueError("当前表不提供查询服务!")


app = Flask(__name__)


@app.route('/query', methods=['POST'])
def sql_query():
    data = request.json
    sql_query = data.get('sql')

    try:
        table_name, conditions = parse_query(sql_query)

        # 使用数据库会话执行查询
        with app.app_context():
            session = db_session
            results = execute_sqlalchemy_query(session, table_name, conditions)
            print(f"查询结果为 {results}")
            # 关闭 session
            session.close()

        # 格式化输出结果
        result_list = [result.__dict__ for result in results]
        print(f"查询结果列表为 {result_list}")
        for result in result_list:
            # 去掉 SQLAlchemy 默认字段
            result.pop('_sa_instance_state', None)
        print(f"去掉SQLAlchemy默认字段后的 result_list {result_list}")
        return jsonify(result_list)

    except ValueError as e:
        return {"error": str(e)}, 400
    except Exception as e:
        return {"error": "查询操作失败"}, 500


if __name__ == '__main__':
    app.run(debug=True, port=5009)

总结

搭建查询 SQL 服务,本质需求便是如何在安全限制下,让没有权限的用户访问到数据库。

至于如何对用户的请求进行限制,则需要对用户的 SQL 请求进行一定的解析与判断,避免某些私密数据被外部查询获取,避免数据库被恶意注入,避免数据库数据被删除等。