Миграции базы данных на уровне функций с SQLALCHEMY, ALEMBIC и Pytest

Миграции базы данных на уровне функций с SQLALCHEMY, ALEMBIC и Pytest

7 августа 2025 г.

Вступление

Одним из первых барьеров, с которыми сталкивается разработчик во время первоначальной инициализации проекта, является установка конфигурации модульных тестов и базовый набор модульных тестов.

Первый шаг - установить общую стратегию тестирования:

  • Структура модульного тестирования - основная библиотека как база для реализации модульных тестов для конкретных приложений;
  • Структура тестов - иерархия каталога, тестирование иерархии;
  • Последовательность - простой план покрытия кодовой базы с помощью модульных тестов вместе с эволюцией кодовой базы.

Управление миграциями

Подходы

При работе со схемой БД есть 2 наиболее распространенных подхода к управлению миграциями, которые можно использовать в рамках проекта:

  • (0) миграции на функцию- мигрировать всю схему БД доголоваи вплоть до нуля на функцию*.* Дорого с точки зрения производительности, особенно принятие большого количества дисковых операций. Пара профессионалов: инициализация простых тестов; Почти полная изоляция теста для функции.
  • (1) миграции за сеанс- мигрировать всю схемувверхвыйти на основе файлов пересмотра Alembic; держать схемуза всю сессию, иразрушатьэтоВ конце сессии.Создать и усечь таблицы, индексы, последовательности и типы на каждую тестовую функцию. Эффективно, еще более сложное для настройки.

А(0) миграции на функциюПодход почти никогда не оправдывается в зрелых проектах, поскольку миграционные операции по своей природе дороги из -за изменений диска и метаданных. Следовательно, запуск набора модульных тестов, состоящих из сотен тестовых функций, в конечном итоге превратится в раздражающий процесс.

Тем не менее, для работы с проектом в начальном состоянии я нахожу его великолепным, поскольку он почти идентичен созданию БД от нуля. Кроме того, этот подход хорош для тестирования понижения с помощью DB, заполненного некоторыми данными.

Кодовая база

Если у нас есть классический класс сессий:

import contextlib
from typing import AsyncGenerator

from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine, AsyncSession


class DBSessionManager:
    def __init__(self, postgres_dsn: str):
        self._engine: AsyncEngine = create_async_engine(url=postgres_dsn)

    @contextlib.asynccontextmanager
    async def asessionmaker(self) -> AsyncGenerator[AsyncSession, None]:
        async with AsyncSession(self._engine) as s:
            yield s

    async def close(self):
        await self._engine.dispose()

, тогда мы готовы к операциям БД с нашим столом.

Затем давайте определим нашу тестовую таблицу DB и сделаем ее простым:

class Product(Base):
    __tablename__ = 'product'

    id: Mapped[UUID] = mapped_column(
        type_=types.UUID,
        primary_key=True,
        server_default=text('gen_random_uuid()'),
    )
    name: Mapped[str] = mapped_column(
        type_=types.VARCHAR(100), server_default=text("''")
    )
    created_at: Mapped[timestamp] = mapped_column(
        type_=types.TIMESTAMP,
        server_default=text('NOW()'),
    )


class Review(Base):
    __tablename__ = 'review'

    id: Mapped[UUID] = mapped_column(
        type_=types.UUID,
        primary_key=True,
        server_default=text('gen_random_uuid()'),
    )
    content: Mapped[str] = mapped_column(
        type_=types.VARCHAR(1000), server_default=text("''")
    )
    rating: Mapped[int] = mapped_column(type_=types.DECIMAL(2, 1))
    created_at: Mapped[timestamp] = mapped_column(
        type_=types.TIMESTAMP,
        server_default=text('NOW()'),
    )

Тогда нашconftest.pyФайл близок к тому, чтобы быть простым:

from typing import AsyncGenerator
from unittest import mock
import os


import pytest
import pytest_asyncio
from asyncpg.exceptions import DuplicateDatabaseError
from alembic import command
from alembic.config import Config
from sqlalchemy import text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.ext.asyncio import create_async_engine


from project.db.session_manager import DBSessionManager
from project.db.models import Product


@pytest.fixture(autouse=True, scope='session')
def os_environ_patch():
    original_connection_string = os.environ['POSTGRES_DSN']
    new_environ = {
        'POSTGRES_DSN': f'{original_connection_string}_test',
        'POSTGRES_DSN_ORIGINAL': original_connection_string,
    }
    with mock.patch.dict(os.environ, new_environ, clear=False):
        yield


@pytest_asyncio.fixture(scope='session')
async def create_test_db(os_environ_patch):
    test_db_name = 'example_db_test'
    engine = create_async_engine(
        os.environ['POSTGRES_DSN_ORIGINAL'],
        isolation_level='AUTOCOMMIT',
    )

    create_db_op = text(f'''CREATE DATABASE "{test_db_name}"''')
    drop_db_op = text(f'DROP DATABASE IF EXISTS "{test_db_name}"')
    try:
        async with engine.begin() as conn:
            await conn.execute(create_db_op)
    except ProgrammingError as err:
        if err.orig and err.orig.pgcode == DuplicateDatabaseError.sqlstate:
            async with engine.begin() as conn:
                await conn.execute(drop_db_op)
                await conn.execute(create_db_op)

    yield
    async with engine.begin() as conn:
        await conn.execute(drop_db_op)


@pytest.fixture
def migrate_db(create_test_db):
    config = Config('alembic.ini')
    test_db_url = os.environ['POSTGRES_DSN']
    config.set_main_option('sqlalchemy.url', test_db_url)
    command.upgrade(config, 'head')
    yield
    command.downgrade(config, 'base')


@pytest_asyncio.fixture
async def db(migrate_db) -> AsyncGenerator[DBSessionManager, None]:
    postgres_dsn = os.environ['POSTGRES_DSN']
    db_manager = DBSessionManager(postgres_dsn)
    yield db_manager
    await db_manager.close()


@pytest_asyncio.fixture
async def product_fixture(db: DBSessionManager):
    async with db.asessionmaker() as session:
        product = Product(name='Test product')
        session.add(product)
        await session.commit()
        await session.refresh(product)
    return product

Самые сложные части здесь:

  • сохраняя оригинал DB DSN при работе с тестовым DB DSN;
  • Создание временного экземпляра DB Test через подключение к исходному БД, чтобы избежать ускорения нашего оригинального БД.

Наконец, наши тесты могут быть идентичны:

@pytest.mark.asyncio
async def test_get_record(db: DBSessionManager, product_fixture: Product):
    """Test reading a single existing Record instance"""
    # Prepare

    # Do
    stmt = select(Product)
    async with db.asessionmaker() as s:
        result = await s.execute(stmt)
    product = result.scalar_one_or_none()

    # Check
    assert product is not None
    assert product.id == product_fixture.id
    assert product.name == product_fixture.name
    stmt = select(func.count(Product.id)).select_from(Product)
    async with db.asessionmaker() as s:
        result = await s.execute(stmt)
    assert result.scalar_one() == 1


@pytest.mark.asyncio
async def test_create_record(db: DBSessionManager, product_fixture: Product):
    """Test creating a new Record instance"""
    # Prepare
    stmt = select(func.count(Product.id)).select_from(Product)
    async with db.asessionmaker() as s:
        result = await s.execute(stmt)
    assert result.scalar_one() == 1
    new_product_name = 'New product'

    # Do
    insert_op = insert(Product).values(name=new_product_name)
    async with db.asessionmaker() as s:
        await s.execute(insert_op)
        await s.commit()

    # Check
    stmt = select(func.count(Product.id)).select_from(Product)
    async with db.asessionmaker() as s:
        result = await s.execute(stmt)
    assert result.scalar_one() == 2
    async with db.asessionmaker() as s:
        result = await s.execute(
            select(Product).order_by(Product.created_at.desc()).limit(1)
        )
    new_product = result.scalar_one()
    assert new_product.name == new_product_name

  • GitHubРепозиторий:https://github.com/gencurrent/python-migration-per-cunction-test-setup


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE