Skip to main content

Batch Processing & Automation

This guide explains how to use AquaGen API's batch processing capabilities for automated report generation, bulk operations, and scheduled tasks.

📋 Overview​

Batch processing in AquaGen API allows you to:

  • Automated Report Generation: Generate reports for multiple industries/units automatically
  • Scheduled Execution: Run tasks at specific times (daily, weekly, monthly)
  • Bulk Operations: Process multiple units, dates, or configurations in one job
  • Email Delivery: Automatically email reports to stakeholders

🎯 Automated Report System​

Architecture​

Components​

1. Automated Report Service (app/services/admin/automated_report/automated_report.py)

class AutomatedReportService:
"""
Main service for automated report generation
"""

def __init__(self):
self.report_processor = ReportProcessor()

def generate_daily_reports(self):
"""
Generate daily reports for all configured industries
"""
# Fetch industries with auto-report enabled
industries = self.get_industries_with_auto_reports()

for industry in industries:
try:
self.process_industry_reports(industry)
except Exception as e:
logger.error(f"Error processing reports for {industry['name']}: {e}")

def process_industry_reports(self, industry):
"""
Process all configured reports for an industry
"""
report_configs = industry['autoReportConfigs']

for config in report_configs:
self.report_processor.generate_and_send(
industry_id=industry['id'],
config=config
)

2. Report Processor (app/services/admin/automated_report/report_processor.py)

class ReportProcessor:
"""
Processes individual report generation and delivery
"""

def generate_and_send(self, industry_id, config):
"""
Generate report and send via email

Args:
industry_id: Industry identifier
config: Report configuration dict
{
'reportType': 'daily',
'reportFormat': 'pdf',
'service': 'water',
'recipients': ['user@example.com'],
'schedule': 'daily',
'time': '06:00'
}
"""
# Step 1: Generate report
report = self.generate_report(industry_id, config)

# Step 2: Store in blob storage
report_url = self.store_report(report)

# Step 3: Send email
self.send_email(
recipients=config['recipients'],
subject=self.create_subject(config),
attachment_url=report_url
)

# Step 4: Log execution
self.log_execution(industry_id, config, report_url)

def generate_report(self, industry_id, config):
"""
Generate the actual report
"""
from app.services.report.report import ReportService
from app.data_class.report import ReportRequestData

# Calculate date
date = self.calculate_date(config['schedule'])

# Create request
request = ReportRequestData(
report_type=ReportType(config['reportType']),
report_format=ReportFormat(config['reportFormat']),
service_type=ServiceType(config['service']),
industry_id=industry_id,
start_date=date,
# ... other parameters
)

# Generate report
service = ReportService(request)
return service.get_report()

🔧 Configuration​

Industry-Level Configuration​

Industries can configure automated reports in their metadata:

{
"industryId": "industry-123",
"name": "Acme Industries",
"autoReportConfigs": [
{
"enabled": true,
"reportType": "daily",
"reportFormat": "pdf",
"service": "water",
"schedule": {
"frequency": "daily",
"time": "06:00",
"timezone": "Asia/Kolkata",
"daysOfWeek": null
},
"recipients": [
{
"email": "operations@acme.com",
"name": "Operations Team"
},
{
"email": "manager@acme.com",
"name": "Plant Manager"
}
],
"emailTemplate": {
"subject": "Daily Water Report - {date}",
"body": "Please find attached the daily water consumption report."
},
"filters": {
"unitIds": ["unit-1", "unit-2"],
"useShift": true
}
},
{
"enabled": true,
"reportType": "monthly",
"reportFormat": "xlsx",
"service": "energy",
"schedule": {
"frequency": "monthly",
"dayOfMonth": 1,
"time": "08:00"
},
"recipients": [
{
"email": "billing@acme.com",
"name": "Billing Department"
}
]
}
]
}

Schedule Frequencies​

FrequencyConfigurationExample
Daily{ "frequency": "daily", "time": "06:00" }Every day at 6 AM
Weekly{ "frequency": "weekly", "daysOfWeek": [1,5], "time": "08:00" }Monday and Friday at 8 AM
Monthly{ "frequency": "monthly", "dayOfMonth": 1, "time": "09:00" }1st of month at 9 AM
Custom{ "frequency": "custom", "cron": "0 6 * * *" }Custom cron

🚀 Running Batch Jobs​

Method 1: Using the Script​

The auto_reports.sh script triggers automated report generation:

#!/bin/bash
# scripts/auto_reports.sh

API_URL="http://localhost:5001/api/admin/automated/report"
ADMIN_TOKEN="YOUR_ADMIN_TOKEN"

# Trigger daily reports
curl -X POST "${API_URL}" \
-H "Authorization: Bearer ${ADMIN_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"reportType": "daily",
"reportFormat": "pdf",
"service": "water"
}'

Usage:

# Run manually
bash scripts/auto_reports.sh

# Schedule with cron
crontab -e

# Add this line for daily execution at 6 AM
0 6 * * * /path/to/aquagenapi/scripts/auto_reports.sh >> /var/log/auto_reports.log 2>&1

Method 2: API Endpoint​

Endpoint: POST /api/admin/automated/report

Request Body:

{
"reportType": "daily",
"reportFormat": "pdf",
"service": "water",
"targetDate": "09/11/2024",
"industries": ["industry-123", "industry-456"],
"testMode": false
}

Parameters:

ParameterTypeDescriptionDefault
reportTypestringReport typeRequired
reportFormatstringOutput formatRequired
servicestringService typeRequired
targetDatestringSpecific date (DD/MM/YYYY)Yesterday
industriesarrayIndustry IDs to processAll enabled
testModebooleanTest without sending emailsfalse

Response:

{
"status": "success",
"message": "Automated reports triggered",
"data": {
"jobId": "job-789",
"industriesProcessed": 15,
"reportsGenerated": 15,
"emailsSent": 15,
"errors": []
}
}

Method 3: Programmatic​

from app.services.admin.automated_report.automated_report import AutomatedReportService

# Initialize service
service = AutomatedReportService()

# Generate daily reports for all industries
service.generate_daily_reports()

# Generate for specific industry
service.process_industry_reports(industry_id='industry-123')

# Generate specific report type
service.generate_reports_by_type(
report_type='monthly',
report_format='xlsx',
service='energy'
)

📊 Batch Report Generation​

Scenario 1: Generate Monthly Reports for All Units​

from app.services.report.report import ReportService
from app.data_class.report import ReportRequestData
from datetime import datetime, timedelta

def generate_monthly_reports_for_all_units(industry_id, month, year):
"""
Generate monthly reports for all units in an industry
"""
# Fetch all units
units = DatabaseSupporter.get_all_units(industry_id)

results = []

for unit in units:
try:
# Create report request
request = ReportRequestData(
report_type=ReportType.MONTHLY,
report_format=ReportFormat.PDF,
service_type=ServiceType.WATER,
industry_id=industry_id,
unit_id=unit['unitId'],
start_date=f"01/{month:02d}/{year}",
use_shift=True
)

# Generate report
service = ReportService(request)
report = service.get_report()

# Save report
filename = f"{unit['unitName']}_monthly_{month}_{year}.pdf"
save_report(report, filename)

results.append({
'unitId': unit['unitId'],
'status': 'success',
'filename': filename
})

except Exception as e:
results.append({
'unitId': unit['unitId'],
'status': 'error',
'error': str(e)
})

return results

# Usage
results = generate_monthly_reports_for_all_units('industry-123', 11, 2024)
print(f"Generated {len([r for r in results if r['status'] == 'success'])} reports")

Scenario 2: Bulk Date Range Processing​

from concurrent.futures import ThreadPoolExecutor, as_completed

def generate_reports_for_date_range(industry_id, start_date, end_date):
"""
Generate daily reports for a date range (bulk processing)
"""
dates = get_date_range(start_date, end_date)

results = []

# Use thread pool for parallel processing
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {}

for date in dates:
future = executor.submit(
generate_daily_report,
industry_id,
date
)
futures[future] = date

# Collect results
for future in as_completed(futures):
date = futures[future]
try:
result = future.result()
results.append({
'date': date,
'status': 'success',
'reportUrl': result
})
except Exception as e:
results.append({
'date': date,
'status': 'error',
'error': str(e)
})

return results

def generate_daily_report(industry_id, date):
"""
Generate single daily report
"""
request = ReportRequestData(
report_type=ReportType.DAILY,
report_format=ReportFormat.PDF,
service_type=ServiceType.WATER,
industry_id=industry_id,
start_date=date
)

service = ReportService(request)
report = service.get_report()

# Store in blob storage
return store_in_blob_storage(report, f"daily_{date}.pdf")

# Usage: Generate reports for November 2024
results = generate_reports_for_date_range(
'industry-123',
'01/11/2024',
'30/11/2024'
)

Scenario 3: Multi-Service Report Generation​

def generate_all_service_reports(industry_id, date):
"""
Generate reports for all services (water, energy, level, quality)
"""
services = [
ServiceType.WATER,
ServiceType.ENERGY,
ServiceType.LEVEL,
ServiceType.QUALITY
]

reports = {}

for service in services:
try:
request = ReportRequestData(
report_type=ReportType.DAILY,
report_format=ReportFormat.PDF,
service_type=service,
industry_id=industry_id,
start_date=date
)

report_service = ReportService(request)
report = report_service.get_report()

reports[service.value] = {
'status': 'success',
'data': report
}

except Exception as e:
reports[service.value] = {
'status': 'error',
'error': str(e)
}

return reports

# Usage
all_reports = generate_all_service_reports('industry-123', '09/11/2024')

📧 Email Delivery​

Email Configuration​

# app/services/admin/automated_report/email_service.py

class EmailService:
def send_report_email(self, recipients, subject, body, attachment_url):
"""
Send email with report attachment
"""
# Download report from blob storage
attachment = self.download_from_blob(attachment_url)

# Create email
email = EmailMessage()
email['From'] = Config.SMTP_FROM
email['To'] = ', '.join(recipients)
email['Subject'] = subject

# Email body
email.set_content(body, subtype='html')

# Attach report
email.add_attachment(
attachment,
maintype='application',
subtype='pdf',
filename=f"report_{datetime.now().strftime('%Y%m%d')}.pdf"
)

# Send via SMTP
with smtplib.SMTP(Config.SMTP_HOST, Config.SMTP_PORT) as smtp:
smtp.starttls()
smtp.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
smtp.send_message(email)

Email Templates​

<!-- Daily Report Email Template -->
<!DOCTYPE html>
<html>
<head>
<style>
body { font-family: Arial, sans-serif; }
.header { background: #0066cc; color: white; padding: 20px; }
.content { padding: 20px; }
.footer { background: #f5f5f5; padding: 10px; text-align: center; }
</style>
</head>
<body>
<div class="header">
<h1>{{ industry_name }}</h1>
<p>Daily Water Consumption Report - {{ date }}</p>
</div>

<div class="content">
<p>Dear {{ recipient_name }},</p>

<p>Please find attached the daily water consumption report for {{ date }}.</p>

<h3>Summary:</h3>
<ul>
<li><strong>Total Consumption:</strong> {{ total_consumption }} KL</li>
<li><strong>Average Flow Rate:</strong> {{ avg_flow_rate }} L/min</li>
<li><strong>Peak Hour:</strong> {{ peak_hour }}</li>
</ul>

<p>The detailed report is attached to this email.</p>

<p>Best regards,<br>AquaGen Automated Reporting System</p>
</div>

<div class="footer">
<small>This is an automated email. Please do not reply.</small>
</div>
</body>
</html>

🔄 Retry Logic​

from tenacity import retry, stop_after_attempt, wait_exponential

class ReportProcessor:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def generate_report_with_retry(self, industry_id, config):
"""
Generate report with automatic retry on failure
"""
try:
return self.generate_report(industry_id, config)
except Exception as e:
logger.warning(f"Retry attempt for {industry_id}: {e}")
raise # Will trigger retry

📊 Logging & Monitoring​

Execution Logs​

def log_execution(self, industry_id, config, result):
"""
Log batch execution to database
"""
log_entry = {
'id': str(uuid.uuid4()),
'industryId': industry_id,
'reportType': config['reportType'],
'service': config['service'],
'timestamp': datetime.utcnow().isoformat(),
'status': result['status'],
'reportUrl': result.get('reportUrl'),
'emailsSent': len(config['recipients']),
'executionTime': result['executionTime'],
'errors': result.get('errors', [])
}

DatabaseSupporter.create_automation_log(log_entry)

Monitoring Dashboard​

Query execution logs:

def get_automation_statistics(start_date, end_date):
"""
Get automation statistics for date range
"""
logs = DatabaseSupporter.get_automation_logs(start_date, end_date)

stats = {
'totalExecutions': len(logs),
'successfulExecutions': len([l for l in logs if l['status'] == 'success']),
'failedExecutions': len([l for l in logs if l['status'] == 'error']),
'avgExecutionTime': sum(l['executionTime'] for l in logs) / len(logs),
'industriesProcessed': len(set(l['industryId'] for l in logs)),
'reportsGenerated': sum(1 for l in logs if l['reportUrl'])
}

return stats

🎯 Best Practices​

1. Error Handling​

def process_with_error_handling(self, industry):
"""
Process with comprehensive error handling
"""
try:
# Main processing
result = self.process_industry_reports(industry)

# Success logging
logger.info(f"Successfully processed {industry['name']}")

return result

except DatabaseError as e:
# Database-specific error
logger.error(f"Database error for {industry['name']}: {e}")
self.notify_admin(industry, "database_error", str(e))

except ReportGenerationError as e:
# Report generation error
logger.error(f"Report generation failed for {industry['name']}: {e}")
self.retry_later(industry)

except Exception as e:
# Unexpected error
logger.critical(f"Unexpected error for {industry['name']}: {e}")
self.escalate_to_admin(industry, str(e))

2. Rate Limiting​

import time
from ratelimit import limits, sleep_and_retry

class BatchProcessor:
@sleep_and_retry
@limits(calls=10, period=60) # 10 calls per minute
def generate_report_rate_limited(self, industry_id, config):
"""
Generate report with rate limiting
"""
return self.generate_report(industry_id, config)

3. Progress Tracking​

from tqdm import tqdm

def batch_process_with_progress(industries):
"""
Process batch with progress bar
"""
results = []

for industry in tqdm(industries, desc="Processing industries"):
result = process_industry(industry)
results.append(result)
time.sleep(1) # Rate limiting

return results

Automation Benefits

Automated batch processing saves time, ensures consistency, and reduces manual errors. Set it up once and let it run!

📚 Next Steps​