您好,欢迎来到华佗健康网。
搜索
您的当前位置:首页flask的基本使用

flask的基本使用

来源:华佗健康网

1 sqlalchemy 快速使用

# 1 sqlalchemy 企业级orm框架


# 2 python界的orm框架
	-1 django-orm     #只能django框架用
    -2 peewee          # 小型orm框架:https://docs.peewee-orm.com/en/latest/peewee/quickstart.html
    -----同步orm框架-----
    -3 sqlalchemy     # 企业级,使用非常广泛:https://docs.sqlalchemy.org/en/20/orm/quickstart.html  # 可以用在flask上,还可以用在fastapi
    ---中间态----
    -4 Tortoise ORM  # fastapi用的比较多
    
    
    
# 3 django ,flask---》同步框架--》新版本支持异步
# 4 fastapi,sanic,tornado---》异步框架--》支持同步的写法--》如果在异步框架上写同步
	-众多第三方库---》都是同步的--》导致异步框架性能发挥不出来
    -redis:aioredis  --》redis-py
    -mysql:aiomysql  --》pymysql
    
# 5 异步框架上用同步第三方模块,不能发挥最佳性能?

1.1 快速使用

#  1 安装 pip3 install sqlalchemy   
	- 2.0.30版本
    
    
# 2 架构
    Engine,框架的引擎
    Connection Pooling ,数据库连接池
    Dialect,选择连接数据库的DB API种类 
    SQL Exprression Language,SQL表达式语言
    
# 3 链接不同数据库
## 3.1 postgresql
# default
engine = create_engine("postgresql://scott:tiger@localhost/mydatabase")
# psycopg2
engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/mydatabase")
# pg8000
engine = create_engine("postgresql+pg8000://scott:tiger@localhost/mydatabase")
## 3.2 Mysql
# default
engine = create_engine("mysql://scott:tiger@localhost/foo")
# mysqlclient (a maintained fork of MySQL-Python)
engine = create_engine("mysql+mysqldb://scott:tiger@localhost/foo")
# PyMySQL
engine = create_engine("mysql+pymysql://scott:tiger@localhost/foo")

## 3.3 oracle
engine = create_engine("oracle://scott:tiger@127.0.0.1:1521/sidname")
engine = create_engine("oracle+cx_oracle://scott:tiger@tnsname")

## 3.4 Microsoft SQL Server
# pyodbc
engine = create_engine("mssql+pyodbc://scott:tiger@mydsn")
# pymssql
engine = create_engine("mssql+pymssql://scott:tiger@hostname:port/dbname")

## 3.5 sqlite
# Unix/Mac - 4 initial slashes in total
engine = create_engine("sqlite:absolute/path/to/foo.db")
# Windows
engine = create_engine("sqlite:///C:\\path\\to\\foo.db")
# Windows alternative using raw string
engine = create_engine(r"sqlite:///C:\path\to\foo.db")

2 sqlalchemy 原生操作

import threading
#  1 创建 engin
from sqlalchemy import create_engine
engine = create_engine(
    "mysql+pymysql://root:1234@127.0.0.1:3306/cnblogs?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

#2 原生操作mysql---》用了连接池---》就可以不用dbutils模块了
def task():
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from article"
    )
    result = cursor.fetchall()
    print(result)
    cursor.close()
    conn.close()

for i in range(20):
    t = threading.Thread(target=task)
    t.start()

基本使用

import threading
#  1 创建 engin
from sqlalchemy import create_engine


engine = create_engine(
    "mysql+pymysql://root:123123@127.0.0.1:3306/school?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

#2 原生操作mysql---》用了连接池---》就可以不用dbutils模块了
def task():
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from info"
    )
    result = cursor.fetchall()
    print(result)
    cursor.close()
    conn.close()

for i in range(20):
    t = threading.Thread(target=task)
    t.start()

3 sqlalchemy操作表

3.1 创建删除表

  • datetime.datetime.now不能加括号,加了括号,以后永远是当前时间

from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
import datetime
###### 第一步:创建基类,以后所有类,都必须继承基类

#### 1 老版本创建基类(不建议)
# from sqlalchemy.ext.declarative import declarative_base
# Base = declarative_base()

#### 2 新版本创建基类
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
    pass


### 第二步:写个类,继承
### 第三步:写字段:传统方式,类型方式
class User(Base):
    __tablename__='user'
    id=Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(32), index=True, nullable=False)  # name列varchar32,索引,不可为空
    email = Column(String(32), unique=True)  # email 列,varchar32,唯一
    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)


if __name__ == '__main__':
    from sqlalchemy import create_engine

    engine = create_engine(
        "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy001?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    ## 第四步:创建表(不能创建库,只能新增或删除表,不能增加删除字段)
    # Base.metadata.create_all(engine)

    ## 第五步:删除表
    Base.metadata.drop_all(engine)
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
import datetime

# 新版本创建基类
# 第一步:创建基类,以后所有类,都必须继承基类
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


### 第二步:写个类,继承
### 第三步:写字段:传统方式,类型方式

class User(Base):
    __tablename__ = 'user'

    id = Column(Integer, primary_key=True, autoincrement=True)
    # name列varchar32,索引,不可为空 nullable=False
    name = Column(String(32), index=True, nullable=False)
    # email 列,varchar32,唯一
    email = Column(String(32), unique=True)
    ctime = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)


if __name__ == '__main__':
    from sqlalchemy import create_engine

    engine = create_engine(
        "mysql+pymysql://root:123123@127.0.0.1:3306/school?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 第四步 创建user表
    Base.metadata.create_all(engine)

    ## 第五步:删除表
    # Base.metadata.drop_all(engine)

1.2 增删查改

from models import User
# 第一步:创建engine
from sqlalchemy import create_engine
engine = create_engine(
    "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy001?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 第二步:创建 session对象---老方式
# from sqlalchemy.orm import sessionmaker
# Session = sessionmaker(bind=engine)
# session=Session()
# 第二步:创建 session对象---新方式(推荐)
from sqlalchemy.orm import Session
session=Session(engine)

# 第三步:使用session对象
# 1 新增
user1=User(name='lqz11',email='333@qq.com')
user2=User(name='lqz12',email='335@qq.com')
# session.add(user)
session.add_all([user1,user2])
session.commit()  #提交
session.close()


# 2 搜索--》简单
# res=session.query(User).filter_by(name='lqz').all()
# res=session.query(User).filter_by(id=1).all()
# print(res)
# print(res[0])

# 3 删除
# res=session.query(User).filter_by(name='lqz').delete()
# print(res)
# session.commit()

# 4 修改
# res=session.query(User).filter_by(id=2).update({'name':'ooo'})
# session.commit()
# user=session.query(User).filter_by(id=2).first()
# print(user.id)
# user.name='uuii'
# session.add(user) #[id字段在不在] 如果对象不存在,就是新增,如果对象存在,就是修改
# session.commit()





1.2.1增加

from models import User

from sqlalchemy import create_engine

engine = create_engine(
    "mysql+pymysql://root:123123@127.0.0.1:3306/school?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

from sqlalchemy.orm import Session

session = Session(engine)

res1 = User(name='maojingyi', email='666@qq.com')
res2 = User(name='jingyi', email='969@qq.com')

session.add_all([res1,res2])
session.commit()  #提交
session.close()

1.2.2搜索

# 搜索
res = session.query(User).filter_by(name='maojingyi').all()
print(res)
  • 我们可以定制返回的形式
  • 这个是在models.py的User表中写入
    def __repr__(self):
        return f"<User(name={self.name}, email={self.email})>"

1.2.3删除

  • 删除所有name='maojingyi'
res=session.query(User).filter_by(name='maojingyi').delete()
print(res)
session.commit()
result = session.query(User).filter(User.id == 1, User.email == '666@qq.com').delete()  
# 

1.2.4修改

# session.add(user) #[id字段在不在] 如果对象不存在,就是新增,如果对象存在,就是修改
# session.commit()
# 修改
res = session.query(User).filter_by(id=2).update({'name': 'ooo'})
session.commit()
session.close()
user=session.query(User).filter_by(id=2).first()
print(user.name)

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- huatuo0.com 版权所有 湘ICP备2023021991号-1

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务