import {
ActionDefinition,
Connector,
Document,
DocumentMetadata,
DocumentPermissions,
SyncContext,
} from '@getomnico/connector';
interface FeedEntry {
id?: string;
link?: string;
title?: string;
content?: string;
summary?: string;
author?: string;
published?: string;
}
interface Feed {
title?: string;
entries: FeedEntry[];
}
class RSSConnector extends Connector {
get name(): string {
return 'rss';
}
get version(): string {
return '1.0.0';
}
get syncModes(): string[] {
return ['full', 'incremental'];
}
get actions(): ActionDefinition[] {
return [
{
name: 'validate_feed',
description: 'Validate that a feed URL is accessible',
parameters: [
{
name: 'url',
type: 'string',
required: true,
description: 'The RSS/Atom feed URL to validate',
},
],
},
];
}
async sync(
sourceConfig: Record<string, unknown>,
credentials: Record<string, unknown>,
state: Record<string, unknown> | null,
ctx: SyncContext
): Promise<void> {
const feedUrl = sourceConfig.feed_url as string;
if (!feedUrl) {
await ctx.fail('Missing feed_url in source configuration');
return;
}
// Fetch and parse the feed
const feed = await this.parseFeed(feedUrl);
// Get last sync time for incremental sync
const lastSyncTime = state?.last_sync_time
? new Date(state.last_sync_time as string)
: null;
for (const entry of feed.entries) {
if (ctx.isCancelled()) {
return;
}
await ctx.incrementScanned();
// Skip old entries in incremental mode
if (lastSyncTime && entry.published) {
const entryDate = new Date(entry.published);
if (entryDate <= lastSyncTime) {
continue;
}
}
// Build content from entry
const content = entry.content || entry.summary || entry.title || '';
const contentId = await ctx.contentStorage.save(content);
// Create document
const doc: Document = {
external_id: entry.id || entry.link || '',
title: entry.title || 'Untitled',
content_id: contentId,
metadata: {
author: entry.author,
url: entry.link,
created_at: entry.published,
extra: {
feed_title: feed.title,
feed_url: feedUrl,
},
},
permissions: {
public: true,
},
};
await ctx.emit(doc);
}
// Complete with updated state
await ctx.complete({ last_sync_time: new Date().toISOString() });
}
async executeAction(
action: string,
params: Record<string, unknown>,
credentials: Record<string, unknown>
): Promise<Record<string, unknown>> {
if (action === 'validate_feed') {
const url = params.url as string;
if (!url) {
return { status: 'error', error: 'Missing URL parameter' };
}
try {
const feed = await this.parseFeed(url);
return {
status: 'success',
result: {
title: feed.title,
entry_count: feed.entries.length,
},
};
} catch (error) {
return { status: 'error', error: String(error) };
}
}
return { status: 'error', error: `Unknown action: ${action}` };
}
private async parseFeed(url: string): Promise<Feed> {
// Simple XML parsing (use rss-parser library in production)
const response = await fetch(url);
const text = await response.text();
const entries: FeedEntry[] = [];
const itemRegex = /<item>([\s\S]*?)<\/item>/g;
let match;
while ((match = itemRegex.exec(text)) !== null) {
const item = match[1];
entries.push({
title: this.extractTag(item, 'title'),
link: this.extractTag(item, 'link'),
content: this.extractTag(item, 'content:encoded') ||
this.extractTag(item, 'description'),
author: this.extractTag(item, 'author') ||
this.extractTag(item, 'dc:creator'),
published: this.extractTag(item, 'pubDate'),
id: this.extractTag(item, 'guid') ||
this.extractTag(item, 'link'),
});
}
return {
title: this.extractTag(text, 'title'),
entries,
};
}
private extractTag(xml: string, tag: string): string | undefined {
const regex = new RegExp(`<${tag}[^>]*>([\\s\\S]*?)</${tag}>`, 'i');
const match = regex.exec(xml);
return match ? match[1].trim() : undefined;
}
}
const connector = new RSSConnector();
connector.serve({ port: 8000 });