Add a Custom Data Source
The DataRouter supports HTTP, x402, MCP, and WebSocket sources out of the box. To add a new source — proprietary API, internal feed, alternative oracle — implement the DataSource interface.
1. Inherit from DataSource
# src/strategy/sources/my_oracle.py
from typing import Any, Callable
from aether_forge.data_layer import DataSource, DataResult, DataSourceCost
class MyOracleSource(DataSource):
name = "my-oracle"
def __init__(self, api_key: str, base_url: str = "https://my-oracle.example.com"):
self.api_key = api_key
self.base_url = base_url
self._calls = 0
self._errors = 0
def supports(self, capability: str) -> bool:
return capability in {
"my-oracle-price",
"my-oracle-depth",
"my-oracle-volume",
}
def fetch(self, capability: str, **params: Any) -> DataResult:
import json
from urllib import request
token = params.get("token", "ETH")
url = f"{self.base_url}/{capability}?token={token}"
req = request.Request(url, headers={"X-API-Key": self.api_key})
try:
self._calls += 1
with request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
except Exception as error:
self._errors += 1
raise RuntimeError(f"my-oracle fetch failed: {error}") from error
return DataResult(
source=self.name,
capability=capability,
data=data,
cost=DataSourceCost(amount_usd=0.0, paid=False, payment_method="subscription"),
)
def status(self) -> dict[str, Any]:
return {
"name": self.name,
"calls": self._calls,
"errors": self._errors,
}2. Wire into your scaffold
In your generated agent’s src/strategy/router.py:
from aether_forge.data_layer import DataRouter
from .sources.my_oracle import MyOracleSource
def build_router(config) -> DataRouter:
sources = [
MyOracleSource(api_key=config.get("my_oracle_key")),
# ... other sources
]
return DataRouter(sources=sources, fallback_chain=True)3. Declare in aether-forge.json
{
"data_sources": [
{
"name": "my-oracle",
"type": "custom",
"module": "src.strategy.sources.my_oracle:MyOracleSource",
"config": {
"api_key_env": "MY_ORACLE_KEY",
"base_url": "https://my-oracle.example.com"
},
"priority": 1
}
]
}4. Reference in capability-manifest
{
"capabilities": [
{
"id": "my-oracle-price",
"name": "Get oracle price",
"kind": "read",
"riskLevel": "low",
"effectSemantics": "none",
"credentialHandle": "my-oracle"
}
],
"credentials": [
{
"handle": "my-oracle",
"provider": "my-oracle",
"scope": "read-only"
}
]
}5. Test it
def test_my_oracle_source(monkeypatch):
from src.strategy.sources.my_oracle import MyOracleSource
source = MyOracleSource(api_key="test-key")
# mock urlopen ...
result = source.fetch("my-oracle-price", token="ETH")
assert result.source == "my-oracle"
assert result.data is not NoneTips
- Always set
costcorrectly — the router tracks spend - Use
DataSourceCost(payment_method="x402")for paid sources to surface in budget reports - Implement
subscribe()if your source supports streaming (WebSocket, SSE) - Return raw responses; don’t pre-process — the planner handles that
- Set timeouts; the runtime has
tick_timeout_secondsbut per-source timeouts give better error messages
Last updated on