Skip to main content
The Rust SDK (omni-connector-sdk) is used by many built-in Omni connectors, including Google Workspace, Slack, Atlassian, IMAP, Nextcloud, Filesystem, Web, and Darwinbox. Use it when you want a native Rust connector with typed configuration, high-throughput sync logic, or direct reuse of Omni’s shared Rust models.

Installation

The Rust SDK is not published to crates.io. Connectors live in the Omni monorepo and depend on the SDK as a local path dependency. Create a connector under connectors/ and add a Cargo.toml like:
[package]
name = "omni-my-connector"
version = "0.1.0"
edition = "2024"

[dependencies]
omni-connector-sdk = { path = "../../sdk/rust" }
anyhow = "1"
async-trait = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
The connector should be part of the Omni workspace so it is built, tested, and released with the matching SDK and shared model versions.

Server Startup

The SDK provides an Axum server with the standard connector endpoints and a registration loop that advertises the connector manifest to connector-manager every 30 seconds.
use anyhow::Result;
use omni_connector_sdk::serve;

use my_connector::MyConnector;

#[tokio::main]
async fn main() -> Result<()> {
    serve(MyConnector).await
}
When running under Docker, set:
VariablePurpose
CONNECTOR_MANAGER_URLURL of omni-connector-manager
CONNECTOR_HOST_NAMEHostname connector-manager should call back, usually the Docker Compose service name
PORTPort the connector server binds and registers

Connector Trait

Implement omni_connector_sdk::Connector for your connector type.
use anyhow::{Context, Result};
use async_trait::async_trait;
use omni_connector_sdk::{
    Connector, ServiceCredential, Source, SourceType, SyncContext, SyncType,
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize)]
struct MyConfig {
    api_url: String,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
struct MyCredentials {
    api_key: String,
}

#[derive(Debug, Clone, Default, Deserialize, Serialize)]
struct MyCheckpoint {
    cursor: Option<String>,
}

struct MyConnector;

#[async_trait]
impl Connector for MyConnector {
    type Config = MyConfig;
    type Credentials = MyCredentials;
    type State = MyCheckpoint;

    fn name(&self) -> &'static str {
        "my_connector"
    }

    fn version(&self) -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    fn display_name(&self) -> String {
        "My Connector".to_string()
    }

    fn source_types(&self) -> Vec<SourceType> {
        vec![SourceType::Web]
    }

    fn sync_modes(&self) -> Vec<SyncType> {
        vec![SyncType::Full, SyncType::Incremental]
    }

    async fn sync(
        &self,
        source: Source,
        credentials: Option<ServiceCredential>,
        checkpoint: Option<Self::State>,
        ctx: SyncContext,
    ) -> Result<()> {
        let config: MyConfig = serde_json::from_value(source.config)
            .context("failed to decode source config")?;
        let creds = credentials.context("credentials are required")?;
        let creds: MyCredentials = serde_json::from_value(creds.credentials)
            .context("failed to decode credentials")?;

        run_sync(config, creds, checkpoint.unwrap_or_default(), ctx).await
    }
}

Associated Types

Associated typeDescription
ConfigShape of source.config; the SDK validates it before dispatching /sync
CredentialsShape of service_credentials.credentials; validated before dispatch
StateCheckpoint type passed to sync and serialized by save_checkpoint
Use serde_json::Value for any associated type when you want to opt out of typed validation.

SyncContext

SyncContext is the connector’s handle back to connector-manager.
MethodPurpose
sync_run_id() / source_id() / source_type()Identify the current sync and source
sync_mode()Current mode: full, incremental, or realtime
is_resume()Whether this run resumed after interruption
is_cancelled()Whether cancellation was requested
store_content(content)Store extracted text and return a content ID
extract_and_store_content(data, mime_type, filename)Extract text from binary content through connector-manager/Docling and store it
emit_event(event)Emit document or permission events
increment_scanned(count) / increment_updated(count)Update sync progress counters
save_checkpoint(value)Persist a run checkpoint for safe resume
complete() / fail(error)Mark the sync complete or failed
heartbeat()Keep long-running syncs alive
emit_event() buffers events. Operations that checkpoint or terminate a sync flush buffered events first so the checkpoint does not race ahead of persisted events.

Emitting Documents

Rust connectors emit ConnectorEvent values directly.
use anyhow::Result;
use omni_connector_sdk::{
    ConnectorEvent, DocumentMetadata, DocumentPermissions, SourceType, SyncContext,
};
use serde_json::json;

async fn emit_one(ctx: &SyncContext) -> Result<()> {
    let content_id = ctx.store_content("Hello from my data source").await?;
    let external_id = "item-123".to_string();

    ctx.emit_event(ConnectorEvent::DocumentCreated {
        sync_run_id: ctx.sync_run_id().to_string(),
        source_id: ctx.source_id().to_string(),
        document_id: external_id.clone(),
        content_id,
        metadata: DocumentMetadata {
            title: Some("Example item".to_string()),
            author: None,
            created_at: None,
            updated_at: None,
            content_type: Some("text/plain".to_string()),
            mime_type: Some("text/plain".to_string()),
            size: None,
            url: Some("https://example.com/items/123".to_string()),
            path: None,
            extra: None,
        },
        permissions: DocumentPermissions {
            public: true,
            users: vec![],
            groups: vec![],
        },
        attributes: Some([
            ("source_type".to_string(), json!(SourceType::Web)),
        ].into_iter().collect()),
    }).await?;

    ctx.increment_scanned(1).await?;
    ctx.increment_updated(1).await?;
    Ok(())
}
For updates, emit DocumentUpdated; for removals, emit DocumentDeleted. For group-based permissions, emit GroupMembershipSync events.

Checkpoints and Resume

Use save_checkpoint after batches or remote pages that are safe to resume past:
let checkpoint = serde_json::json!({ "cursor": next_cursor });
ctx.save_checkpoint(checkpoint).await?;
On the next run, connector-manager passes the latest checkpoint as the checkpoint argument to sync. ctx.is_resume() tells you whether the current run is a resume attempt after interruption.
Checkpoint only after the documents for that cursor/page have been emitted. The SDK flushes before checkpointing, but your connector still controls where the logical cursor advances.

Actions

Expose connector-specific actions by returning ActionDefinition values and implementing execute_action.
use anyhow::{anyhow, Result};
use axum::http::StatusCode;
use omni_connector_sdk::{ActionDefinition, ActionMode, ActionResponse, Connector, SourceType};
use serde_json::{json, Value as JsonValue};

fn actions(&self) -> Vec<ActionDefinition> {
    vec![ActionDefinition {
        name: "validate_connection".to_string(),
        description: "Validate connector credentials".to_string(),
        input_schema: json!({ "type": "object", "properties": {} }),
        mode: ActionMode::Read,
        source_types: vec![SourceType::Web],
        admin_only: false,
        hidden: true,
    }]
}

async fn execute_action(
    &self,
    action: &str,
    _params: JsonValue,
    _credentials: Option<omni_connector_sdk::ServiceCredential>,
) -> Result<axum::response::Response> {
    match action {
        "validate_connection" => Ok(ActionResponse::success(json!({ "ok": true })).into_response()),
        other => Ok(ActionResponse::not_supported(other)
            .into_response_with_status(StatusCode::NOT_FOUND)),
    }
}
FieldMeaning
modeRead or Write; write actions require approval/allowlisting where applicable
source_typesRestricts an action to specific source types; empty means all source types for the connector
admin_onlyRestricts chat/tool exposure to admins and affects credential resolution
hiddenKeeps setup/internal actions out of every chat and agent tool surface while leaving them dispatchable by name

Search Operators

Connectors can register inline search operators that map to document attributes.
use omni_connector_sdk::SearchOperator;

fn search_operators(&self) -> Vec<SearchOperator> {
    vec![SearchOperator {
        operator: "project".to_string(),
        attribute_key: "project_id".to_string(),
        value_type: "text".to_string(),
    }]
}
After registration, users can filter with queries like roadmap project:alpha in Search and Chat.

OAuth and MCP

Rust connectors can also expose:
  • oauth_config() for Omni’s generic OAuth flow
  • mcp_server() for stdio or Streamable HTTP MCP servers
  • prepare_mcp_env() / prepare_mcp_headers() to inject credentials into MCP calls
  • serve_with_extra_routes() for provider-specific callback/enrichment routes when the standard SDK routes are not enough
Use the GitHub, Google, Atlassian, and Darwinbox connectors as references for advanced actions, OAuth manifests, and MCP integration patterns.

Testing

Rust connectors should use integration tests wherever possible. Existing Rust connector tests under connectors/*/tests/ use testcontainers-backed ParadeDB, Redis, and connector-manager helpers from the repo’s shared test infrastructure. Recommended coverage:
  • Config and credential deserialization failures
  • Full and incremental sync behavior
  • Checkpoint/resume behavior
  • Permission and group membership events
  • Search operators and attributes
  • Action success/failure cases
  • Cancellation for long-running syncs

Reference Connectors

ConnectorGood reference for
connectors/filesystemMinimal Rust connector, local config, realtime sync, hidden setup action
connectors/darwinboxTyped config/credentials/checkpoint, many actions, search operators
connectors/googleService-account auth, multiple source types, OAuth manifest, Drive/Gmail/Chat sync
connectors/slackRealtime/socket behavior and message/file indexing
connectors/atlassianMultiple products, attachments, permission mapping

What’s Next

SDK Overview

Compare SDKs and connector lifecycle concepts

Connector Management

Learn how connector-manager schedules and monitors sources