Our Storm Implementation
Now that you understand Apache Storm basics, let's see how we've implemented it in our water management system. This guide walks through real examples from our codebase, showing how we create bolts, build topologies, pass data, and manage state.
This page uses real code examples from our project. You can find these files in src/main/kotlin/ to explore further.
Building Our Topology
The Topology Builder
Our topology is built in StormTopologyBuilder.kt. It creates a complex DAG (Directed Acyclic Graph) that processes water sensor data through multiple stages.
Real Code Example (StormTopologyBuilder.kt:39-64):
fun buildTopology(): StormTopology {
val topologyBuilder = TopologyBuilder()
// Step 1: Add all spouts to topology
spoutsConfig.forEach { (id, config) ->
topologyBuilder.setSpout(
id.value, // Spout ID (e.g., "EventHubSpout")
EventHubSpout(config), // Spout instance
config.partitionCount // Parallelism
).setNumTasks(config.partitionCount)
}
// Step 2: Add HubLogger bolt - receives from all spouts
val bolt = topologyBuilder.setBolt(
HubLoggerBolt.ID,
HubLoggerBolt(),
registeredSpouts.sumOf { it.partitionCount }
).setNumTasks(registeredSpouts.sumOf { it.partitionCount })
// Connect HubLogger to all spouts
spoutsConfig.keys.forEach {
bolt.localOrShuffleGrouping(it.value)
}
// Step 3: Build the rest of the topology
spoutsConfig.forEach { (id, config) ->
when (id) {
SpoutsIdentifier.EVENT_HUB_SPOUT ->
buildHardwareDataTopology(config, topologyBuilder)
SpoutsIdentifier.EVENT_HUB_SPOUT_EXTERNAL -> {}
}
}
return topologyBuilder.createTopology()
}
- Parallelism: Number of executor threads (e.g.,
config.partitionCount) - NumTasks: Number of instances (can be > parallelism for future scaling)
- Grouping: How tuples are distributed to bolt instances
Data Models
Before we process data, we need to define what data looks like. We use data classes for type-safe data handling.
InstantaneousData
Used for real-time sensor readings like water quality, level, etc.
Real Code (supporter/InstantaneousData.kt:6-29):
class InstantaneousData : Serializable {
// Stores multiple sensor parameters (pH, TDS, turbidity, etc.)
private var paramsData: Map<String, Double?>? = hashMapOf()
// Single sensor value (for flow, level)
var sensorValue: Double? = null
// Calculated instantaneous value
var calcInstantaneousValue: Double? = null
constructor(data: Map<String, Double>) {
this.paramsData = data
}
constructor(value: Double) {
this.calcInstantaneousValue = value
}
constructor()
fun getParamData(): Map<String, Double?>? {
return paramsData
}
fun setParamData(data: Map<String, Double?>?) {
this.paramsData = data
}
}
Usage Example:
// Water quality data with multiple parameters
val qualityData = InstantaneousData(mapOf(
"pH" to 7.2,
"TDS" to 350.5,
"turbidity" to 2.1,
"temperature" to 25.0
))
// Single level sensor value
val levelData = InstantaneousData(1250.75) // Tank level in cm
All data classes must implement Serializable because Storm sends data between JVM processes over the network.
Creating Bolts
Bolt Anatomy
Every bolt in our system extends BaseRichBolt and implements three key methods:
class MyBolt : BaseRichBolt() {
// 1. Initialize bolt (called once per task)
override fun prepare(
conf: Map<String, Any>,
context: TopologyContext,
collector: OutputCollector
) { }
// 2. Process each tuple (called for every incoming tuple)
override fun execute(tuple: Tuple) { }
// 3. Declare what fields this bolt emits
override fun declareOutputFields(declarer: OutputFieldsDeclarer) { }
}
Real Example: QualityBolt
Let's look at how QualityBolt processes water quality data.
Real Code (bolt/quality/QualityBolt.kt:19-63):
class QualityBolt : BaseRichBolt() {
private var collector: OutputCollector? = null
// 1. PREPARE - Initialize resources
override fun prepare(
topoConf: Map<String, Any>,
context: TopologyContext,
collector: OutputCollector
) {
this.collector = collector
// Could also initialize database connections, caches, etc.
}
// 2. EXECUTE - Process incoming tuple
override fun execute(tuple: Tuple) {
try {
// Extract input data from tuple
val inputData = HubLoggerBoltQualityStreamOutputField.fromValues(
tuple.values
)
// Convert timestamp to epoch
val createdOnEpoch = UDF.createdOnToEpoch(inputData.createdOn)
// Emit processed data to next bolt
collector!!.emit(
tuple, // Anchor to input tuple for reliability
CommonLevelQualityBoltOutputField(
"telemetry",
inputData.instantaneousData,
createdOnEpoch,
inputData.deviceId,
inputData.deviceId + "_" + inputData.unitId, // Partition ID
inputData.systemEpoch,
inputData.bpRerun
).values
)
} catch (pe: Exception) {
pe.printStackTrace()
logger.error("anomaly parsing error: ${pe.message}")
}
// 3. ACK - Mark tuple as successfully processed
collector!!.ack(tuple)
}
// 4. DECLARE OUTPUT - Define output schema
override fun declareOutputFields(declarer: OutputFieldsDeclarer) {
declarer.declare(CommonLevelQualityBoltOutputField.fields)
}
companion object {
private val logger: Logger = LoggerFactory.getLogger(QualityBolt::class.java)
const val ID: String = "QualityBolt"
}
}
Step-by-Step Breakdown:
- Extract Data: Parse incoming tuple into typed objects
- Transform: Process, calculate, or enrich the data
- Emit: Send result to downstream bolts
- Ack: Confirm successful processing (enables Storm's reliability)
Emitting Data
Basic Emit
// Emit to default stream using typed output field
collector.emit(
CommonLevelQualityBoltOutputField(
dataType = "telemetry",
instantaneousData = instantData,
createdOnEpoch = epoch,
deviceId = "DEVICE_001",
partitionId = "DEVICE_001_UNIT_A",
systemEpoch = sysEpoch,
bpRerun = false
).values
)
Anchored Emit (Reliable Processing)
// Anchor output to input tuple - creates tuple tree for tracking
collector.emit(
tuple, // Anchor to input
CommonLevelQualityBoltOutputField(
dataType = "telemetry",
instantaneousData = processedData,
createdOnEpoch = epoch,
deviceId = deviceId,
partitionId = partitionId,
systemEpoch = systemEpoch,
bpRerun = false
).values
)
Why Anchor?
- Storm tracks the entire tuple tree
- If any downstream bolt fails, the original tuple is replayed
- Guarantees at-least-once processing
Named Streams
Bolts can emit to multiple streams for routing different data types.
Real Example - HubLoggerBolt emits to different streams based on data type:
override fun execute(tuple: Tuple) {
val deviceType = parseDeviceType(tuple)
when (deviceType) {
"GSM_FLOW" -> {
// Emit to UNIT_STREAM for flow data
collector.emit(
StreamGroup.UNIT_STREAM.value,
tuple,
flowData.values
)
}
"TELTONIKA" -> {
// Emit to TELTONIKA_STREAM for teltonika devices
collector.emit(
StreamGroup.TELTONIKA_STREAM.value,
tuple,
teltonikaData.values
)
}
"QUALITY" -> {
// Emit to QUALITY_DATA_STREAM for quality sensors
collector.emit(
StreamGroup.QUALITY_DATA_STREAM.value,
tuple,
qualityData.values
)
}
}
}
override fun declareOutputFields(declarer: OutputFieldsDeclarer) {
// Declare multiple named streams
declarer.declareStream(
StreamGroup.UNIT_STREAM.value,
HubLoggerBoltUnitStreamOutputField.fields
)
declarer.declareStream(
StreamGroup.TELTONIKA_STREAM.value,
HubLoggerBoltTeltonikaStreamOutputField.fields
)
declarer.declareStream(
StreamGroup.QUALITY_DATA_STREAM.value,
HubLoggerBoltQualityStreamOutputField.fields
)
}
Stream Groupings
Stream groupings determine how tuples are distributed among bolt instances.
1. Fields Grouping (Most Common in Our System)
Routes tuples with the same field value to the same bolt instance. Essential for stateful processing.
Real Example (StormTopologyBuilder.kt:110-116):
// Quality data grouped by unitId
topologyBuilder.setBolt(QualityBolt.ID, QualityBolt(), config.partitionCount)
.fieldsGrouping(
HubLoggerBolt.ID, // Source bolt
StreamGroup.QUALITY_DATA_STREAM.value, // Source stream
Fields(FieldGroup.UNIT_ID.value) // Group by unitId
)
.setNumTasks(4)
Why Fields Grouping?
- All data for
Unit_Aalways goes to same bolt instance - Enables stateful tracking (previous values, thresholds, etc.)
- Prevents state corruption from concurrent updates
2. Shuffle Grouping (Random Distribution)
Distributes tuples randomly for load balancing when state isn't needed.
Real Example (StormTopologyBuilder.kt:320-326):
// Alert responses don't need state - distribute randomly
topologyBuilder.setBolt(
AlertResponseHandlerBolt.ID,
AlertResponseHandlerBolt(),
config.partitionCount
)
.shuffleGrouping(FlowConsumptionAlertBolt.ID)
.shuffleGrouping(LevelAlertBolt.ID)
.shuffleGrouping(QualityAlertBolt.ID)
.shuffleGrouping(EnergyAlertBolt.ID)
.shuffleGrouping(DeviceStatusAlertBolt.ID)
.setNumTasks(config.partitionCount)
3. All Grouping (Broadcast)
Sends tuple to all bolt instances. Used for configuration updates.
Real Example (StormTopologyBuilder.kt:135):
topologyBuilder.setBolt(InitConsumptionBolt.ID, InitConsumptionBolt(), parallelism)
.fieldsGrouping(SingularBolt.ID, Fields(FieldGroup.PARTITION_ID.value))
.fieldsGrouping(TeltonikaBolt.ID, Fields(FieldGroup.PARTITION_ID.value))
.allGrouping(HubLoggerBolt.ID, StreamGroup.INIT_STREAM.value) // Config broadcast
Use Case: When HubLogger emits configuration updates, ALL InitConsumptionBolt instances receive it.
4. Local or Shuffle Grouping (Performance Optimization)
Prefers same-worker tasks, falls back to shuffle if not available.
Real Example (StormTopologyBuilder.kt:53):
spoutsConfig.keys.forEach {
bolt.localOrShuffleGrouping(it.value) // Optimize for same JVM
}
Receiving Data
Extracting Tuple Values
We use type-safe data classes with fromValues() helper methods.
Real Example (QualityAlertBolt.kt:41):
override fun execute(tuple: Tuple) {
// Type-safe extraction
val inputData = PreProcessingInstantaneousBoltOutputField.fromValues(
tuple.values
)
// Now we have strongly-typed access
val industryId = inputData.industryId
val unitId = inputData.referenceData.unitId
val createdOn = inputData.createdOnEpoch
val qualityData = inputData.instantaneousData
}
Manual Tuple Access
You can also access tuple fields by index or name:
// By index
val field0 = tuple.getValue(0)
val field1 = tuple.getValue(1)
// By field name
val unitId = tuple.getStringByField("unitId")
val value = tuple.getDoubleByField("value")
val timestamp = tuple.getLongByField("timestamp")
State Management in Bolts
Bolts maintain in-memory state using HashMaps keyed by partition ID.
Real Example: QualityAlertBolt State
Real Code (QualityAlertBolt.kt:29, 71-79):
class QualityAlertBolt : BaseAlertBolt() {
// State HashMap - one entry per partition
private var state: HashMap<String, QualityThresholdState?> = HashMap()
override fun execute(tuple: Tuple) {
val inputData = PreProcessingInstantaneousBoltOutputField.fromValues(tuple.values)
val partitionId = inputData.partitionId
// Get state for this partition
var qualityState = state[partitionId]
// Initialize state if first time seeing this partition
if (qualityState == null) {
qualityState = QualityThresholdState()
qualityState.lastCreatedOnEpoch = inputData.createdOnEpoch
qualityState.lastParamsData = HashMap()
state[partitionId] = qualityState
}
// Use state to detect threshold violations
val processor = QualityAlertProcessor(
inletData = inputData,
state = qualityState, // Pass existing state
dbClient = notificationDBClient,
alertConfig = alertConfig
)
val alerts = processor.run()
// Update state after processing
state[partitionId] = qualityState
}
}
QualityThresholdState tracks:
- Last timestamp
- Previous parameter values (pH, TDS, etc.)
- Active alerts
- Cooldown periods
State management requires fields grouping! The same partition must always route to the same bolt instance, otherwise state will be split across instances and won't work correctly.
Complete Bolt Example: QualityAlertBolt
Let's put it all together with a real-world example that handles water quality alerts.
Full Flow:
Real Code (QualityAlertBolt.kt:27-100):
class QualityAlertBolt : BaseAlertBolt() {
// 1. STATE MANAGEMENT
private var state: HashMap<String, QualityThresholdState?> = HashMap()
private lateinit var dbClient: CosmosConnector
private lateinit var notificationDBClient: CosmosConnector
// 2. INITIALIZATION
override fun prepare(
topoConf: Map<String, Any>,
context: TopologyContext,
collector: OutputCollector
) {
this.collector = collector
this.dbClient = CosmosConnector(ContainerRef.DEVICES_DATA)
this.notificationDBClient = CosmosConnector(ContainerRef.NOTIFICATION)
}
// 3. MAIN PROCESSING LOGIC
override fun execute(tuple: Tuple) {
try {
// RECEIVE: Extract typed data from tuple
val inputData = PreProcessingInstantaneousBoltOutputField.fromValues(
tuple.values
)
val partitionId = inputData.partitionId
val referenceData = inputData.referenceData as ReferenceObjectInstant
// GET CONFIG: Fetch alert configuration for this unit
val alertConfig = getAlertConfigOrAck(
inputData.industryId,
referenceData.unitId,
tuple
) ?: return
// PROCESS: Check quality thresholds and generate alerts
checkAndUpdateAlerts(
listOf(
processQualityAlerts(inputData, partitionId, alertConfig)
),
inputData.partitionId
)
} catch (e: Exception) {
logger.error("Error processing tuple: ${e.message}", e)
Sentry.captureException(e)
}
// ACK: Mark tuple as successfully processed
collector.ack(tuple)
}
// 4. ALERT PROCESSING LOGIC
private fun processQualityAlerts(
inputData: PreProcessingInstantaneousBoltOutputField,
partitionId: String,
alertConfig: IndustryUnitAlertConfig
): List<JSONObject> {
// Get existing state or create new
var qualityState = state[partitionId]
if (qualityState == null) {
qualityState = QualityThresholdState()
qualityState.lastCreatedOnEpoch = inputData.createdOnEpoch
qualityState.lastParamsData = HashMap()
state[partitionId] = qualityState
}
// Create processor with state
val processor = QualityAlertProcessor(
inletData = inputData,
state = qualityState,
dbClient = notificationDBClient,
alertConfig = alertConfig
)
// Run processor - returns formatted alerts
val alerts = processor.run()
// Save updated state
state[partitionId] = qualityState
return alerts
}
// 5. OUTPUT DECLARATION
override fun declareOutputFields(declarer: OutputFieldsDeclarer) {
declarer.declare(Fields(
FieldGroup.PARTITION_ID.value,
"formattedData"
))
}
companion object {
private val logger = LoggerFactory.getLogger(QualityAlertBolt::class.java)
const val ID: String = "QualityAlertBolt"
}
}
Registering Serializable Classes
Storm needs to know about custom classes that flow through the topology.
Real Example (Main.kt:72-82):
val config = Config()
// Register all custom data classes for serialization
config.registerSerialization(ReferenceObject::class.java)
config.registerSerialization(EnergyReferenceObject::class.java)
config.registerSerialization(ReferenceObjectInstant::class.java)
config.registerSerialization(InstantaneousData::class.java)
config.registerSerialization(java.util.LinkedHashMap::class.java)
config.registerSerialization(java.util.ArrayList::class.java)
config.registerSerialization(java.util.HashMap::class.java)
If you add a new data class that gets emitted in tuples, you must register it or you'll get serialization errors at runtime!
Real-World Data Flow Example
Let's trace a quality sensor reading through the entire topology:
Step 1: Sensor Data Arrives
{
"deviceId": "SENSOR_001",
"unitId": "UNIT_A",
"timestamp": "2025-01-09T10:30:00Z",
"parameters": {
"pH": 8.5,
"TDS": 450,
"turbidity": 3.2
}
}
Step 2: Event Hub Spout Ingests
// EventHubSpout receives message
spout.emit(Values(messageBytes), messageId)
Step 3: HubLogger Parses and Routes
// HubLoggerBolt parses and determines it's quality data
collector.emit(
StreamGroup.QUALITY_DATA_STREAM.value, // Route to quality stream
tuple,
qualityData.values
)
Step 4: QualityBolt Processes
// QualityBolt formats for instantaneous processing
collector.emit(
tuple,
CommonLevelQualityBoltOutputField(
"telemetry",
instantaneousData, // Contains pH, TDS, turbidity
createdOnEpoch,
"SENSOR_001",
"SENSOR_001_UNIT_A", // partitionId
systemEpoch,
false
).values
)
Step 5: Fields Grouping Routes by PartitionId
// Storm ensures all SENSOR_001_UNIT_A data goes to same instance
.fieldsGrouping(
QualityBolt.ID,
Fields(FieldGroup.PARTITION_ID.value) // Using enum for field name
)
Step 6: InitInstantaneousBolt
// Initializes reference object for state tracking
val refObj = ReferenceObjectInstant()
refObj.unitId = "UNIT_A"
refObj.currentInstantaneousData = instantaneousData
// Emit using typed output field
collector.emit(
tuple,
InitInstantaneousBoltOutputField(
partitionId = partitionId,
referenceData = refObj,
industryId = industryId
).values
)
Step 7: TimeFilter & PreProcessing
// TimeFilterInstantaneousBolt filters old data
// PreProcessingInstantaneousBolt aggregates and enriches
collector.emit(
StreamGroup.INSTANT_NOTIFICATION_STREAM.value,
tuple,
processedData.values
)
Step 8: AlertsHandler Routes to QualityAlertBolt
// AlertsHandlerBolt routes to quality alert stream
collector.emit(
StreamGroup.QUALITY_ALERT_STREAM.value,
tuple,
inputData.values
)
Step 9: QualityAlertBolt Detects Violation
// pH threshold: 6.5 - 7.5
// Current: 8.5 � VIOLATION!
val qualityState = state[partitionId]
val processor = QualityAlertProcessor(inputData, qualityState, ...)
// Returns alert JSON
val alerts = processor.run() // [{type: "QUALITY", param: "pH", ...}]
// Emit to response handler
checkAndUpdateAlerts(alerts, partitionId)
Step 10: AlertResponseHandler Saves to Database
// Saves formatted alert to NOTIFICATION container
dbClient.createItem(alertJson, PartitionKey(industryId))
Complete Flow Diagram:
Key Takeaways
1. Use Typed Output Fields (Not Raw Values)
// ✅ GOOD: Type-safe using output field classes
collector.emit(
tuple,
MyBoltOutputField(
partitionId = partitionId,
data = processedData,
timestamp = epoch
).values
)
// ❌ BAD: Raw Values - no type safety
collector.emit(tuple, Values(partitionId, processedData, epoch))
2. Use Field Enums for Groupings
// ✅ GOOD: Using enum for field names
.fieldsGrouping(SourceBolt.ID, Fields(FieldGroup.PARTITION_ID.value))
// ❌ BAD: Hard-coded strings
.fieldsGrouping(SourceBolt.ID, Fields("partitionId"))
3. Use fromValues() for Extraction
// ✅ GOOD: Type-safe extraction
val inputData = MyBoltOutputField.fromValues(tuple.values)
val partitionId = inputData.partitionId
// ❌ BAD: Manual extraction by index
val partitionId = tuple.getValue(0) as String
4. Always Use Fields Grouping for State
.fieldsGrouping(SourceBolt.ID, Fields(FieldGroup.PARTITION_ID.value))
5. Always Ack Tuples
collector.ack(tuple) // Success
collector.fail(tuple) // Failure - will replay
6. Anchor Emits for Reliability
collector.emit(tuple, outputField.values) // Creates tuple tree
7. Register Custom Classes
config.registerSerialization(MyDataClass::class.java)
Next Steps
Now that you understand how our Storm implementation works, you can:
- Explore Project Structure to understand codebase organization
- Jump to Adding a New Bolt to implement your own processing logic
- Read Alert System for deep dive on alerts
Related Documentation
- What is Apache Storm - Storm fundamentals
- Topology Design - Our topology architecture
- Bolts Overview - All bolt types
- State Management - Advanced state patterns