Implementation Guide
This guide covers how to actually build the system described in the AI Agent Plan. Start with Phase 1 — it delivers a working agent in ~2 weeks and everything after is additive.
Prerequisites
Before writing any agent code:
- ✅ Access to AquaGen production API (
https://prod-aquagen.azurewebsites.net) - ✅ One real customer's credentials for testing
- ✅ Existing React dashboard (the chat widget plugs into this)
- ✅ Existing backend (the agent gateway runs alongside it)
- ✅ LLM API keys: Anthropic (primary) + Claude Haiku (classifier)
Phase 1 — Foundation
Goal: Working agent answering real customer questions in ~2 weeks.
Step 1 — Summary API Adapter
This is the most important step. Build it before anything else. Every agent calls summary endpoints — never raw data.
The adapter sits between the agent and the AquaGen API. It calls the real endpoints, aggregates the data, and returns a clean JSON summary.
Build one summary function per module:
- Water Flow
- Energy
- Alerts
# summary_adapter/water_flow.py
from aquagen_client import AquaGenClient
from helpers import parse_date, extract_units
def get_water_flow_summary(client: AquaGenClient, period: str = "today") -> dict:
"""
Returns a clean summary for the Water Flow agent.
period: "today" | "this_month" | "last_month" | "DD/MM/YYYY"
"""
date1 = parse_date(period)
type_param = "DAY" if period in ("today", "yesterday") else "MONTH"
response = client.get("/api/user/deviceDataV2", params={
"date1": date1,
"category": "SOURCE_CATEGORY",
"type": type_param,
"formattedOutput": True
})
units = extract_units(response, exclude_virtual=True)
total = sum(u["value"] for u in units if u.get("online") is not None)
offline = [u["displayName"] for u in units if not u.get("online", True)]
over_threshold = [
u["displayName"] for u in units
if u.get("meta", {}).get("threshold")
and u.get("value", 0) > u["meta"]["threshold"]
]
# Build breakdown by subcategory
breakdown = {}
for u in units:
sub = u.get("subCategoryName", "Other")
breakdown.setdefault(sub, 0)
breakdown[sub] += u.get("value", 0)
return {
"module": "water-flow",
"period": period,
"total_kl": round(total, 2),
"unit_count": len(units),
"offline_devices": offline,
"over_threshold_devices": over_threshold,
"breakdown": [
{"name": k, "value_kl": round(v, 2)}
for k, v in sorted(breakdown.items(), key=lambda x: -x[1])
]
}
# summary_adapter/energy.py
def get_energy_summary(client, period="today"):
date1 = parse_date(period)
type_param = "DAY" if period in ("today", "yesterday") else "MONTH"
response = client.get("/api/user/deviceDataV2", params={
"date1": date1,
"category": "ENERGY_CATEGORY",
"type": type_param,
"formattedOutput": True
})
units = extract_units(response, exclude_virtual=True)
total_kwh = sum(u.get("value", 0) for u in units)
offline = [u["displayName"] for u in units if not u.get("online", True)]
top_consumers = sorted(units, key=lambda u: -u.get("value", 0))[:5]
return {
"module": "energy",
"period": period,
"total_kwh": round(total_kwh, 2),
"offline_meters": offline,
"top_consumers": [
{"name": u["displayName"], "kwh": round(u.get("value", 0), 2)}
for u in top_consumers
]
}
# summary_adapter/alerts.py
from helpers import get_all_alerts
def get_alerts_summary(client, period="today"):
response = client.get("/api/user/alerts")
all_alerts = get_all_alerts(response) # merges today + currentMonth, deduplicates
if period == "today":
from helpers import get_todays_alerts
alerts = get_todays_alerts(response)
else:
alerts = all_alerts
from collections import Counter
severity_counts = Counter(a.get("severity", "UNKNOWN") for a in alerts)
device_counts = Counter(a.get("deviceName") for a in alerts)
top_devices = device_counts.most_common(5)
return {
"module": "alerts",
"period": period,
"total_alerts": len(alerts),
"unread": sum(1 for a in alerts if not a.get("isRead", True)),
"by_severity": dict(severity_counts),
"top_devices": [{"name": name, "count": count} for name, count in top_devices]
}
Build a summary for all 12 modules. The pattern is always the same: call the real API → aggregate → return a clean dict. The LLM never sees raw rows.
Step 2 — Pre-fetch Service (Redis Cache)
When a customer opens the dashboard, pre-fetch all module snapshots in parallel before the first question is asked. Store in Redis with a 2-minute TTL.
# prefetch_service.py
import asyncio
import redis
import json
from summary_adapter import (
get_water_flow_summary, get_energy_summary, get_alerts_summary,
get_water_stock_summary, get_water_quality_summary,
get_water_balance_summary, get_groundwater_summary,
get_water_neutrality_summary, get_rainwater_summary, get_uwi_summary
)
SNAPSHOT_TTL_SECONDS = 120 # 2 minutes
async def prefetch_all_snapshots(customer_id: str, client):
"""
Call all enabled module summary APIs in parallel.
Called when customer opens dashboard (JWT auth event).
"""
tasks = {
"alerts": asyncio.to_thread(get_alerts_summary, client, "today"),
"water_flow": asyncio.to_thread(get_water_flow_summary, client, "today"),
"energy": asyncio.to_thread(get_energy_summary, client, "today"),
"water_stock": asyncio.to_thread(get_water_stock_summary, client, "today"),
"water_balance": asyncio.to_thread(get_water_balance_summary, client, "today"),
"water_neutrality": asyncio.to_thread(get_water_neutrality_summary, client, "this_month"),
"groundwater": asyncio.to_thread(get_groundwater_summary, client, "today"),
"water_quality": asyncio.to_thread(get_water_quality_summary, client, "today"),
"rainwater": asyncio.to_thread(get_rainwater_summary, client, "today"),
"uwi": asyncio.to_thread(get_uwi_summary, client, "today"),
}
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
snapshots = {}
for key, result in zip(tasks.keys(), results):
if not isinstance(result, Exception):
snapshots[key] = result
r = redis.Redis()
r.setex(
f"snapshot:{customer_id}",
SNAPSHOT_TTL_SECONDS,
json.dumps(snapshots)
)
return snapshots
def get_snapshot(customer_id: str) -> dict | None:
r = redis.Redis()
data = r.get(f"snapshot:{customer_id}")
return json.loads(data) if data else None
Step 3 — Agent Gateway (FastAPI)
The agent gateway is a new service that sits next to your existing backend. It handles the AI chat stream endpoint.
# agent_gateway/main.py
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from intent_classifier import classify_intent
from agent_pool import run_agent
from prefetch_service import get_snapshot, prefetch_all_snapshots
from context_store import get_context, save_context
import json
app = FastAPI()
@app.post("/agent/chat")
async def chat(request: Request):
body = await request.json()
customer_id = body["customer_id"]
question = body["question"]
jwt_token = request.headers.get("Authorization")
# Get snapshot (pre-fetched on dashboard open, or fetch now)
snapshot = get_snapshot(customer_id)
if not snapshot:
client = make_client(jwt_token)
snapshot = await prefetch_all_snapshots(customer_id, client)
# Classify intent → which modules needed
intent = classify_intent(question)
# intent = { "modules": ["page-water-flow"], "complexity": "single" }
# Get conversation context (last 5 turns)
context = get_context(customer_id)
async def event_stream():
yield f"data: {json.dumps({'type': 'start'})}\n\n"
if intent["complexity"] == "background":
# Queue as background job, return immediately
job_id = queue_background_job(customer_id, question, intent)
yield f"data: {json.dumps({'type': 'queued', 'job_id': job_id})}\n\n"
else:
# Run agent(s) and stream tokens
async for event in run_agent(
modules=intent["modules"],
question=question,
snapshot=snapshot,
context=context,
customer_id=customer_id,
jwt_token=jwt_token
):
yield f"data: {json.dumps(event)}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.post("/agent/dashboard-open")
async def on_dashboard_open(request: Request):
"""Called when a customer opens the dashboard — triggers snapshot pre-fetch."""
body = await request.json()
customer_id = body["customer_id"]
jwt_token = request.headers.get("Authorization")
client = make_client(jwt_token)
await prefetch_all_snapshots(customer_id, client)
return {"status": "ok", "message": "Snapshots pre-fetched"}
Step 4 — Dashboard Agent + Alerts Agent
Build these two first. They cover the most common questions and are the simplest to implement.
# agents/dashboard_agent.py
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import SystemMessage, HumanMessage
from agent_prompts import DASHBOARD_AGENT_PROMPT
llm = ChatAnthropic(model="claude-sonnet-4-6", temperature=0.2, max_tokens=2000)
def run_dashboard_agent(question: str, snapshot: dict, context: list, session_ctx: dict):
"""
Single agent — no tools needed for Phase 1.
Snapshot already contains all data. Agent just reasons over it.
"""
system_prompt = DASHBOARD_AGENT_PROMPT.format(
session_context=format_session_context(session_ctx)
)
# Inject snapshot as context (already summarised — low token cost)
snapshot_text = format_snapshot_for_llm(snapshot)
messages = [
SystemMessage(content=system_prompt),
*context, # Last 5 turns
HumanMessage(content=f"Available data snapshot:\n{snapshot_text}\n\nQuestion: {question}")
]
response = llm.invoke(messages)
return response.content
def format_snapshot_for_llm(snapshot: dict) -> str:
"""Render snapshot as compact text for LLM context."""
lines = []
for module, data in snapshot.items():
lines.append(f"[{module.upper()}]")
for k, v in data.items():
if k != "module":
lines.append(f" {k}: {v}")
return "\n".join(lines)
Step 5 — Intent Classifier
# intent_classifier.py
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage
import json, re
# Use Haiku — fast and cheap for classification
classifier_llm = ChatAnthropic(model="claude-haiku-4-5-20251001", temperature=0, max_tokens=200)
CLASSIFIER_PROMPT = """
You are an intent classifier for AquaGen water management platform.
Given a user question, return JSON only:
{
"modules": ["page-water-flow"],
"complexity": "single"
}
complexity: "single" (1 module), "multi" (2+ modules), "background" (reports/30+ day analysis)
Modules: page-dashboard, page-alerts, page-reports, page-water-flow, page-water-quality,
page-water-balance, page-water-stock, page-water-neutrality, page-rainwater, page-groundwater,
page-energy, page-uwi
"""
def classify_intent(question: str) -> dict:
response = classifier_llm.invoke([
HumanMessage(content=f"{CLASSIFIER_PROMPT}\n\nQuestion: {question}")
])
try:
match = re.search(r'\{.*\}', response.content, re.DOTALL)
return json.loads(match.group()) if match else {"modules": ["page-dashboard"], "complexity": "single"}
except Exception:
return {"modules": ["page-dashboard"], "complexity": "single"}
Step 6 — Chat Widget (React)
Plug into your existing React dashboard. The widget calls the new agent gateway endpoint.
// components/AquaGenChat.tsx
import { useState, useRef } from 'react';
export function AquaGenChat({ customerId, jwtToken }: Props) {
const [messages, setMessages] = useState<Message[]>([]);
const [streaming, setStreaming] = useState(false);
const sendMessage = async (question: string) => {
setStreaming(true);
const response = await fetch('/agent/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${jwtToken}`
},
body: JSON.stringify({ customer_id: customerId, question })
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let currentAnswer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lines = decoder.decode(value).split('\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const event = JSON.parse(line.slice(6));
if (event.type === 'token') {
currentAnswer += event.content;
// Update last message in real time
setMessages(prev => [
...prev.slice(0, -1),
{ role: 'assistant', content: currentAnswer }
]);
} else if (event.type === 'done') {
setStreaming(false);
}
}
}
};
// Trigger snapshot pre-fetch on mount (dashboard open)
useEffect(() => {
fetch('/agent/dashboard-open', {
method: 'POST',
headers: { 'Authorization': `Bearer ${jwtToken}` },
body: JSON.stringify({ customer_id: customerId })
});
}, [customerId]);
return <ChatUI messages={messages} onSend={sendMessage} streaming={streaming} />;
}
Phase 1 Checklist
Before going to Phase 2, verify each item:
- Summary API adapter built for all 12 modules
- Pre-fetch service caches snapshots on dashboard open (Redis, 2-min TTL)
- Agent gateway running at
/agent/chat(SSE streaming) - Intent Classifier routing correctly (test 20 sample questions)
- Dashboard Agent answering overview questions from snapshot
- Alerts Agent answering alert history questions
- Chat widget embedded in dashboard, SSE streaming working
- Token refresh working (4-hour token lifespan handled)
- Tested with one real customer's data end-to-end
Customers can ask dashboard overview and alert questions and get instant, accurate answers streamed in real time. No tool calls needed — the pre-fetched snapshot is enough for 80% of common questions.
Token Budget
Keep LLM context lean. For Phase 1 (snapshot-based, no live API calls):
| Component | Tokens |
|---|---|
| System prompt + agent persona | ~300 |
| Session context (customer/industry) | ~100 |
| Snapshot (all 12 modules summarised) | ~600 |
| Conversation history (last 3 turns) | ~300 |
| User question | ~50 |
| Total input | ~1,350 tokens |
At 1,350 tokens input + ~400 output, a question costs under $0.01 with Claude Haiku, and under $0.05 with Claude Sonnet 4.6.
Adding Tool Calls (Phase 2+)
In Phase 2, agents gain the ability to make live API calls when the snapshot isn't enough. The pattern is:
from langchain_core.tools import tool
from summary_adapter import get_water_flow_summary
@tool
def fetch_water_flow(period: str) -> str:
"""Fetch water flow data for a specific period.
period: "today" | "yesterday" | "this_month" | "last_month" | "DD/MM/YYYY"
"""
client = get_authenticated_client()
summary = get_water_flow_summary(client, period)
return json.dumps(summary)
Bind tools to the LLM and let the agent decide when to call them. The agent will use the snapshot for fast questions and call tools only when the snapshot doesn't have the answer.