import asyncio
from datetime import datetime
from typing import Any
import feedparser
from omni_connector import (
ActionDefinition,
ActionParameter,
Connector,
Document,
DocumentMetadata,
DocumentPermissions,
SyncContext,
)
class RSSConnector(Connector):
"""Connector that indexes RSS/Atom feeds."""
@property
def name(self) -> str:
return "rss"
@property
def version(self) -> str:
return "1.0.0"
@property
def sync_modes(self) -> list[str]:
return ["full", "incremental"]
@property
def actions(self) -> list[ActionDefinition]:
return [
ActionDefinition(
name="validate_feed",
description="Validate that a feed URL is accessible",
parameters=[
ActionParameter(
name="url",
type="string",
required=True,
description="The RSS/Atom feed URL to validate",
),
],
),
]
async def sync(
self,
source_config: dict[str, Any],
credentials: dict[str, Any],
state: dict[str, Any] | None,
ctx: SyncContext,
) -> None:
"""Sync articles from an RSS feed."""
feed_url = source_config.get("feed_url")
if not feed_url:
await ctx.fail("Missing feed_url in source configuration")
return
# Parse the feed
feed = await asyncio.to_thread(feedparser.parse, feed_url)
if feed.bozo and not feed.entries:
await ctx.fail(f"Failed to parse feed: {feed.bozo_exception}")
return
# Get last sync time for incremental sync
last_sync_time = None
if state and "last_sync_time" in state:
last_sync_time = datetime.fromisoformat(state["last_sync_time"])
for entry in feed.entries:
if ctx.is_cancelled():
return
await ctx.increment_scanned()
# Skip old entries in incremental mode
if last_sync_time and hasattr(entry, "published_parsed"):
entry_date = datetime(*entry.published_parsed[:6])
if entry_date <= last_sync_time:
continue
# Build content from entry
content = self._build_content(entry)
content_id = await ctx.content_storage.save(content)
# Create document
doc = Document(
external_id=entry.get("id", entry.link),
title=entry.get("title", "Untitled"),
content_id=content_id,
metadata=DocumentMetadata(
author=entry.get("author"),
url=entry.get("link"),
created_at=self._parse_date(entry),
extra={
"feed_title": feed.feed.get("title"),
"feed_url": feed_url,
},
),
permissions=DocumentPermissions(public=True),
)
await ctx.emit(doc)
# Complete with updated state
await ctx.complete({"last_sync_time": datetime.utcnow().isoformat()})
def _build_content(self, entry) -> str:
"""Extract content from feed entry."""
if hasattr(entry, "content") and entry.content:
return entry.content[0].get("value", "")
if hasattr(entry, "summary"):
return entry.summary
return entry.get("title", "")
def _parse_date(self, entry) -> datetime | None:
"""Parse date from feed entry."""
if hasattr(entry, "published_parsed") and entry.published_parsed:
return datetime(*entry.published_parsed[:6])
return None
async def execute_action(
self,
action: str,
params: dict[str, Any],
credentials: dict[str, Any],
) -> dict[str, Any]:
if action == "validate_feed":
url = params.get("url")
if not url:
return {"status": "error", "error": "Missing URL parameter"}
feed = await asyncio.to_thread(feedparser.parse, url)
if feed.bozo and not feed.entries:
return {"status": "error", "error": str(feed.bozo_exception)}
return {
"status": "success",
"result": {
"title": feed.feed.get("title"),
"entry_count": len(feed.entries),
},
}
return {"status": "error", "error": f"Unknown action: {action}"}
if __name__ == "__main__":
RSSConnector().serve(port=8000)