# 1 sqlalchemy 企业级orm框架
# 2 python界的orm框架
-1 django-orm #只能django框架用
-2 peewee # 小型orm框架:https://docs.peewee-orm.com/en/latest/peewee/quickstart.html
-----同步orm框架-----
-3 sqlalchemy # 企业级,使用非常广泛:https://docs.sqlalchemy.org/en/20/orm/quickstart.html # 可以用在flask上,还可以用在fastapi
---中间态----
-4 Tortoise ORM # fastapi用的比较多
# 3 django ,flask---》同步框架--》新版本支持异步
# 4 fastapi,sanic,tornado---》异步框架--》支持同步的写法--》如果在异步框架上写同步
-众多第三方库---》都是同步的--》导致异步框架性能发挥不出来
-redis:aioredis --》redis-py
-mysql:aiomysql --》pymysql
# 5 异步框架上用同步第三方模块,不能发挥最佳性能?
# 1 安装 pip3 install sqlalchemy
- 2.0.30版本
# 2 架构
Engine,框架的引擎
Connection Pooling ,数据库连接池
Dialect,选择连接数据库的DB API种类
SQL Exprression Language,SQL表达式语言
# 3 链接不同数据库
## 3.1 postgresql
# default
engine = create_engine("postgresql://scott:tiger@localhost/mydatabase")
# psycopg2
engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/mydatabase")
# pg8000
engine = create_engine("postgresql+pg8000://scott:tiger@localhost/mydatabase")
## 3.2 Mysql
# default
engine = create_engine("mysql://scott:tiger@localhost/foo")
# mysqlclient (a maintained fork of MySQL-Python)
engine = create_engine("mysql+mysqldb://scott:tiger@localhost/foo")
# PyMySQL
engine = create_engine("mysql+pymysql://scott:tiger@localhost/foo")
## 3.3 oracle
engine = create_engine("oracle://scott:tiger@127.0.0.1:1521/sidname")
engine = create_engine("oracle+cx_oracle://scott:tiger@tnsname")
## 3.4 Microsoft SQL Server
# pyodbc
engine = create_engine("mssql+pyodbc://scott:tiger@mydsn")
# pymssql
engine = create_engine("mssql+pymssql://scott:tiger@hostname:port/dbname")
## 3.5 sqlite
# Unix/Mac - 4 initial slashes in total
engine = create_engine("sqlite:absolute/path/to/foo.db")
# Windows
engine = create_engine("sqlite:///C:\\path\\to\\foo.db")
# Windows alternative using raw string
engine = create_engine(r"sqlite:///C:\path\to\foo.db")
import threading
# 1 创建 engin
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:1234@127.0.0.1:3306/cnblogs?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
#2 原生操作mysql---》用了连接池---》就可以不用dbutils模块了
def task():
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from article"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
for i in range(20):
t = threading.Thread(target=task)
t.start()
import threading
# 1 创建 engin
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:123123@127.0.0.1:3306/school?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
#2 原生操作mysql---》用了连接池---》就可以不用dbutils模块了
def task():
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from info"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
for i in range(20):
t = threading.Thread(target=task)
t.start()
datetime.datetime.now
不能加括号,加了括号,以后永远是当前时间from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
import datetime
###### 第一步:创建基类,以后所有类,都必须继承基类
#### 1 老版本创建基类(不建议)
# from sqlalchemy.ext.declarative import declarative_base
# Base = declarative_base()
#### 2 新版本创建基类
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
### 第二步:写个类,继承
### 第三步:写字段:传统方式,类型方式
class User(Base):
__tablename__='user'
id=Column(Integer,primary_key=True,autoincrement=True)
name = Column(String(32), index=True, nullable=False) # name列varchar32,索引,不可为空
email = Column(String(32), unique=True) # email 列,varchar32,唯一
# datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)
if __name__ == '__main__':
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy001?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
## 第四步:创建表(不能创建库,只能新增或删除表,不能增加删除字段)
# Base.metadata.create_all(engine)
## 第五步:删除表
Base.metadata.drop_all(engine)
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
import datetime
# 新版本创建基类
# 第一步:创建基类,以后所有类,都必须继承基类
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
### 第二步:写个类,继承
### 第三步:写字段:传统方式,类型方式
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True, autoincrement=True)
# name列varchar32,索引,不可为空 nullable=False
name = Column(String(32), index=True, nullable=False)
# email 列,varchar32,唯一
email = Column(String(32), unique=True)
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)
if __name__ == '__main__':
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:123123@127.0.0.1:3306/school?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第四步 创建user表
Base.metadata.create_all(engine)
## 第五步:删除表
# Base.metadata.drop_all(engine)
from models import User
# 第一步:创建engine
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy001?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第二步:创建 session对象---老方式
# from sqlalchemy.orm import sessionmaker
# Session = sessionmaker(bind=engine)
# session=Session()
# 第二步:创建 session对象---新方式(推荐)
from sqlalchemy.orm import Session
session=Session(engine)
# 第三步:使用session对象
# 1 新增
user1=User(name='lqz11',email='333@qq.com')
user2=User(name='lqz12',email='335@qq.com')
# session.add(user)
session.add_all([user1,user2])
session.commit() #提交
session.close()
# 2 搜索--》简单
# res=session.query(User).filter_by(name='lqz').all()
# res=session.query(User).filter_by(id=1).all()
# print(res)
# print(res[0])
# 3 删除
# res=session.query(User).filter_by(name='lqz').delete()
# print(res)
# session.commit()
# 4 修改
# res=session.query(User).filter_by(id=2).update({'name':'ooo'})
# session.commit()
# user=session.query(User).filter_by(id=2).first()
# print(user.id)
# user.name='uuii'
# session.add(user) #[id字段在不在] 如果对象不存在,就是新增,如果对象存在,就是修改
# session.commit()
from models import User
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:123123@127.0.0.1:3306/school?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
from sqlalchemy.orm import Session
session = Session(engine)
res1 = User(name='maojingyi', email='666@qq.com')
res2 = User(name='jingyi', email='969@qq.com')
session.add_all([res1,res2])
session.commit() #提交
session.close()
# 搜索
res = session.query(User).filter_by(name='maojingyi').all()
print(res)
def __repr__(self):
return f"<User(name={self.name}, email={self.email})>"
name='maojingyi'
res=session.query(User).filter_by(name='maojingyi').delete()
print(res)
session.commit()
result = session.query(User).filter(User.id == 1, User.email == '666@qq.com').delete()
#
# session.add(user) #[id字段在不在] 如果对象不存在,就是新增,如果对象存在,就是修改
# session.commit()
# 修改
res = session.query(User).filter_by(id=2).update({'name': 'ooo'})
session.commit()
session.close()
user=session.query(User).filter_by(id=2).first()
print(user.name)
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- huatuo0.com 版权所有 湘ICP备2023021991号-1
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务