Skip to main content
The TypeScript SDK (@getomnico/connector) provides everything you need to build custom connectors that integrate with Omni’s search and AI platform.

Installation

npm install @getomnico/connector
Requirements:
  • Node.js >= 20.0.0
Dependencies installed:
  • express - Web framework
  • pg - PostgreSQL client
  • zod - Schema validation

Quick Start

import {
  Connector,
  Document,
  DocumentMetadata,
  DocumentPermissions,
  SyncContext,
} from '@getomnico/connector';

class MyConnector extends Connector {
  get name(): string {
    return 'my-connector';
  }

  get version(): string {
    return '1.0.0';
  }

  get syncModes(): string[] {
    return ['full', 'incremental'];
  }

  async sync(
    sourceConfig: Record<string, unknown>,
    credentials: Record<string, unknown>,
    state: Record<string, unknown> | null,
    ctx: SyncContext
  ): Promise<void> {
    // Your sync logic here
    await ctx.complete();
  }
}

const connector = new MyConnector();
connector.serve({ port: 8000 });

Core Concepts

Connector Class

The Connector abstract class defines the interface for all connectors. Required Properties:
PropertyTypeDescription
namestringUnique connector identifier (e.g., "my-connector")
versionstringSemantic version (e.g., "1.0.0")
Optional Properties:
PropertyTypeDefaultDescription
syncModesstring[]["full"]Supported modes: "full", "incremental"
actionsActionDefinition[][]Custom actions the connector supports
Required Methods:
async sync(
  sourceConfig: Record<string, unknown>,  // Configuration from Omni UI
  credentials: Record<string, unknown>,   // Stored credentials
  state: Record<string, unknown> | null,  // Persisted state
  ctx: SyncContext                        // Sync context utilities
): Promise<void>
Optional Methods:
cancel(syncRunId: string): void
// Handle sync cancellation request

async executeAction(
  action: string,
  params: Record<string, unknown>,
  credentials: Record<string, unknown>
): Promise<Record<string, unknown>>
// Execute a custom action

SyncContext

The SyncContext object is passed to your sync method and provides utilities for the sync operation. Properties:
PropertyTypeDescription
syncRunIdstringUnique identifier for this sync run
sourceIdstringSource identifier
stateRecord<string, unknown> | nullState from previous sync
contentStorageContentStorageStorage for document content
documentsEmittednumberCount of emitted documents
documentsScannednumberCount of scanned items
Methods:
// Emit a new document
await ctx.emit(document: Document): Promise<void>

// Emit an updated document
await ctx.emitUpdated(document: Document): Promise<void>

// Mark a document as deleted
await ctx.emitDeleted(externalId: string): Promise<void>

// Report a non-fatal error for a document
await ctx.emitError(externalId: string, error: string): Promise<void>

// Increment scanned counter (also sends heartbeat)
await ctx.incrementScanned(): Promise<void>

// Save state checkpoint for resumability
await ctx.saveState(state: Record<string, unknown>): Promise<void>

// Mark sync as completed
await ctx.complete(newState?: Record<string, unknown>): Promise<void>

// Mark sync as failed
await ctx.fail(error: string): Promise<void>

// Check if sync was cancelled
ctx.isCancelled(): boolean

ContentStorage

The ContentStorage class handles storing document content.
// Store text content
const contentId = await ctx.contentStorage.save(
  content: string,
  contentType?: string  // default: "text/plain"
): Promise<string>

// Store binary content (base64 encoded)
const contentId = await ctx.contentStorage.saveBinary(
  content: Buffer,
  contentType: string
): Promise<string>
The returned contentId is a ULID that references the stored content.

Document Model

The Document interface represents a searchable document.
import { Document, DocumentMetadata, DocumentPermissions } from '@getomnico/connector';

const document: Document = {
  external_id: 'unique-id-from-source',
  title: 'Document Title',
  content_id: contentId,  // From contentStorage.save()
  metadata: {
    title: 'Document Title',
    author: 'Author Name',
    created_at: new Date().toISOString(),
    updated_at: new Date().toISOString(),
    mime_type: 'text/plain',
    size: 1234,
    url: 'https://source.com/doc/123',
    path: '/folder/document.txt',
    extra: { custom_field: 'value' },
  },
  permissions: {
    public: true,
    users: ['[email protected]'],
    groups: ['engineering'],
  },
  attributes: { category: 'documentation' },
};
Document Fields:
FieldTypeRequiredDescription
external_idstringYesUnique ID from the source system
titlestringYesDocument title for display
content_idstringYesReference to stored content
metadataDocumentMetadataNoDocument metadata
permissionsDocumentPermissionsNoAccess control
attributesRecord<string, unknown>NoCustom searchable attributes

Actions

Connectors can define custom actions that users can trigger from the Omni UI.
import { ActionDefinition, ActionParameter } from '@getomnico/connector';

class MyConnector extends Connector {
  get actions(): ActionDefinition[] {
    return [
      {
        name: 'validate_connection',
        description: 'Test the API connection',
        parameters: [
          {
            name: 'timeout',
            type: 'integer',
            required: false,
            description: 'Connection timeout in seconds',
          },
        ],
      },
    ];
  }

  async executeAction(
    action: string,
    params: Record<string, unknown>,
    credentials: Record<string, unknown>
  ): Promise<Record<string, unknown>> {
    if (action === 'validate_connection') {
      // Perform validation
      return { status: 'success', message: 'Connection valid' };
    }
    throw new Error(`Unknown action: ${action}`);
  }
}

RSS Connector Example

Here’s a complete example of an RSS feed connector:
import {
  ActionDefinition,
  Connector,
  Document,
  DocumentMetadata,
  DocumentPermissions,
  SyncContext,
} from '@getomnico/connector';

interface FeedEntry {
  id?: string;
  link?: string;
  title?: string;
  content?: string;
  summary?: string;
  author?: string;
  published?: string;
}

interface Feed {
  title?: string;
  entries: FeedEntry[];
}

class RSSConnector extends Connector {
  get name(): string {
    return 'rss';
  }

  get version(): string {
    return '1.0.0';
  }

  get syncModes(): string[] {
    return ['full', 'incremental'];
  }

  get actions(): ActionDefinition[] {
    return [
      {
        name: 'validate_feed',
        description: 'Validate that a feed URL is accessible',
        parameters: [
          {
            name: 'url',
            type: 'string',
            required: true,
            description: 'The RSS/Atom feed URL to validate',
          },
        ],
      },
    ];
  }

  async sync(
    sourceConfig: Record<string, unknown>,
    credentials: Record<string, unknown>,
    state: Record<string, unknown> | null,
    ctx: SyncContext
  ): Promise<void> {
    const feedUrl = sourceConfig.feed_url as string;
    if (!feedUrl) {
      await ctx.fail('Missing feed_url in source configuration');
      return;
    }

    // Fetch and parse the feed
    const feed = await this.parseFeed(feedUrl);

    // Get last sync time for incremental sync
    const lastSyncTime = state?.last_sync_time
      ? new Date(state.last_sync_time as string)
      : null;

    for (const entry of feed.entries) {
      if (ctx.isCancelled()) {
        return;
      }

      await ctx.incrementScanned();

      // Skip old entries in incremental mode
      if (lastSyncTime && entry.published) {
        const entryDate = new Date(entry.published);
        if (entryDate <= lastSyncTime) {
          continue;
        }
      }

      // Build content from entry
      const content = entry.content || entry.summary || entry.title || '';
      const contentId = await ctx.contentStorage.save(content);

      // Create document
      const doc: Document = {
        external_id: entry.id || entry.link || '',
        title: entry.title || 'Untitled',
        content_id: contentId,
        metadata: {
          author: entry.author,
          url: entry.link,
          created_at: entry.published,
          extra: {
            feed_title: feed.title,
            feed_url: feedUrl,
          },
        },
        permissions: {
          public: true,
        },
      };

      await ctx.emit(doc);
    }

    // Complete with updated state
    await ctx.complete({ last_sync_time: new Date().toISOString() });
  }

  async executeAction(
    action: string,
    params: Record<string, unknown>,
    credentials: Record<string, unknown>
  ): Promise<Record<string, unknown>> {
    if (action === 'validate_feed') {
      const url = params.url as string;
      if (!url) {
        return { status: 'error', error: 'Missing URL parameter' };
      }

      try {
        const feed = await this.parseFeed(url);
        return {
          status: 'success',
          result: {
            title: feed.title,
            entry_count: feed.entries.length,
          },
        };
      } catch (error) {
        return { status: 'error', error: String(error) };
      }
    }

    return { status: 'error', error: `Unknown action: ${action}` };
  }

  private async parseFeed(url: string): Promise<Feed> {
    // Simple XML parsing (use rss-parser library in production)
    const response = await fetch(url);
    const text = await response.text();

    const entries: FeedEntry[] = [];
    const itemRegex = /<item>([\s\S]*?)<\/item>/g;
    let match;

    while ((match = itemRegex.exec(text)) !== null) {
      const item = match[1];
      entries.push({
        title: this.extractTag(item, 'title'),
        link: this.extractTag(item, 'link'),
        content: this.extractTag(item, 'content:encoded') ||
                 this.extractTag(item, 'description'),
        author: this.extractTag(item, 'author') ||
                this.extractTag(item, 'dc:creator'),
        published: this.extractTag(item, 'pubDate'),
        id: this.extractTag(item, 'guid') ||
            this.extractTag(item, 'link'),
      });
    }

    return {
      title: this.extractTag(text, 'title'),
      entries,
    };
  }

  private extractTag(xml: string, tag: string): string | undefined {
    const regex = new RegExp(`<${tag}[^>]*>([\\s\\S]*?)</${tag}>`, 'i');
    const match = regex.exec(xml);
    return match ? match[1].trim() : undefined;
  }
}

const connector = new RSSConnector();
connector.serve({ port: 8000 });

Error Handling

The SDK provides custom error classes:
import {
  ConnectorError,      // Base error
  SdkClientError,      // Communication errors (includes statusCode)
  SyncCancelledError,  // Sync was cancelled
  ConfigurationError,  // Configuration issues
} from '@getomnico/connector';
Handling errors in sync:
async sync(
  sourceConfig: Record<string, unknown>,
  credentials: Record<string, unknown>,
  state: Record<string, unknown> | null,
  ctx: SyncContext
): Promise<void> {
  try {
    // Your sync logic
    await ctx.complete();
  } catch (error) {
    if (error instanceof SyncCancelledError) {
      // Handle cancellation gracefully
      return;
    }
    await ctx.fail(String(error));
  }
}

Development

Project Setup

# Install dependencies
npm install

# Build
npm run build

# Run tests
npm test

# Watch mode for tests
npm run test:watch

# Lint
npm run lint

TypeScript Configuration

The SDK uses strict TypeScript settings. Your tsconfig.json should include:
{
  "compilerOptions": {
    "target": "ES2022",
    "module": "ESNext",
    "moduleResolution": "bundler",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true
  }
}

API Reference

Exports

The SDK exports the following: Core Classes:
  • Connector - Abstract base class
  • SyncContext - Sync operation context
  • ContentStorage - Content storage interface
  • SdkClient - SDK client for connector-manager
Data Models (with Zod schemas):
  • Document, DocumentMetadata, DocumentPermissions
  • ConnectorEvent, EventType
  • ActionDefinition, ActionParameter
  • ActionRequest, ActionResponse
  • ConnectorManifest, SyncMode
  • SyncRequest, SyncResponse
  • CancelRequest, CancelResponse
Error Classes:
  • ConnectorError, SdkClientError, SyncCancelledError, ConfigurationError

Zod Schemas

All data models include Zod schemas for runtime validation:
import { DocumentSchema, SyncRequestSchema } from '@getomnico/connector';

// Validate incoming data
const result = SyncRequestSchema.safeParse(data);
if (!result.success) {
  console.error('Validation failed:', result.error);
}

What’s Next