Подключите, играйте и корабль: модульные трубопроводы Получите значительное обновление

Подключите, играйте и корабль: модульные трубопроводы Получите значительное обновление

1 августа 2025 г.

Принесите свой собственный пункт назначения. Экспорт где угодно.

Мы рады объявить, чтоCocoindex теперь официально поддерживает пользовательские цели- Предоставляя вам возможность экспортировать данные в любой пункт назначения, будь то локальный файл, облачное хранилище, API REST или ваша собственная система на заказ.

Эта новая возможность открывает совершенно новый уровень гибкости для интеграции кокосоиндекса в ваши трубопроводы и позволяет вам внести свои собственные «кирпичи LEGO» в нашу модель потока.

Огромное спасибо сообществу за предложения и раннюю обратную связь - ваши идеи сформировали это!

Трубопроводы как Lego: подключить, играть и сочинять

Кокоиндекс был построен с четким видением:

Трубопроводы данных должны быть похожи на LEGO - композитный, взаимозаменяемый и веселый в построении.

Вот почему кокосоиндекс приходит сместные встроенныеДля общих источников, целей и преобразований. Независимо от того, тянете ли вы из локального каталога, S3 или таблицы SQL - или экспорта в облачное хранилище или вектор DB - интерфейс остается последовательным и декларативным.

В большинстве случаев этоОднострочный код переключательобмениваться между компонентами. Например:

Источник:

flow_builder.add_source(cocoindex.sources.S3(...))
# 1-line switch to LocalFile
flow_builder.add_source(cocoindex.sources.LocalFile(...))

Цели:

doc_embeddings.export(
    "doc_embeddings",
    cocoindex.targets.Postgres(),
    primary_key_fields=["id"],
)
# 1-line switch to Qdrant
doc_embeddings.export(
    "doc_embeddings",
    cocoindex.targets.Qdrant(collection_name=QDRANT_COLLECTION),
    primary_key_fields=["id"],
)

Этот стандартизированный интерфейс не только ускоряет итерацию, но и поощряет чистые, модульные и многоразовые определения потока. Но мы также знаем, что не каждая система одинакова.

Вот почему кокосоиндекс поддерживает«Принеси свой собственный лего»- Позволяет вам добавить свой собственный исходный источник, цель или преобразование и легко интегрировать его в экосистему.

С помощью пользовательских целей вы теперь можете написать свою собственную логику экспорта и по -прежнему извлечь выгоду из управления, отслеживания и инкрементных обновлений Cocoindex.

🚀 Что такое пользовательская цель?

АПользовательская цельПозволяет Cocoindex потоки для экспорта данных, где бы вы ни захотели-помимо встроенных разъемов, таких как S3 или базы данных. Вы определяете две вещи:

  1. Целевая спецификация- Как настроить цель (например, настройка пути файла или клавиша API).
  2. Целевой разъем- Как записать данные в эту цель (логика).

Вы можете думать об этом как о подключении своей собственной цели с помощью нескольких линий Python.

1. Целевая спецификация

Это определяет конфигурацию, необходимую для использования вашей пользовательской цели:

class CustomTarget(cocoindex.op.TargetSpec):
    param1: str
    param2: int | None = None

Это похоже на обработку данных. Просто объявите необходимые вам параметры, и кокосочеждает обо всем остальном.

2. Целевой разъем

Здесь вы реализуете логику для экспорта данных.

Целевой разъем управляет фактическими операциями экспорта данных для вашей пользовательской цели. Он определяет, как данные должны быть записаны в вашу цель.

Целевые разъемы реализуют две категории методов:

  • Методы настройкиДля управления целевой инфраструктурой (аналогично операциям DDL в базах данных) и
  • Методы данныхДля обработки конкретных операций данных (аналогично операциям DML).
@cocoindex.op.target_connector(spec_cls=CustomTarget)
class CustomTargetConnector:
    # Setup methods
    @staticmethod
    def get_persistent_key(spec: CustomTarget, target_name: str) -> PersistentKey:
        """Required. Return a persistent key that uniquely identifies this target instance."""
        ...

    @staticmethod
    def apply_setup_change(
        key: PersistentKey, previous: CustomTarget | None, current: CustomTarget | None
    ) -> None:
        """Required. Apply setup changes to the target."""
        ...

    # Data methods
    @staticmethod
    def mutate(
        *all_mutations: tuple[PreparedCustomTarget, dict[DataKeyType, DataValueType | None]],
    ) -> None:
        """Required. Apply fdata mutations to the target."""
        ...

Вы также можете реализовать дополнительные методы, такие какprepare()иdescribe()Для обработки подготовки среды, проверки или ведения журнала.

Для получения более подробной документации и лучших практик, пожалуйста, обратитесь к этой документации:https://cocoindex.io/docs/custom_ops/custom_targets

✨ Пример: экспортные файлы разметки в локальный HTML

Давайте пройдемся по простому примеру - экспорта.mdфайлы как.htmlИспользование пользовательской цели на основе файлов. Этот проект контролирует изменения папки и постоянно преобразует отметки в HTML постепенно. Проверьте полныйисходный кодПолем

Общий поток прост:

Markdown to HTML flow

Этот пример фокусируется на

  • Как настроить пользовательскую цель
  • Поток легко поднимает изменения в источнике, перекачивает только то, что изменилось и экспортирует в цель

Шаг 1: Переходные файлы

Проглатывает список файлов разметки:

@cocoindex.flow_def(name="CustomOutputFiles")
def custom_output_files(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
	"""
	Define an example flow that exports markdown files to HTML files.
	"""
	data_scope["documents"] = flow_builder.add_source(
		cocoindex.sources.LocalFile(path="data", included_patterns=["*.md"]),
		refresh_interval=timedelta(seconds=5),
	)

Это приема создает стол сfilenameиcontentполя.

Ingest files

Шаг 2: Обработайте каждый файл и собирайте

Определите пользовательскую функцию, которая преобразует Markdown в HTML

@cocoindex.op.function()
def markdown_to_html(text: str) -> str:
    return _markdown_it.render(text)

Определите сборщик данных и преобразуйте каждый документ в HTML.

output_html = data_scope.add_collector()
with data_scope["documents"].row() as doc:
    doc["html"] = doc["content"].transform(markdown_to_html)
    output_html.collect(filename=doc["filename"], html=doc["html"])

Transform to HTML

Шаг 3: Определите пользовательскую цель

3.1 Определите целевую спецификацию

Целевая характеристика содержит каталог для выходных файлов:

class LocalFileTarget(cocoindex.op.TargetSpec):
    directory: str

3.2: Реализация разъема

get_persistent_key()Определяет постоянный ключ, который однозначно идентифицирует цель для отслеживания изменений и постепенных обновлений. Здесь мы просто используем целевой каталог в качестве ключа (например,,./data/output)

@cocoindex.op.target_connector(spec_cls=LocalFileTarget)
class LocalFileTargetConnector:
    @staticmethod
    def get_persistent_key(spec: LocalFileTarget, target_name: str) -> str:
        """Use the directory path as the persistent key for this target."""
        return spec.directory

Аdescribe()Метод возвращает, читаемую человеку строку, которая описывает цель, которая отображается в журналах CLI. Например, он печатает:

Target: Local directory ./data/output

@staticmethod
def describe(key: str) -> str:
    """(Optional) Return a human-readable description of the target."""
    return f"Local directory {key}"

apply_setup_change()применяет изменения настройки в бэкэнд. Предыдущие и текущие характеристики передаются в качестве аргументов, и ожидается, что метод обновит настройку бэкэнд в соответствии с текущим состоянием.

АNoneспецификация указывает на небытие, поэтому, когдаpreviousявляетсяNone, нам нужно создать это, и когдаcurrentявляетсяNone, нам нужно удалить его.

@staticmethod
def apply_setup_change(
    key: str, previous: LocalFileTarget | None, current: LocalFileTarget | None
) -> None:
    """
    Apply setup changes to the target.

    Best practice: keep all actions idempotent.
    """

    # Create the directory if it didn't exist.
    if previous is None and current is not None:
        os.makedirs(current.directory, exist_ok=True)

    # Delete the directory with its contents if it no longer exists.
    if previous is not None and current is None:
        if os.path.isdir(previous.directory):
            for filename in os.listdir(previous.directory):
                if filename.endswith(".html"):
                    os.remove(os.path.join(previous.directory, filename))
            os.rmdir(previous.directory)

Аmutate()Метод вызывается кокосоиндексом для применения изменений данных к цели, пакетирующие мутации для потенциально множественных мишеней одного и того же типа. Это позволяет гибкости целевого разъема в реализации (например, атомных коммитов или элементов обработки с зависимостями в определенном порядке).

Каждый элемент в партии соответствует определенной цели и представлен кортежом, содержащим:

  • целевая спецификация
  • Все мутации для цели, представленныеdictКартирование первичных ключей с полями значения. Поля значения могут быть представлены датом данных -LocalFileTargetValuesв этом случае:
@dataclasses.dataclass
class LocalFileTargetValues:
    """Represents value fields of exported data. Used in `mutate` method below."""

    html: str

Тип значенияdictявляетсяLocalFileTargetValues | None, где неNoneзначение означает повышение иNoneЗначение означает удаление. Похоже наapply_setup_changes(), Идентичность ожидается здесь.

@staticmethod
def mutate(
    *all_mutations: tuple[LocalFileTarget, dict[str, LocalFileTargetValues | None]],
) -> None:
    """
    Mutate the target.
    """
    for spec, mutations in all_mutations:
        for filename, mutation in mutations.items():
            full_path = os.path.join(spec.directory, filename) + ".html"
            if mutation is None:
                # Delete the file
                try:
                    os.remove(full_path)
                except FileNotFoundError:
                    pass
            else:
                # Create/update the file
                with open(full_path, "w") as f:
                    f.write(mutation.html)

3.3: Используйте его по потоку

    output_html.export(
        "OutputHtml",
        LocalFileTarget(directory="output_html"),
        primary_key_fields=["filename"],
    )

Запустите пример

После того, как ваш трубопровод будет настроен, обновление вашего графа знаний прост:

pip install -e .
cocoindex update --setup main.py

Вы можете добавить, изменять или удалять файлы вdata/Справочник - Cocoindex будет только переиздать измененные файлы и соответствующим образом обновлять цель.

Дляобновления в реальном времени, запустить в режиме живого:

cocoindex update --setup -L main.py

Это постоянно синхронизируется с вами графиком знаний с источником вашего документа-идеально подходит для быстро меняющихся сред, таких как внутренняя вики или техническая документация.

Лучшие практики

  • Идеемпочность имеет значение: apply_setup_change()иmutate()должен быть безопасным для работы несколько раз без непреднамеренных эффектов.
  • Приготовьтесь один раз, мутируют многих: Если вам нужна настройка (например, установление соединения), используйтеprepare()Чтобы не повторять работу.
  • Используйте структурированные типы: Для первичных ключей или значений CocoIndex поддерживает простые типы, а также обработки данных и названные.

Поддержите нас:

Мы постоянно добавляем больше примеров и улучшаем наше время выполнения. Если вы нашли это полезным, пожалуйста, ⭐ StarКокоиндекс на GitHubи поделиться этим с другими.

Предложения для более местных произведений «Lego»? Просто дайте нам знать! Мы движемся на полную скорость, чтобы поддержать вас!


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE