Skip to main content

Sample Storm Trial Project

Build a complete Temperature Monitoring and Alert System from scratch using all the patterns and best practices from our Storm implementation. This hands-on tutorial will guide you through creating a real working Storm topology.

Learning Objectives

By the end of this tutorial, you'll be able to:

  • Create typed data models for Storm tuples
  • Build custom bolts with state management
  • Implement alert detection logic
  • Integrate with external systems (databases)
  • Test and deploy a complete topology

Project Overview

What We're Building

A Real-Time Temperature Monitoring System that:

  1. Receives temperature sensor data from IoT devices
  2. Processes readings in real-time
  3. Detects high temperature alerts (> 35°C)
  4. Calculates hourly averages
  5. Stores data in a database
  6. Sends notifications when alerts trigger

Use Case

Scenario: A warehouse has 50 temperature sensors monitoring refrigerated storage units. We need to:

  • Track temperature in real-time
  • Alert when temperature exceeds safe threshold
  • Maintain hourly temperature averages for compliance reporting
  • Detect sensor failures (no data for 5 minutes)

Step 1: Create Data Models

First, let's create our data classes following the exact patterns used in the main application.

1.1 TemperatureReading Data Class

Create src/main/kotlin/dataclass/TemperatureReading.kt:

package dataclass

import com.fasterxml.jackson.annotation.JsonProperty
import java.io.Serializable

/**
* Represents a single temperature reading from an IoT sensor
*/
data class TemperatureReading(
@JsonProperty("deviceId")
val deviceId: String,

@JsonProperty("temperature")
val temperature: Double,

@JsonProperty("unit")
val unit: String = "Celsius",

@JsonProperty("timestamp")
val timestamp: Long,

@JsonProperty("location")
val location: String,

@JsonProperty("sensorType")
val sensorType: String = "DHT22"
) : Serializable {

/**
* Validates if the temperature reading is within expected range
*/
fun isValid(): Boolean {
return temperature in -50.0..100.0 && deviceId.isNotBlank()
}

/**
* Checks if temperature exceeds safe threshold
*/
fun isHighTemperature(threshold: Double = 35.0): Boolean {
return temperature > threshold
}

companion object {
private const val serialVersionUID = 1L
}
}

1.2 TemperatureAlert Data Class

Create src/main/kotlin/dataclass/TemperatureAlert.kt:

package dataclass

import com.fasterxml.jackson.annotation.JsonProperty
import java.io.Serializable

/**
* Represents a temperature alert event
*/
data class TemperatureAlert(
@JsonProperty("alertId")
val alertId: String,

@JsonProperty("deviceId")
val deviceId: String,

@JsonProperty("temperature")
val temperature: Double,

@JsonProperty("threshold")
val threshold: Double,

@JsonProperty("location")
val location: String,

@JsonProperty("triggeredAt")
val triggeredAt: Long,

@JsonProperty("severity")
val severity: AlertSeverity,

@JsonProperty("message")
val message: String
) : Serializable {

companion object {
private const val serialVersionUID = 1L

fun fromReading(reading: TemperatureReading, threshold: Double): TemperatureAlert {
val severity = when {
reading.temperature > threshold + 10 -> AlertSeverity.CRITICAL
reading.temperature > threshold + 5 -> AlertSeverity.HIGH
else -> AlertSeverity.MEDIUM
}

return TemperatureAlert(
alertId = "TEMP_${reading.deviceId}_${reading.timestamp}",
deviceId = reading.deviceId,
temperature = reading.temperature,
threshold = threshold,
location = reading.location,
triggeredAt = reading.timestamp,
severity = severity,
message = "Temperature ${reading.temperature}°C exceeds threshold ${threshold}°C at ${reading.location}"
)
}
}
}

enum class AlertSeverity {
MEDIUM, HIGH, CRITICAL
}

Step 2: Create Typed Output Fields

Following the pattern from 03-our-storm.md, create type-safe output field classes.

2.1 TemperatureProcessingBoltOutputField

Create src/main/kotlin/model/TemperatureProcessingBoltOutputField.kt:

package model

import dataclass.TemperatureReading
import org.apache.storm.tuple.Fields
import org.apache.storm.tuple.Values

/**
* Type-safe output fields for TemperatureProcessingBolt
* ✅ GOOD: Type-safe, prevents errors
*/
data class TemperatureProcessingBoltOutputField(
val partitionId: Int,
val reading: TemperatureReading,
val timestamp: Long,
val isValid: Boolean
) {
/**
* Convert to Storm Values for emitting
*/
val values: Values
get() = Values(partitionId, reading, timestamp, isValid)

companion object {
/**
* Define field names
*/
val fields: Fields = Fields("partitionId", "reading", "timestamp", "isValid")

/**
* Type-safe extraction from tuple
*/
fun fromValues(values: List<Any>): TemperatureProcessingBoltOutputField {
return TemperatureProcessingBoltOutputField(
partitionId = values[0] as Int,
reading = values[1] as TemperatureReading,
timestamp = values[2] as Long,
isValid = values[3] as Boolean
)
}
}
}

2.2 TemperatureAlertBoltOutputField

Create src/main/kotlin/model/TemperatureAlertBoltOutputField.kt:

package model

import dataclass.TemperatureAlert
import org.apache.storm.tuple.Fields
import org.apache.storm.tuple.Values

/**
* Type-safe output fields for TemperatureAlertBolt
*/
data class TemperatureAlertBoltOutputField(
val deviceId: String,
val alert: TemperatureAlert,
val triggeredAt: Long,
val notificationChannels: List<String>
) {
val values: Values
get() = Values(deviceId, alert, triggeredAt, notificationChannels)

companion object {
val fields: Fields = Fields("deviceId", "alert", "triggeredAt", "notificationChannels")

fun fromValues(values: List<Any>): TemperatureAlertBoltOutputField {
return TemperatureAlertBoltOutputField(
deviceId = values[0] as String,
alert = values[1] as TemperatureAlert,
triggeredAt = values[2] as Long,
@Suppress("UNCHECKED_CAST")
notificationChannels = values[3] as List<String>
)
}
}
}

Step 3: Create Stream and Field Enums

3.1 Add to StreamGroup Enum

Edit src/main/kotlin/enum/StreamGroup.kt and add:

enum class StreamGroup(val value: String) {
// ... existing streams ...

// Temperature monitoring streams
TEMPERATURE_DATA_STREAM("temperature-data-stream"),
TEMPERATURE_ALERT_STREAM("temperature-alert-stream"),
TEMPERATURE_AVERAGE_STREAM("temperature-average-stream");
}

3.2 Add to FieldGroup Enum

Edit src/main/kotlin/enum/FieldGroup.kt and add:

enum class FieldGroup(val value: String) {
// ... existing fields ...

// Temperature monitoring fields
DEVICE_ID("deviceId"),
TEMPERATURE("temperature"),
LOCATION("location");
}

Step 4: Build the Processing Bolt

Create src/main/kotlin/bolt/temperature/TemperatureProcessingBolt.kt:

package bolt.temperature

import dataclass.TemperatureReading
import model.TemperatureProcessingBoltOutputField
import org.apache.storm.task.OutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichBolt
import org.apache.storm.tuple.Tuple
import enum.StreamGroup
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue

/**
* Processes raw temperature sensor data
* - Validates readings
* - Filters junk data
* - Emits to downstream bolts
*/
class TemperatureProcessingBolt : BaseRichBolt() {

private lateinit var collector: OutputCollector
private val objectMapper = jacksonObjectMapper()

// Metrics
private var processedCount = 0
private var invalidCount = 0

companion object {
const val ID = "temperature-processing-bolt"
}

override fun prepare(
stormConf: MutableMap<String, Any>?,
context: TopologyContext?,
collector: OutputCollector?
) {
this.collector = collector!!
}

override fun execute(tuple: Tuple?) {
try {
// Extract partition ID from tuple
val partitionId = tuple?.getIntegerByField("partitionId") ?: 0

// Parse JSON message to TemperatureReading
val message = tuple?.getStringByField("message") ?: return
val reading = objectMapper.readValue<TemperatureReading>(message)

val timestamp = System.currentTimeMillis()

// Validate the reading
val isValid = reading.isValid()

if (isValid) {
processedCount++

// ✅ GOOD: Use typed output field
collector.emit(
StreamGroup.TEMPERATURE_DATA_STREAM.value,
tuple,
TemperatureProcessingBoltOutputField(
partitionId = partitionId,
reading = reading,
timestamp = timestamp,
isValid = true
).values
)

// Ack the tuple
collector.ack(tuple)
} else {
invalidCount++
// Log invalid reading
println("Invalid temperature reading from device ${reading.deviceId}: ${reading.temperature}")
collector.ack(tuple)
}

} catch (e: Exception) {
println("Error processing temperature data: ${e.message}")
collector.fail(tuple)
}
}

override fun declareOutputFields(declarer: OutputFieldsDeclarer?) {
// ✅ GOOD: Use typed output field class
declarer?.declareStream(
StreamGroup.TEMPERATURE_DATA_STREAM.value,
TemperatureProcessingBoltOutputField.fields
)
}

override fun cleanup() {
println("TemperatureProcessingBolt Stats:")
println(" Processed: $processedCount")
println(" Invalid: $invalidCount")
}
}

Step 5: Build the Alert Bolt with State Management

Create src/main/kotlin/bolt/temperature/TemperatureAlertBolt.kt:

package bolt.temperature

import dataclass.TemperatureAlert
import dataclass.TemperatureReading
import model.TemperatureProcessingBoltOutputField
import model.TemperatureAlertBoltOutputField
import org.apache.storm.task.OutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichBolt
import org.apache.storm.tuple.Tuple
import enum.StreamGroup
import java.util.concurrent.ConcurrentHashMap

/**
* Detects temperature alerts with state management
* - Tracks alert state per device
* - Prevents duplicate alerts
* - Emits only when crossing threshold
*/
class TemperatureAlertBolt : BaseRichBolt() {

private lateinit var collector: OutputCollector

// State management: Track alert state per device
// Key: deviceId, Value: AlertState
private val deviceAlertState = ConcurrentHashMap<String, AlertState>()

// Configuration
private val temperatureThreshold = 35.0
private val alertCooldownMillis = 300000 // 5 minutes

// Metrics
private var alertsTriggered = 0
private var alertsSuppressed = 0

companion object {
const val ID = "temperature-alert-bolt"
}

override fun prepare(
stormConf: MutableMap<String, Any>?,
context: TopologyContext?,
collector: OutputCollector?
) {
this.collector = collector!!
}

override fun execute(tuple: Tuple?) {
try {
// ✅ GOOD: Type-safe extraction using output field class
val data = TemperatureProcessingBoltOutputField.fromValues(tuple?.values ?: return)

// Only process valid readings
if (!data.isValid) {
collector.ack(tuple)
return
}

val reading = data.reading
val deviceId = reading.deviceId

// Check if temperature exceeds threshold
if (reading.isHighTemperature(temperatureThreshold)) {

// Get or create alert state for this device
val state = deviceAlertState.getOrPut(deviceId) { AlertState() }

// Check if we should trigger alert (not in cooldown)
val now = System.currentTimeMillis()
val canTriggerAlert = !state.isInAlert ||
(now - state.lastAlertTime) > alertCooldownMillis

if (canTriggerAlert) {
// Create alert
val alert = TemperatureAlert.fromReading(reading, temperatureThreshold)

// Update state
state.isInAlert = true
state.lastAlertTime = now
state.alertCount++

alertsTriggered++

// ✅ GOOD: Use typed output field
collector.emit(
StreamGroup.TEMPERATURE_ALERT_STREAM.value,
tuple,
TemperatureAlertBoltOutputField(
deviceId = deviceId,
alert = alert,
triggeredAt = now,
notificationChannels = listOf("email", "sms", "dashboard")
).values
)

println("🚨 ALERT: ${alert.message}")
} else {
alertsSuppressed++
}

} else {
// Temperature normal - clear alert state if exists
deviceAlertState[deviceId]?.let { state ->
if (state.isInAlert) {
state.isInAlert = false
println("✅ Normal: Temperature back to normal for device $deviceId")
}
}
}

collector.ack(tuple)

} catch (e: Exception) {
println("Error in TemperatureAlertBolt: ${e.message}")
collector.fail(tuple)
}
}

override fun declareOutputFields(declarer: OutputFieldsDeclarer?) {
// ✅ GOOD: Use typed output field class
declarer?.declareStream(
StreamGroup.TEMPERATURE_ALERT_STREAM.value,
TemperatureAlertBoltOutputField.fields
)
}

override fun cleanup() {
println("TemperatureAlertBolt Stats:")
println(" Alerts Triggered: $alertsTriggered")
println(" Alerts Suppressed (cooldown): $alertsSuppressed")
println(" Devices Tracked: ${deviceAlertState.size}")
}

/**
* State class to track alert status per device
*/
data class AlertState(
var isInAlert: Boolean = false,
var lastAlertTime: Long = 0L,
var alertCount: Int = 0
)
}

Step 6: Build the Average Calculation Bolt

Create src/main/kotlin/bolt/temperature/TemperatureAverageBolt.kt:

package bolt.temperature

import dataclass.TemperatureReading
import model.TemperatureProcessingBoltOutputField
import org.apache.storm.task.OutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichBolt
import org.apache.storm.topology.IRichBolt
import org.apache.storm.tuple.Tuple
import org.apache.storm.tuple.Fields
import org.apache.storm.tuple.Values
import enum.StreamGroup
import java.util.concurrent.ConcurrentHashMap

/**
* Calculates hourly temperature averages per device
* Uses time-windowed state management
*/
class TemperatureAverageBolt : BaseRichBolt() {

private lateinit var collector: OutputCollector

// State: Track readings per device per hour
// Key: "deviceId:hourBucket", Value: ReadingAccumulator
private val hourlyData = ConcurrentHashMap<String, ReadingAccumulator>()

// Window size: 1 hour in milliseconds
private val windowSizeMillis = 3600000L

companion object {
const val ID = "temperature-average-bolt"
}

override fun prepare(
stormConf: MutableMap<String, Any>?,
context: TopologyContext?,
collector: OutputCollector?
) {
this.collector = collector!!
}

override fun execute(tuple: Tuple?) {
try {
// ✅ GOOD: Type-safe extraction
val data = TemperatureProcessingBoltOutputField.fromValues(tuple?.values ?: return)

if (!data.isValid) {
collector.ack(tuple)
return
}

val reading = data.reading
val deviceId = reading.deviceId

// Calculate hour bucket (unix timestamp / 1 hour)
val hourBucket = reading.timestamp / windowSizeMillis
val key = "$deviceId:$hourBucket"

// Get or create accumulator for this hour
val accumulator = hourlyData.getOrPut(key) {
ReadingAccumulator(
deviceId = deviceId,
hourBucket = hourBucket,
location = reading.location
)
}

// Add reading to accumulator
accumulator.addReading(reading.temperature)

// Check if we should emit (e.g., every 10 readings or end of hour)
if (accumulator.count % 10 == 0) {
val average = accumulator.getAverage()

// Emit average to database bolt
collector.emit(
StreamGroup.TEMPERATURE_AVERAGE_STREAM.value,
tuple,
Values(
deviceId,
hourBucket,
average,
accumulator.count,
accumulator.min,
accumulator.max,
reading.location
)
)

println("📊 Average for $deviceId (hour $hourBucket): $average°C (${accumulator.count} readings)")
}

// Clean old data (older than 24 hours)
cleanOldData(hourBucket)

collector.ack(tuple)

} catch (e: Exception) {
println("Error in TemperatureAverageBolt: ${e.message}")
collector.fail(tuple)
}
}

override fun declareOutputFields(declarer: OutputFieldsDeclarer?) {
declarer?.declareStream(
StreamGroup.TEMPERATURE_AVERAGE_STREAM.value,
Fields("deviceId", "hourBucket", "average", "count", "min", "max", "location")
)
}

/**
* Remove data older than 24 hours
*/
private fun cleanOldData(currentHourBucket: Long) {
val cutoffBucket = currentHourBucket - 24 // 24 hours ago
hourlyData.entries.removeIf { (key, _) ->
val bucket = key.split(":")[1].toLong()
bucket < cutoffBucket
}
}

/**
* Accumulator for hourly readings
*/
data class ReadingAccumulator(
val deviceId: String,
val hourBucket: Long,
val location: String,
var sum: Double = 0.0,
var count: Int = 0,
var min: Double = Double.MAX_VALUE,
var max: Double = Double.MIN_VALUE
) {
fun addReading(temperature: Double) {
sum += temperature
count++
if (temperature < min) min = temperature
if (temperature > max) max = temperature
}

fun getAverage(): Double = if (count > 0) sum / count else 0.0
}
}

Step 7: Create the Notification Bolt

Create src/main/kotlin/bolt/temperature/TemperatureNotificationBolt.kt:

package bolt.temperature

import model.TemperatureAlertBoltOutputField
import org.apache.storm.task.OutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichBolt
import org.apache.storm.tuple.Tuple

/**
* Sends notifications for temperature alerts
* In production, this would integrate with:
* - Email service (SendGrid, AWS SES)
* - SMS gateway (Twilio)
* - Push notification service (FCM)
* - Dashboard websocket
*/
class TemperatureNotificationBolt : BaseRichBolt() {

private lateinit var collector: OutputCollector
private var notificationsSent = 0

companion object {
const val ID = "temperature-notification-bolt"
}

override fun prepare(
stormConf: MutableMap<String, Any>?,
context: TopologyContext?,
collector: OutputCollector?
) {
this.collector = collector!!
}

override fun execute(tuple: Tuple?) {
try {
// ✅ GOOD: Type-safe extraction
val alertData = TemperatureAlertBoltOutputField.fromValues(tuple?.values ?: return)

val alert = alertData.alert

// Send notifications to each channel
alertData.notificationChannels.forEach { channel ->
when (channel) {
"email" -> sendEmailNotification(alert.message, alert.deviceId)
"sms" -> sendSmsNotification(alert.message, alert.deviceId)
"dashboard" -> updateDashboard(alert)
else -> println("Unknown channel: $channel")
}
}

notificationsSent++
collector.ack(tuple)

} catch (e: Exception) {
println("Error sending notification: ${e.message}")
collector.fail(tuple)
}
}

override fun declareOutputFields(declarer: OutputFieldsDeclarer?) {
// Terminal bolt - no output streams
}

// Mock notification methods (replace with actual integrations)

private fun sendEmailNotification(message: String, deviceId: String) {
println("📧 EMAIL: Alert for device $deviceId - $message")
// TODO: Integrate with email service
// emailService.send(to = "admin@example.com", subject = "Temperature Alert", body = message)
}

private fun sendSmsNotification(message: String, deviceId: String) {
println("📱 SMS: Alert for device $deviceId")
// TODO: Integrate with SMS gateway
// smsGateway.send(to = "+1234567890", message = message)
}

private fun updateDashboard(alert: dataclass.TemperatureAlert) {
println("📊 DASHBOARD: Updated with alert ${alert.alertId}")
// TODO: Publish to websocket or push to dashboard API
// dashboardClient.publishAlert(alert)
}

override fun cleanup() {
println("TemperatureNotificationBolt Stats:")
println(" Notifications Sent: $notificationsSent")
}
}

Step 8: Build the Topology

Create src/main/kotlin/topology/TemperatureTopologyBuilder.kt:

package topology

import bolt.temperature.*
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.Fields
import enum.StreamGroup
import enum.FieldGroup

/**
* Builds the Temperature Monitoring Topology
*/
object TemperatureTopologyBuilder {

fun build(
spout: BaseRichSpout,
partitionCount: Int = 4
): TopologyBuilder {

val builder = TopologyBuilder()

// 1. Set the spout (data source)
builder.setSpout("temperature-spout", spout, partitionCount)

// 2. Temperature Processing Bolt
// - Parallelism: 4 (one per partition)
// - Grouping: Shuffle (stateless, distribute evenly)
builder.setBolt(
TemperatureProcessingBolt.ID,
TemperatureProcessingBolt(),
partitionCount
)
.shuffleGrouping("temperature-spout")
.setNumTasks(4)

// 3. Temperature Alert Bolt
// - Parallelism: 4
// - Grouping: Fields by deviceId (stateful - same device to same bolt instance)
builder.setBolt(
TemperatureAlertBolt.ID,
TemperatureAlertBolt(),
partitionCount
)
.fieldsGrouping(
TemperatureProcessingBolt.ID,
StreamGroup.TEMPERATURE_DATA_STREAM.value,
Fields(FieldGroup.DEVICE_ID.value) // ✅ GOOD: Group by device for state management
)
.setNumTasks(4)

// 4. Temperature Average Bolt
// - Parallelism: 4
// - Grouping: Fields by deviceId (stateful - time windows per device)
builder.setBolt(
TemperatureAverageBolt.ID,
TemperatureAverageBolt(),
partitionCount
)
.fieldsGrouping(
TemperatureProcessingBolt.ID,
StreamGroup.TEMPERATURE_DATA_STREAM.value,
Fields(FieldGroup.DEVICE_ID.value)
)
.setNumTasks(4)

// 5. Notification Bolt
// - Parallelism: 2 (less CPU intensive)
// - Grouping: Shuffle (no state needed)
builder.setBolt(
TemperatureNotificationBolt.ID,
TemperatureNotificationBolt(),
2
)
.shuffleGrouping(
TemperatureAlertBolt.ID,
StreamGroup.TEMPERATURE_ALERT_STREAM.value
)
.setNumTasks(2)

return builder
}
}

Step 9: Create a Test Spout (Data Generator)

For testing, create a spout that generates sample temperature data:

Create src/main/kotlin/spout/TemperatureTestSpout.kt:

package spout

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import dataclass.TemperatureReading
import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.Fields
import org.apache.storm.tuple.Values
import kotlin.random.Random

/**
* Test spout that generates simulated temperature sensor data
*/
class TemperatureTestSpout : BaseRichSpout() {

private lateinit var collector: SpoutOutputCollector
private val objectMapper = jacksonObjectMapper()

// Simulated devices
private val devices = listOf(
"SENSOR_001" to "Warehouse A - Zone 1",
"SENSOR_002" to "Warehouse A - Zone 2",
"SENSOR_003" to "Warehouse B - Zone 1",
"SENSOR_004" to "Warehouse B - Zone 2",
"SENSOR_005" to "Warehouse C - Zone 1"
)

private var emitCount = 0L

override fun open(
conf: MutableMap<String, Any>?,
context: TopologyContext?,
collector: SpoutOutputCollector?
) {
this.collector = collector!!
}

override fun nextTuple() {
try {
// Sleep briefly to simulate real sensor data rate
Thread.sleep(1000) // 1 reading per second per device

// Pick a random device
val (deviceId, location) = devices.random()

// Generate temperature reading
// Base temperature: 20-30°C
// 10% chance of high temperature (35-45°C) for testing alerts
val temperature = if (Random.nextDouble() < 0.1) {
Random.nextDouble(35.0, 45.0) // High temperature
} else {
Random.nextDouble(20.0, 30.0) // Normal temperature
}

val reading = TemperatureReading(
deviceId = deviceId,
temperature = temperature,
unit = "Celsius",
timestamp = System.currentTimeMillis(),
location = location,
sensorType = "DHT22"
)

// Convert to JSON
val json = objectMapper.writeValueAsString(reading)

// Emit tuple
val partitionId = devices.indexOf(deviceId to location) % 4
collector.emit(
Values(partitionId, json),
emitCount++ // Message ID for acking
)

} catch (e: Exception) {
println("Error in TemperatureTestSpout: ${e.message}")
}
}

override fun declareOutputFields(declarer: OutputFieldsDeclarer?) {
declarer?.declare(Fields("partitionId", "message"))
}

override fun ack(msgId: Any?) {
// Successfully processed
}

override fun fail(msgId: Any?) {
// Failed to process - could retry here
println("Failed to process message: $msgId")
}
}

Step 10: Create Main Entry Point

Create src/main/kotlin/TemperatureMonitoringMain.kt:

import org.apache.storm.Config
import org.apache.storm.LocalCluster
import org.apache.storm.StormSubmitter
import spout.TemperatureTestSpout
import topology.TemperatureTopologyBuilder

fun main(args: Array<String>) {
// Build topology
val spout = TemperatureTestSpout()
val topology = TemperatureTopologyBuilder.build(spout, partitionCount = 4).createTopology()

// Configure Storm
val config = Config()
config.setDebug(false)
config.setNumWorkers(2)
config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1000

// Determine environment
val isLocal = args.isEmpty() || args[0] == "local"

if (isLocal) {
// Run locally for testing
println("🚀 Starting Temperature Monitoring Topology (LOCAL MODE)")
config.setMaxTaskParallelism(2)

val cluster = LocalCluster()
cluster.submitTopology("temperature-monitoring", config, topology)

// Run for 2 minutes then shutdown
Thread.sleep(120000)
cluster.shutdown()

println("✅ Topology stopped")
} else {
// Submit to remote cluster
val topologyName = args[0]
println("🚀 Submitting Temperature Monitoring Topology: $topologyName")

StormSubmitter.submitTopology(topologyName, config, topology)
println("✅ Topology submitted successfully")
}
}

Step 11: Build and Run

Build the Project

# Clean and build
./gradlew clean build

# Create fat JAR
./gradlew shadowJar

Run Locally

# Run with Gradle
./gradlew run --args="local"

# Or run JAR directly
java -cp build/libs/aquagen-storm-1.0-d7-shaded.jar TemperatureMonitoringMainKt local

Expected Output

You should see output like:

🚀 Starting Temperature Monitoring Topology (LOCAL MODE)
📊 Average for SENSOR_001 (hour 491234): 24.5°C (10 readings)
🚨 ALERT: Temperature 38.2°C exceeds threshold 35.0°C at Warehouse A - Zone 1
📧 EMAIL: Alert for device SENSOR_001 - Temperature 38.2°C exceeds threshold 35.0°C
📱 SMS: Alert for device SENSOR_001
📊 DASHBOARD: Updated with alert TEMP_SENSOR_001_1699876543123
✅ Normal: Temperature back to normal for device SENSOR_001
📊 Average for SENSOR_003 (hour 491234): 26.1°C (10 readings)
...

Step 12: Add to Main Topology (Optional)

To integrate into the main application topology, edit StormTopologyBuilder.kt:

// Add temperature monitoring bolts
builder.setBolt(
TemperatureProcessingBolt.ID,
TemperatureProcessingBolt(),
config.partitionCount
)
.fieldsGrouping(
HubLoggerBolt.ID,
StreamGroup.TEMPERATURE_DATA_STREAM.value, // Assuming Event Hub sends temp data
Fields(FieldGroup.PARTITION_ID.value)
)

// Add alert bolt
builder.setBolt(
TemperatureAlertBolt.ID,
TemperatureAlertBolt(),
config.partitionCount
)
.fieldsGrouping(
TemperatureProcessingBolt.ID,
StreamGroup.TEMPERATURE_DATA_STREAM.value,
Fields(FieldGroup.DEVICE_ID.value)
)

// Add average bolt
builder.setBolt(
TemperatureAverageBolt.ID,
TemperatureAverageBolt(),
config.partitionCount
)
.fieldsGrouping(
TemperatureProcessingBolt.ID,
StreamGroup.TEMPERATURE_DATA_STREAM.value,
Fields(FieldGroup.DEVICE_ID.value)
)

// Add notification bolt
builder.setBolt(
TemperatureNotificationBolt.ID,
TemperatureNotificationBolt(),
2
)
.shuffleGrouping(
TemperatureAlertBolt.ID,
StreamGroup.TEMPERATURE_ALERT_STREAM.value
)

Key Takeaways

What You Learned

Data Modeling

  • Created type-safe data classes with validation
  • Used @JsonProperty for JSON serialization
  • Implemented companion object factory methods

Typed Output Fields

  • Created output field classes with .fields, .values, and fromValues()
  • Used type-safe emitting instead of raw Values()
  • Prevented runtime errors with compile-time safety

Bolt Development

  • Implemented prepare(), execute(), declareOutputFields(), cleanup()
  • Used collector.emit(), collector.ack(), collector.fail()
  • Added error handling and metrics

State Management

  • Used ConcurrentHashMap for stateful processing
  • Implemented alert cooldown logic
  • Created time-windowed aggregations

Stream Groupings

  • Shuffle Grouping: For stateless bolts (even distribution)
  • Fields Grouping: For stateful bolts (same key → same instance)
  • Understood parallelism and task allocation

Best Practices

  • Type-safe tuple handling
  • Proper anchoring for reliability
  • Cleanup of old state data
  • Comprehensive logging and metrics

Next Steps

Extend the Project

Try these enhancements:

  1. Add Database Integration

    • Store readings in Cosmos DB
    • Query historical data
    • Implement data retention policies
  2. Add More Alert Types

    • Low temperature alerts (freezer failure)
    • Rapid temperature change detection
    • Sensor offline detection (no data for X minutes)
  3. Add Real Event Hub Integration

    • Replace test spout with Event Hub spout
    • Configure consumer groups
    • Handle partitioning
  4. Add Dashboard

    • Create REST API to query Storm state
    • WebSocket for real-time updates
    • Visualization with charts
  5. Improve Notifications

    • Integrate with SendGrid for emails
    • Use Twilio for SMS
    • Add FCM push notifications
    • Implement notification preferences per user

Deploy to Production

Follow these steps:

  1. Change environment in StormConstants.kt to Environment.DEV
  2. Build JAR: ./gradlew shadowJar
  3. Copy to VM: scp build/libs/*.jar stream@20.197.21.30:~/storm-setup
  4. Deploy using initstorm.sh (Option 4)
  5. Monitor via Storm UI: http://20.197.21.30:6800

Summary

You've successfully built a complete Storm topology with:

  • ✅ 5 data classes (readings, alerts, state)
  • ✅ 2 typed output field classes
  • ✅ 4 bolts (processing, alert, average, notification)
  • ✅ 1 test spout
  • ✅ 1 topology builder
  • ✅ State management with maps
  • ✅ Multiple stream groupings
  • ✅ Type-safe tuple handling

This example demonstrates all the core patterns used in the main Storm application. You can now confidently build your own Storm topologies!