> ## Documentation Index
> Fetch the complete documentation index at: https://docs.getomni.co/llms.txt
> Use this file to discover all available pages before exploring further.

# Python SDK

> Build custom connectors with Python and FastAPI

The Python SDK (`omni-connector`) provides everything you need to build custom connectors that integrate with Omni's search and AI platform.

## Installation

The SDK is **not published to PyPI**. Connectors live in the Omni monorepo and depend on the SDK as a local path dependency. To start a new Python connector, fork or clone [omni](https://github.com/getomnico/omni) and create a directory under `connectors/`:

```bash theme={null}
git clone https://github.com/getomnico/omni.git
cd omni/connectors
mkdir my-connector && cd my-connector
```

Add a `pyproject.toml` that pulls the SDK from the local checkout — copy from any existing Python connector (e.g. `connectors/notion/pyproject.toml`):

```toml theme={null}
[project]
name = "my-connector"
version = "1.0.0"
requires-python = ">=3.11"
dependencies = [
    "omni-connector",
]

[tool.uv.sources]
omni-connector = { path = "../../sdk/python" }
```

Then install with `uv`:

```bash theme={null}
uv sync
```

**Requirements:**

* Python >= 3.11
* [uv](https://docs.astral.sh/uv/) (used throughout the monorepo for Python dependency management)

**Transitive dependencies pulled in by the SDK:**

* `fastapi` - Web framework
* `uvicorn` - ASGI server
* `httpx` - Async HTTP client
* `pydantic` - Data validation

<Tip>
  Run `/build-connector <service name>` from Claude Code inside the omni repo to scaffold the entire connector structure (sync logic, manifest, Dockerfile, frontend wiring, Terraform, integration tests) following the conventions used by built-in connectors. See the [SDK overview](/developers/sdk-overview#scaffolding-with-the-build-connector-skill) for details.
</Tip>

## Quick Start

```python theme={null}
from omni_connector import (
    Connector,
    Document,
    DocumentMetadata,
    DocumentPermissions,
    SyncContext,
)

class MyConnector(Connector):
    @property
    def name(self) -> str:
        return "my-connector"

    @property
    def version(self) -> str:
        return "1.0.0"

    @property
    def sync_modes(self) -> list[str]:
        return ["full", "incremental"]

    async def sync(
        self,
        source_config: dict,
        credentials: dict,
        state: dict | None,
        ctx: SyncContext
    ) -> None:
        # Your sync logic here
        await ctx.complete()

if __name__ == "__main__":
    MyConnector().serve(port=8000)
```

## Core Concepts

### Connector Class

The `Connector` abstract base class defines the interface for all connectors.

**Required Properties:**

| Property  | Type  | Description                                          |
| --------- | ----- | ---------------------------------------------------- |
| `name`    | `str` | Unique connector identifier (e.g., `"my-connector"`) |
| `version` | `str` | Semantic version (e.g., `"1.0.0"`)                   |

**Optional Properties:**

| Property     | Type                     | Default    | Description                                |
| ------------ | ------------------------ | ---------- | ------------------------------------------ |
| `sync_modes` | `list[str]`              | `["full"]` | Supported modes: `"full"`, `"incremental"` |
| `actions`    | `list[ActionDefinition]` | `[]`       | Custom actions the connector supports      |

**Required Methods:**

```python theme={null}
async def sync(
    self,
    source_config: dict,      # Configuration from Omni UI
    credentials: dict,        # Stored credentials (API keys, tokens)
    state: dict | None,       # Persisted state from previous sync
    ctx: SyncContext          # Sync context utilities
) -> None:
    """Execute a sync operation."""
    pass
```

**Optional Methods:**

```python theme={null}
def cancel(self, sync_run_id: str) -> None:
    """Handle sync cancellation request."""
    pass

async def execute_action(
    self,
    action: str,
    params: dict,
    credentials: dict
) -> dict:
    """Execute a custom action."""
    pass
```

### SyncContext

The `SyncContext` object is passed to your `sync` method and provides utilities for the sync operation.

**Properties:**

| Property            | Type             | Description                         |
| ------------------- | ---------------- | ----------------------------------- |
| `sync_run_id`       | `str`            | Unique identifier for this sync run |
| `source_id`         | `str`            | Source identifier                   |
| `state`             | `dict \| None`   | State from previous sync            |
| `content_storage`   | `ContentStorage` | Storage for document content        |
| `documents_emitted` | `int`            | Count of emitted documents          |
| `documents_scanned` | `int`            | Count of scanned items              |

**Methods:**

```python theme={null}
# Emit a new document
await ctx.emit(document: Document)

# Emit an updated document
await ctx.emit_updated(document: Document)

# Mark a document as deleted
await ctx.emit_deleted(external_id: str)

# Report a non-fatal error for a document
await ctx.emit_error(external_id: str, error: str)

# Increment scanned counter (also sends heartbeat)
await ctx.increment_scanned()

# Save state checkpoint for resumability
await ctx.save_state(state: dict)

# Mark sync as completed
await ctx.complete(new_state: dict | None = None)

# Mark sync as failed
await ctx.fail(error: str)

# Check if sync was cancelled
ctx.is_cancelled() -> bool
```

### ContentStorage

The `ContentStorage` class handles storing document content.

```python theme={null}
# Store text content
content_id = await ctx.content_storage.save(
    content: str,
    content_type: str = "text/plain"
) -> str

# Store binary content (base64 encoded)
content_id = await ctx.content_storage.save_binary(
    content: bytes,
    content_type: str
) -> str

# Extract text from a binary file (PDF, DOCX, PPTX, XLSX, images, ...)
# via connector-manager's /sdk/extract-text endpoint, without persisting.
# Returns the extracted text.
text = await ctx.content_storage.extract_text(
    data: bytes,
    mime_type: str,
    filename: str,
) -> str

# Same as extract_text, but also stores the extracted text and returns
# the resulting content_id.
content_id = await ctx.content_storage.extract_and_store_content(
    data: bytes,
    mime_type: str,
    filename: str,
) -> str
```

The returned `content_id` is a ULID that references the stored content.

<Tip>
  Always prefer `extract_text` / `extract_and_store_content` over shelling out to your own PDF/Office parser. The connector-manager routes these calls through the centralized Docling service when the admin has enabled it (with the configured quality preset), and falls back to a lightweight built-in extractor otherwise — so your connector automatically honors the instance-wide document-conversion setting.
</Tip>

### Document Model

The `Document` class represents a searchable document.

```python theme={null}
from omni_connector import Document, DocumentMetadata, DocumentPermissions

document = Document(
    external_id="unique-id-from-source",
    title="Document Title",
    content_id=content_id,  # From content_storage.save()
    metadata=DocumentMetadata(
        title="Document Title",
        author="Author Name",
        created_at=datetime.now(),
        updated_at=datetime.now(),
        mime_type="text/plain",
        size=1234,
        url="https://source.com/doc/123",
        path="/folder/document.txt",
        extra={"custom_field": "value"},
    ),
    permissions=DocumentPermissions(
        public=True,
        users=["user@example.com"],
        groups=["engineering"],
    ),
    attributes={"category": "documentation"},
)
```

**Document Fields:**

| Field         | Type                  | Required | Description                      |
| ------------- | --------------------- | -------- | -------------------------------- |
| `external_id` | `str`                 | Yes      | Unique ID from the source system |
| `title`       | `str`                 | Yes      | Document title for display       |
| `content_id`  | `str`                 | Yes      | Reference to stored content      |
| `metadata`    | `DocumentMetadata`    | No       | Document metadata                |
| `permissions` | `DocumentPermissions` | No       | Access control                   |
| `attributes`  | `dict`                | No       | Custom searchable attributes     |

### Actions

Connectors can define custom actions that users can trigger from the Omni UI.

```python theme={null}
from omni_connector import ActionDefinition, ActionParameter

class MyConnector(Connector):
    @property
    def actions(self) -> list[ActionDefinition]:
        return [
            ActionDefinition(
                name="validate_connection",
                description="Test the API connection",
                parameters=[
                    ActionParameter(
                        name="timeout",
                        type="integer",
                        required=False,
                        description="Connection timeout in seconds",
                    ),
                ],
            ),
        ]

    async def execute_action(
        self,
        action: str,
        params: dict,
        credentials: dict
    ) -> dict:
        if action == "validate_connection":
            # Perform validation
            return {"status": "success", "message": "Connection valid"}
        raise ValueError(f"Unknown action: {action}")
```

## RSS Connector Example

Here's a complete example of an RSS feed connector:

```python theme={null}
import asyncio
from datetime import datetime
from typing import Any

import feedparser
from omni_connector import (
    ActionDefinition,
    ActionParameter,
    Connector,
    Document,
    DocumentMetadata,
    DocumentPermissions,
    SyncContext,
)


class RSSConnector(Connector):
    """Connector that indexes RSS/Atom feeds."""

    @property
    def name(self) -> str:
        return "rss"

    @property
    def version(self) -> str:
        return "1.0.0"

    @property
    def sync_modes(self) -> list[str]:
        return ["full", "incremental"]

    @property
    def actions(self) -> list[ActionDefinition]:
        return [
            ActionDefinition(
                name="validate_feed",
                description="Validate that a feed URL is accessible",
                parameters=[
                    ActionParameter(
                        name="url",
                        type="string",
                        required=True,
                        description="The RSS/Atom feed URL to validate",
                    ),
                ],
            ),
        ]

    async def sync(
        self,
        source_config: dict[str, Any],
        credentials: dict[str, Any],
        state: dict[str, Any] | None,
        ctx: SyncContext,
    ) -> None:
        """Sync articles from an RSS feed."""
        feed_url = source_config.get("feed_url")
        if not feed_url:
            await ctx.fail("Missing feed_url in source configuration")
            return

        # Parse the feed
        feed = await asyncio.to_thread(feedparser.parse, feed_url)

        if feed.bozo and not feed.entries:
            await ctx.fail(f"Failed to parse feed: {feed.bozo_exception}")
            return

        # Get last sync time for incremental sync
        last_sync_time = None
        if state and "last_sync_time" in state:
            last_sync_time = datetime.fromisoformat(state["last_sync_time"])

        for entry in feed.entries:
            if ctx.is_cancelled():
                return

            await ctx.increment_scanned()

            # Skip old entries in incremental mode
            if last_sync_time and hasattr(entry, "published_parsed"):
                entry_date = datetime(*entry.published_parsed[:6])
                if entry_date <= last_sync_time:
                    continue

            # Build content from entry
            content = self._build_content(entry)
            content_id = await ctx.content_storage.save(content)

            # Create document
            doc = Document(
                external_id=entry.get("id", entry.link),
                title=entry.get("title", "Untitled"),
                content_id=content_id,
                metadata=DocumentMetadata(
                    author=entry.get("author"),
                    url=entry.get("link"),
                    created_at=self._parse_date(entry),
                    extra={
                        "feed_title": feed.feed.get("title"),
                        "feed_url": feed_url,
                    },
                ),
                permissions=DocumentPermissions(public=True),
            )

            await ctx.emit(doc)

        # Complete with updated state
        await ctx.complete({"last_sync_time": datetime.utcnow().isoformat()})

    def _build_content(self, entry) -> str:
        """Extract content from feed entry."""
        if hasattr(entry, "content") and entry.content:
            return entry.content[0].get("value", "")
        if hasattr(entry, "summary"):
            return entry.summary
        return entry.get("title", "")

    def _parse_date(self, entry) -> datetime | None:
        """Parse date from feed entry."""
        if hasattr(entry, "published_parsed") and entry.published_parsed:
            return datetime(*entry.published_parsed[:6])
        return None

    async def execute_action(
        self,
        action: str,
        params: dict[str, Any],
        credentials: dict[str, Any],
    ) -> dict[str, Any]:
        if action == "validate_feed":
            url = params.get("url")
            if not url:
                return {"status": "error", "error": "Missing URL parameter"}

            feed = await asyncio.to_thread(feedparser.parse, url)
            if feed.bozo and not feed.entries:
                return {"status": "error", "error": str(feed.bozo_exception)}

            return {
                "status": "success",
                "result": {
                    "title": feed.feed.get("title"),
                    "entry_count": len(feed.entries),
                },
            }

        return {"status": "error", "error": f"Unknown action: {action}"}


if __name__ == "__main__":
    RSSConnector().serve(port=8000)
```

## Error Handling

The SDK provides custom exception classes:

```python theme={null}
from omni_connector import (
    ConnectorError,      # Base exception
    SdkClientError,      # Communication errors
    SyncCancelledError,  # Sync was cancelled
    ConfigurationError,  # Configuration issues
)
```

**Handling errors in sync:**

```python theme={null}
async def sync(self, source_config, credentials, state, ctx):
    try:
        # Your sync logic
        await ctx.complete()
    except SyncCancelledError:
        # Handle cancellation gracefully
        return
    except Exception as e:
        await ctx.fail(str(e))
```

## Development

### Project Setup

```bash theme={null}
# From your connector directory inside the omni monorepo
uv sync
```

### Running Tests

```bash theme={null}
uv run pytest tests/ -v
```

### Type Checking

```bash theme={null}
uv run mypy my_connector/
```

### Linting

```bash theme={null}
uv run ruff check my_connector/
```

## Testing

The SDK includes a testing module (`omni_connector.testing`) with utilities for writing integration tests for your connectors.

### TestHarness

The `TestHarness` class provides a self-contained environment for testing your connector without needing a running Omni deployment.

```python theme={null}
import pytest
from omni_connector.testing import TestHarness
from my_connector import MyConnector

@pytest.fixture
def harness():
    return TestHarness(MyConnector())

@pytest.mark.asyncio
async def test_full_sync(harness):
    # Seed test data
    harness.set_source_config({"api_url": "https://api.example.com"})
    harness.set_credentials({"api_key": "test-key"})

    # Run sync
    result = await harness.run_sync()

    # Assert results
    assert result.status == "completed"
    assert result.documents_emitted > 0

@pytest.mark.asyncio
async def test_incremental_sync(harness):
    # Run initial sync
    harness.set_source_config({"api_url": "https://api.example.com"})
    harness.set_credentials({"api_key": "test-key"})
    first_result = await harness.run_sync()

    # Run incremental sync with state from first run
    second_result = await harness.run_sync(state=first_result.state)
    assert second_result.status == "completed"
```

### Assertions

The harness captures all emitted documents, errors, and state changes for easy assertion:

```python theme={null}
async def test_document_content(harness):
    harness.set_source_config({"feed_url": "https://example.com/feed"})
    result = await harness.run_sync()

    # Check emitted documents
    assert len(result.documents) == 5
    assert result.documents[0].title == "Expected Title"

    # Check for errors
    assert len(result.errors) == 0

    # Check final state
    assert "last_sync_time" in result.state
```

## API Reference

### Exports

The SDK exports the following:

**Core Classes:**

* `Connector` - Abstract base class
* `SyncContext` - Sync operation context
* `ContentStorage` - Content storage interface
* `SdkClient` - SDK client for connector-manager

**Data Models:**

* `Document`, `DocumentMetadata`, `DocumentPermissions`
* `ConnectorEvent`, `EventType`
* `ActionDefinition`, `ActionParameter`
* `ActionRequest`, `ActionResponse`
* `ConnectorManifest`, `SyncMode`
* `SyncRequest`, `SyncResponse`
* `CancelRequest`, `CancelResponse`

**Exceptions:**

* `ConnectorError`, `SdkClientError`, `SyncCancelledError`, `ConfigurationError`

## What's Next

<CardGroup cols={2}>
  <Card title="SDK Overview" icon="book" href="/developers/sdk-overview">
    Learn about SDK architecture
  </Card>

  <Card title="TypeScript SDK" icon="js" href="/developers/typescript-sdk">
    Build connectors with TypeScript
  </Card>
</CardGroup>
