
Миграции базы данных на уровне функций с SQLALCHEMY, ALEMBIC и Pytest
7 августа 2025 г.Вступление
Одним из первых барьеров, с которыми сталкивается разработчик во время первоначальной инициализации проекта, является установка конфигурации модульных тестов и базовый набор модульных тестов.
Первый шаг - установить общую стратегию тестирования:
- Структура модульного тестирования - основная библиотека как база для реализации модульных тестов для конкретных приложений;
- Структура тестов - иерархия каталога, тестирование иерархии;
- Последовательность - простой план покрытия кодовой базы с помощью модульных тестов вместе с эволюцией кодовой базы.
Управление миграциями
Подходы
При работе со схемой БД есть 2 наиболее распространенных подхода к управлению миграциями, которые можно использовать в рамках проекта:
- (0) миграции на функцию- мигрировать всю схему БД доголоваи вплоть до нуля на функцию*.* Дорого с точки зрения производительности, особенно принятие большого количества дисковых операций. Пара профессионалов: инициализация простых тестов; Почти полная изоляция теста для функции.
- (1) миграции за сеанс- мигрировать всю схемувверхвыйти на основе файлов пересмотра Alembic; держать схемуза всю сессию, иразрушатьэтоВ конце сессии.Создать и усечь таблицы, индексы, последовательности и типы на каждую тестовую функцию. Эффективно, еще более сложное для настройки.
А(0) миграции на функциюПодход почти никогда не оправдывается в зрелых проектах, поскольку миграционные операции по своей природе дороги из -за изменений диска и метаданных. Следовательно, запуск набора модульных тестов, состоящих из сотен тестовых функций, в конечном итоге превратится в раздражающий процесс.
Тем не менее, для работы с проектом в начальном состоянии я нахожу его великолепным, поскольку он почти идентичен созданию БД от нуля. Кроме того, этот подход хорош для тестирования понижения с помощью DB, заполненного некоторыми данными.
Кодовая база
Если у нас есть классический класс сессий:
import contextlib
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine, AsyncSession
class DBSessionManager:
def __init__(self, postgres_dsn: str):
self._engine: AsyncEngine = create_async_engine(url=postgres_dsn)
@contextlib.asynccontextmanager
async def asessionmaker(self) -> AsyncGenerator[AsyncSession, None]:
async with AsyncSession(self._engine) as s:
yield s
async def close(self):
await self._engine.dispose()
, тогда мы готовы к операциям БД с нашим столом.
Затем давайте определим нашу тестовую таблицу DB и сделаем ее простым:
class Product(Base):
__tablename__ = 'product'
id: Mapped[UUID] = mapped_column(
type_=types.UUID,
primary_key=True,
server_default=text('gen_random_uuid()'),
)
name: Mapped[str] = mapped_column(
type_=types.VARCHAR(100), server_default=text("''")
)
created_at: Mapped[timestamp] = mapped_column(
type_=types.TIMESTAMP,
server_default=text('NOW()'),
)
class Review(Base):
__tablename__ = 'review'
id: Mapped[UUID] = mapped_column(
type_=types.UUID,
primary_key=True,
server_default=text('gen_random_uuid()'),
)
content: Mapped[str] = mapped_column(
type_=types.VARCHAR(1000), server_default=text("''")
)
rating: Mapped[int] = mapped_column(type_=types.DECIMAL(2, 1))
created_at: Mapped[timestamp] = mapped_column(
type_=types.TIMESTAMP,
server_default=text('NOW()'),
)
Тогда нашconftest.py
Файл близок к тому, чтобы быть простым:
from typing import AsyncGenerator
from unittest import mock
import os
import pytest
import pytest_asyncio
from asyncpg.exceptions import DuplicateDatabaseError
from alembic import command
from alembic.config import Config
from sqlalchemy import text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.ext.asyncio import create_async_engine
from project.db.session_manager import DBSessionManager
from project.db.models import Product
@pytest.fixture(autouse=True, scope='session')
def os_environ_patch():
original_connection_string = os.environ['POSTGRES_DSN']
new_environ = {
'POSTGRES_DSN': f'{original_connection_string}_test',
'POSTGRES_DSN_ORIGINAL': original_connection_string,
}
with mock.patch.dict(os.environ, new_environ, clear=False):
yield
@pytest_asyncio.fixture(scope='session')
async def create_test_db(os_environ_patch):
test_db_name = 'example_db_test'
engine = create_async_engine(
os.environ['POSTGRES_DSN_ORIGINAL'],
isolation_level='AUTOCOMMIT',
)
create_db_op = text(f'''CREATE DATABASE "{test_db_name}"''')
drop_db_op = text(f'DROP DATABASE IF EXISTS "{test_db_name}"')
try:
async with engine.begin() as conn:
await conn.execute(create_db_op)
except ProgrammingError as err:
if err.orig and err.orig.pgcode == DuplicateDatabaseError.sqlstate:
async with engine.begin() as conn:
await conn.execute(drop_db_op)
await conn.execute(create_db_op)
yield
async with engine.begin() as conn:
await conn.execute(drop_db_op)
@pytest.fixture
def migrate_db(create_test_db):
config = Config('alembic.ini')
test_db_url = os.environ['POSTGRES_DSN']
config.set_main_option('sqlalchemy.url', test_db_url)
command.upgrade(config, 'head')
yield
command.downgrade(config, 'base')
@pytest_asyncio.fixture
async def db(migrate_db) -> AsyncGenerator[DBSessionManager, None]:
postgres_dsn = os.environ['POSTGRES_DSN']
db_manager = DBSessionManager(postgres_dsn)
yield db_manager
await db_manager.close()
@pytest_asyncio.fixture
async def product_fixture(db: DBSessionManager):
async with db.asessionmaker() as session:
product = Product(name='Test product')
session.add(product)
await session.commit()
await session.refresh(product)
return product
Самые сложные части здесь:
- сохраняя оригинал DB DSN при работе с тестовым DB DSN;
- Создание временного экземпляра DB Test через подключение к исходному БД, чтобы избежать ускорения нашего оригинального БД.
Наконец, наши тесты могут быть идентичны:
@pytest.mark.asyncio
async def test_get_record(db: DBSessionManager, product_fixture: Product):
"""Test reading a single existing Record instance"""
# Prepare
# Do
stmt = select(Product)
async with db.asessionmaker() as s:
result = await s.execute(stmt)
product = result.scalar_one_or_none()
# Check
assert product is not None
assert product.id == product_fixture.id
assert product.name == product_fixture.name
stmt = select(func.count(Product.id)).select_from(Product)
async with db.asessionmaker() as s:
result = await s.execute(stmt)
assert result.scalar_one() == 1
@pytest.mark.asyncio
async def test_create_record(db: DBSessionManager, product_fixture: Product):
"""Test creating a new Record instance"""
# Prepare
stmt = select(func.count(Product.id)).select_from(Product)
async with db.asessionmaker() as s:
result = await s.execute(stmt)
assert result.scalar_one() == 1
new_product_name = 'New product'
# Do
insert_op = insert(Product).values(name=new_product_name)
async with db.asessionmaker() as s:
await s.execute(insert_op)
await s.commit()
# Check
stmt = select(func.count(Product.id)).select_from(Product)
async with db.asessionmaker() as s:
result = await s.execute(stmt)
assert result.scalar_one() == 2
async with db.asessionmaker() as s:
result = await s.execute(
select(Product).order_by(Product.created_at.desc()).limit(1)
)
new_product = result.scalar_one()
assert new_product.name == new_product_name
Ссылки
- GitHubРепозиторий:https://github.com/gencurrent/python-migration-per-cunction-test-setup
Оригинал