Skip to main content
The Python SDK (omni-connector) provides everything you need to build custom connectors that integrate with Omni’s search and AI platform.

Installation

pip install omni-connector
Requirements:
  • Python >= 3.11
Dependencies installed:
  • fastapi - Web framework
  • uvicorn - ASGI server
  • httpx - Async HTTP client
  • pydantic - Data validation
  • asyncpg - PostgreSQL async driver

Quick Start

from omni_connector import (
    Connector,
    Document,
    DocumentMetadata,
    DocumentPermissions,
    SyncContext,
)

class MyConnector(Connector):
    @property
    def name(self) -> str:
        return "my-connector"

    @property
    def version(self) -> str:
        return "1.0.0"

    @property
    def sync_modes(self) -> list[str]:
        return ["full", "incremental"]

    async def sync(
        self,
        source_config: dict,
        credentials: dict,
        state: dict | None,
        ctx: SyncContext
    ) -> None:
        # Your sync logic here
        await ctx.complete()

if __name__ == "__main__":
    MyConnector().serve(port=8000)

Core Concepts

Connector Class

The Connector abstract base class defines the interface for all connectors. Required Properties:
PropertyTypeDescription
namestrUnique connector identifier (e.g., "my-connector")
versionstrSemantic version (e.g., "1.0.0")
Optional Properties:
PropertyTypeDefaultDescription
sync_modeslist[str]["full"]Supported modes: "full", "incremental"
actionslist[ActionDefinition][]Custom actions the connector supports
Required Methods:
async def sync(
    self,
    source_config: dict,      # Configuration from Omni UI
    credentials: dict,        # Stored credentials (API keys, tokens)
    state: dict | None,       # Persisted state from previous sync
    ctx: SyncContext          # Sync context utilities
) -> None:
    """Execute a sync operation."""
    pass
Optional Methods:
def cancel(self, sync_run_id: str) -> None:
    """Handle sync cancellation request."""
    pass

async def execute_action(
    self,
    action: str,
    params: dict,
    credentials: dict
) -> dict:
    """Execute a custom action."""
    pass

SyncContext

The SyncContext object is passed to your sync method and provides utilities for the sync operation. Properties:
PropertyTypeDescription
sync_run_idstrUnique identifier for this sync run
source_idstrSource identifier
statedict | NoneState from previous sync
content_storageContentStorageStorage for document content
documents_emittedintCount of emitted documents
documents_scannedintCount of scanned items
Methods:
# Emit a new document
await ctx.emit(document: Document)

# Emit an updated document
await ctx.emit_updated(document: Document)

# Mark a document as deleted
await ctx.emit_deleted(external_id: str)

# Report a non-fatal error for a document
await ctx.emit_error(external_id: str, error: str)

# Increment scanned counter (also sends heartbeat)
await ctx.increment_scanned()

# Save state checkpoint for resumability
await ctx.save_state(state: dict)

# Mark sync as completed
await ctx.complete(new_state: dict | None = None)

# Mark sync as failed
await ctx.fail(error: str)

# Check if sync was cancelled
ctx.is_cancelled() -> bool

ContentStorage

The ContentStorage class handles storing document content.
# Store text content
content_id = await ctx.content_storage.save(
    content: str,
    content_type: str = "text/plain"
) -> str

# Store binary content (base64 encoded)
content_id = await ctx.content_storage.save_binary(
    content: bytes,
    content_type: str
) -> str
The returned content_id is a ULID that references the stored content.

Document Model

The Document class represents a searchable document.
from omni_connector import Document, DocumentMetadata, DocumentPermissions

document = Document(
    external_id="unique-id-from-source",
    title="Document Title",
    content_id=content_id,  # From content_storage.save()
    metadata=DocumentMetadata(
        title="Document Title",
        author="Author Name",
        created_at=datetime.now(),
        updated_at=datetime.now(),
        mime_type="text/plain",
        size=1234,
        url="https://source.com/doc/123",
        path="/folder/document.txt",
        extra={"custom_field": "value"},
    ),
    permissions=DocumentPermissions(
        public=True,
        users=["[email protected]"],
        groups=["engineering"],
    ),
    attributes={"category": "documentation"},
)
Document Fields:
FieldTypeRequiredDescription
external_idstrYesUnique ID from the source system
titlestrYesDocument title for display
content_idstrYesReference to stored content
metadataDocumentMetadataNoDocument metadata
permissionsDocumentPermissionsNoAccess control
attributesdictNoCustom searchable attributes

Actions

Connectors can define custom actions that users can trigger from the Omni UI.
from omni_connector import ActionDefinition, ActionParameter

class MyConnector(Connector):
    @property
    def actions(self) -> list[ActionDefinition]:
        return [
            ActionDefinition(
                name="validate_connection",
                description="Test the API connection",
                parameters=[
                    ActionParameter(
                        name="timeout",
                        type="integer",
                        required=False,
                        description="Connection timeout in seconds",
                    ),
                ],
            ),
        ]

    async def execute_action(
        self,
        action: str,
        params: dict,
        credentials: dict
    ) -> dict:
        if action == "validate_connection":
            # Perform validation
            return {"status": "success", "message": "Connection valid"}
        raise ValueError(f"Unknown action: {action}")

RSS Connector Example

Here’s a complete example of an RSS feed connector:
import asyncio
from datetime import datetime
from typing import Any

import feedparser
from omni_connector import (
    ActionDefinition,
    ActionParameter,
    Connector,
    Document,
    DocumentMetadata,
    DocumentPermissions,
    SyncContext,
)


class RSSConnector(Connector):
    """Connector that indexes RSS/Atom feeds."""

    @property
    def name(self) -> str:
        return "rss"

    @property
    def version(self) -> str:
        return "1.0.0"

    @property
    def sync_modes(self) -> list[str]:
        return ["full", "incremental"]

    @property
    def actions(self) -> list[ActionDefinition]:
        return [
            ActionDefinition(
                name="validate_feed",
                description="Validate that a feed URL is accessible",
                parameters=[
                    ActionParameter(
                        name="url",
                        type="string",
                        required=True,
                        description="The RSS/Atom feed URL to validate",
                    ),
                ],
            ),
        ]

    async def sync(
        self,
        source_config: dict[str, Any],
        credentials: dict[str, Any],
        state: dict[str, Any] | None,
        ctx: SyncContext,
    ) -> None:
        """Sync articles from an RSS feed."""
        feed_url = source_config.get("feed_url")
        if not feed_url:
            await ctx.fail("Missing feed_url in source configuration")
            return

        # Parse the feed
        feed = await asyncio.to_thread(feedparser.parse, feed_url)

        if feed.bozo and not feed.entries:
            await ctx.fail(f"Failed to parse feed: {feed.bozo_exception}")
            return

        # Get last sync time for incremental sync
        last_sync_time = None
        if state and "last_sync_time" in state:
            last_sync_time = datetime.fromisoformat(state["last_sync_time"])

        for entry in feed.entries:
            if ctx.is_cancelled():
                return

            await ctx.increment_scanned()

            # Skip old entries in incremental mode
            if last_sync_time and hasattr(entry, "published_parsed"):
                entry_date = datetime(*entry.published_parsed[:6])
                if entry_date <= last_sync_time:
                    continue

            # Build content from entry
            content = self._build_content(entry)
            content_id = await ctx.content_storage.save(content)

            # Create document
            doc = Document(
                external_id=entry.get("id", entry.link),
                title=entry.get("title", "Untitled"),
                content_id=content_id,
                metadata=DocumentMetadata(
                    author=entry.get("author"),
                    url=entry.get("link"),
                    created_at=self._parse_date(entry),
                    extra={
                        "feed_title": feed.feed.get("title"),
                        "feed_url": feed_url,
                    },
                ),
                permissions=DocumentPermissions(public=True),
            )

            await ctx.emit(doc)

        # Complete with updated state
        await ctx.complete({"last_sync_time": datetime.utcnow().isoformat()})

    def _build_content(self, entry) -> str:
        """Extract content from feed entry."""
        if hasattr(entry, "content") and entry.content:
            return entry.content[0].get("value", "")
        if hasattr(entry, "summary"):
            return entry.summary
        return entry.get("title", "")

    def _parse_date(self, entry) -> datetime | None:
        """Parse date from feed entry."""
        if hasattr(entry, "published_parsed") and entry.published_parsed:
            return datetime(*entry.published_parsed[:6])
        return None

    async def execute_action(
        self,
        action: str,
        params: dict[str, Any],
        credentials: dict[str, Any],
    ) -> dict[str, Any]:
        if action == "validate_feed":
            url = params.get("url")
            if not url:
                return {"status": "error", "error": "Missing URL parameter"}

            feed = await asyncio.to_thread(feedparser.parse, url)
            if feed.bozo and not feed.entries:
                return {"status": "error", "error": str(feed.bozo_exception)}

            return {
                "status": "success",
                "result": {
                    "title": feed.feed.get("title"),
                    "entry_count": len(feed.entries),
                },
            }

        return {"status": "error", "error": f"Unknown action: {action}"}


if __name__ == "__main__":
    RSSConnector().serve(port=8000)

Error Handling

The SDK provides custom exception classes:
from omni_connector import (
    ConnectorError,      # Base exception
    SdkClientError,      # Communication errors
    SyncCancelledError,  # Sync was cancelled
    ConfigurationError,  # Configuration issues
)
Handling errors in sync:
async def sync(self, source_config, credentials, state, ctx):
    try:
        # Your sync logic
        await ctx.complete()
    except SyncCancelledError:
        # Handle cancellation gracefully
        return
    except Exception as e:
        await ctx.fail(str(e))

Development

Project Setup

# Create virtual environment
python -m venv venv
source venv/bin/activate

# Install with development dependencies
pip install -e ".[dev]"

Running Tests

pytest tests/ -v

Type Checking

mypy omni_connector/

Linting

ruff check omni_connector/

API Reference

Exports

The SDK exports the following: Core Classes:
  • Connector - Abstract base class
  • SyncContext - Sync operation context
  • ContentStorage - Content storage interface
  • SdkClient - SDK client for connector-manager
Data Models:
  • Document, DocumentMetadata, DocumentPermissions
  • ConnectorEvent, EventType
  • ActionDefinition, ActionParameter
  • ActionRequest, ActionResponse
  • ConnectorManifest, SyncMode
  • SyncRequest, SyncResponse
  • CancelRequest, CancelResponse
Exceptions:
  • ConnectorError, SdkClientError, SyncCancelledError, ConfigurationError

What’s Next