日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

数据库和ORMS:使用SQLAlchemy与数据库通信

發布時間:2024/7/5 数据库 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 数据库和ORMS:使用SQLAlchemy与数据库通信 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • 1. 環境安裝
    • 2. 使用SQLAlchemy與SQL數據庫通信
      • 2.1 創建表
      • 2.2 連接數據庫
      • 2.3 insert、select
      • 2.4 update、delete
      • 2.5 relationships
      • 2.6 用Alembic進行數據庫遷移

learn from 《Building Data Science Applications with FastAPI》

1. 環境安裝

docker 安裝 MongoDB 服務

docker run -d --name fastapi-mongo -p 27017:27017 mongo:4.4

2. 使用SQLAlchemy與SQL數據庫通信

安裝 pip install databases[sqlite]

2.1 創建表

# models.pyimport sqlalchemy from datetime import datetime from typing import Optional from pydantic import BaseModel, Fieldmetadata = sqlalchemy.MetaData() # 創建元數據對象posts = sqlalchemy.Table( # 創建表對象'posts', # 表名metadata, # 元數據對象# 列對象(列名,類型,其他選項)sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True, autoincrement=True),sqlalchemy.Column('publication_date', sqlalchemy.DateTime(), nullable=False),sqlalchemy.Column('title', sqlalchemy.String(255), nullable=False),sqlalchemy.Column('text', sqlalchemy.Text(), nullable=False), )class PostBase(BaseModel):title: strtext: strpublication_date: datetime = Field(dafault_factory=datetime.now)class PostPartialUpdate(BaseModel):text: Optional[str] = Nonecontent: Optional[str] = Noneclass PostCreate(PostBase):passclass PostDB(PostBase):id: int

2.2 連接數據庫

# _*_ coding: utf-8 _*_ # @Time : 2022/3/8 9:28 # @Author : Michael # @File : database.py # @desc : import sqlalchemy from databases import Database DB_URL = 'sqlite:///cp6_sqlalchemy.db' database = Database(DB_URL) sqlalchemy_engine = sqlalchemy.create_engine(DB_URL)def get_database() -> Database:return database

2.3 insert、select

# _*_ coding: utf-8 _*_ # @Time : 2022/3/8 9:40 # @Author : Michael # @File : app.py # @desc :from typing import List, Tuple import uvicorn from databases import Database from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine from models import metadata, posts, PostDB, PostCreate, PostPartialUpdateapp = FastAPI()@app.on_event('startup') # 啟動的時候執行數據庫連接 async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)@app.on_event("shutdown") # 關閉的時候執行數據庫斷開連接 async def shutdown():await get_database().disconnect()async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0),) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database = Depends(get_database)) -> PostDB:select_query = posts.select().where(posts.c.id == id)raw_post = await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)return PostDB(**raw_post)# 開始插入數據 @app.post("/posts/", response_model=PostDB, status_code=status.HTTP_201_CREATED) async def create_post(post: PostCreate, db: Database = Depends(get_database)) -> PostDB:# 創建插入語句,不必手寫sqlinsert_query = posts.insert().values(post.dict())# 執行插入語句命令post_id = await db.execute(insert_query)post_db = await get_post_or_404(post_id, db)return post_db@app.get("/posts/{id}", response_model=PostDB) async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:return post@app.get("/posts") async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database),) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return resultsif __name__ == '__main__':uvicorn.run(app='app:app', host="127.0.0.1", port=8001, reload=True, debug=True)



2.4 update、delete

# update @app.patch("/posts/{id}", response_model=PostDB) async def update_post(post_update: PostPartialUpdate,post: PostDB = Depends(get_post_or_404),database: Database = Depends(get_database)) -> PostDB:update_query = (posts.update().where(posts.c.id == post.id).values(post_update.dict(exclude_unset=True)))await database.execute(update_query)post_db = await get_post_or_404(post.id, database)return post_db

# delete @app.delete("/posts/{id}",status_code=status.HTTP_204_NO_CONTENT) async def delete_post(post: PostDB = Depends(get_post_or_404),database: Database = Depends(get_database)) -> None:delete_query = posts.delete().where(posts.c.id == post.id)await database.execute(delete_query)

2.5 relationships

models.py 編寫新的表

comments = sqlalchemy.Table("comments",metadata,sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),# 定義連接的外鍵sqlalchemy.Column("post_id", sqlalchemy.ForeignKey("posts.id", ondelete="CASCADE"), nullable=False),sqlalchemy.Column("publication_date", sqlalchemy.DateTime(), nullable=False),sqlalchemy.Column("content", sqlalchemy.Text(), nullable=False), )class CommentBase(BaseModel):post_id: intpublication_date: datetime = Field(default_factory=datetime.now)content: strclass CommentCreate(CommentBase):passclass CommentDB(CommentBase):id: int

app.py 添加內容

from typing import List, Mapping, Tuple, cast from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED) async def create_comment(comment: CommentCreate, database: Database = Depends(get_database) ) -> CommentDB:# 選取post表單數據select_post_query = posts.select().where(posts.c.id == comment.post_id)post = await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {id} does not exist")# 插入comment 語句insert_query = comments.insert().values(comment.dict())comment_id = await database.execute(insert_query)# 查詢 commentselect_query = comments.select().where(comments.c.id == comment_id)raw_comment = cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)


獲取一個post的全部comments

models.py

class PostPublic(PostDB):comments: Optional[List[CommentDB]] = None

app.py

# _*_ coding: utf-8 _*_ # @Time : 2022/3/8 9:40 # @Author : Michael # @File : app.py # @desc :from typing import List, Mapping, Tuple, cast import uvicorn from databases import Database from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB, \PostPublicapp = FastAPI()@app.on_event('startup') # 啟動的時候執行數據庫連接 async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)@app.on_event("shutdown") # 關閉的時候執行數據庫斷開連接 async def shutdown():await get_database().disconnect()async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0), ) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database = Depends(get_database)) -> PostPublic:select_query = posts.select().where(posts.c.id == id)raw_post = await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)# 編號為id的post的所有commentsselect_post_comment_query = comments.select().where(comments.c.post_id == id)raw_comments = await database.fetch_all(select_post_comment_query)comments_list = [CommentDB(**row) for row in raw_comments]return PostPublic(**raw_post, comments=comments_list)# 開始插入數據 @app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED) async def create_post(post: PostCreate, db: Database = Depends(get_database)) -> PostPublic:# 創建插入語句,不必手寫sqlinsert_query = posts.insert().values(post.dict())# 執行插入語句命令post_id = await db.execute(insert_query)post_db = await get_post_or_404(post_id, db)return post_db@app.get("/posts") async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database), ) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return results@app.get("/posts/{id}", response_model=PostPublic) async def get_post(post: PostPublic = Depends(get_post_or_404)) -> PostPublic:return post@app.get("/posts") async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database), ) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return results# update @app.patch("/posts/{id}", response_model=PostPublic) async def update_post(post_update: PostPartialUpdate,post: PostPublic = Depends(get_post_or_404),database: Database = Depends(get_database)) -> PostPublic:update_query = (posts.update().where(posts.c.id == post.id).values(post_update.dict(exclude_unset=True)))await database.execute(update_query)post_db = await get_post_or_404(post.id, database)return post_db# delete @app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_post(post: PostPublic = Depends(get_post_or_404),database: Database = Depends(get_database)) -> None:delete_query = posts.delete().where(posts.c.id == post.id)await database.execute(delete_query)@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED) async def create_comment(comment: CommentCreate, database: Database = Depends(get_database) ) -> CommentDB:select_post_query = posts.select().where(posts.c.id == comment.post_id)post = await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {comment.post_id} does not exist")insert_query = comments.insert().values(comment.dict())comment_id = await database.execute(insert_query)select_query = comments.select().where(comments.c.id == comment_id)raw_comment = cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)if __name__ == '__main__':uvicorn.run(app='app:app', host="127.0.0.1", port=8001, reload=True, debug=True)


2.6 用Alembic進行數據庫遷移

pip install alembic

終端輸入:

alembic init alembic

初始化遷移環境,其中包括一組文件和目錄,Alembic將在其中存儲其配置和遷移文件,需要一起提交 git

在 env.py 中導入元數據

from web_python_dev.sqlalchemy1.models import metadatatarget_metadata = metadata

編輯ini配置

開始遷移

alembic revision --autogenerate -m "Initial migration"

之后會生成一個py文件

該代碼內有兩個函數:upgrade,downgrade用于數據遷移和回滾

# 升級 alembic upgrade head

數據的遷移和升級之前請做好備份和測試,防止丟失損壞
https://alembic.sqlalchemy.org/en/latest/index.html

總結

以上是生活随笔為你收集整理的数据库和ORMS:使用SQLAlchemy与数据库通信的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。