Skip to main content

Project Structure

Understanding the project's organization is essential for navigating the codebase and making contributions. This guide provides a comprehensive overview of the directory structure, key files, and code organization patterns.

Quick Navigation

Use your IDE's file search (Cmd/Ctrl + P) to quickly jump to files mentioned in this guide.


Directory Overview

storm/
�� src/main/kotlin/ # Main source code
 �� Main.kt # Application entry point
 �� bolt/ # Storm bolt implementations
 �� topology/ # Topology builder
 �� database/ # Cosmos DB integration
 �� supporter/ # State management & utilities
 �� model/ # Output field models
 �� enum/ # Enumerations & constants
 �� connector/ # External service connectors
 �� constants/ # Configuration constants
 �� dataclass/ # Data transfer objects
 �� util/ # Utility functions
 �� shaded/ # Shaded dependencies

�� processor/ # Annotation processor module
�� build.gradle.kts # Build configuration
�� settings.gradle.kts # Gradle settings
�� docs/ # Documentation (you are here!)

Main Source Directory (src/main/kotlin/)

Entry Point

Main.kt

The application's entry point that configures and submits the Storm topology.

Key Responsibilities:

  • Initialize Event Hub spouts
  • Register serialization classes
  • Configure Storm topology
  • Submit to local or remote cluster

Location: src/main/kotlin/Main.kt

fun main(args: Array<String>) {
HelperUtil.initSentry()
val topology = Main()
topology.runScenario(args)
}

Core Packages

1. bolt/ - Processing Units

Contains all Storm bolt implementations organized by functionality.

bolt/
�� alerts/ # Alert detection and handling
 �� AlertsHandlerBolt.kt # Routes data to specific alert bolts
 �� QualityAlertBolt.kt # Quality parameter alerts
 �� EnergyAlertBolt.kt # Energy consumption alerts
 �� LevelAlertBolt.kt # Tank level alerts
 �� FlowConsumptionAlertBolt.kt # Flow & consumption alerts
 �� DeviceStatusAlertBolt.kt # Device online/offline alerts
 �� AlertResponseHandlerBolt.kt # Saves alerts to database
 �� processor/ # Alert processing logic
 �� quality/ # Quality alert processors
 �� energy/ # Energy alert processors
 �� level/ # Level alert processors
 �� flow/ # Flow alert processors

�� data/ # Data persistence bolts
 �� DeviceDataPostBolt.kt # Writes processed data to DB
 �� IndustryMetrics.kt # Industry-level metrics

�� energy/ # Energy processing pipeline
 �� InitEnergyBolt.kt # Initialize energy data
 �� TimeFilterEnergyBolt.kt # Filter old energy data
 �� PreProcessingEnergyBolt.kt # Aggregate energy data
 �� supporter/ # Energy-specific helpers

�� flow/ # Flow/consumption processing
 �� SingularBolt.kt # GSM flow meter data
 �� TeltonikaBolt.kt # Teltonika device data
 �� LoraPandaBolt.kt # LoRa device data
 �� InitConsumptionBolt.kt # Initialize consumption
 �� TimeFilterBolt.kt # Filter old flow data
 �� PreProcessingConsumptionBolt.kt # Aggregate consumption

�� level/ # Level sensor processing
 �� SingularLevelBolt.kt # GSM level sensors
 �� TeltonikaLevelBolt.kt # Teltonika level sensors

�� quality/ # Water quality processing
 �� QualityBolt.kt # Process quality parameters

�� instantaneous/ # Real-time metrics pipeline
 �� InitInstantaneousBolt.kt # Initialize instant data
 �� TimeFilterInstantaneousBolt.kt # Filter old data
 �� PreProcessingInstantaneousBolt.kt # Aggregate instant data

�� metrics/ # Industry-wide metrics
 �� IndustryMetrics.kt # Calculate industry metrics

�� hubLogger/ # Data routing
 �� HubLoggerBolt.kt # Parse and route incoming data

�� notification/ # (Legacy - migrated to alerts/)

Bolt Naming Convention:

  • Init*Bolt - Initialize and set up reference objects
  • TimeFilter*Bolt - Filter outdated data
  • PreProcessing*Bolt - Aggregate and enrich data
  • *AlertBolt - Detect and generate alerts

Example Bolt Structure:

// bolt/quality/QualityBolt.kt
class QualityBolt : BaseRichBolt() {
companion object {
const val ID: String = "QualityBolt" // Used in topology
}

override fun prepare(...) { }
override fun execute(tuple: Tuple) { }
override fun declareOutputFields(...) { }
}

2. topology/ - Topology Builder

Constructs the Storm topology by connecting spouts and bolts.

topology/
�� StormTopologyBuilder.kt

Key File: StormTopologyBuilder.kt

Responsibilities:

  • Define topology structure
  • Connect bolts with appropriate groupings
  • Set parallelism and task counts
  • Configure named streams

Usage in Main.kt:

val topology = StormTopologyBuilder(
spoutsConfig,
registeredSpouts
).buildTopology()

3. database/ - Database Integration

Handles all interactions with Azure Cosmos DB.

database/
�� CosmosDataSource.kt # Singleton connection manager
�� CosmosDBQueries.kt # Database query operations
�� model/ # Database entity models
�� IndustryData.kt
�� IndustryUnitData.kt
�� IndustryUnitAlertConfig.kt
�� AlertsConfig.kt
�� UnitConfig.kt

CosmosDataSource.kt

  • Manages database connection lifecycle
  • Provides singleton access to database client
  • Handles connection pooling

CosmosDBQueries.kt

  • CRUD operations for all containers
  • Batch operations
  • Query builders

Data Models (under database/model/):

// Example: IndustryUnitAlertConfig.kt
data class IndustryUnitAlertConfig(
var id: String = "",
var industryId: String = "",
var unitId: String = "",
var alertsConfig: Map<AlertType, AlertConfig> = emptyMap()
)

4. supporter/ - State Management & Utilities

Contains state classes and utility functions used across bolts.

supporter/
�� ReferenceObject.kt # Flow/consumption state tracking
�� ReferenceObjectInstant.kt # Instantaneous data state
�� EnergyReferenceObject.kt # Energy state tracking
�� InstantaneousData.kt # Sensor readings container
�� ThresholdState.kt # Generic threshold state
�� QualityThresholdState.kt # Quality alert state
�� LevelThresholdState.kt # Level alert state
�� EnergyThresholdState.kt # Energy alert state
�� LeakAlertState.kt # Leak detection state
�� StableFlowAlertState.kt # Stable flow state
�� FlowRateTriggerState.kt # Flow rate state
�� ConsumptionTriggerState.kt # Consumption trigger state
�� PreviousState.kt # Generic previous state
�� UDF.kt # User-defined functions
�� ...

Reference Objects: Track stateful information across tuples for each partition.

// ReferenceObject.kt - Tracks flow/consumption
class ReferenceObject {
var currentData: Double? = null
var previousData: Double? = null
var lastProcessedTime: Long = 0
var totaliser: TotaliserData? = null
}

Threshold States: Maintain alert state to prevent alert fatigue.

// QualityThresholdState.kt
class QualityThresholdState {
var lastCreatedOnEpoch: Long = 0
var lastParamsData: HashMap<String, Double?> = HashMap()
var activeAlerts: HashMap<String, AlertInfo> = HashMap()
var cooldownPeriods: HashMap<String, Long> = HashMap()
}

UDF (User-Defined Functions): Utility functions for data processing.

object UDF {
fun createdOnToEpoch(createdOn: String): Long { }
fun calculateConsumption(current: Double, previous: Double): Double { }
fun interpolateValue(value1: Double, value2: Double, ratio: Double): Double { }
}

5. model/ - Output Field Models

Type-safe field declarations for tuples flowing between bolts.

model/
�� common/ # Shared output fields
 �� CommonLevelQualityBoltOutputField.kt

�� hubLogger/ # HubLogger output fields
 �� HubLoggerBoltUnitStreamOutputField.kt
 �� HubLoggerBoltQualityStreamOutputField.kt
 �� HubLoggerBoltEnergyStreamOutputField.kt
 �� ...

�� flow/ # Flow bolt output fields
 �� PreProcessingConsumptionBoltOutputStreamField.kt

�� energy/ # Energy bolt output fields
 �� PreProcessingEnergyBoltOutputStreamField.kt

�� unknown/ # Instantaneous output fields
 �� PreProcessingInstantaneousBoltOutputField.kt

�� alert/ # Alert output fields
 �� AlertOutputField.kt

�� notification/ # Notification fields

Output Field Pattern:

Each model provides:

  • fields - Storm Fields declaration
  • values - List of values for emission
  • fromValues() - Type-safe extraction
data class MyBoltOutputField(
val partitionId: String,
val data: InstantaneousData,
val timestamp: Long
) {
companion object {
val fields = Fields("partitionId", "data", "timestamp")

fun fromValues(values: List<Any>): MyBoltOutputField {
return MyBoltOutputField(
partitionId = values[0] as String,
data = values[1] as InstantaneousData,
timestamp = values[2] as Long
)
}
}

val values: List<Any>
get() = listOf(partitionId, data, timestamp)
}

6. enum/ - Enumerations & Constants

Centralized enums for type-safe references.

enum/
�� StreamGroup.kt # Named stream identifiers
�� FieldGroup.kt # Common field names
�� SpoutsIdentifier.kt # Spout identifiers
�� ContainerRef.kt # Cosmos DB container names
�� Environment.kt # Deployment environments

StreamGroup.kt:

enum class StreamGroup(val value: String) {
UNIT_STREAM("unitStream"),
QUALITY_DATA_STREAM("qualityDataStream"),
ENERGY_STREAM("energyStream"),
QUALITY_ALERT_STREAM("qualityAlertStream"),
FLOW_ALERT_STREAM("flowAlertStream")
// ...

val fields: Fields // Associated Storm Fields
get() = when (this) {
UNIT_STREAM -> HubLoggerBoltUnitStreamOutputField.fields
QUALITY_DATA_STREAM -> HubLoggerBoltQualityStreamOutputField.fields
// ...
}
}

FieldGroup.kt:

enum class FieldGroup(val value: String) {
UNIT_ID("unitId"),
DEVICE_ID("deviceId"),
PARTITION_ID("partitionId"),
INDUSTRY_ID("industryId")
}

ContainerRef.kt:

enum class ContainerRef(val value: String) {
DEVICES_DATA("DEVICES_DATA"),
INDUSTRY_UNIT_DATA("INDUSTRY_UNIT_DATA"),
NOTIFICATION("NOTIFICATION"),
ALERTS_CONFIG("ALERTS_CONFIG")
}

7. connector/ - External Service Connectors

Manages connections to external services.

connector/
�� CosmosConnector.kt # Cosmos DB connector wrapper

CosmosConnector.kt:

class CosmosConnector(private val containerRef: ContainerRef) {
fun createItem(item: Any, partitionKey: PartitionKey) { }
fun readItem(id: String, partitionKey: PartitionKey): CosmosItemResponse { }
fun queryItems(query: String): List<CosmosItem> { }
}

8. constants/ - Configuration Constants

Application-wide configuration values.

constants/
�� StormConfig.kt # Storm and Event Hub configuration

StormConfig.kt:

object StormConfig {
val deviceDataEventHubMeta = SpoutMeta(
id = SpoutsIdentifier.EVENT_HUB_SPOUT,
namespace = "...",
entityPath = "...",
username = "...",
password = "...",
partitionCount = 8
)
}

9. dataclass/ - Data Transfer Objects

Simple data classes for configuration and metadata.

dataclass/
�� SpoutMeta.kt # Event Hub spout configuration

SpoutMeta.kt:

data class SpoutMeta(
val id: SpoutsIdentifier,
val namespace: String,
val entityPath: String,
val username: String,
val password: String,
val partitionCount: Int
)

10. util/ - Utility Functions

General-purpose helper functions.

util/
�� HelperUtil.kt # Miscellaneous utilities

HelperUtil.kt:

object HelperUtil {
fun initSentry() { }
fun formatTimestamp(epoch: Long): String { }
fun validateSensorData(data: InstantaneousData): Boolean { }
}

Build Configuration

build.gradle.kts

Defines project dependencies, plugins, and build tasks.

Key Sections:

plugins {
kotlin("jvm") version "1.9.22"
id("com.github.johnrengelman.shadow") version "7.1.2"
id("io.sentry.jvm.gradle") version "5.3.0"
}

dependencies {
// Storm
implementation("org.apache.storm:storm-core:2.7.0")
implementation("org.apache.storm:storm-eventhubs:2.5.0")
implementation("org.apache.storm:storm-redis:2.7.0")

// Database
implementation("com.azure:azure-cosmos:4.65.0")

// Utilities
implementation("org.apache.commons:commons-math3:3.6.1")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.18.1")

// Monitoring
implementation("io.sentry:sentry:5.3.0")

// Annotation processing
implementation(project(":processor"))
ksp(project(":processor"))
}

Build Tasks:

  • gradle build - Compile and test
  • gradle shadowJar - Create fat JAR with dependencies
  • gradle clean - Clean build directory

Processor Module

Annotation processor for generating boilerplate code.

processor/
�� build.gradle.kts
�� src/main/kotlin/
�� BoltProcessor.kt # KSP annotation processor

Used for generating output field classes automatically (if implemented).


Code Organization Patterns

1. Package by Feature

Bolts are organized by their domain functionality (alerts, flow, energy, etc.) rather than technical type.

 GOOD:

bolt/alerts/QualityAlertBolt.kt
bolt/alerts/EnergyAlertBolt.kt
bolt/alerts/processor/quality/QualityAlertProcessor.kt

L BAD:

bolt/QualityAlertBolt.kt
bolt/EnergyAlertBolt.kt
processor/QualityAlertProcessor.kt

2. Companion Object for Constants

Bolt IDs and other constants stored in companion objects.

class QualityBolt : BaseRichBolt() {
companion object {
const val ID: String = "QualityBolt"
private val logger = LoggerFactory.getLogger(QualityBolt::class.java)
}
}

3. State Classes in supporter/

All stateful data structures centralized in supporter/ for reusability.

// supporter/QualityThresholdState.kt
class QualityThresholdState {
var lastCreatedOnEpoch: Long = 0
var lastParamsData: HashMap<String, Double?> = HashMap()
}

4. Enum-Based Configuration

Use enums instead of magic strings for configuration keys.

//  GOOD
.fieldsGrouping(source, Fields(FieldGroup.PARTITION_ID.value))

// L BAD
.fieldsGrouping(source, Fields("partitionId"))

5. Output Field Models

Each bolt's output is defined as a data class in model/.

// model/common/CommonLevelQualityBoltOutputField.kt
data class CommonLevelQualityBoltOutputField(...) {
companion object {
val fields = Fields(...)
fun fromValues(values: List<Any>): CommonLevelQualityBoltOutputField { }
}
val values: List<Any> get() = listOf(...)
}

Key Files Reference

FilePurposeLocation
Main.ktApplication entry pointsrc/main/kotlin/
StormTopologyBuilder.ktTopology constructionsrc/main/kotlin/topology/
HubLoggerBolt.ktData parsing and routingsrc/main/kotlin/bolt/hubLogger/
AlertsHandlerBolt.ktAlert routingsrc/main/kotlin/bolt/alerts/
QualityAlertBolt.ktQuality alertssrc/main/kotlin/bolt/alerts/
CosmosDataSource.ktDatabase connectionsrc/main/kotlin/database/
CosmosDBQueries.ktDatabase operationssrc/main/kotlin/database/
StreamGroup.ktStream definitionssrc/main/kotlin/enum/
FieldGroup.ktField name constantssrc/main/kotlin/enum/
build.gradle.ktsBuild configurationRoot directory

Finding Bolts

# Find all bolt implementations
find src/main/kotlin/bolt -name "*Bolt.kt"

# Find alert-related bolts
find src/main/kotlin/bolt/alerts -name "*AlertBolt.kt"

Finding Models

# Find output field models
find src/main/kotlin/model -name "*OutputField.kt"

# Find database models
find src/main/kotlin/database/model -name "*.kt"

Finding State Classes

# Find all state classes
find src/main/kotlin/supporter -name "*State.kt"

Documentation References

Located in the root directory:

  • ALERTS_CONFIG_MIGRATION_GUIDE.md - Alert configuration migration
  • BOLT_MIGRATION_SUMMARY.md - Bolt refactoring summary
  • REFERENCE_OBJECT_USAGE_SUMMARY.md - Reference object patterns
  • STANDARD_NOTIFICATION_FORMAT.md - Alert notification format
  • ALERT_STANDARDIZATION_STATUS.md - Alert migration status

Summary

The project follows a clear organizational structure:

  1. bolt/ - Organized by domain (alerts, flow, energy, etc.)
  2. topology/ - Central topology builder
  3. database/ - Cosmos DB integration and models
  4. supporter/ - State management and utilities
  5. model/ - Type-safe output field definitions
  6. enum/ - Constants and enumerations
  7. connector/ - External service connectors
  8. constants/ - Configuration
  9. util/ - Helper functions

Key Principles:

  • Package by feature, not by layer
  • Use enums for type safety
  • Centralize state management
  • Type-safe data models
  • Clear naming conventions
Next Steps