> ## Documentation Index
> Fetch the complete documentation index at: https://docs.getomni.co/llms.txt
> Use this file to discover all available pages before exploring further.

# Rust SDK

> Build custom connectors in Rust with Axum and the Omni connector SDK

The Rust SDK (`omni-connector-sdk`) is used by many built-in Omni connectors, including Google Workspace, Slack, Atlassian, IMAP, Nextcloud, Filesystem, Web, and Darwinbox. Use it when you want a native Rust connector with typed configuration, high-throughput sync logic, or direct reuse of Omni's shared Rust models.

## Installation

The Rust SDK is **not published to crates.io**. Connectors live in the Omni monorepo and depend on the SDK as a local path dependency.

Create a connector under `connectors/` and add a `Cargo.toml` like:

```toml theme={null}
[package]
name = "omni-my-connector"
version = "0.1.0"
edition = "2024"

[dependencies]
omni-connector-sdk = { path = "../../sdk/rust" }
anyhow = "1"
async-trait = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
```

<Note>
  The connector should be part of the Omni workspace so it is built, tested, and released with the matching SDK and shared model versions.
</Note>

## Server Startup

The SDK provides an Axum server with the standard connector endpoints and a registration loop that advertises the connector manifest to connector-manager every 30 seconds.

```rust theme={null}
use anyhow::Result;
use omni_connector_sdk::serve;

use my_connector::MyConnector;

#[tokio::main]
async fn main() -> Result<()> {
    serve(MyConnector).await
}
```

When running under Docker, set:

| Variable                | Purpose                                                                              |
| ----------------------- | ------------------------------------------------------------------------------------ |
| `CONNECTOR_MANAGER_URL` | URL of omni-connector-manager                                                        |
| `CONNECTOR_HOST_NAME`   | Hostname connector-manager should call back, usually the Docker Compose service name |
| `PORT`                  | Port the connector server binds and registers                                        |

## Connector Trait

Implement `omni_connector_sdk::Connector` for your connector type.

```rust theme={null}
use anyhow::{Context, Result};
use async_trait::async_trait;
use omni_connector_sdk::{
    Connector, ServiceCredential, Source, SourceType, SyncContext, SyncType,
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize)]
struct MyConfig {
    api_url: String,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
struct MyCredentials {
    api_key: String,
}

#[derive(Debug, Clone, Default, Deserialize, Serialize)]
struct MyCheckpoint {
    cursor: Option<String>,
}

struct MyConnector;

#[async_trait]
impl Connector for MyConnector {
    type Config = MyConfig;
    type Credentials = MyCredentials;
    type State = MyCheckpoint;

    fn name(&self) -> &'static str {
        "my_connector"
    }

    fn version(&self) -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    fn display_name(&self) -> String {
        "My Connector".to_string()
    }

    fn source_types(&self) -> Vec<SourceType> {
        vec![SourceType::Web]
    }

    fn sync_modes(&self) -> Vec<SyncType> {
        vec![SyncType::Full, SyncType::Incremental]
    }

    async fn sync(
        &self,
        source: Source,
        credentials: Option<ServiceCredential>,
        checkpoint: Option<Self::State>,
        ctx: SyncContext,
    ) -> Result<()> {
        let config: MyConfig = serde_json::from_value(source.config)
            .context("failed to decode source config")?;
        let creds = credentials.context("credentials are required")?;
        let creds: MyCredentials = serde_json::from_value(creds.credentials)
            .context("failed to decode credentials")?;

        run_sync(config, creds, checkpoint.unwrap_or_default(), ctx).await
    }
}
```

### Associated Types

| Associated type | Description                                                               |
| --------------- | ------------------------------------------------------------------------- |
| `Config`        | Shape of `source.config`; the SDK validates it before dispatching `/sync` |
| `Credentials`   | Shape of `service_credentials.credentials`; validated before dispatch     |
| `State`         | Checkpoint type passed to `sync` and serialized by `save_checkpoint`      |

Use `serde_json::Value` for any associated type when you want to opt out of typed validation.

## SyncContext

`SyncContext` is the connector's handle back to connector-manager.

| Method                                                  | Purpose                                                                         |
| ------------------------------------------------------- | ------------------------------------------------------------------------------- |
| `sync_run_id()` / `source_id()` / `source_type()`       | Identify the current sync and source                                            |
| `sync_mode()`                                           | Current mode: full, incremental, or realtime                                    |
| `is_resume()`                                           | Whether this run resumed after interruption                                     |
| `is_cancelled()`                                        | Whether cancellation was requested                                              |
| `store_content(content)`                                | Store extracted text and return a content ID                                    |
| `extract_and_store_content(data, mime_type, filename)`  | Extract text from binary content through connector-manager/Docling and store it |
| `emit_event(event)`                                     | Emit document or permission events                                              |
| `increment_scanned(count)` / `increment_updated(count)` | Update sync progress counters                                                   |
| `save_checkpoint(value)`                                | Persist a run checkpoint for safe resume                                        |
| `complete()` / `fail(error)`                            | Mark the sync complete or failed                                                |
| `heartbeat()`                                           | Keep long-running syncs alive                                                   |

`emit_event()` buffers events. Operations that checkpoint or terminate a sync flush buffered events first so the checkpoint does not race ahead of persisted events.

## Emitting Documents

Rust connectors emit `ConnectorEvent` values directly.

```rust theme={null}
use anyhow::Result;
use omni_connector_sdk::{
    ConnectorEvent, DocumentMetadata, DocumentPermissions, SourceType, SyncContext,
};
use serde_json::json;

async fn emit_one(ctx: &SyncContext) -> Result<()> {
    let content_id = ctx.store_content("Hello from my data source").await?;
    let external_id = "item-123".to_string();

    ctx.emit_event(ConnectorEvent::DocumentCreated {
        sync_run_id: ctx.sync_run_id().to_string(),
        source_id: ctx.source_id().to_string(),
        document_id: external_id.clone(),
        content_id,
        metadata: DocumentMetadata {
            title: Some("Example item".to_string()),
            author: None,
            created_at: None,
            updated_at: None,
            content_type: Some("text/plain".to_string()),
            mime_type: Some("text/plain".to_string()),
            size: None,
            url: Some("https://example.com/items/123".to_string()),
            path: None,
            extra: None,
        },
        permissions: DocumentPermissions {
            public: true,
            users: vec![],
            groups: vec![],
        },
        attributes: Some([
            ("source_type".to_string(), json!(SourceType::Web)),
        ].into_iter().collect()),
    }).await?;

    ctx.increment_scanned(1).await?;
    ctx.increment_updated(1).await?;
    Ok(())
}
```

For updates, emit `DocumentUpdated`; for removals, emit `DocumentDeleted`. For group-based permissions, emit `GroupMembershipSync` events.

## Checkpoints and Resume

Use `save_checkpoint` after batches or remote pages that are safe to resume past:

```rust theme={null}
let checkpoint = serde_json::json!({ "cursor": next_cursor });
ctx.save_checkpoint(checkpoint).await?;
```

On the next run, connector-manager passes the latest checkpoint as the `checkpoint` argument to `sync`. `ctx.is_resume()` tells you whether the current run is a resume attempt after interruption.

<Tip>
  Checkpoint only after the documents for that cursor/page have been emitted. The SDK flushes before checkpointing, but your connector still controls where the logical cursor advances.
</Tip>

## Actions

Expose connector-specific actions by returning `ActionDefinition` values and implementing `execute_action`.

```rust theme={null}
use anyhow::{anyhow, Result};
use axum::http::StatusCode;
use omni_connector_sdk::{ActionDefinition, ActionMode, ActionResponse, Connector, SourceType};
use serde_json::{json, Value as JsonValue};

fn actions(&self) -> Vec<ActionDefinition> {
    vec![ActionDefinition {
        name: "validate_connection".to_string(),
        description: "Validate connector credentials".to_string(),
        input_schema: json!({ "type": "object", "properties": {} }),
        mode: ActionMode::Read,
        source_types: vec![SourceType::Web],
        admin_only: false,
        hidden: true,
    }]
}

async fn execute_action(
    &self,
    action: &str,
    _params: JsonValue,
    _credentials: Option<omni_connector_sdk::ServiceCredential>,
) -> Result<axum::response::Response> {
    match action {
        "validate_connection" => Ok(ActionResponse::success(json!({ "ok": true })).into_response()),
        other => Ok(ActionResponse::not_supported(other)
            .into_response_with_status(StatusCode::NOT_FOUND)),
    }
}
```

| Field          | Meaning                                                                                                       |
| -------------- | ------------------------------------------------------------------------------------------------------------- |
| `mode`         | `Read` or `Write`; write actions require approval/allowlisting where applicable                               |
| `source_types` | Restricts an action to specific source types; empty means all source types for the connector                  |
| `admin_only`   | Restricts chat/tool exposure to admins and affects credential resolution                                      |
| `hidden`       | Keeps setup/internal actions out of every chat and agent tool surface while leaving them dispatchable by name |

## Search Operators

Connectors can register inline search operators that map to document attributes.

```rust theme={null}
use omni_connector_sdk::SearchOperator;

fn search_operators(&self) -> Vec<SearchOperator> {
    vec![SearchOperator {
        operator: "project".to_string(),
        attribute_key: "project_id".to_string(),
        value_type: "text".to_string(),
    }]
}
```

After registration, users can filter with queries like `roadmap project:alpha` in Search and Chat.

## OAuth and MCP

Rust connectors can also expose:

* `oauth_config()` for Omni's generic OAuth flow
* `mcp_server()` for stdio or Streamable HTTP MCP servers
* `prepare_mcp_env()` / `prepare_mcp_headers()` to inject credentials into MCP calls
* `serve_with_extra_routes()` for provider-specific callback/enrichment routes when the standard SDK routes are not enough

Use the GitHub, Google, Atlassian, and Darwinbox connectors as references for advanced actions, OAuth manifests, and MCP integration patterns.

## Testing

Rust connectors should use integration tests wherever possible. Existing Rust connector tests under `connectors/*/tests/` use testcontainers-backed ParadeDB, Redis, and connector-manager helpers from the repo's shared test infrastructure.

Recommended coverage:

* Config and credential deserialization failures
* Full and incremental sync behavior
* Checkpoint/resume behavior
* Permission and group membership events
* Search operators and attributes
* Action success/failure cases
* Cancellation for long-running syncs

## Reference Connectors

| Connector               | Good reference for                                                                 |
| ----------------------- | ---------------------------------------------------------------------------------- |
| `connectors/filesystem` | Minimal Rust connector, local config, realtime sync, hidden setup action           |
| `connectors/darwinbox`  | Typed config/credentials/checkpoint, many actions, search operators                |
| `connectors/google`     | Service-account auth, multiple source types, OAuth manifest, Drive/Gmail/Chat sync |
| `connectors/slack`      | Realtime/socket behavior and message/file indexing                                 |
| `connectors/atlassian`  | Multiple products, attachments, permission mapping                                 |

## What's Next

<CardGroup cols={2}>
  <Card title="SDK Overview" icon="code" href="/developers/sdk-overview">
    Compare SDKs and connector lifecycle concepts
  </Card>

  <Card title="Connector Management" icon="gear" href="/admin/connector-management">
    Learn how connector-manager schedules and monitors sources
  </Card>
</CardGroup>
