Skip to Content
DocumentationCookbookTrigger from a Webhook

Trigger an Agent from a Webhook

Out of the box, Aether Forge agents run on a tick interval. To run them on external events such as webhooks, queue messages, or cron jobs, call one tick and pass request data through scenario_inputs.

Pattern 1: HTTP Server + Per-Request Tick

# webhook_server.py from pathlib import Path from threading import Lock from flask import Flask, jsonify, request from aether_forge import AgentRunner, RunnerConfig app = Flask(__name__) tick_lock = Lock() runner = AgentRunner( Path("./my-agent"), config=RunnerConfig( environment="sandbox", auto_approve=True, persist_memory=True, tick_timeout_seconds=60, ), ) @app.post("/trigger") def trigger(): payload = request.get_json(force=True) with tick_lock: result = runner.tick(scenario_inputs={"webhookEvent": payload}) return jsonify({ "tick": result.tick_number, "status": result.session_status, "steps": result.steps_executed, "pendingApprovals": result.pending_approvals, }) if __name__ == "__main__": app.run(host="0.0.0.0", port=8000)
curl -X POST http://localhost:8000/trigger \ -H "Content-Type: application/json" \ -d '{"event":"price_alert","token":"ETH","threshold":2500}'

Pattern 2: Queue Consumer

import json from pathlib import Path import redis from aether_forge import AgentRunner, RunnerConfig r = redis.Redis(host="localhost", port=6379) runner = AgentRunner( Path("./my-agent"), config=RunnerConfig(environment="sandbox", auto_approve=True), ) while True: _, raw = r.blpop("agent-events", timeout=0) event = json.loads(raw) result = runner.tick(scenario_inputs={"queueEvent": event}) r.rpush("agent-results", json.dumps({ "event": event, "result": { "tick": result.tick_number, "status": result.session_status, "steps": result.steps_executed, }, }))

This is the safest pattern for bursty traffic because one consumer serializes writes to the agent’s SQLite memory store.

Pattern 3: GitHub Webhook to Agent Action

import hashlib import hmac import os from pathlib import Path from threading import Lock from flask import Flask, request from aether_forge import AgentRunner, RunnerConfig app = Flask(__name__) tick_lock = Lock() runner = AgentRunner( Path("./my-agent"), config=RunnerConfig(environment="sandbox", auto_approve=True), ) SECRET = os.environ["GITHUB_WEBHOOK_SECRET"] @app.post("/github") def github_webhook(): signature = request.headers.get("X-Hub-Signature-256", "") expected = "sha256=" + hmac.new( SECRET.encode(), request.data, hashlib.sha256, ).hexdigest() if not hmac.compare_digest(signature, expected): return "Forbidden", 403 event = request.headers.get("X-GitHub-Event") payload = request.get_json(force=True) if event == "push": with tick_lock: runner.tick(scenario_inputs={"githubEvent": event, "payload": payload}) return "ok"

Pattern 4: A2A as the Webhook

Other agents can call your agent through A2A. This gives you cryptographic auth and optional payment on top of request-style execution.

forge run ./my-agent --a2a-port 9001 --auto-approve

Then another agent or service can send a task:

forge agent-send http://your-host:9001 --capability your-cap --payload '{"event":"x"}'

Pattern 5: Scheduled Tick

Serverless platforms work best when each invocation runs one tick and persists state between invocations.

# lambda_handler.py from pathlib import Path import boto3 from aether_forge import AgentRunner, RunnerConfig s3 = boto3.client("s3") def handler(event, context): s3.download_file("agent-state", "memory.db", "/tmp/memory.db") runner = AgentRunner( Path("/var/task/agent"), config=RunnerConfig( environment="production", memory_db_path="/tmp/memory.db", persist_replays=False, ), ) result = runner.tick(scenario_inputs={"scheduleEvent": event}) s3.upload_file("/tmp/memory.db", "agent-state", "memory.db") return { "statusCode": 200, "body": { "tick": result.tick_number, "status": result.session_status, }, }

Important: Handle Concurrency

If multiple events arrive at once, sequential ticks are safest. SQLite supports concurrent readers, but a single writer per tenant or agent state file keeps behavior predictable.

Use one of these:

  • a queue and single consumer,
  • a process-local lock around runner.tick(...),
  • separate memory and replay paths per tenant or event stream.

Don’t Forget

  • Validate webhook signatures.
  • Rate limit incoming requests.
  • Keep request payloads out of policy and credential configuration.
  • Set conservative tick_timeout_seconds.
  • Monitor /ready; if the agent enters circuit breaker, your webhook handler should return 503.

For SaaS products with tenant-specific state, see Multi-Tenant Integration.

Last updated on