侧边栏壁纸
博主头像
AI研究僧

hycj89@163.com

  • 累计撰写 1,899 篇文章
  • 累计创建 179 个标签
  • 累计收到 1 条评论
标签搜索

目 录CONTENT

文章目录

Flask-SQLAlchemy的安装使用 一对多 多对多join查询

AI研究僧
2023-02-10 / 0 评论 / 0 点赞 / 923 阅读 / 4,542 字

Flask-SQLAlchemy安装及设置

  • SQLALchemy 实际上是对数据库的抽象,让开发者不用直接和 SQL 语句打交道,而是通过 Python 对象来操作数据库,在舍弃一些性能开销的同时,换来的是开发效率的较大提升
  • SQLAlchemy是一个关系型数据库框架,它提供了高层的 ORM 和底层的原生数据库的操作。flask-sqlalchemy 是一个简化了 SQLAlchemy 操作的flask扩展
    文档地址:https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/
    SQLAlchemy文档:https://docs.sqlalchemy.org/en/20/

安装

  • 安装 flask-sqlalchemy
    pip install flask-sqlalchemy
  • 如果连接的是 mysql 数据库,需要安装 pymysql
    pip install pymysql

数据库连接设置

  • 在 Flask-SQLAlchemy 中,数据库使用URL指定,而且程序使用的数据库必须保存到Flask配置对象的 SQLALCHEMY_DATABASE_URI 键中
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'
  • 其他设置:
# 动态追踪修改设置,如未设置只会提示警告
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
#查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
  • 配置完成需要去 MySQL 中创建项目所使用的数据库
$ mysql -uroot -pmysql
$ create database test charset utf8;
  • 其他配置
SQLALCHEMY_DATABASE_URI	用于连接的数据库 URI 。例如:sqlite:////tmp/test.dbmysql://username:password@server/db
SQLALCHEMY_BINDS	一个映射 binds 到连接 URI 的字典。更多 binds 的信息见用 Binds 操作多个数据库。
SQLALCHEMY_ECHO	如果设置为Ture, SQLAlchemy 会记录所有 发给 stderr 的语句,这对调试有用。(打印sql语句)
SQLALCHEMY_RECORD_QUERIES	可以用于显式地禁用或启用查询记录。查询记录 在调试或测试模式自动启用。更多信息见get_debug_queries()。
SQLALCHEMY_NATIVE_UNICODE	可以用于显式禁用原生 unicode 支持。当使用 不合适的指定无编码的数据库默认值时,这对于 一些数据库适配器是必须的(比如 Ubuntu 上 某些版本的 PostgreSQL )。
SQLALCHEMY_POOL_SIZE	数据库连接池的大小。默认是引擎默认值(通常 是 5 )
SQLALCHEMY_POOL_TIMEOUT	设定连接池的连接超时时间。默认是 10 。
SQLALCHEMY_POOL_RECYCLE	多少秒后自动回收连接。这对 MySQL 是必要的, 它默认移除闲置多于 8 小时的连接。注意如果 使用了 MySQL , Flask-SQLALchemy 自动设定 这个值为 2 小时。

连接其他数据库

完整连接 URI 列表请跳转到 SQLAlchemy 下面的文档 (supported database) 。这里给出一些常见的连接字符串。

  • Postgres:
    postgresql://scott:tiger@localhost/mydatabase
  • MySQL:
    mysql://scott:tiger@localhost/mydatabase
    or
    mysql+pymysql://scott:tiger@localhost/mydatabase
  • Oracle:
    oracle://scott:tiger@127.0.0.1:1521/sidname
  • SQLite (注意开头的四个斜线):
    sqlite:////absolute/path/to/foo.db

常用的SQLAlchemy字段类型

image-1675995335275

常用的SQLAl1chemy列选项

image-1675995340878

常用的SQLAlchemy关系选项

image-1675995346415

数据库基本操作

  • 在Flask-SQLAlchemy中,插入、修改、删除操作,均由数据库会话管理。
    • 会话用 db.session 表示。在准备把数据写入数据库前,要先将数据添加到会话中然后调用 commit() 方法提交会话。
  • 在 Flask-SQLAlchemy 中,查询操作是通过 query 对象操作数据。
    • 最基本的查询是返回表中所有数据,可以通过过滤器进行更精确的数据库查询。

在视图函数中定义模型类

from flask import Flask
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)

#设置连接数据库的URL
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:123456@localhost/test'

app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
#查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)

class Role(db.Model):
    # 定义表名,数据库中的真实表名
    __tablename__ = 'roles'
    # 定义列对象,变量名是数据库字段名
    id = db.Column(db.Integer, primary_key=True)  # 整型主键会默认设置自增主键
    name = db.Column(db.String(64), unique=True)
    users = db.relationship('User', backref='role')

    #repr()方法显示一个可读字符串,使用print打印对象时,会输出这里定义的内容
    def __repr__(self):
        return 'Role:%s'% self.name

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True, index=True)
    email = db.Column(db.String(64),unique=True)
    password = db.Column(db.String(64))
    role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))

    def __repr__(self):
        return 'User:%s'%self.name

@app.route("/")
def index():
    return "index page"

if __name__ == '__main__':
    app.run(debug=True)

创建表:

db.create_all()

删除表

db.drop_all()

插入一条数据

ro1 = Role(name='admin')
db.session.add(ro1)
db.session.commit()

再次插入一条数据

ro2 = Role(name='user')
db.session.add(ro2)
db.session.commit()

一次插入多条数据

us1 = User(name='wang',email='wang@163.com',password='123456',role_id=ro1.id)
us2 = User(name='zhang',email='zhang@189.com',password='201512',role_id=ro2.id)
us3 = User(name='chen',email='chen@126.com',password='987654',role_id=ro2.id)
us4 = User(name='zhou',email='zhou@163.com',password='456789',role_id=ro1.id)
us5 = User(name='tang',email='tang@itheima.com',password='158104',role_id=ro2.id)
us6 = User(name='wu',email='wu@gmail.com',password='5623514',role_id=ro2.id)
us7 = User(name='qian',email='qian@gmail.com',password='1543567',role_id=ro1.id)
us8 = User(name='liu',email='liu@itheima.com',password='867322',role_id=ro1.id)
us9 = User(name='li',email='li@163.com',password='4526342',role_id=ro2.id)
us10 = User(name='sun',email='sun@163.com',password='235523',role_id=ro2.id)
db.session.add_all([us1,us2,us3,us4,us5,us6,us7,us8,us9,us10])
db.session.commit()

查询

常用的SQLAlchemy查询过滤器(用于过滤)

filter()	# 把过滤器添加到原查询上,返回一个新查询
filter_by()	# 把等值过滤器添加到原查询上,返回一个新查询
limit	# 使用指定的值限定原查询返回的结果
offset()	# 偏移原查询返回的结果,返回一个新查询
order_by()	# 根据指定条件对原查询结果进行排序,返回一个新查询
group_by()	# 根据指定条件对原查询结果进行分组,返回一个新查询

以上过滤条件可连用,如User.query.filter().offset().order_by().limit()

常用的SQLAlchemy查询执行器(用于取结果)

all()	# 以列表形式返回查询的所有结果
first()	# 返回查询的第一个结果,如果未查到,返回None
first_or_404()	# 返回查询的第一个结果,如果未查到,返回404
get()	# 返回指定主键对应的行,如不存在,返回None
get_or_404()	# 返回指定主键对应的行,如不存在,返回404
count()	# 返回查询结果的数量
paginate()	# 返回一个Paginate对象,它包含指定范围内的结果

查询:filter_by精确查询

返回名字等于wang的所有人

User.query.filter_by(name='wang').all()  # flask-sqlalchemy提供的写法,query无参数

user = db.session.query(User).filter_by(name='wang').all()  # sqlalchemy提供的写法,query有参数

执行sql,就是where中的过滤条件

SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.password AS users_password, users.role_id AS users_role_id 
FROM users 
WHERE users.name = %(name_1)s

first()返回查询到的第一个对象

User.query.first()

sql,可以看到就是limit

SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.password AS users_password, users.role_id AS users_role_id 
FROM users 
 LIMIT %(param_1)s

all()返回查询到的所有对象

User.query.all()
SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.password AS users_password, users.role_id AS users_role_id 
FROM users

filter模糊查询,返回名字结尾字符为g的所有数据

User.query.filter(User.name.endswith('g')).all()
SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.password AS users_password, users.role_id AS users_role_id 
FROM users 
WHERE (users.name LIKE concat('%%', %(name_1)s))

注意concat中的’%%'是python字符串的格式化写法,如:

a= "%%%s%%"%("abc")
实际表示的是
'%abc%'

%(name_1)s也是格式化的写法,如:

print(“I’m %(name)s. I’m %(age)d” % {‘name’:‘Pythontab’, ‘age’:99})

也就是最后的sql会是

SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.password AS users_password, users.role_id AS users_role_id 
FROM users 
WHERE (users.name LIKE concat('%', 'g'))  - 也就是以g结尾

get():参数为主键,如果主键不存在没有返回内容

User.query.get(1)

逻辑非,返回名字不等于wang的所有数据

User.query.filter(User.name!='wang').all()
SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.password AS users_password, users.role_id AS users_role_id 
FROM users 
WHERE users.name != %(name_1)s

not_ 相当于取反

from sqlalchemy import not_
User.query.filter(not_(User.name=='chen')).all()
SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.password AS users_password, users.role_id AS users_role_id 
FROM users 
WHERE users.name != %(name_1)s

本质上与逻辑非没有区别

逻辑与,需要导入and,返回and()条件满足的所有数据

from sqlalchemy import and_
User.query.filter(and_(User.name!='wang',User.email.endswith('163.com'))).all()
SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.password AS users_password, users.role_id AS users_role_id 
FROM users 
WHERE users.name != %(name_1)s AND (users.email LIKE concat('%%', %(email_1)s))

逻辑或,需要导入or_

from sqlalchemy import or_
User.query.filter(or_(User.name!='wang',User.email.endswith('163.com'))).all()
SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.password AS users_password, users.role_id AS users_role_id 
FROM users 
WHERE users.name != %(name_1)s OR (users.email LIKE concat('%%', %(email_1)s))

分组统计group_by

flask-sqlalchemy无法实现

from sqlalchemy import func
# 统计每个角色的用户数,即用户表以role_id分组统计count即可
ss = db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id).all()

如果需要对分组后的数据进行过滤使用having

ss = db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id).having(func.count(User.role_id)>1).all()

执行sql如下:
SELECT users.role_id AS users_role_id, count(users.role_id) AS count_1 FROM users GROUP BY users.role_id HAVING count(users.role_id) > 1

查询数据后删除

user = User.query.first()
db.session.delete(user)
db.session.commit()
User.query.all()

更新数据

user = User.query.first()
user.name = 'dong'
db.session.commit()
User.query.first()

模型之间的关联

一对多

前边定义的Role和User中,一对多的关系核心代码如下

class Role(db.Model):
    ...
    #关键代码
    users = db.relationship('User', backref='role')
    ...

class User(db.Model):
    ...
    # 外键,数据库中真实存在的字段
    role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))

其中realtionship描述了RoleUser的关系,在一对多的关系中,定义在的一方(本例中是一个角色对应多个用户,因此定义在Role类中)。

第一个参数为对应参照的类"User",以字符串输入
第二个参数backref是为类User创建的新属性,这样当有user对象时,可以使用user.role得到该用户对应的Role对象;此外如果有个role对象,要获取对应的所有用户时直接使用role.users即可得到。
还有第三个参数lazy决定了什么时候SQLALchemy从数据库中加载数据(案例中未设置)

  • 如果设置为子查询方式(subquery),则会在加载完Role对象后,就立即加载与其关联的对象,这样会让总查询数量减少,但如果返回的条目数量很多,就会比较慢,设置为 subquery 的话,role.users 返回所有数据列表
  • 也可以设置为动态方式(dynamic),这样关联对象会在被使用的时候再进行加载,并且在返回前进行过滤,如果返回的对象数很多,或者未来会变得很多,那最好采用这种方式。设置为 dynamic 的话,role.users 返回查询对象,并没有做真正的查询,可以利用查询对象做其他逻辑,比如:先排序再返回结果

User类中的role_id不可省略,该字段设置的是外键,关联到roles表的id,当使用user.role时,会使用该字段去得到role对象的所有属性,否则通过user.role_id只能得到id,而不能得到其他属性。

多对多

需求举例:学生网上选课,每个学生可以选择多门课程,同时每门课程也可以被多个学生选择,构成多对多关系。多对多关系描述有一个唯一的点就是:需要添加一张单独的表去记录两张表之间的对应关系,这张表是数据库真实存在的表。

# 中间表,注意中间表使用db.Table,而非继承自db.Model,该表未设置主键
tb_student_course = db.Table('tb_student_course',
                             db.Column('student_id', db.Integer, db.ForeignKey('students.id')),
                             db.Column('course_id', db.Integer, db.ForeignKey('courses.id'))
                             )

# 学生表
class Student(db.Model):
    __tablename__ = "students"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)
    # 关系,secondary指定关联中间表。通过backref使Course类中有student属性
    courses = db.relationship('Course', secondary=tb_student_course,
                              backref='student',
                              lazy='dynamic')

# 课程表
class Course(db.Model):
    __tablename__ = "courses"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)

关联查询示例(一对多):

角色和用户的关系是一对多的关系,一个角色可以有多个用户,一个用户只能属于一个角色。

查询已知角色的所有用户

#查询roles表id为1的角色
ro1 = Role.query.get(1)
#查询该角色的所有用户
ro1.users.all()

查询已知用户所属角色

#查询users表id为3的用户
us1 = User.query.get(3)
#查询用户属于什么角色
us1.role
SELECT roles.id AS roles_id, roles.name AS roles_name 
FROM roles 
WHERE roles.id = %(pk_1)s

以上两个是已知role_id或者user_id,因此未涉及join查询

关联查询示例(多对多)

查询某个学生选修了哪些课程

    stu1 = Student.query.get(1)
    cos = stu1.courses  # Student中有属性courses定义了关系
    for co in cos:
        print(co.name)

执行sql
SELECT courses.id AS courses_id, courses.name AS courses_name FROM courses, tb_student_course WHERE 1 = tb_student_course.student_id AND courses.id = tb_student_course.course_id

查询某个课程有哪些学生选修

    co1 = Course.query.get(1)
    stu = co1.student  # [<Student 1>, <Student 2>, <Student 3>] 学生列表
    for s in stu:
        print(s.name)

执行sql
SELECT students.id AS students_id, students.name AS students_name FROM students, tb_student_course WHERE 1 = tb_student_course.course_id AND students.id = tb_student_course.student_id

join查询

  • 内连接 inner join
# 以下查询结果中只包含user表的字段,不含角色表字段,实践中用途不大。得到的是User对象列表
# 其中User.role_id==Role.id在设置的有外键时,可不指定
d = User.query.join(Role, User.role_id==Role.id).all()

# sql
SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.password AS users_password, users.role_id AS users_role_id 
FROM users INNER JOIN roles ON users.role_id = roles.id

下边查询特定字段,使用with_entities传递要查询的字段信息

# 查询用户角色不为空时,用户的角色名(本质是inner join)
s = User.query.join(Role, User.role_id==Role.id).with_entities(User.id, User.name, User.role_id, Role.name).all()

# 原生sqlalchemy的写法
db.session.query(User.id, User.name, User.role_id, Role.name).join(User, User.role_id==Role.id, isouter=True).all()

# 结果样例,列表嵌套元组
[(6, 'wu', 2, 'user'), (7, 'qian', 1, 'admin'), (8, 'liu', 1, 'admin'), (9, 'li', 2, 'user'), (10, 'sun', 2, 'user')]

# 执行sql如下
SELECT users.id AS users_id, users.name AS users_name, users.role_id AS users_role_id, roles.name AS roles_name 
FROM users INNER JOIN roles ON users.role_id = roles.id
  • 左连接
d = User.query.join(Role, User.role_id==Role.id, isouter=True).with_entities(User.id, User.name, User.role_id, Role.name).all()

# sql
SELECT users.id AS users_id, users.name AS users_name, users.role_id AS users_role_id, roles.name AS roles_name 
FROM users LEFT OUTER JOIN roles ON users.role_id = roles.id
  • 右连接
    与左连接雷同,两表反过来即可

另外有User.query.outerjoin接口表示外连接,但其本质还是join,源码如下:
image-1676017098431

关联查询并非强制要求模型类中设置外键,只要可以通过某个字段进行关联起来,即可使用join查询,只需要在join中指定关联字段即可。

子查询

# 下边代码是在Role和User表各添加了deleted字段,表示是否删除,0未删除,1删除。
# 通过subquery设定子查询
        abc = db.session.query(User.id.label('id'), User.name.label('username'), Role.name.label('rolename'),
                           Role.deleted.label('deleted'), func.IF(Role.deleted==0,1,0).label('snapshot')).\
        join(Role, Role.user_id == User.id, isouter=True).subquery()
        xx = db.session.query(abc.c.id, abc.c.username, func.sum(abc.c.snapshot)).group_by(abc.c.id).all()

执行sql

select userid, username, sum(snp) from (
SELECT users.id userid, users.name username, roles.name rolename,roles.deleted, if(roles.deleted=0,1,0) as snp
 FROM users LEFT OUTER JOIN roles ON roles.user_id = users.id) s GROUP BY s.userid;

常用函数

通常在数据库中使用的聚合函数如sum、count,if函数等都在sqlalchemy.func

from sqlalchemy import func
# 聚合函数
db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id).all()

# if 函数
db.session.query(User.id, func.IF(User.id > 4, 1, 0).label('xx'))  # user id大于4,返回1,否则返回0

其中label用来设置别名

博主关闭了所有页面的评论