Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.getomni.co/llms.txt

Use this file to discover all available pages before exploring further.

The Python SDK (omni-connector) provides everything you need to build custom connectors that integrate with Omni’s search and AI platform.

Installation

The SDK is not published to PyPI. Connectors live in the Omni monorepo and depend on the SDK as a local path dependency. To start a new Python connector, fork or clone omni and create a directory under connectors/:
git clone https://github.com/getomnico/omni.git
cd omni/connectors
mkdir my-connector && cd my-connector
Add a pyproject.toml that pulls the SDK from the local checkout — copy from any existing Python connector (e.g. connectors/notion/pyproject.toml):
[project]
name = "my-connector"
version = "1.0.0"
requires-python = ">=3.11"
dependencies = [
    "omni-connector",
]

[tool.uv.sources]
omni-connector = { path = "../../sdk/python" }
Then install with uv:
uv sync
Requirements:
  • Python >= 3.11
  • uv (used throughout the monorepo for Python dependency management)
Transitive dependencies pulled in by the SDK:
  • fastapi - Web framework
  • uvicorn - ASGI server
  • httpx - Async HTTP client
  • pydantic - Data validation
Run /build-connector <service name> from Claude Code inside the omni repo to scaffold the entire connector structure (sync logic, manifest, Dockerfile, frontend wiring, Terraform, integration tests) following the conventions used by built-in connectors. See the SDK overview for details.

Quick Start

from omni_connector import (
    Connector,
    Document,
    DocumentMetadata,
    DocumentPermissions,
    SyncContext,
)

class MyConnector(Connector):
    @property
    def name(self) -> str:
        return "my-connector"

    @property
    def version(self) -> str:
        return "1.0.0"

    @property
    def sync_modes(self) -> list[str]:
        return ["full", "incremental"]

    async def sync(
        self,
        source_config: dict,
        credentials: dict,
        state: dict | None,
        ctx: SyncContext
    ) -> None:
        # Your sync logic here
        await ctx.complete()

if __name__ == "__main__":
    MyConnector().serve(port=8000)

Core Concepts

Connector Class

The Connector abstract base class defines the interface for all connectors. Required Properties:
PropertyTypeDescription
namestrUnique connector identifier (e.g., "my-connector")
versionstrSemantic version (e.g., "1.0.0")
Optional Properties:
PropertyTypeDefaultDescription
sync_modeslist[str]["full"]Supported modes: "full", "incremental"
actionslist[ActionDefinition][]Custom actions the connector supports
Required Methods:
async def sync(
    self,
    source_config: dict,      # Configuration from Omni UI
    credentials: dict,        # Stored credentials (API keys, tokens)
    state: dict | None,       # Persisted state from previous sync
    ctx: SyncContext          # Sync context utilities
) -> None:
    """Execute a sync operation."""
    pass
Optional Methods:
def cancel(self, sync_run_id: str) -> None:
    """Handle sync cancellation request."""
    pass

async def execute_action(
    self,
    action: str,
    params: dict,
    credentials: dict
) -> dict:
    """Execute a custom action."""
    pass

SyncContext

The SyncContext object is passed to your sync method and provides utilities for the sync operation. Properties:
PropertyTypeDescription
sync_run_idstrUnique identifier for this sync run
source_idstrSource identifier
statedict | NoneState from previous sync
content_storageContentStorageStorage for document content
documents_emittedintCount of emitted documents
documents_scannedintCount of scanned items
Methods:
# Emit a new document
await ctx.emit(document: Document)

# Emit an updated document
await ctx.emit_updated(document: Document)

# Mark a document as deleted
await ctx.emit_deleted(external_id: str)

# Report a non-fatal error for a document
await ctx.emit_error(external_id: str, error: str)

# Increment scanned counter (also sends heartbeat)
await ctx.increment_scanned()

# Save state checkpoint for resumability
await ctx.save_state(state: dict)

# Mark sync as completed
await ctx.complete(new_state: dict | None = None)

# Mark sync as failed
await ctx.fail(error: str)

# Check if sync was cancelled
ctx.is_cancelled() -> bool

ContentStorage

The ContentStorage class handles storing document content.
# Store text content
content_id = await ctx.content_storage.save(
    content: str,
    content_type: str = "text/plain"
) -> str

# Store binary content (base64 encoded)
content_id = await ctx.content_storage.save_binary(
    content: bytes,
    content_type: str
) -> str

# Extract text from a binary file (PDF, DOCX, PPTX, XLSX, images, ...)
# via connector-manager's /sdk/extract-text endpoint, without persisting.
# Returns the extracted text.
text = await ctx.content_storage.extract_text(
    data: bytes,
    mime_type: str,
    filename: str,
) -> str

# Same as extract_text, but also stores the extracted text and returns
# the resulting content_id.
content_id = await ctx.content_storage.extract_and_store_content(
    data: bytes,
    mime_type: str,
    filename: str,
) -> str
The returned content_id is a ULID that references the stored content.
Always prefer extract_text / extract_and_store_content over shelling out to your own PDF/Office parser. The connector-manager routes these calls through the centralized Docling service when the admin has enabled it (with the configured quality preset), and falls back to a lightweight built-in extractor otherwise — so your connector automatically honors the instance-wide document-conversion setting.

Document Model

The Document class represents a searchable document.
from omni_connector import Document, DocumentMetadata, DocumentPermissions

document = Document(
    external_id="unique-id-from-source",
    title="Document Title",
    content_id=content_id,  # From content_storage.save()
    metadata=DocumentMetadata(
        title="Document Title",
        author="Author Name",
        created_at=datetime.now(),
        updated_at=datetime.now(),
        mime_type="text/plain",
        size=1234,
        url="https://source.com/doc/123",
        path="/folder/document.txt",
        extra={"custom_field": "value"},
    ),
    permissions=DocumentPermissions(
        public=True,
        users=["user@example.com"],
        groups=["engineering"],
    ),
    attributes={"category": "documentation"},
)
Document Fields:
FieldTypeRequiredDescription
external_idstrYesUnique ID from the source system
titlestrYesDocument title for display
content_idstrYesReference to stored content
metadataDocumentMetadataNoDocument metadata
permissionsDocumentPermissionsNoAccess control
attributesdictNoCustom searchable attributes

Actions

Connectors can define custom actions that users can trigger from the Omni UI.
from omni_connector import ActionDefinition, ActionParameter

class MyConnector(Connector):
    @property
    def actions(self) -> list[ActionDefinition]:
        return [
            ActionDefinition(
                name="validate_connection",
                description="Test the API connection",
                parameters=[
                    ActionParameter(
                        name="timeout",
                        type="integer",
                        required=False,
                        description="Connection timeout in seconds",
                    ),
                ],
            ),
        ]

    async def execute_action(
        self,
        action: str,
        params: dict,
        credentials: dict
    ) -> dict:
        if action == "validate_connection":
            # Perform validation
            return {"status": "success", "message": "Connection valid"}
        raise ValueError(f"Unknown action: {action}")

RSS Connector Example

Here’s a complete example of an RSS feed connector:
import asyncio
from datetime import datetime
from typing import Any

import feedparser
from omni_connector import (
    ActionDefinition,
    ActionParameter,
    Connector,
    Document,
    DocumentMetadata,
    DocumentPermissions,
    SyncContext,
)


class RSSConnector(Connector):
    """Connector that indexes RSS/Atom feeds."""

    @property
    def name(self) -> str:
        return "rss"

    @property
    def version(self) -> str:
        return "1.0.0"

    @property
    def sync_modes(self) -> list[str]:
        return ["full", "incremental"]

    @property
    def actions(self) -> list[ActionDefinition]:
        return [
            ActionDefinition(
                name="validate_feed",
                description="Validate that a feed URL is accessible",
                parameters=[
                    ActionParameter(
                        name="url",
                        type="string",
                        required=True,
                        description="The RSS/Atom feed URL to validate",
                    ),
                ],
            ),
        ]

    async def sync(
        self,
        source_config: dict[str, Any],
        credentials: dict[str, Any],
        state: dict[str, Any] | None,
        ctx: SyncContext,
    ) -> None:
        """Sync articles from an RSS feed."""
        feed_url = source_config.get("feed_url")
        if not feed_url:
            await ctx.fail("Missing feed_url in source configuration")
            return

        # Parse the feed
        feed = await asyncio.to_thread(feedparser.parse, feed_url)

        if feed.bozo and not feed.entries:
            await ctx.fail(f"Failed to parse feed: {feed.bozo_exception}")
            return

        # Get last sync time for incremental sync
        last_sync_time = None
        if state and "last_sync_time" in state:
            last_sync_time = datetime.fromisoformat(state["last_sync_time"])

        for entry in feed.entries:
            if ctx.is_cancelled():
                return

            await ctx.increment_scanned()

            # Skip old entries in incremental mode
            if last_sync_time and hasattr(entry, "published_parsed"):
                entry_date = datetime(*entry.published_parsed[:6])
                if entry_date <= last_sync_time:
                    continue

            # Build content from entry
            content = self._build_content(entry)
            content_id = await ctx.content_storage.save(content)

            # Create document
            doc = Document(
                external_id=entry.get("id", entry.link),
                title=entry.get("title", "Untitled"),
                content_id=content_id,
                metadata=DocumentMetadata(
                    author=entry.get("author"),
                    url=entry.get("link"),
                    created_at=self._parse_date(entry),
                    extra={
                        "feed_title": feed.feed.get("title"),
                        "feed_url": feed_url,
                    },
                ),
                permissions=DocumentPermissions(public=True),
            )

            await ctx.emit(doc)

        # Complete with updated state
        await ctx.complete({"last_sync_time": datetime.utcnow().isoformat()})

    def _build_content(self, entry) -> str:
        """Extract content from feed entry."""
        if hasattr(entry, "content") and entry.content:
            return entry.content[0].get("value", "")
        if hasattr(entry, "summary"):
            return entry.summary
        return entry.get("title", "")

    def _parse_date(self, entry) -> datetime | None:
        """Parse date from feed entry."""
        if hasattr(entry, "published_parsed") and entry.published_parsed:
            return datetime(*entry.published_parsed[:6])
        return None

    async def execute_action(
        self,
        action: str,
        params: dict[str, Any],
        credentials: dict[str, Any],
    ) -> dict[str, Any]:
        if action == "validate_feed":
            url = params.get("url")
            if not url:
                return {"status": "error", "error": "Missing URL parameter"}

            feed = await asyncio.to_thread(feedparser.parse, url)
            if feed.bozo and not feed.entries:
                return {"status": "error", "error": str(feed.bozo_exception)}

            return {
                "status": "success",
                "result": {
                    "title": feed.feed.get("title"),
                    "entry_count": len(feed.entries),
                },
            }

        return {"status": "error", "error": f"Unknown action: {action}"}


if __name__ == "__main__":
    RSSConnector().serve(port=8000)

Error Handling

The SDK provides custom exception classes:
from omni_connector import (
    ConnectorError,      # Base exception
    SdkClientError,      # Communication errors
    SyncCancelledError,  # Sync was cancelled
    ConfigurationError,  # Configuration issues
)
Handling errors in sync:
async def sync(self, source_config, credentials, state, ctx):
    try:
        # Your sync logic
        await ctx.complete()
    except SyncCancelledError:
        # Handle cancellation gracefully
        return
    except Exception as e:
        await ctx.fail(str(e))

Development

Project Setup

# From your connector directory inside the omni monorepo
uv sync

Running Tests

uv run pytest tests/ -v

Type Checking

uv run mypy my_connector/

Linting

uv run ruff check my_connector/

Testing

The SDK includes a testing module (omni_connector.testing) with utilities for writing integration tests for your connectors.

TestHarness

The TestHarness class provides a self-contained environment for testing your connector without needing a running Omni deployment.
import pytest
from omni_connector.testing import TestHarness
from my_connector import MyConnector

@pytest.fixture
def harness():
    return TestHarness(MyConnector())

@pytest.mark.asyncio
async def test_full_sync(harness):
    # Seed test data
    harness.set_source_config({"api_url": "https://api.example.com"})
    harness.set_credentials({"api_key": "test-key"})

    # Run sync
    result = await harness.run_sync()

    # Assert results
    assert result.status == "completed"
    assert result.documents_emitted > 0

@pytest.mark.asyncio
async def test_incremental_sync(harness):
    # Run initial sync
    harness.set_source_config({"api_url": "https://api.example.com"})
    harness.set_credentials({"api_key": "test-key"})
    first_result = await harness.run_sync()

    # Run incremental sync with state from first run
    second_result = await harness.run_sync(state=first_result.state)
    assert second_result.status == "completed"

Assertions

The harness captures all emitted documents, errors, and state changes for easy assertion:
async def test_document_content(harness):
    harness.set_source_config({"feed_url": "https://example.com/feed"})
    result = await harness.run_sync()

    # Check emitted documents
    assert len(result.documents) == 5
    assert result.documents[0].title == "Expected Title"

    # Check for errors
    assert len(result.errors) == 0

    # Check final state
    assert "last_sync_time" in result.state

API Reference

Exports

The SDK exports the following: Core Classes:
  • Connector - Abstract base class
  • SyncContext - Sync operation context
  • ContentStorage - Content storage interface
  • SdkClient - SDK client for connector-manager
Data Models:
  • Document, DocumentMetadata, DocumentPermissions
  • ConnectorEvent, EventType
  • ActionDefinition, ActionParameter
  • ActionRequest, ActionResponse
  • ConnectorManifest, SyncMode
  • SyncRequest, SyncResponse
  • CancelRequest, CancelResponse
Exceptions:
  • ConnectorError, SdkClientError, SyncCancelledError, ConfigurationError

What’s Next

SDK Overview

Learn about SDK architecture

TypeScript SDK

Build connectors with TypeScript