Skip to Content
DocumentationCookbookAdd a Custom Data Source

Add a Custom Data Source

The DataRouter supports HTTP, x402, MCP, and WebSocket sources out of the box. To add a new source — proprietary API, internal feed, alternative oracle — implement the DataSource interface.

1. Inherit from DataSource

# src/strategy/sources/my_oracle.py from typing import Any, Callable from aether_forge.data_layer import DataSource, DataResult, DataSourceCost class MyOracleSource(DataSource): name = "my-oracle" def __init__(self, api_key: str, base_url: str = "https://my-oracle.example.com"): self.api_key = api_key self.base_url = base_url self._calls = 0 self._errors = 0 def supports(self, capability: str) -> bool: return capability in { "my-oracle-price", "my-oracle-depth", "my-oracle-volume", } def fetch(self, capability: str, **params: Any) -> DataResult: import json from urllib import request token = params.get("token", "ETH") url = f"{self.base_url}/{capability}?token={token}" req = request.Request(url, headers={"X-API-Key": self.api_key}) try: self._calls += 1 with request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read()) except Exception as error: self._errors += 1 raise RuntimeError(f"my-oracle fetch failed: {error}") from error return DataResult( source=self.name, capability=capability, data=data, cost=DataSourceCost(amount_usd=0.0, paid=False, payment_method="subscription"), ) def status(self) -> dict[str, Any]: return { "name": self.name, "calls": self._calls, "errors": self._errors, }

2. Wire into your scaffold

In your generated agent’s src/strategy/router.py:

from aether_forge.data_layer import DataRouter from .sources.my_oracle import MyOracleSource def build_router(config) -> DataRouter: sources = [ MyOracleSource(api_key=config.get("my_oracle_key")), # ... other sources ] return DataRouter(sources=sources, fallback_chain=True)

3. Declare in aether-forge.json

{ "data_sources": [ { "name": "my-oracle", "type": "custom", "module": "src.strategy.sources.my_oracle:MyOracleSource", "config": { "api_key_env": "MY_ORACLE_KEY", "base_url": "https://my-oracle.example.com" }, "priority": 1 } ] }

4. Reference in capability-manifest

{ "capabilities": [ { "id": "my-oracle-price", "name": "Get oracle price", "kind": "read", "riskLevel": "low", "effectSemantics": "none", "credentialHandle": "my-oracle" } ], "credentials": [ { "handle": "my-oracle", "provider": "my-oracle", "scope": "read-only" } ] }

5. Test it

def test_my_oracle_source(monkeypatch): from src.strategy.sources.my_oracle import MyOracleSource source = MyOracleSource(api_key="test-key") # mock urlopen ... result = source.fetch("my-oracle-price", token="ETH") assert result.source == "my-oracle" assert result.data is not None

Tips

  • Always set cost correctly — the router tracks spend
  • Use DataSourceCost(payment_method="x402") for paid sources to surface in budget reports
  • Implement subscribe() if your source supports streaming (WebSocket, SSE)
  • Return raw responses; don’t pre-process — the planner handles that
  • Set timeouts; the runtime has tick_timeout_seconds but per-source timeouts give better error messages
Last updated on