Project Structure
Understanding the project's organization is essential for navigating the codebase and making contributions. This guide provides a comprehensive overview of the directory structure, key files, and code organization patterns.
Use your IDE's file search (Cmd/Ctrl + P) to quickly jump to files mentioned in this guide.
Directory Overview
storm/
�� src/main/kotlin/ # Main source code
�� Main.kt # Application entry point
�� bolt/ # Storm bolt implementations
�� topology/ # Topology builder
�� database/ # Cosmos DB integration
�� supporter/ # State management & utilities
�� model/ # Output field models
�� enum/ # Enumerations & constants
�� connector/ # External service connectors
�� constants/ # Configuration constants
�� dataclass/ # Data transfer objects
�� util/ # Utility functions
�� shaded/ # Shaded dependencies
�� processor/ # Annotation processor module
�� build.gradle.kts # Build configuration
�� settings.gradle.kts # Gradle settings
�� docs/ # Documentation (you are here!)
Main Source Directory (src/main/kotlin/)
Entry Point
Main.kt
The application's entry point that configures and submits the Storm topology.
Key Responsibilities:
- Initialize Event Hub spouts
- Register serialization classes
- Configure Storm topology
- Submit to local or remote cluster
Location: src/main/kotlin/Main.kt
fun main(args: Array<String>) {
HelperUtil.initSentry()
val topology = Main()
topology.runScenario(args)
}
Core Packages
1. bolt/ - Processing Units
Contains all Storm bolt implementations organized by functionality.
bolt/
�� alerts/ # Alert detection and handling
�� AlertsHandlerBolt.kt # Routes data to specific alert bolts
�� QualityAlertBolt.kt # Quality parameter alerts
�� EnergyAlertBolt.kt # Energy consumption alerts
�� LevelAlertBolt.kt # Tank level alerts
�� FlowConsumptionAlertBolt.kt # Flow & consumption alerts
�� DeviceStatusAlertBolt.kt # Device online/offline alerts
�� AlertResponseHandlerBolt.kt # Saves alerts to database
�� processor/ # Alert processing logic
�� quality/ # Quality alert processors
�� energy/ # Energy alert processors
�� level/ # Level alert processors
�� flow/ # Flow alert processors
�� data/ # Data persistence bolts
�� DeviceDataPostBolt.kt # Writes processed data to DB
�� IndustryMetrics.kt # Industry-level metrics
�� energy/ # Energy processing pipeline
�� InitEnergyBolt.kt # Initialize energy data
�� TimeFilterEnergyBolt.kt # Filter old energy data
�� PreProcessingEnergyBolt.kt # Aggregate energy data
�� supporter/ # Energy-specific helpers
�� flow/ # Flow/consumption processing
�� SingularBolt.kt # GSM flow meter data
�� TeltonikaBolt.kt # Teltonika device data
�� LoraPandaBolt.kt # LoRa device data
�� InitConsumptionBolt.kt # Initialize consumption
�� TimeFilterBolt.kt # Filter old flow data
�� PreProcessingConsumptionBolt.kt # Aggregate consumption
�� level/ # Level sensor processing
�� SingularLevelBolt.kt # GSM level sensors
�� TeltonikaLevelBolt.kt # Teltonika level sensors
�� quality/ # Water quality processing
�� QualityBolt.kt # Process quality parameters
�� instantaneous/ # Real-time metrics pipeline
�� InitInstantaneousBolt.kt # Initialize instant data
�� TimeFilterInstantaneousBolt.kt # Filter old data
�� PreProcessingInstantaneousBolt.kt # Aggregate instant data
�� metrics/ # Industry-wide metrics
�� IndustryMetrics.kt # Calculate industry metrics
�� hubLogger/ # Data routing
�� HubLoggerBolt.kt # Parse and route incoming data
�� notification/ # (Legacy - migrated to alerts/)
Bolt Naming Convention:
Init*Bolt- Initialize and set up reference objectsTimeFilter*Bolt- Filter outdated dataPreProcessing*Bolt- Aggregate and enrich data*AlertBolt- Detect and generate alerts
Example Bolt Structure:
// bolt/quality/QualityBolt.kt
class QualityBolt : BaseRichBolt() {
companion object {
const val ID: String = "QualityBolt" // Used in topology
}
override fun prepare(...) { }
override fun execute(tuple: Tuple) { }
override fun declareOutputFields(...) { }
}
2. topology/ - Topology Builder
Constructs the Storm topology by connecting spouts and bolts.
topology/
�� StormTopologyBuilder.kt
Key File: StormTopologyBuilder.kt
Responsibilities:
- Define topology structure
- Connect bolts with appropriate groupings
- Set parallelism and task counts
- Configure named streams
Usage in Main.kt:
val topology = StormTopologyBuilder(
spoutsConfig,
registeredSpouts
).buildTopology()
3. database/ - Database Integration
Handles all interactions with Azure Cosmos DB.
database/
�� CosmosDataSource.kt # Singleton connection manager
�� CosmosDBQueries.kt # Database query operations
�� model/ # Database entity models
�� IndustryData.kt
�� IndustryUnitData.kt
�� IndustryUnitAlertConfig.kt
�� AlertsConfig.kt
�� UnitConfig.kt
CosmosDataSource.kt
- Manages database connection lifecycle
- Provides singleton access to database client
- Handles connection pooling
CosmosDBQueries.kt
- CRUD operations for all containers
- Batch operations
- Query builders
Data Models (under database/model/):
// Example: IndustryUnitAlertConfig.kt
data class IndustryUnitAlertConfig(
var id: String = "",
var industryId: String = "",
var unitId: String = "",
var alertsConfig: Map<AlertType, AlertConfig> = emptyMap()
)
4. supporter/ - State Management & Utilities
Contains state classes and utility functions used across bolts.
supporter/
�� ReferenceObject.kt # Flow/consumption state tracking
�� ReferenceObjectInstant.kt # Instantaneous data state
�� EnergyReferenceObject.kt # Energy state tracking
�� InstantaneousData.kt # Sensor readings container
�� ThresholdState.kt # Generic threshold state
�� QualityThresholdState.kt # Quality alert state
�� LevelThresholdState.kt # Level alert state
�� EnergyThresholdState.kt # Energy alert state
�� LeakAlertState.kt # Leak detection state
�� StableFlowAlertState.kt # Stable flow state
�� FlowRateTriggerState.kt # Flow rate state
�� ConsumptionTriggerState.kt # Consumption trigger state
�� PreviousState.kt # Generic previous state
�� UDF.kt # User-defined functions
�� ...
Reference Objects: Track stateful information across tuples for each partition.
// ReferenceObject.kt - Tracks flow/consumption
class ReferenceObject {
var currentData: Double? = null
var previousData: Double? = null
var lastProcessedTime: Long = 0
var totaliser: TotaliserData? = null
}
Threshold States: Maintain alert state to prevent alert fatigue.
// QualityThresholdState.kt
class QualityThresholdState {
var lastCreatedOnEpoch: Long = 0
var lastParamsData: HashMap<String, Double?> = HashMap()
var activeAlerts: HashMap<String, AlertInfo> = HashMap()
var cooldownPeriods: HashMap<String, Long> = HashMap()
}
UDF (User-Defined Functions): Utility functions for data processing.
object UDF {
fun createdOnToEpoch(createdOn: String): Long { }
fun calculateConsumption(current: Double, previous: Double): Double { }
fun interpolateValue(value1: Double, value2: Double, ratio: Double): Double { }
}
5. model/ - Output Field Models
Type-safe field declarations for tuples flowing between bolts.
model/
�� common/ # Shared output fields
�� CommonLevelQualityBoltOutputField.kt
�� hubLogger/ # HubLogger output fields
�� HubLoggerBoltUnitStreamOutputField.kt
�� HubLoggerBoltQualityStreamOutputField.kt
�� HubLoggerBoltEnergyStreamOutputField.kt
�� ...
�� flow/ # Flow bolt output fields
�� PreProcessingConsumptionBoltOutputStreamField.kt
�� energy/ # Energy bolt output fields
�� PreProcessingEnergyBoltOutputStreamField.kt
�� unknown/ # Instantaneous output fields
�� PreProcessingInstantaneousBoltOutputField.kt
�� alert/ # Alert output fields
�� AlertOutputField.kt
�� notification/ # Notification fields
Output Field Pattern:
Each model provides:
fields- Storm Fields declarationvalues- List of values for emissionfromValues()- Type-safe extraction
data class MyBoltOutputField(
val partitionId: String,
val data: InstantaneousData,
val timestamp: Long
) {
companion object {
val fields = Fields("partitionId", "data", "timestamp")
fun fromValues(values: List<Any>): MyBoltOutputField {
return MyBoltOutputField(
partitionId = values[0] as String,
data = values[1] as InstantaneousData,
timestamp = values[2] as Long
)
}
}
val values: List<Any>
get() = listOf(partitionId, data, timestamp)
}
6. enum/ - Enumerations & Constants
Centralized enums for type-safe references.
enum/
�� StreamGroup.kt # Named stream identifiers
�� FieldGroup.kt # Common field names
�� SpoutsIdentifier.kt # Spout identifiers
�� ContainerRef.kt # Cosmos DB container names
�� Environment.kt # Deployment environments
StreamGroup.kt:
enum class StreamGroup(val value: String) {
UNIT_STREAM("unitStream"),
QUALITY_DATA_STREAM("qualityDataStream"),
ENERGY_STREAM("energyStream"),
QUALITY_ALERT_STREAM("qualityAlertStream"),
FLOW_ALERT_STREAM("flowAlertStream")
// ...
val fields: Fields // Associated Storm Fields
get() = when (this) {
UNIT_STREAM -> HubLoggerBoltUnitStreamOutputField.fields
QUALITY_DATA_STREAM -> HubLoggerBoltQualityStreamOutputField.fields
// ...
}
}
FieldGroup.kt:
enum class FieldGroup(val value: String) {
UNIT_ID("unitId"),
DEVICE_ID("deviceId"),
PARTITION_ID("partitionId"),
INDUSTRY_ID("industryId")
}
ContainerRef.kt:
enum class ContainerRef(val value: String) {
DEVICES_DATA("DEVICES_DATA"),
INDUSTRY_UNIT_DATA("INDUSTRY_UNIT_DATA"),
NOTIFICATION("NOTIFICATION"),
ALERTS_CONFIG("ALERTS_CONFIG")
}
7. connector/ - External Service Connectors
Manages connections to external services.
connector/
�� CosmosConnector.kt # Cosmos DB connector wrapper
CosmosConnector.kt:
class CosmosConnector(private val containerRef: ContainerRef) {
fun createItem(item: Any, partitionKey: PartitionKey) { }
fun readItem(id: String, partitionKey: PartitionKey): CosmosItemResponse { }
fun queryItems(query: String): List<CosmosItem> { }
}
8. constants/ - Configuration Constants
Application-wide configuration values.
constants/
�� StormConfig.kt # Storm and Event Hub configuration
StormConfig.kt:
object StormConfig {
val deviceDataEventHubMeta = SpoutMeta(
id = SpoutsIdentifier.EVENT_HUB_SPOUT,
namespace = "...",
entityPath = "...",
username = "...",
password = "...",
partitionCount = 8
)
}
9. dataclass/ - Data Transfer Objects
Simple data classes for configuration and metadata.
dataclass/
�� SpoutMeta.kt # Event Hub spout configuration
SpoutMeta.kt:
data class SpoutMeta(
val id: SpoutsIdentifier,
val namespace: String,
val entityPath: String,
val username: String,
val password: String,
val partitionCount: Int
)
10. util/ - Utility Functions
General-purpose helper functions.
util/
�� HelperUtil.kt # Miscellaneous utilities
HelperUtil.kt:
object HelperUtil {
fun initSentry() { }
fun formatTimestamp(epoch: Long): String { }
fun validateSensorData(data: InstantaneousData): Boolean { }
}
Build Configuration
build.gradle.kts
Defines project dependencies, plugins, and build tasks.
Key Sections:
plugins {
kotlin("jvm") version "1.9.22"
id("com.github.johnrengelman.shadow") version "7.1.2"
id("io.sentry.jvm.gradle") version "5.3.0"
}
dependencies {
// Storm
implementation("org.apache.storm:storm-core:2.7.0")
implementation("org.apache.storm:storm-eventhubs:2.5.0")
implementation("org.apache.storm:storm-redis:2.7.0")
// Database
implementation("com.azure:azure-cosmos:4.65.0")
// Utilities
implementation("org.apache.commons:commons-math3:3.6.1")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.18.1")
// Monitoring
implementation("io.sentry:sentry:5.3.0")
// Annotation processing
implementation(project(":processor"))
ksp(project(":processor"))
}
Build Tasks:
gradle build- Compile and testgradle shadowJar- Create fat JAR with dependenciesgradle clean- Clean build directory
Processor Module
Annotation processor for generating boilerplate code.
processor/
�� build.gradle.kts
�� src/main/kotlin/
�� BoltProcessor.kt # KSP annotation processor
Used for generating output field classes automatically (if implemented).
Code Organization Patterns
1. Package by Feature
Bolts are organized by their domain functionality (alerts, flow, energy, etc.) rather than technical type.
GOOD:
bolt/alerts/QualityAlertBolt.kt
bolt/alerts/EnergyAlertBolt.kt
bolt/alerts/processor/quality/QualityAlertProcessor.kt
L BAD:
bolt/QualityAlertBolt.kt
bolt/EnergyAlertBolt.kt
processor/QualityAlertProcessor.kt
2. Companion Object for Constants
Bolt IDs and other constants stored in companion objects.
class QualityBolt : BaseRichBolt() {
companion object {
const val ID: String = "QualityBolt"
private val logger = LoggerFactory.getLogger(QualityBolt::class.java)
}
}
3. State Classes in supporter/
All stateful data structures centralized in supporter/ for reusability.
// supporter/QualityThresholdState.kt
class QualityThresholdState {
var lastCreatedOnEpoch: Long = 0
var lastParamsData: HashMap<String, Double?> = HashMap()
}
4. Enum-Based Configuration
Use enums instead of magic strings for configuration keys.
// GOOD
.fieldsGrouping(source, Fields(FieldGroup.PARTITION_ID.value))
// L BAD
.fieldsGrouping(source, Fields("partitionId"))
5. Output Field Models
Each bolt's output is defined as a data class in model/.
// model/common/CommonLevelQualityBoltOutputField.kt
data class CommonLevelQualityBoltOutputField(...) {
companion object {
val fields = Fields(...)
fun fromValues(values: List<Any>): CommonLevelQualityBoltOutputField { }
}
val values: List<Any> get() = listOf(...)
}
Key Files Reference
| File | Purpose | Location |
|---|---|---|
Main.kt | Application entry point | src/main/kotlin/ |
StormTopologyBuilder.kt | Topology construction | src/main/kotlin/topology/ |
HubLoggerBolt.kt | Data parsing and routing | src/main/kotlin/bolt/hubLogger/ |
AlertsHandlerBolt.kt | Alert routing | src/main/kotlin/bolt/alerts/ |
QualityAlertBolt.kt | Quality alerts | src/main/kotlin/bolt/alerts/ |
CosmosDataSource.kt | Database connection | src/main/kotlin/database/ |
CosmosDBQueries.kt | Database operations | src/main/kotlin/database/ |
StreamGroup.kt | Stream definitions | src/main/kotlin/enum/ |
FieldGroup.kt | Field name constants | src/main/kotlin/enum/ |
build.gradle.kts | Build configuration | Root directory |
Navigation Tips
Finding Bolts
# Find all bolt implementations
find src/main/kotlin/bolt -name "*Bolt.kt"
# Find alert-related bolts
find src/main/kotlin/bolt/alerts -name "*AlertBolt.kt"
Finding Models
# Find output field models
find src/main/kotlin/model -name "*OutputField.kt"
# Find database models
find src/main/kotlin/database/model -name "*.kt"
Finding State Classes
# Find all state classes
find src/main/kotlin/supporter -name "*State.kt"
Documentation References
Located in the root directory:
ALERTS_CONFIG_MIGRATION_GUIDE.md- Alert configuration migrationBOLT_MIGRATION_SUMMARY.md- Bolt refactoring summaryREFERENCE_OBJECT_USAGE_SUMMARY.md- Reference object patternsSTANDARD_NOTIFICATION_FORMAT.md- Alert notification formatALERT_STANDARDIZATION_STATUS.md- Alert migration status
Summary
The project follows a clear organizational structure:
bolt/- Organized by domain (alerts, flow, energy, etc.)topology/- Central topology builderdatabase/- Cosmos DB integration and modelssupporter/- State management and utilitiesmodel/- Type-safe output field definitionsenum/- Constants and enumerationsconnector/- External service connectorsconstants/- Configurationutil/- Helper functions
Key Principles:
- Package by feature, not by layer
- Use enums for type safety
- Centralize state management
- Type-safe data models
- Clear naming conventions
- Explore Quick Start to run the project
- Read Adding a New Bolt to implement features
- See Topology Design for architecture details
Related Documentation
- Overview - Project introduction
- Our Storm Implementation - How we use Storm
- Bolts Overview - Detailed bolt documentation
- Data Models - All data structures