Sample Storm Trial Project
Build a complete Temperature Monitoring and Alert System from scratch using all the patterns and best practices from our Storm implementation. This hands-on tutorial will guide you through creating a real working Storm topology.
By the end of this tutorial, you'll be able to:
- Create typed data models for Storm tuples
- Build custom bolts with state management
- Implement alert detection logic
- Integrate with external systems (databases)
- Test and deploy a complete topology
Project Overview
What We're Building
A Real-Time Temperature Monitoring System that:
- Receives temperature sensor data from IoT devices
- Processes readings in real-time
- Detects high temperature alerts (> 35°C)
- Calculates hourly averages
- Stores data in a database
- Sends notifications when alerts trigger
Use Case
Scenario: A warehouse has 50 temperature sensors monitoring refrigerated storage units. We need to:
- Track temperature in real-time
- Alert when temperature exceeds safe threshold
- Maintain hourly temperature averages for compliance reporting
- Detect sensor failures (no data for 5 minutes)
Step 1: Create Data Models
First, let's create our data classes following the exact patterns used in the main application.
1.1 TemperatureReading Data Class
Create src/main/kotlin/dataclass/TemperatureReading.kt:
package dataclass
import com.fasterxml.jackson.annotation.JsonProperty
import java.io.Serializable
/**
* Represents a single temperature reading from an IoT sensor
*/
data class TemperatureReading(
@JsonProperty("deviceId")
val deviceId: String,
@JsonProperty("temperature")
val temperature: Double,
@JsonProperty("unit")
val unit: String = "Celsius",
@JsonProperty("timestamp")
val timestamp: Long,
@JsonProperty("location")
val location: String,
@JsonProperty("sensorType")
val sensorType: String = "DHT22"
) : Serializable {
/**
* Validates if the temperature reading is within expected range
*/
fun isValid(): Boolean {
return temperature in -50.0..100.0 && deviceId.isNotBlank()
}
/**
* Checks if temperature exceeds safe threshold
*/
fun isHighTemperature(threshold: Double = 35.0): Boolean {
return temperature > threshold
}
companion object {
private const val serialVersionUID = 1L
}
}
1.2 TemperatureAlert Data Class
Create src/main/kotlin/dataclass/TemperatureAlert.kt:
package dataclass
import com.fasterxml.jackson.annotation.JsonProperty
import java.io.Serializable
/**
* Represents a temperature alert event
*/
data class TemperatureAlert(
@JsonProperty("alertId")
val alertId: String,
@JsonProperty("deviceId")
val deviceId: String,
@JsonProperty("temperature")
val temperature: Double,
@JsonProperty("threshold")
val threshold: Double,
@JsonProperty("location")
val location: String,
@JsonProperty("triggeredAt")
val triggeredAt: Long,
@JsonProperty("severity")
val severity: AlertSeverity,
@JsonProperty("message")
val message: String
) : Serializable {
companion object {
private const val serialVersionUID = 1L
fun fromReading(reading: TemperatureReading, threshold: Double): TemperatureAlert {
val severity = when {
reading.temperature > threshold + 10 -> AlertSeverity.CRITICAL
reading.temperature > threshold + 5 -> AlertSeverity.HIGH
else -> AlertSeverity.MEDIUM
}
return TemperatureAlert(
alertId = "TEMP_${reading.deviceId}_${reading.timestamp}",
deviceId = reading.deviceId,
temperature = reading.temperature,
threshold = threshold,
location = reading.location,
triggeredAt = reading.timestamp,
severity = severity,
message = "Temperature ${reading.temperature}°C exceeds threshold ${threshold}°C at ${reading.location}"
)
}
}
}
enum class AlertSeverity {
MEDIUM, HIGH, CRITICAL
}
Step 2: Create Typed Output Fields
Following the pattern from 03-our-storm.md, create type-safe output field classes.
2.1 TemperatureProcessingBoltOutputField
Create src/main/kotlin/model/TemperatureProcessingBoltOutputField.kt:
package model
import dataclass.TemperatureReading
import org.apache.storm.tuple.Fields
import org.apache.storm.tuple.Values
/**
* Type-safe output fields for TemperatureProcessingBolt
* ✅ GOOD: Type-safe, prevents errors
*/
data class TemperatureProcessingBoltOutputField(
val partitionId: Int,
val reading: TemperatureReading,
val timestamp: Long,
val isValid: Boolean
) {
/**
* Convert to Storm Values for emitting
*/
val values: Values
get() = Values(partitionId, reading, timestamp, isValid)
companion object {
/**
* Define field names
*/
val fields: Fields = Fields("partitionId", "reading", "timestamp", "isValid")
/**
* Type-safe extraction from tuple
*/
fun fromValues(values: List<Any>): TemperatureProcessingBoltOutputField {
return TemperatureProcessingBoltOutputField(
partitionId = values[0] as Int,
reading = values[1] as TemperatureReading,
timestamp = values[2] as Long,
isValid = values[3] as Boolean
)
}
}
}
2.2 TemperatureAlertBoltOutputField
Create src/main/kotlin/model/TemperatureAlertBoltOutputField.kt:
package model
import dataclass.TemperatureAlert
import org.apache.storm.tuple.Fields
import org.apache.storm.tuple.Values
/**
* Type-safe output fields for TemperatureAlertBolt
*/
data class TemperatureAlertBoltOutputField(
val deviceId: String,
val alert: TemperatureAlert,
val triggeredAt: Long,
val notificationChannels: List<String>
) {
val values: Values
get() = Values(deviceId, alert, triggeredAt, notificationChannels)
companion object {
val fields: Fields = Fields("deviceId", "alert", "triggeredAt", "notificationChannels")
fun fromValues(values: List<Any>): TemperatureAlertBoltOutputField {
return TemperatureAlertBoltOutputField(
deviceId = values[0] as String,
alert = values[1] as TemperatureAlert,
triggeredAt = values[2] as Long,
@Suppress("UNCHECKED_CAST")
notificationChannels = values[3] as List<String>
)
}
}
}
Step 3: Create Stream and Field Enums
3.1 Add to StreamGroup Enum
Edit src/main/kotlin/enum/StreamGroup.kt and add:
enum class StreamGroup(val value: String) {
// ... existing streams ...
// Temperature monitoring streams
TEMPERATURE_DATA_STREAM("temperature-data-stream"),
TEMPERATURE_ALERT_STREAM("temperature-alert-stream"),
TEMPERATURE_AVERAGE_STREAM("temperature-average-stream");
}
3.2 Add to FieldGroup Enum
Edit src/main/kotlin/enum/FieldGroup.kt and add:
enum class FieldGroup(val value: String) {
// ... existing fields ...
// Temperature monitoring fields
DEVICE_ID("deviceId"),
TEMPERATURE("temperature"),
LOCATION("location");
}
Step 4: Build the Processing Bolt
Create src/main/kotlin/bolt/temperature/TemperatureProcessingBolt.kt:
package bolt.temperature
import dataclass.TemperatureReading
import model.TemperatureProcessingBoltOutputField
import org.apache.storm.task.OutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichBolt
import org.apache.storm.tuple.Tuple
import enum.StreamGroup
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
/**
* Processes raw temperature sensor data
* - Validates readings
* - Filters junk data
* - Emits to downstream bolts
*/
class TemperatureProcessingBolt : BaseRichBolt() {
private lateinit var collector: OutputCollector
private val objectMapper = jacksonObjectMapper()
// Metrics
private var processedCount = 0
private var invalidCount = 0
companion object {
const val ID = "temperature-processing-bolt"
}
override fun prepare(
stormConf: MutableMap<String, Any>?,
context: TopologyContext?,
collector: OutputCollector?
) {
this.collector = collector!!
}
override fun execute(tuple: Tuple?) {
try {
// Extract partition ID from tuple
val partitionId = tuple?.getIntegerByField("partitionId") ?: 0
// Parse JSON message to TemperatureReading
val message = tuple?.getStringByField("message") ?: return
val reading = objectMapper.readValue<TemperatureReading>(message)
val timestamp = System.currentTimeMillis()
// Validate the reading
val isValid = reading.isValid()
if (isValid) {
processedCount++
// ✅ GOOD: Use typed output field
collector.emit(
StreamGroup.TEMPERATURE_DATA_STREAM.value,
tuple,
TemperatureProcessingBoltOutputField(
partitionId = partitionId,
reading = reading,
timestamp = timestamp,
isValid = true
).values
)
// Ack the tuple
collector.ack(tuple)
} else {
invalidCount++
// Log invalid reading
println("Invalid temperature reading from device ${reading.deviceId}: ${reading.temperature}")
collector.ack(tuple)
}
} catch (e: Exception) {
println("Error processing temperature data: ${e.message}")
collector.fail(tuple)
}
}
override fun declareOutputFields(declarer: OutputFieldsDeclarer?) {
// ✅ GOOD: Use typed output field class
declarer?.declareStream(
StreamGroup.TEMPERATURE_DATA_STREAM.value,
TemperatureProcessingBoltOutputField.fields
)
}
override fun cleanup() {
println("TemperatureProcessingBolt Stats:")
println(" Processed: $processedCount")
println(" Invalid: $invalidCount")
}
}
Step 5: Build the Alert Bolt with State Management
Create src/main/kotlin/bolt/temperature/TemperatureAlertBolt.kt:
package bolt.temperature
import dataclass.TemperatureAlert
import dataclass.TemperatureReading
import model.TemperatureProcessingBoltOutputField
import model.TemperatureAlertBoltOutputField
import org.apache.storm.task.OutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichBolt
import org.apache.storm.tuple.Tuple
import enum.StreamGroup
import java.util.concurrent.ConcurrentHashMap
/**
* Detects temperature alerts with state management
* - Tracks alert state per device
* - Prevents duplicate alerts
* - Emits only when crossing threshold
*/
class TemperatureAlertBolt : BaseRichBolt() {
private lateinit var collector: OutputCollector
// State management: Track alert state per device
// Key: deviceId, Value: AlertState
private val deviceAlertState = ConcurrentHashMap<String, AlertState>()
// Configuration
private val temperatureThreshold = 35.0
private val alertCooldownMillis = 300000 // 5 minutes
// Metrics
private var alertsTriggered = 0
private var alertsSuppressed = 0
companion object {
const val ID = "temperature-alert-bolt"
}
override fun prepare(
stormConf: MutableMap<String, Any>?,
context: TopologyContext?,
collector: OutputCollector?
) {
this.collector = collector!!
}
override fun execute(tuple: Tuple?) {
try {
// ✅ GOOD: Type-safe extraction using output field class
val data = TemperatureProcessingBoltOutputField.fromValues(tuple?.values ?: return)
// Only process valid readings
if (!data.isValid) {
collector.ack(tuple)
return
}
val reading = data.reading
val deviceId = reading.deviceId
// Check if temperature exceeds threshold
if (reading.isHighTemperature(temperatureThreshold)) {
// Get or create alert state for this device
val state = deviceAlertState.getOrPut(deviceId) { AlertState() }
// Check if we should trigger alert (not in cooldown)
val now = System.currentTimeMillis()
val canTriggerAlert = !state.isInAlert ||
(now - state.lastAlertTime) > alertCooldownMillis
if (canTriggerAlert) {
// Create alert
val alert = TemperatureAlert.fromReading(reading, temperatureThreshold)
// Update state
state.isInAlert = true
state.lastAlertTime = now
state.alertCount++
alertsTriggered++
// ✅ GOOD: Use typed output field
collector.emit(
StreamGroup.TEMPERATURE_ALERT_STREAM.value,
tuple,
TemperatureAlertBoltOutputField(
deviceId = deviceId,
alert = alert,
triggeredAt = now,
notificationChannels = listOf("email", "sms", "dashboard")
).values
)
println("🚨 ALERT: ${alert.message}")
} else {
alertsSuppressed++
}
} else {
// Temperature normal - clear alert state if exists
deviceAlertState[deviceId]?.let { state ->
if (state.isInAlert) {
state.isInAlert = false
println("✅ Normal: Temperature back to normal for device $deviceId")
}
}
}
collector.ack(tuple)
} catch (e: Exception) {
println("Error in TemperatureAlertBolt: ${e.message}")
collector.fail(tuple)
}
}
override fun declareOutputFields(declarer: OutputFieldsDeclarer?) {
// ✅ GOOD: Use typed output field class
declarer?.declareStream(
StreamGroup.TEMPERATURE_ALERT_STREAM.value,
TemperatureAlertBoltOutputField.fields
)
}
override fun cleanup() {
println("TemperatureAlertBolt Stats:")
println(" Alerts Triggered: $alertsTriggered")
println(" Alerts Suppressed (cooldown): $alertsSuppressed")
println(" Devices Tracked: ${deviceAlertState.size}")
}
/**
* State class to track alert status per device
*/
data class AlertState(
var isInAlert: Boolean = false,
var lastAlertTime: Long = 0L,
var alertCount: Int = 0
)
}
Step 6: Build the Average Calculation Bolt
Create src/main/kotlin/bolt/temperature/TemperatureAverageBolt.kt:
package bolt.temperature
import dataclass.TemperatureReading
import model.TemperatureProcessingBoltOutputField
import org.apache.storm.task.OutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichBolt
import org.apache.storm.topology.IRichBolt
import org.apache.storm.tuple.Tuple
import org.apache.storm.tuple.Fields
import org.apache.storm.tuple.Values
import enum.StreamGroup
import java.util.concurrent.ConcurrentHashMap
/**
* Calculates hourly temperature averages per device
* Uses time-windowed state management
*/
class TemperatureAverageBolt : BaseRichBolt() {
private lateinit var collector: OutputCollector
// State: Track readings per device per hour
// Key: "deviceId:hourBucket", Value: ReadingAccumulator
private val hourlyData = ConcurrentHashMap<String, ReadingAccumulator>()
// Window size: 1 hour in milliseconds
private val windowSizeMillis = 3600000L
companion object {
const val ID = "temperature-average-bolt"
}
override fun prepare(
stormConf: MutableMap<String, Any>?,
context: TopologyContext?,
collector: OutputCollector?
) {
this.collector = collector!!
}
override fun execute(tuple: Tuple?) {
try {
// ✅ GOOD: Type-safe extraction
val data = TemperatureProcessingBoltOutputField.fromValues(tuple?.values ?: return)
if (!data.isValid) {
collector.ack(tuple)
return
}
val reading = data.reading
val deviceId = reading.deviceId
// Calculate hour bucket (unix timestamp / 1 hour)
val hourBucket = reading.timestamp / windowSizeMillis
val key = "$deviceId:$hourBucket"
// Get or create accumulator for this hour
val accumulator = hourlyData.getOrPut(key) {
ReadingAccumulator(
deviceId = deviceId,
hourBucket = hourBucket,
location = reading.location
)
}
// Add reading to accumulator
accumulator.addReading(reading.temperature)
// Check if we should emit (e.g., every 10 readings or end of hour)
if (accumulator.count % 10 == 0) {
val average = accumulator.getAverage()
// Emit average to database bolt
collector.emit(
StreamGroup.TEMPERATURE_AVERAGE_STREAM.value,
tuple,
Values(
deviceId,
hourBucket,
average,
accumulator.count,
accumulator.min,
accumulator.max,
reading.location
)
)
println("📊 Average for $deviceId (hour $hourBucket): $average°C (${accumulator.count} readings)")
}
// Clean old data (older than 24 hours)
cleanOldData(hourBucket)
collector.ack(tuple)
} catch (e: Exception) {
println("Error in TemperatureAverageBolt: ${e.message}")
collector.fail(tuple)
}
}
override fun declareOutputFields(declarer: OutputFieldsDeclarer?) {
declarer?.declareStream(
StreamGroup.TEMPERATURE_AVERAGE_STREAM.value,
Fields("deviceId", "hourBucket", "average", "count", "min", "max", "location")
)
}
/**
* Remove data older than 24 hours
*/
private fun cleanOldData(currentHourBucket: Long) {
val cutoffBucket = currentHourBucket - 24 // 24 hours ago
hourlyData.entries.removeIf { (key, _) ->
val bucket = key.split(":")[1].toLong()
bucket < cutoffBucket
}
}
/**
* Accumulator for hourly readings
*/
data class ReadingAccumulator(
val deviceId: String,
val hourBucket: Long,
val location: String,
var sum: Double = 0.0,
var count: Int = 0,
var min: Double = Double.MAX_VALUE,
var max: Double = Double.MIN_VALUE
) {
fun addReading(temperature: Double) {
sum += temperature
count++
if (temperature < min) min = temperature
if (temperature > max) max = temperature
}
fun getAverage(): Double = if (count > 0) sum / count else 0.0
}
}
Step 7: Create the Notification Bolt
Create src/main/kotlin/bolt/temperature/TemperatureNotificationBolt.kt:
package bolt.temperature
import model.TemperatureAlertBoltOutputField
import org.apache.storm.task.OutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichBolt
import org.apache.storm.tuple.Tuple
/**
* Sends notifications for temperature alerts
* In production, this would integrate with:
* - Email service (SendGrid, AWS SES)
* - SMS gateway (Twilio)
* - Push notification service (FCM)
* - Dashboard websocket
*/
class TemperatureNotificationBolt : BaseRichBolt() {
private lateinit var collector: OutputCollector
private var notificationsSent = 0
companion object {
const val ID = "temperature-notification-bolt"
}
override fun prepare(
stormConf: MutableMap<String, Any>?,
context: TopologyContext?,
collector: OutputCollector?
) {
this.collector = collector!!
}
override fun execute(tuple: Tuple?) {
try {
// ✅ GOOD: Type-safe extraction
val alertData = TemperatureAlertBoltOutputField.fromValues(tuple?.values ?: return)
val alert = alertData.alert
// Send notifications to each channel
alertData.notificationChannels.forEach { channel ->
when (channel) {
"email" -> sendEmailNotification(alert.message, alert.deviceId)
"sms" -> sendSmsNotification(alert.message, alert.deviceId)
"dashboard" -> updateDashboard(alert)
else -> println("Unknown channel: $channel")
}
}
notificationsSent++
collector.ack(tuple)
} catch (e: Exception) {
println("Error sending notification: ${e.message}")
collector.fail(tuple)
}
}
override fun declareOutputFields(declarer: OutputFieldsDeclarer?) {
// Terminal bolt - no output streams
}
// Mock notification methods (replace with actual integrations)
private fun sendEmailNotification(message: String, deviceId: String) {
println("📧 EMAIL: Alert for device $deviceId - $message")
// TODO: Integrate with email service
// emailService.send(to = "admin@example.com", subject = "Temperature Alert", body = message)
}
private fun sendSmsNotification(message: String, deviceId: String) {
println("📱 SMS: Alert for device $deviceId")
// TODO: Integrate with SMS gateway
// smsGateway.send(to = "+1234567890", message = message)
}
private fun updateDashboard(alert: dataclass.TemperatureAlert) {
println("📊 DASHBOARD: Updated with alert ${alert.alertId}")
// TODO: Publish to websocket or push to dashboard API
// dashboardClient.publishAlert(alert)
}
override fun cleanup() {
println("TemperatureNotificationBolt Stats:")
println(" Notifications Sent: $notificationsSent")
}
}
Step 8: Build the Topology
Create src/main/kotlin/topology/TemperatureTopologyBuilder.kt:
package topology
import bolt.temperature.*
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.Fields
import enum.StreamGroup
import enum.FieldGroup
/**
* Builds the Temperature Monitoring Topology
*/
object TemperatureTopologyBuilder {
fun build(
spout: BaseRichSpout,
partitionCount: Int = 4
): TopologyBuilder {
val builder = TopologyBuilder()
// 1. Set the spout (data source)
builder.setSpout("temperature-spout", spout, partitionCount)
// 2. Temperature Processing Bolt
// - Parallelism: 4 (one per partition)
// - Grouping: Shuffle (stateless, distribute evenly)
builder.setBolt(
TemperatureProcessingBolt.ID,
TemperatureProcessingBolt(),
partitionCount
)
.shuffleGrouping("temperature-spout")
.setNumTasks(4)
// 3. Temperature Alert Bolt
// - Parallelism: 4
// - Grouping: Fields by deviceId (stateful - same device to same bolt instance)
builder.setBolt(
TemperatureAlertBolt.ID,
TemperatureAlertBolt(),
partitionCount
)
.fieldsGrouping(
TemperatureProcessingBolt.ID,
StreamGroup.TEMPERATURE_DATA_STREAM.value,
Fields(FieldGroup.DEVICE_ID.value) // ✅ GOOD: Group by device for state management
)
.setNumTasks(4)
// 4. Temperature Average Bolt
// - Parallelism: 4
// - Grouping: Fields by deviceId (stateful - time windows per device)
builder.setBolt(
TemperatureAverageBolt.ID,
TemperatureAverageBolt(),
partitionCount
)
.fieldsGrouping(
TemperatureProcessingBolt.ID,
StreamGroup.TEMPERATURE_DATA_STREAM.value,
Fields(FieldGroup.DEVICE_ID.value)
)
.setNumTasks(4)
// 5. Notification Bolt
// - Parallelism: 2 (less CPU intensive)
// - Grouping: Shuffle (no state needed)
builder.setBolt(
TemperatureNotificationBolt.ID,
TemperatureNotificationBolt(),
2
)
.shuffleGrouping(
TemperatureAlertBolt.ID,
StreamGroup.TEMPERATURE_ALERT_STREAM.value
)
.setNumTasks(2)
return builder
}
}
Step 9: Create a Test Spout (Data Generator)
For testing, create a spout that generates sample temperature data:
Create src/main/kotlin/spout/TemperatureTestSpout.kt:
package spout
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import dataclass.TemperatureReading
import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.Fields
import org.apache.storm.tuple.Values
import kotlin.random.Random
/**
* Test spout that generates simulated temperature sensor data
*/
class TemperatureTestSpout : BaseRichSpout() {
private lateinit var collector: SpoutOutputCollector
private val objectMapper = jacksonObjectMapper()
// Simulated devices
private val devices = listOf(
"SENSOR_001" to "Warehouse A - Zone 1",
"SENSOR_002" to "Warehouse A - Zone 2",
"SENSOR_003" to "Warehouse B - Zone 1",
"SENSOR_004" to "Warehouse B - Zone 2",
"SENSOR_005" to "Warehouse C - Zone 1"
)
private var emitCount = 0L
override fun open(
conf: MutableMap<String, Any>?,
context: TopologyContext?,
collector: SpoutOutputCollector?
) {
this.collector = collector!!
}
override fun nextTuple() {
try {
// Sleep briefly to simulate real sensor data rate
Thread.sleep(1000) // 1 reading per second per device
// Pick a random device
val (deviceId, location) = devices.random()
// Generate temperature reading
// Base temperature: 20-30°C
// 10% chance of high temperature (35-45°C) for testing alerts
val temperature = if (Random.nextDouble() < 0.1) {
Random.nextDouble(35.0, 45.0) // High temperature
} else {
Random.nextDouble(20.0, 30.0) // Normal temperature
}
val reading = TemperatureReading(
deviceId = deviceId,
temperature = temperature,
unit = "Celsius",
timestamp = System.currentTimeMillis(),
location = location,
sensorType = "DHT22"
)
// Convert to JSON
val json = objectMapper.writeValueAsString(reading)
// Emit tuple
val partitionId = devices.indexOf(deviceId to location) % 4
collector.emit(
Values(partitionId, json),
emitCount++ // Message ID for acking
)
} catch (e: Exception) {
println("Error in TemperatureTestSpout: ${e.message}")
}
}
override fun declareOutputFields(declarer: OutputFieldsDeclarer?) {
declarer?.declare(Fields("partitionId", "message"))
}
override fun ack(msgId: Any?) {
// Successfully processed
}
override fun fail(msgId: Any?) {
// Failed to process - could retry here
println("Failed to process message: $msgId")
}
}
Step 10: Create Main Entry Point
Create src/main/kotlin/TemperatureMonitoringMain.kt:
import org.apache.storm.Config
import org.apache.storm.LocalCluster
import org.apache.storm.StormSubmitter
import spout.TemperatureTestSpout
import topology.TemperatureTopologyBuilder
fun main(args: Array<String>) {
// Build topology
val spout = TemperatureTestSpout()
val topology = TemperatureTopologyBuilder.build(spout, partitionCount = 4).createTopology()
// Configure Storm
val config = Config()
config.setDebug(false)
config.setNumWorkers(2)
config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1000
// Determine environment
val isLocal = args.isEmpty() || args[0] == "local"
if (isLocal) {
// Run locally for testing
println("🚀 Starting Temperature Monitoring Topology (LOCAL MODE)")
config.setMaxTaskParallelism(2)
val cluster = LocalCluster()
cluster.submitTopology("temperature-monitoring", config, topology)
// Run for 2 minutes then shutdown
Thread.sleep(120000)
cluster.shutdown()
println("✅ Topology stopped")
} else {
// Submit to remote cluster
val topologyName = args[0]
println("🚀 Submitting Temperature Monitoring Topology: $topologyName")
StormSubmitter.submitTopology(topologyName, config, topology)
println("✅ Topology submitted successfully")
}
}
Step 11: Build and Run
Build the Project
# Clean and build
./gradlew clean build
# Create fat JAR
./gradlew shadowJar
Run Locally
# Run with Gradle
./gradlew run --args="local"
# Or run JAR directly
java -cp build/libs/aquagen-storm-1.0-d7-shaded.jar TemperatureMonitoringMainKt local
Expected Output
You should see output like:
🚀 Starting Temperature Monitoring Topology (LOCAL MODE)
📊 Average for SENSOR_001 (hour 491234): 24.5°C (10 readings)
🚨 ALERT: Temperature 38.2°C exceeds threshold 35.0°C at Warehouse A - Zone 1
📧 EMAIL: Alert for device SENSOR_001 - Temperature 38.2°C exceeds threshold 35.0°C
📱 SMS: Alert for device SENSOR_001
📊 DASHBOARD: Updated with alert TEMP_SENSOR_001_1699876543123
✅ Normal: Temperature back to normal for device SENSOR_001
📊 Average for SENSOR_003 (hour 491234): 26.1°C (10 readings)
...
Step 12: Add to Main Topology (Optional)
To integrate into the main application topology, edit StormTopologyBuilder.kt:
// Add temperature monitoring bolts
builder.setBolt(
TemperatureProcessingBolt.ID,
TemperatureProcessingBolt(),
config.partitionCount
)
.fieldsGrouping(
HubLoggerBolt.ID,
StreamGroup.TEMPERATURE_DATA_STREAM.value, // Assuming Event Hub sends temp data
Fields(FieldGroup.PARTITION_ID.value)
)
// Add alert bolt
builder.setBolt(
TemperatureAlertBolt.ID,
TemperatureAlertBolt(),
config.partitionCount
)
.fieldsGrouping(
TemperatureProcessingBolt.ID,
StreamGroup.TEMPERATURE_DATA_STREAM.value,
Fields(FieldGroup.DEVICE_ID.value)
)
// Add average bolt
builder.setBolt(
TemperatureAverageBolt.ID,
TemperatureAverageBolt(),
config.partitionCount
)
.fieldsGrouping(
TemperatureProcessingBolt.ID,
StreamGroup.TEMPERATURE_DATA_STREAM.value,
Fields(FieldGroup.DEVICE_ID.value)
)
// Add notification bolt
builder.setBolt(
TemperatureNotificationBolt.ID,
TemperatureNotificationBolt(),
2
)
.shuffleGrouping(
TemperatureAlertBolt.ID,
StreamGroup.TEMPERATURE_ALERT_STREAM.value
)
Key Takeaways
What You Learned
✅ Data Modeling
- Created type-safe data classes with validation
- Used
@JsonPropertyfor JSON serialization - Implemented companion object factory methods
✅ Typed Output Fields
- Created output field classes with
.fields,.values, andfromValues() - Used type-safe emitting instead of raw
Values() - Prevented runtime errors with compile-time safety
✅ Bolt Development
- Implemented
prepare(),execute(),declareOutputFields(),cleanup() - Used
collector.emit(),collector.ack(),collector.fail() - Added error handling and metrics
✅ State Management
- Used
ConcurrentHashMapfor stateful processing - Implemented alert cooldown logic
- Created time-windowed aggregations
✅ Stream Groupings
- Shuffle Grouping: For stateless bolts (even distribution)
- Fields Grouping: For stateful bolts (same key → same instance)
- Understood parallelism and task allocation
✅ Best Practices
- Type-safe tuple handling
- Proper anchoring for reliability
- Cleanup of old state data
- Comprehensive logging and metrics
Next Steps
Extend the Project
Try these enhancements:
-
Add Database Integration
- Store readings in Cosmos DB
- Query historical data
- Implement data retention policies
-
Add More Alert Types
- Low temperature alerts (freezer failure)
- Rapid temperature change detection
- Sensor offline detection (no data for X minutes)
-
Add Real Event Hub Integration
- Replace test spout with Event Hub spout
- Configure consumer groups
- Handle partitioning
-
Add Dashboard
- Create REST API to query Storm state
- WebSocket for real-time updates
- Visualization with charts
-
Improve Notifications
- Integrate with SendGrid for emails
- Use Twilio for SMS
- Add FCM push notifications
- Implement notification preferences per user
Deploy to Production
Follow these steps:
- Change environment in
StormConstants.kttoEnvironment.DEV - Build JAR:
./gradlew shadowJar - Copy to VM:
scp build/libs/*.jar stream@20.197.21.30:~/storm-setup - Deploy using
initstorm.sh(Option 4) - Monitor via Storm UI:
http://20.197.21.30:6800
Summary
You've successfully built a complete Storm topology with:
- ✅ 5 data classes (readings, alerts, state)
- ✅ 2 typed output field classes
- ✅ 4 bolts (processing, alert, average, notification)
- ✅ 1 test spout
- ✅ 1 topology builder
- ✅ State management with maps
- ✅ Multiple stream groupings
- ✅ Type-safe tuple handling
This example demonstrates all the core patterns used in the main Storm application. You can now confidently build your own Storm topologies!
Related Documentation
- Our Storm Implementation - Core Storm patterns
- Project Structure - Codebase organization
- Quick Start Guide - Setup and deployment
- Adding a New Bolt - Advanced bolt patterns