【PY模块】Python ORM 模型 SQLAlchemy 使用

ORM 说明

SQLAlchemy 是P ython 中,通过ORM操作数据库的框架。简单点来说,就是帮助我们从烦冗的 SQL语句 中解脱出来,从而不需要再去写原生的SQL语句,只需要用Python的语法来操作对象,就能被自动映射SQL语句

安装 SQLAlchemy:pip3 install SQLAlchemy

使用

本文以 sqlite 数据为例,当然 sqlalchemy 也支持其他数据,MySQL 、PostgreSQL、Oracle、MSSQL ...

连接引擎

任何SQLAlchemy应用程序的开始都是一个Engine对象,此对象充当连接特定数据库中心源,提供被称为connection pool的对于这些数据库连接。

"""
配置连接数据库 database.py
"""
from sqlalchemy import create_engine

DATABASE_URL = 'sqlite:///orm.db'  # sqlite://数据库路径
engine = create_engine(  # 创建引擎
    DATABASE_URL, encoding='utf-8',
    echo=True,  # 引擎将用 repr() 函数记录所有语句及其参数列表到日志
    connect_args={'check_same_thread': False},
    # 【使用 sqlite数据库 才配置】sqlalchemy 是多线程的, 'check_same_thread': False 来让建立的任意线程都可以使用
)

声明映射

Python 中的一个类,对应一个表,类的每个属性 对应 表的字段名

每个映射类都需要继承declarative_base()

"""
配置连接数据库 database.py
"""
from sqlalchemy.orm import declarative_base

Base = declarative_base()

创建两张表,我们不需要事先在数据库创建表

"""
ORM操作的模型类 models.py
"""
from sqlalchemy import Column, Integer, String, Enum, DateTime, func, ForeignKey
from database import Base, engine

class ClassRoom(Base):
    __tablename__ = 'orm_classroom' # 表名
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    classroom_num = Column(Integer, unique=True, nullable=False, comment='几班')
    teacher = Column(String(100), unique=False, nullable=False, comment='班主任')
    student_num = Column(Integer, unique=False, nullable=False, comment='班级人数')

    create_at = Column(DateTime, server_default=func.now(), comment='创建时间')
    update_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')

    def __repr__(self):  # 方便显示
        return f'id={self.id} classroom_num={self.classroom_num} teacher={self.teacher} student_num={self.student_num}'

class Student(Base):
    __tablename__ = 'orm_student' # 表名
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)  # 主键 自增
    name = Column(String(100), unique=False, nullable=False, comment='姓名')  # 字符串 不唯一 不为空 备注
    sex = Column(Enum('男', '女'), comment='性别')
    classroom_num = Column(Integer, ForeignKey('orm_classroom.classroom_num'), comment='所属班级')

    create_at = Column(DateTime, server_default=func.now(), comment='创建时间')
    update_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')

    def __repr__(self):  # 方便显示
        return f'id={self.id} name={self.name} sex={self.sex} classroom_num={self.classroom_num}'

Base.metadata.create_all(engine)  # 创建表

运行 models.py 文件

mark

创建会话

参数 autoflushautocommit 的说明,推荐博客:https://zhuanlan.zhihu.com/p/48994990

"""
配置连接数据库 database.py
"""

# 在 SQLAlchemy 中,CRUD都是通过会话(session)进行的,所以我们必须要先创建会话
SessionLocal = sessionmaker(bind=engine,
                            autoflush=False,  # 关闭 flush() 函数的调用,flush() 作用是发送语句到数据库,但数据库不一定执行写入磁盘操作
                            autocommit=False,  # 关闭 commit() 函数调用,commit() 作用是提交事务,将变更保存到数据库文件
                            expire_on_commit=True)

进行 crud 操作封装

"""
对数据库增删改查 crud.py
"""
"""
对数据库增删改查 crud.py
"""
from sqlalchemy.orm import Session
from storages import models, schemas

# 增
def create_student(db: Session, student: schemas.CreateStudent):
    """传入 BaseMode 实例"""
    db_student = models.Student(**student.dict())
    db.add(db_student)
    db.commit()  # 提交事务
    db.refresh(db_student)  # 刷新
    return db_student

# 增
def create_class_room(db: Session, classroom: schemas.CreateClassRoom):
    """传入 BaseMode 实例"""
    db_classroom = models.Student(**classroom.dict())
    db.add(db_classroom)
    db.commit()  # 提交事务
    db.refresh(db_classroom)  # 刷新
    return db_classroom

# 查
def get_student_by_id(db: Session, student_id: int):
    """通过 id 查询学生表"""
    return db.query(models.Student).filter(models.Student.id == student_id).first()

# 查
def get_student_by_name(db: Session, name: str):
    """通过 name 查询学生表"""
    return db.query(models.Student).filter(models.Student.name == name)

# 删
def del_student_by_id(db: Session, student_id: int):
    student = get_student_by_id(db, student_id)
    db.delete(student)  # 先查询 再删除
    db.commit()
    return student

# 改
def update_student_name_by_id(db: Session, student_id: int, name: str):
    student = get_student_by_id(db, student_id)
    student.name = name  # 查询结果 修改 提交事务
    db.commit()
    return student

使用

"""
主文件 main.py
"""
from storages.database import SessionLocal, Base, engine
from storages.crud import create_student, get_student_by_id, del_student_by_id, update_student_name_by_id
from storages.schemas import CreateStudent

Base.metadata.create_all(engine)  # 创建表

db = SessionLocal()

student = CreateStudent(
    name='Bob',
    sex='男',
    classroom_num=1
)
create_student(db=db, student=student)
print('查询结果', get_student_by_id(db=db, student_id=1))
# print('删除结果', del_student_by_id(db=db, student_id=2))
# print('修改结果', update_student_name_by_id(db=db, student_id=3, name='Aoa'))

查询数据 query() 的使用

query() 可以理解为 SQL的 select

db.query(models.Student).filter() 过滤
db.query(models.Student).filter_by() 根据关键字过滤
db.query(models.Student).all() 返回全部【列表】
db.query(models.Student).first() 返回第一个元素
db.query(models.Student).one() 有且只有一个元素时才正确返回
db.query(models.Student).one_or_none(),类似one,但如果没有找到结果,则不会引发错误
db.query(models.Student).scalar(),调用one方法,并在成功时返回行的第一列
db.query(models.Student).count() 计数
db.query(models.Student).order_by() 排序

filter() 与 filter_by() 过滤

filter() 使用

# 等于
db.query(models.Student).filter(Student.name == 'Bob')

# 不等于
db.query(models.Student).filter(Student.name != 'Bob')

# like和ilike
db.query(models.Student).filter(Student.name.like('%Bob%'))
db.query(models.Student).filter(Student.name.ilike('%Bob%')) # 不区分大小写

# is
db.query(models.Student).filter(Student.name == None)

# is not
db.query(models.Student).filter(Student.name != None)

# and
from sqlalchemy import and_
db.query(models.Student).filter(and_(Student.name == 'Bob', Student.id == 2))
db.query(models.Student).filter(Student.name == 'Bob', Student.fullname == 2)
db.query(models.Student).filter(Student.name == 'Bob').filter(Student.fullname == 2)

# or
from sqlalchemy import or_
db.query(models.Student).filter(or_(Student.name == 'Bob', Student.name == 'Aoa'))

# match
db.query(models.Student).filter(Student.name.match('Bob'))

filter_by() 使用

# id 是模型的字段
db.query(models.Student).filter_by(id=student_id,name = 'Bob').first()

jion() 连接查询

db.query(models.Student).join(models.ClassRoom).\
    filter(\
    models.Student.classroom_num==models.ClassRoom.classroom_num \
    ).all()
发表评论 / Comment

用心评论~