Project Structure & Development Guide
This guide explains the codebase organization and how to add new features to AquaGen API.
๐ Directory Structureโ
aquagenapi/
โโโ app/ # Main application
โ โโโ __init__.py # Flask app initialization
โ โโโ config.py # Configuration with Azure Key Vault
โ โโโ app.py # Application entry point
โ โ
โ โโโ apis/ # API Blueprints
โ โ โโโ user.py # User API (31 namespaces)
โ โ โโโ admin.py # Admin API (13 namespaces)
โ โ โโโ external.py # External integrations
โ โ โโโ qa.py # QA endpoints
โ โ
โ โโโ routes/ # Route handlers (33 files)
โ โ โโโ report.py # Report generation
โ โ โโโ device_data.py # Device data endpoints
โ โ โโโ device_data_v2.py # Enhanced device data
โ โ โโโ alerts_routes.py # Alert management
โ โ โโโ notification.py # Notifications
โ โ โโโ landing_page.py # Dashboard data
โ โ โโโ water_balance.py # Water balance
โ โ โโโ granular_data.py # Granular sensor data
โ โ โโโ user.py # User operations
โ โ โโโ admin/ # Admin routes (15 files)
โ โ โ โโโ admin_login.py
โ โ โ โโโ automated_report.py
โ โ โ โโโ industry.py
โ โ โ โโโ unit.py
โ โ โ โโโ user.py
โ โ โ โโโ ...
โ โ โโโ external/ # External routes
โ โ โโโ external_login.py
โ โ โโโ water_ratio_data.py
โ โ
โ โโโ services/ # Business logic (40+ services)
โ โ โโโ report/ # Report services
โ โ โ โโโ report.py # Main dispatcher
โ โ โ โโโ report_types/ # 18 report implementations
โ โ โ โโโ flow_report.py
โ โ โ โโโ energy_report.py
โ โ โ โโโ level_report.py
โ โ โ โโโ quality_report.py
โ โ โ โโโ borewell_report.py
โ โ โ โโโ alert_report.py
โ โ โ โโโ consolidated_report.py
โ โ โ โโโ water_balance_report.py
โ โ โ โโโ ...
โ โ โโโ device_data.py
โ โ โโโ device_data_v2/ # Enhanced device data
โ โ โ โโโ device_data_v2.py
โ โ โ โโโ fetch_data/
โ โ โโโ alerts/ # Alert system
โ โ โ โโโ alerts_processor.py
โ โ โ โโโ alerts_content/ # Alert templates
โ โ โ โโโ channels/ # Email, SMS, Chat
โ โ โ โโโ constants/
โ โ โโโ admin/ # Admin services (15 files)
โ โ โ โโโ admin_login.py
โ โ โ โโโ industry.py
โ โ โ โโโ unit.py
โ โ โ โโโ user.py
โ โ โ โโโ automated_report/
โ โ โ โโโ automated_report.py
โ โ โ โโโ report_processor.py
โ โ โโโ gpt/ # AI integration
โ โ โ โโโ gpt.py
โ โ โ โโโ gpt_handler.py
โ โ โ โโโ gpt_config.py
โ โ โ โโโ aquagpt/
โ โ โ โโโ templates/
โ โ โ โโโ utils/
โ โ โโโ notification.py
โ โ โโโ summary/ # Summary services
โ โ โโโ landing_page/ # Dashboard logic
โ โ โโโ water_balance.py
โ โ โโโ granula_data_service.py
โ โ โโโ water_neutrality_service.py
โ โ โโโ powerdrill/ # PowerDrill integration
โ โ โโโ quality_data_extraction/
โ โ โโโ graph_manager/
โ โ โโโ info_builders/
โ โ โโโ wri/ # WRI bathymetry
โ โ โโโ firebase/
โ โ โโโ sms/
โ โ โโโ secret_vault_service.py
โ โ โโโ web_socket.py
โ โ
โ โโโ formatters/ # Data formatting (52 files)
โ โ โโโ report/ # Report formatters
โ โ โ โโโ report.py
โ โ โ โโโ report_util.py
โ โ โ โโโ xlsx_formatter/
โ โ โ โโโ flow_formatter.py
โ โ โ โโโ energy_formatter.py
โ โ โ โโโ level_formatter.py
โ โ โ โโโ quality_formatter.py
โ โ โ โโโ ...
โ โ โโโ device_data_formatter.py
โ โ โโโ device_data_v2/ # Polymorphic formatters
โ โ โ โโโ device_data_v2_formatter.py
โ โ โ โโโ formatters/
โ โ โ โโโ flow_device_data_formatter.py
โ โ โ โโโ energy_device_data_formatter.py
โ โ โ โโโ quality_device_data_formatter.py
โ โ โ โโโ ...
โ โ โโโ alerts_formatter.py
โ โ โโโ common_data_formatter.py
โ โ โโโ graph_data_formatter.py
โ โ โโโ ...
โ โ
โ โโโ database/ # Database layer
โ โ โโโ database_config.py # Cosmos DB setup
โ โ โโโ database_supporter.py # 200+ query methods
โ โ โโโ queires_list.py # Query templates
โ โ
โ โโโ models/ # Data models (30 files)
โ โ โโโ error_models.py
โ โ โโโ success_models.py
โ โ โโโ user_models.py
โ โ โโโ industry_models.py
โ โ โโโ unit_models.py
โ โ โโโ device_data_models.py
โ โ โโโ alerts_models.py
โ โ โโโ report_models.py
โ โ โโโ ...
โ โ
โ โโโ data_class/ # Data classes & DTOs
โ โ โโโ report.py
โ โ โโโ device_data_v2_args.py
โ โ โโโ water_ratio.py
โ โ โโโ alert/
โ โ
โ โโโ enum/ # Enums (20 files)
โ โ โโโ service_type.py
โ โ โโโ report_type.py
โ โ โโโ report_format.py
โ โ โโโ category_type.py
โ โ โโโ response_status.py
โ โ โโโ ...
โ โ
โ โโโ templates/ # Jinja2 templates (55 files)
โ โ โโโ common/
โ โ โโโ flow/ # Water reports (7 templates)
โ โ โโโ energy/ # Energy reports (6 templates)
โ โ โโโ level/ # Level reports (3 templates)
โ โ โโโ quality/ # Quality reports (4 templates)
โ โ โโโ borewell/ # Borewell reports (4 templates)
โ โ โโโ consolidated/
โ โ โโโ summary/
โ โ โโโ alert/
โ โ โโโ ...
โ โ
โ โโโ validators/ # Input validation
โ โ โโโ schema.py
โ โ โโโ validator_util.py
โ โ โโโ schemas/ # JSON schemas
โ โ
โ โโโ security_checks/ # Security layer
โ โ โโโ input_validator.py # SQL injection prevention
โ โ โโโ track_user.py # IP tracking
โ โ
โ โโโ util/ # Utilities (10 files)
โ โ โโโ date_time_util.py
โ โ โโโ number_util.py
โ โ โโโ calculation_util.py
โ โ โโโ si_unit_util.py
โ โ โโโ sort_util.py
โ โ โโโ ...
โ โ
โ โโโ auth/ # Authentication
โ โ โโโ auth.py
โ โ
โ โโโ constants/ # App constants
โ โ โโโ constants.py
โ โ โโโ templates.py
โ โ
โ โโโ cachedData/ # In-memory cache
โ โ โโโ cachedData.py
โ โ
โ โโโ data/ # Static data files
โ โ โโโ bathyData.py
โ โ โโโ radhakunj_boundary.geojson
โ โ
โ โโโ urls/ # URL configuration
โ โ
โ โโโ logging_config.py
โ
โโโ scripts/ # Automation scripts
โ โโโ auto_reports.sh
โ
โโโ docs/ # Documentation
โ โโโ intro.md
โ โโโ getting-started.md
โ โโโ architecture/
โ โโโ core-concepts/
โ โโโ api-reference/
โ โโโ guides/
โ โโโ deployment/
โ โโโ advanced/
โ โโโ development/
โ
โโโ data/ # Data files
โ
โโโ requirements.txt # Dependencies
โโโ app.py # Entry point
โโโ CLAUDE.md # Development guidelines
โโโ README.md
๐๏ธ Architecture Layersโ
๐ฏ Adding New Featuresโ
1. Adding a New Report Typeโ
Step 1: Create Report Service
# app/services/report/report_types/my_custom_report.py
from app.services.report.report_types.base_report import BaseReport
class MyCustomReport(BaseReport):
def get_report(self):
# Fetch data
data = self.fetch_data()
# Process and aggregate
processed = self.process_data(data)
# Calculate metrics
metrics = self.calculate_metrics(processed)
return {
'data': processed,
'metrics': metrics,
'fileName': self.generate_filename()
}
def fetch_data(self):
# Query database
from app.database.database_supporter import DatabaseSupporter
return DatabaseSupporter.get_device_data_by_date_range(
self.report_request.industry_id,
self.report_request.start_date,
self.report_request.end_date
)
def process_data(self, data):
# Aggregate logic
pass
def calculate_metrics(self, data):
# Calculate min, max, avg, etc.
pass
Step 2: Register in ReportService
# app/services/report/report.py
from app.services.report.report_types.my_custom_report import MyCustomReport
class ReportService:
def get_report(self):
if self.report_request.serviceType == ServiceType.MY_CUSTOM:
return MyCustomReport(self.report_request).get_report()
# ... other report types
Step 3: Add ServiceType Enum
# app/enum/service_type.py
class ServiceType(Enum):
# ... existing types
MY_CUSTOM = 'my_custom'
Step 4: Create Template
<!-- app/templates/my_custom/my_custom_daily_report.html -->
<!DOCTYPE html>
<html>
<head>
<title>{{ industry_name }} - Custom Report</title>
</head>
<body>
<h1>Custom Report</h1>
<table>
{% for row in data %}
<tr>
<td>{{ row.timestamp | date_time_to_date }}</td>
<td>{{ row.value | decimal_format(2) }}</td>
</tr>
{% endfor %}
</table>
</body>
</html>
Step 5: Create Formatter (if needed)
# app/formatters/report/my_custom_formatter.py
class MyCustomFormatter:
def format_data(self, data):
# Format data for template
pass
2. Adding a New API Endpointโ
Step 1: Create Route Handler
# app/routes/my_feature.py
from flask import request
from flask_jwt_extended import jwt_required, current_user
from flask_restx import Resource, Namespace
from app.services.my_feature_service import MyFeatureService
from app.models.success_models import GenericSuccessModel
from app.models.error_models import GenericErrorModel
myFeatureNamespace = Namespace('My Feature',
description='My feature APIs',
path='/myFeature')
@myFeatureNamespace.route('')
class MyFeatureRoute(Resource):
@jwt_required()
@myFeatureNamespace.response(200, 'Success', model=GenericSuccessModel)
@myFeatureNamespace.response(1401, 'Data not found', model=GenericErrorModel)
def get(self):
# Parse parameters
args = request.args
# Call service
service = MyFeatureService(current_user['industryId'])
result = service.process()
return {
'status': 'success',
'status_code': 200,
'data': result
}
Step 2: Create Service
# app/services/my_feature_service.py
from app.database.database_supporter import DatabaseSupporter
class MyFeatureService:
def __init__(self, industry_id):
self.industry_id = industry_id
def process(self):
# Business logic
data = DatabaseSupporter.get_data(self.industry_id)
processed = self.transform(data)
return processed
def transform(self, data):
# Transform data
pass
Step 3: Register Namespace
# app/apis/user.py
from app.routes.my_feature import myFeatureNamespace
# ... existing code
api.add_namespace(myFeatureNamespace)
3. Adding Database Methodsโ
# app/database/database_supporter.py
class DatabaseSupporter:
@staticmethod
def get_my_custom_data(industry_id, filters):
"""
Get custom data from Cosmos DB
"""
try:
container = database_config.my_custom_container
query = """
SELECT * FROM c
WHERE c.industryId = @industry_id
AND c.type = @type
"""
parameters = [
{"name": "@industry_id", "value": industry_id},
{"name": "@type", "value": filters.get('type')}
]
items = container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
)
return list(items)
except Exception as e:
logger.error(f"Error fetching custom data: {e}")
return []
4. Adding a New Alert Channelโ
Step 1: Create Channel Handler
# app/services/alerts/channels/my_channel.py
class MyChannelHandler:
def __init__(self, config):
self.config = config
def send(self, alert_data):
"""
Send alert via my custom channel
"""
try:
# Implement channel-specific logic
self.prepare_message(alert_data)
self.deliver()
return True
except Exception as e:
logger.error(f"Error sending via MyChannel: {e}")
return False
def prepare_message(self, alert_data):
# Format message
pass
def deliver(self):
# Send message
pass
Step 2: Register in AlertProcessor
# app/services/alerts/alerts_processor.py
from app.services.alerts.channels.my_channel import MyChannelHandler
class AlertsProcessor:
def send_notifications(self, alert, channels):
if 'my_channel' in channels:
handler = MyChannelHandler(channels['my_channel'])
handler.send(alert)
๐งช Testing Guidelinesโ
Unit Testingโ
# tests/test_report_service.py
import unittest
from app.services.report.report import ReportService
from app.data_class.report import ReportRequestData
from app.enum.report_type import ReportType
from app.enum.service_type import ServiceType
class TestReportService(unittest.TestCase):
def setUp(self):
self.report_request = ReportRequestData(
report_type=ReportType.DAILY,
service_type=ServiceType.WATER,
industry_id='test-industry',
start_date='09/11/2024'
)
def test_generate_daily_report(self):
service = ReportService(self.report_request)
result = service.get_report()
self.assertIsNotNone(result)
self.assertIn('data', result)
self.assertIn('fileName', result)
def test_invalid_service_type(self):
self.report_request.service_type = 'invalid'
with self.assertRaises(ValueError):
ReportService(self.report_request).get_report()
Integration Testingโ
# tests/integration/test_report_api.py
import unittest
import requests
class TestReportAPI(unittest.TestCase):
BASE_URL = 'http://localhost:5001'
TOKEN = 'your-test-token'
def test_daily_report_generation(self):
url = f"{self.BASE_URL}/api/user/report"
params = {
'reportType': 'daily',
'reportFormat': 'html',
'service': 'water',
'startDate': '09/11/2024'
}
headers = {
'Authorization': f'Bearer {self.TOKEN}'
}
response = requests.get(url, params=params, headers=headers)
self.assertEqual(response.status_code, 200)
self.assertIn('<!DOCTYPE html>', response.text)
๐ Code Style Guidelinesโ
Python Style (PEP 8)โ
# Good
class WaterBalanceService:
"""Service for water balance calculations."""
def calculate_balance(self, inflow: float, outflow: float) -> dict:
"""
Calculate water balance.
Args:
inflow: Total water inflow in KL
outflow: Total water outflow in KL
Returns:
dict: Balance calculation results
"""
balance = inflow - outflow
percentage = (balance / inflow) * 100 if inflow > 0 else 0
return {
'balance': balance,
'percentage': percentage,
'status': 'surplus' if balance > 0 else 'deficit'
}
Naming Conventionsโ
| Type | Convention | Example |
|---|---|---|
| Classes | PascalCase | ReportService, DeviceDataFormatter |
| Functions | snake_case | get_report(), calculate_metrics() |
| Variables | snake_case | industry_id, report_data |
| Constants | UPPER_SNAKE_CASE | MAX_RETRY_COUNT, DEFAULT_TIMEOUT |
| Files | snake_case | report_service.py, water_balance.py |
Import Organizationโ
# Standard library imports
import os
from datetime import datetime, timedelta
# Third-party imports
from flask import request
import pandas as pd
# Local application imports
from app.database.database_supporter import DatabaseSupporter
from app.util.date_time_util import DateTimeUtil
from app.enum.service_type import ServiceType
๐ Debugging Tipsโ
Enable Debug Modeโ
# app.py
if __name__ == '__main__':
app.run(debug=True, port=5001)
Loggingโ
import logging
logger = logging.getLogger(__name__)
# Log levels
logger.debug("Detailed debug information")
logger.info("General information")
logger.warning("Warning message")
logger.error("Error occurred")
logger.critical("Critical error")
Azure Application Insightsโ
from opencensus.ext.azure import metrics_exporter
# Custom metrics
exporter = metrics_exporter.new_metrics_exporter(
connection_string=Config.APPLICATION_INSIGHTS
)
# Track custom event
from opencensus.stats import aggregation as aggregation_module
from opencensus.stats import measure as measure_module
from opencensus.stats import stats as stats_module
from opencensus.stats import view as view_module
# Define measure
measure = measure_module.MeasureInt("report_generation_time",
"Time to generate report", "ms")
# Track
stats = stats_module.stats
view_manager = stats.view_manager
stats_recorder = stats.stats_recorder
mmap = stats_recorder.new_measurement_map()
tmap = mmap.measure_int_put(measure, 1500)
tmap.record()
Development Workflow
- Create feature branch
- Implement changes following architecture patterns
- Write tests
- Update documentation
- Submit pull request
๐ Next Stepsโ
- Adding Features Guide - Detailed examples
- Testing Guide - Testing strategies
- API Guidelines - API design principles