Эффективная обработка сеансов с использованием шаблона репозитория в FastAPI
10 июня 2023 г.Я написал статью о пять месяцев назад об адаптации шаблона репозитория с помощью FastAPI, и я получил много прочтений (спасибо). Я пришел написать об эффективном способе обработки сеансов, все еще используя шаблон репозитория.
Прежде чем приступить к делу, я заметил, что в рабочей среде я получаю одну из следующих ошибок всякий раз, когда мои API пытаются выполнить транзакцию, включающую чтение или запись в базу данных:
* sqlalchemy.exc.PendingRollbackError * sqlalchemy.exc.InvalidRequestError
sqlalchemy.exc.PendingRollbackError
Эта ошибка указывает на то, что выполняется незафиксированная транзакция, которую необходимо отменить перед выполнением любых других операций с базой данных.
Наиболее распространенной причиной этой ошибки является необработанное исключение, возникающее во время транзакции базы данных, что препятствует правильному фиксации или откату транзакции.
sqlalchemy.exc.InvalidRequestError
Эта ошибка указывает на то, что операция или запрос, сделанный вами к базе данных, недействительны или не поддерживаются. Эта ошибка может быть вызвана различными причинами, в том числе:
* Неправильное использование SQLAlchemy API, например вызов метода или доступ к атрибуту, который не существует или неприменим в данном контексте.
* Неверный синтаксис SQL или структура запроса.
* Отсутствие или неправильное сопоставление между объектами Python и таблицами/столбцами базы данных.
Я уверен, что у вас есть мысли о том, как можно решить эти ошибки, однако я хотел бы заявить, что, хотя я определил, в чем была проблема, и исправил ее, проблема остается.
Если вам интересно, как я устраняю неполадки и устраняю их, вы можете выполнить следующие шаги:
sqlalchemy.exc.PendingRollbackError:
- Проверьте свой код на наличие необработанных исключений, которые могут помешать правильному подтверждению или откату транзакций. Убедитесь, что вы правильно обрабатываете исключения и при необходимости фиксируете или откатываете транзакцию.
* Проверьте наличие длительных или вложенных транзакций, которые могут привести к ожидающим откатам. Убедитесь, что все транзакции своевременно фиксируются или откатываются.
* Проверьте свой код на наличие случаев, когда вы можете начать новую транзакцию, прежде чем должным образом обработать предыдущую транзакцию.
sqlalchemy.exc.InvalidRequestError:
- Просмотрите конкретное сообщение об ошибке, предоставленное с исключением, чтобы определить причину недопустимого запроса. Это может дать вам подсказки о том, какая часть вашего кода или оператора SQL вызывает проблему.
* Проверьте свой код SQLAlchemy на наличие неправильных вызовов методов, обращений к атрибутам или неправильного использования API.
* Проверьте свои операторы SQL, чтобы убедиться, что они имеют правильный синтаксис и структуру.
* Убедитесь, что ваша схема базы данных и сопоставления SQLAlchemy синхронизированы. Убедитесь, что все необходимые таблицы и столбцы существуют, а ваши объекты Python правильно сопоставлены с соответствующими объектами базы данных.
Давайте перейдем непосредственно к тому, как я работал над постоянным решением, которое доказало свою эффективность. Я продолжу использовать проект, над которым работал, когда демонстрировал, как использовать шаблоны репозитория.
У нас был модуль, в котором мы хранили миксин нашего базового сеанса формы со следующими кодами:
# SQLAlchemy Imports
from sqlalchemy.orm import Session
# Own Imports
from config.database import SessionLocal
from core.settings import ledger_settings
class ORMSessionMixin:
"""Base orm session mixin for interacting with the database."""
def __init__(self):
"""
Get the next database session from the database pool.
"""
self.orm: Session = self.get_db().__next__()
def get_db(self):
"""
This method creates a database session,
yields it, rollback the transaction if there's an exception
and then finally closes the session.
Yields:
db: scoped database session
"""
db = SessionLocal()
try:
yield db
except Exception:
db.rollback()
finally:
db.close()
Проблема с этим решением заключалась в том, что если в процессе транзакции возникает исключение (это может быть что угодно: создание пользователя, пополнение вашего кошелька и т. д.) — исключения не обрабатываются должным образом, а сессия базы данных в пути не прерывается. получить откат.
После трех месяцев отладки и исправления, а также множества исследований я наконец смог создать эффективный способ обработки сеансов.
# SQLAlchemy Imports
import sqlalchemy
from sqlalchemy.orm import Session
# Own Imports
from config.database.connection import SessionLocal
class DatabaseSessionMixin:
"""Database session mixin."""
def __enter__(self) -> Session:
self.db = SessionLocal()
return self.db
def __exit__(self, exc_type, exc_val, exc_tb):
try:
if exc_type is not None:
self.db.rollback()
except sqlalchemy.exc.SQLAlchemyError:
pass
finally:
self.db.close()
SessionLocal.remove()
def use_database_session():
return DatabaseSessionMixin()
В этом коде:
* DatabaseSession
— это класс менеджера контекста, который обрабатывает сеанс и обеспечивает его правильное закрытие и откат в случае ошибки.
* Метод __enter__
инициализирует сеанс и возвращает его.
* Метод __exit__
проверяет наличие исключений и откатывает сеанс, если возникло исключение. Затем он закрывает сеанс и удаляет его из сеанса с заданной областью.
* use_database_session
— это служебная функция, которую можно использовать в качестве декоратора или менеджера контекста для упрощения использования сеанса.
Вот пример использования служебной функции use_database_session:
with use_database_session() as db:
# perform logic that uses the session
# ...
# After exiting the context, the session will be automatically closed and removed from the scoped session.
Описанный выше подход обеспечивает более чистый и эффективный способ обработки сеансов и гарантирует их правильный откат или закрытие в случае ошибки. Давайте перейдем к тому, как вы реализуете шаблон репозитория при использовании сеанса базы данных в ORM.
# SQLAlchemy Imports
import sqlalchemy
from sqlalchemy.orm import Session
class BaseRepository:
def __init__(self, session: Session):
self.db = session
class UserRepository(BaseRepository):
"""Operations to interact with the `users` table in the database."""
def get(self, user_id: int) -> User:
"""This method gets a user from the database."""
user = (
self.db.query(User)
.filter(User.id == user_id)
.first()
)
return user
def create(self, name: str, email: str, password: str) -> User:
"""This method creates a user."""
user = User(name=name, email=email, password=password)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return user
def update_user(self, user_id: int, updated_data: dict):
"""This method updates a user."""
user = self.get(user_id)
if user:
for key, value in updated_data.items():
setattr(user, key, value)
self.db.commit()
return user
return None
def delete_user(self, user_id):
"""This method deletes a user."""
user = self.get_user(user_id)
if user:
self.db.delete(user)
self.db.commit()
return True
return False
Далее нужно будет интегрировать вышеуказанный репозиторий в сервисный уровень вашего приложения. Предположим, у вас есть сервисная функция, которая создает учетную запись пользователя; вот как это сделать с помощью нашего нового метода:
# Apps Imports
from apps.users.models import User
from apps.users.repo import UserRepository
from apps.users.schemas.auth import UserCreate
# Config Imports
from config.security.hashers import password
from config.database.session_mixin import use_database_session
async def create_user(user: UserCreate) -> User:
"""
This function creates a new user in the database.
:param user: schemas.UserCreate
:type user: schemas.UserCreate
:return: The user object
"""
with use_database_session() as db:
users_repo = UserRepository(db)
user = users_repo.create(
user.name,
user.email,
password.hash(user.password)
)
return user
Приведенный выше шаблон позволит вам инкапсулировать операции базы данных в классах репозитория, используя при этом унаследованный сеанс базы данных. Он также обеспечивает четкое разделение между вашими моделями ORM и логикой репозитория.
Заключение
В заключение, эффективная обработка сеансов важна при построении серверных систем.
Такие ошибки, как sqlalchemy.exc.PendingRollbackError
и sqlalchemy.exc.InvalidRequestError
, возникающие во время транзакций базы данных, могут привести к несогласованности данных и сбоям приложений, если их не обработать должным образом.
Выявление и устранение этих ошибок важно для поддержания целостности и надежности системы.
Для решения проблем, связанных с обработкой сеансов, важно реализовать надежные стратегии. Один из подходов заключается в использовании менеджеров контекста, таких как DatabaseSessionMixin
, которые мы продемонстрировали в статье.
Этот диспетчер контекста обеспечивает правильное открытие, закрытие и откат сеансов в случае возникновения исключений. Инкапсулируя логику сеанса в менеджере контекста, вы можете упростить управление сеансом и улучшить обработку ошибок.
Кроме того, интеграция шаблона репозитория в сервисный уровень приложения может еще больше повысить эффективность обработки сеансов.
Разделив операции с базой данных на классы репозитория и используя сеанс, унаследованный от менеджера контекста, вы можете добиться более четкой организации кода и сохранить четкое разделение между моделями ORM и логикой репозитория.
В целом, эффективная обработка сеансов имеет решающее значение для поддержания согласованности данных, предотвращения ошибок и обеспечения стабильности серверных систем.
Следуя рекомендациям, таким как использование менеджеров контекста и внедрение шаблона репозитория, разработчики могут создавать надежные и надежные системы, которые эффективно управляют сеансами и обрабатывают ошибки во время транзакций базы данных.
Я открыт для написания проектов и активно ищу работу по контракту, связанную со сборкой на Python (Django, FastAPI и т. д.).
Оригинал