Skip to main content

What is Apache Storm?

Apache Storm is a free, open-source, distributed real-time computation system designed to process unbounded streams of data with guaranteed reliability and horizontal scalability.

Overview

Apache Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a pleasure to use!

info

Storm has many use cases: real-time analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node.


Why Storm for Real-Time Processing?

Key Advantages

1. Guaranteed Data Processing

  • Storm guarantees that every tuple will be processed at least once
  • Failed tuples are automatically replayed
  • No data loss even in case of failures

2. Horizontal Scalability

  • Scale by adding more machines to the cluster
  • No code changes required for scaling
  • Linear scalability for most workloads

3. Fault Tolerance

  • Automatic task reassignment on worker failures
  • Supervisor processes monitor and restart workers
  • Cluster coordination via Apache ZooKeeper

4. Low Latency

  • Sub-second response times
  • Ideal for real-time alerts and monitoring
  • Optimized for speed over throughput

5. Programming Language Agnostic

  • Write bolts and spouts in any language
  • Native support for Java, Kotlin, Python, Ruby
  • Easy integration with existing systems

6. Simple API

  • Easy to learn and use
  • Clear separation of concerns
  • Extensive documentation and community support

Why We Chose Storm for Water Management

Our Requirements Met by Storm:

  • Process thousands of sensor readings per second
  • Detect critical conditions (leaks, contamination) in real-time
  • Guarantee no data loss from expensive IoT infrastructure
  • Scale across multiple water treatment facilities
  • Maintain 24/7 uptime with automatic recovery

Core Storm Concepts

1. Topology

A topology is a directed acyclic graph (DAG) of computation. It's the Storm equivalent of a MapReduce job but runs forever (until you kill it).

// Example: Simple topology definition
val builder = TopologyBuilder()

// Add spout (data source)
builder.setSpout("sensor-spout", SensorDataSpout(), 1)

// Add bolts (processing units)
builder.setBolt("metrics-bolt", MetricsBolt(), 2)
.shuffleGrouping("sensor-spout")

builder.setBolt("alert-bolt", AlertBolt(), 2)
.shuffleGrouping("metrics-bolt")

// Submit topology
StormSubmitter.submitTopology("water-management", config, builder.createTopology())

Topology Components:

  • Spouts: Sources of data streams
  • Bolts: Processing units that transform data
  • Streams: Unbounded sequences of tuples flowing between components

2. Spouts

Spouts are the source of streams in a topology. They read data from external sources and emit tuples into the topology.

Characteristics:

  • Entry point for data into Storm
  • Can emit multiple streams
  • Can be reliable (replay on failure) or unreliable
  • Run continuously in a loop
// Example: Simple spout that reads from Event Hub
class EventHubSpout : BaseRichSpout() {
private lateinit var collector: SpoutOutputCollector
private lateinit var eventHubClient: EventHubClient

override fun open(conf: Map<*, *>, context: TopologyContext, collector: SpoutOutputCollector) {
this.collector = collector
this.eventHubClient = EventHubClient.create(connectionString)
}

override fun nextTuple() {
// Read next message from Event Hub
val message = eventHubClient.receive()

if (message != null) {
val messageId = message.systemProperties.sequenceNumber
val data = parseMessage(message)

// Emit tuple with message ID for tracking
collector.emit(Values(data), messageId)
}
}

override fun ack(msgId: Any?) {
// Message processed successfully - checkpoint
eventHubClient.acknowledge(msgId)
}

override fun fail(msgId: Any?) {
// Message failed - will be replayed
eventHubClient.nack(msgId)
}
}

In Our System:

  • Event Hub Spout reads IoT sensor data
  • Emits structured InstantaneousData objects
  • Tracks message IDs for guaranteed processing

3. Bolts

Bolts are the processing units in Storm. They consume streams, process data, and optionally emit new streams.

Capabilities:

  • Filter, transform, aggregate, join data
  • Perform computations and analytics
  • Write to databases or external systems
  • Emit results to downstream bolts
// Example: Bolt that processes sensor data
class MetricsBolt : BaseRichBolt() {
private lateinit var collector: OutputCollector
private lateinit var database: CosmosDataSource

override fun prepare(conf: Map<*, *>, context: TopologyContext, collector: OutputCollector) {
this.collector = collector
this.database = CosmosDataSource.getInstance()
}

override fun execute(tuple: Tuple) {
try {
// Extract data from incoming tuple
val sensorData = tuple.getValue(0) as InstantaneousData

// Process the data
val metrics = calculateMetrics(sensorData)

// Store in database
database.saveMetrics(metrics)

// Emit to next bolt
collector.emit(tuple, Values(metrics))

// Acknowledge successful processing
collector.ack(tuple)

} catch (e: Exception) {
// Report failure - tuple will be replayed
collector.fail(tuple)
}
}

override fun declareOutputFields(declarer: OutputFieldsDeclarer) {
declarer.declare(Fields("metrics"))
}
}

Bolt Types in Our System:

  • MetricsBolt: Calculates and aggregates sensor metrics
  • QualityBolt: Monitors water quality parameters
  • EnergyBolt: Tracks power consumption
  • LevelBolt: Monitors tank levels
  • FlowBolt: Analyzes flow rates and detects leaks
  • NotificationBolt: Sends alerts to users

4. Streams

Streams are the core abstraction in Storm - unbounded sequences of tuples flowing between spouts and bolts.

Stream Characteristics:

  • Defined with a schema (named fields)
  • Can be partitioned across multiple tasks
  • Support different grouping strategies
// Declaring output streams
override fun declareOutputFields(declarer: OutputFieldsDeclarer) {
// Default stream
declarer.declare(Fields("industryId", "unitId", "timestamp", "value"))

// Named stream for alerts
declarer.declareStream("alerts", Fields("alertType", "severity", "message"))
}

// Emitting to different streams
collector.emit(Values(industryId, unitId, timestamp, value))
collector.emit("alerts", Values("QUALITY", "CRITICAL", "TDS exceeded threshold"))

Stream Groupings:

// Shuffle Grouping - random distribution
builder.setBolt("process-bolt", ProcessBolt(), 4)
.shuffleGrouping("input-spout")

// Fields Grouping - group by field value (same field � same task)
builder.setBolt("aggregate-bolt", AggregateBolt(), 4)
.fieldsGrouping("process-bolt", Fields("industryId", "unitId"))

// All Grouping - send to all tasks
builder.setBolt("broadcast-bolt", BroadcastBolt(), 4)
.allGrouping("config-spout")

// Global Grouping - send to single task (task 0)
builder.setBolt("singleton-bolt", SingletonBolt(), 1)
.globalGrouping("input-spout")

5. Tuples

Tuples are the data units that flow through Storm topologies - named lists of values.

// Creating and emitting tuples
val tuple = Values(
"INDUSTRY_001", // industryId
"UNIT_A", // unitId
Instant.now(), // timestamp
150.5, // TDS value
mapOf("pH" to 7.2) // metadata
)
collector.emit(tuple)

// Accessing tuple values in bolt
val industryId = tuple.getStringByField("industryId")
val unitId = tuple.getStringByField("unitId")
val timestamp = tuple.getValueByField("timestamp") as Instant
val value = tuple.getDoubleByField("value")

Tuple Tree & Anchoring:

Storm tracks the lineage of tuples to guarantee processing:

// Anchoring - link output to input for reliability
collector.emit(inputTuple, Values(processedData)) // Anchored
collector.ack(inputTuple) // Ack after downstream processing

// Unanchored emit - no reliability guarantee
collector.emit(Values(processedData)) // Unanchored

Storm Architecture Basics

Cluster Components

Component Roles:

  1. Nimbus (Master Node)

    • Distributes code across cluster
    • Assigns tasks to machines
    • Monitors for failures
    • Stateless - all state in ZooKeeper
  2. Supervisor (Worker Nodes)

    • Manages worker processes on each machine
    • Starts/stops workers based on Nimbus assignments
    • Reports heartbeat to ZooKeeper
  3. Worker (JVM Processes)

    • Executes subset of topology tasks
    • Runs multiple executors (threads)
    • Handles message passing between tasks
  4. ZooKeeper

    • Coordinates the cluster
    • Stores cluster state
    • Enables Nimbus and Supervisors to discover each other

How Storm Ensures Reliability

1. Acking Mechanism

Storm tracks every tuple through the entire topology to ensure it's fully processed.

How It Works:

  1. Spout emits tuple with unique message ID
  2. Each bolt that processes the tuple must ack or fail it
  3. If all bolts in the tuple tree ack, spout receives success
  4. If any bolt fails or timeout occurs, spout replays the tuple
// In Spout
override fun nextTuple() {
val data = readFromSource()
val msgId = generateUniqueId()
collector.emit(Values(data), msgId) // Track with msgId
}

override fun ack(msgId: Any?) {
// Tuple fully processed successfully
markAsProcessed(msgId)
}

override fun fail(msgId: Any?) {
// Tuple failed - replay it
replayMessage(msgId)
}

// In Bolt
override fun execute(tuple: Tuple) {
try {
process(tuple)
collector.ack(tuple) // Success
} catch (e: Exception) {
collector.fail(tuple) // Failure - will replay
}
}

2. Fault Tolerance

Storm automatically recovers from failures at multiple levels:

Worker Failure:

Supervisor Failure:

  • Nimbus detects missing heartbeats
  • Reassigns tasks to other supervisors
  • Work continues without interruption

Nimbus Failure:

  • Topologies continue running
  • Supervisors and workers unaffected
  • Restart Nimbus to regain management capability
tip

Storm is designed for the master (Nimbus) to fail without affecting running topologies. Workers continue processing independently.


Storm Processing Guarantees

Storm offers different levels of processing guarantees based on your configuration:

1. At-Least-Once Processing (Default)

Guarantee: Every tuple will be processed at least once, but may be processed multiple times on failure.

// Configuration for at-least-once
config.setNumAckers(1) // Enable tracking
config.setMaxSpoutPending(1000) // Limit in-flight tuples
config.setMessageTimeoutSecs(30) // Replay timeout

Use Case: When duplicate processing is acceptable (idempotent operations)

Example: Updating a counter, sending alerts (duplicates tolerable)


2. At-Most-Once Processing

Guarantee: Every tuple will be processed at most once, may be lost on failure.

// Configuration for at-most-once
config.setNumAckers(0) // Disable tracking - no acking

Use Case: When performance is more important than reliability

Example: Real-time metrics where occasional loss is acceptable


3. Exactly-Once Processing (with Trident)

Guarantee: Every tuple will be processed exactly once using Storm Trident.

// Trident topology with exactly-once semantics
val topology = TridentTopology()
val stream = topology.newStream("spout", spout)
.each(Fields("data"), ProcessFunction(), Fields("result"))
.partitionPersist(StateFactory(), Fields("result"), Updater())

Use Case: When you need strict guarantees (financial transactions, critical alerts)

Trade-off: Higher latency and complexity


Our Implementation: At-Least-Once

We use at-least-once processing because:

  • Alert duplicates are better than missed alerts
  • Database operations are idempotent (upserts with timestamps)
  • Performance is critical for real-time monitoring
  • Simplicity over Trident's complexity
// Our configuration
config.setNumAckers(2)
config.setMaxSpoutPending(5000)
config.setMessageTimeoutSecs(30)
config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 2)

Storm vs Other Streaming Frameworks

FeatureApache StormKafka StreamsApache Flink
Processing ModelTuple-at-a-timeStream processingStream processing
LatencySub-secondLow (seconds)Low (seconds)
ThroughputHighVery HighVery High
State ManagementManualBuilt-inBuilt-in
Exactly-OnceTrident onlyYesYes
Language SupportAnyJava/ScalaJava/Scala/Python
Learning CurveEasyMediumSteep
Best ForLow-latency alertsEvent processingComplex analytics

Why Storm for Our Project:

  • Lowest latency for critical alerts (< 1 second)
  • Simple, proven architecture
  • Flexibility with any JVM language (Kotlin)
  • Mature ecosystem and documentation

Quick Example: Complete Topology

Here's a complete example of a simple Storm topology:

fun main(args: Array<String>) {
// Create topology builder
val builder = TopologyBuilder()

// Add spout (data source)
builder.setSpout("event-hub-spout", EventHubSpout(), 1)

// Add processing bolts
builder.setBolt("parse-bolt", ParseBolt(), 4)
.shuffleGrouping("event-hub-spout")

builder.setBolt("metrics-bolt", MetricsBolt(), 4)
.fieldsGrouping("parse-bolt", Fields("industryId", "unitId"))

builder.setBolt("alert-bolt", AlertBolt(), 2)
.shuffleGrouping("metrics-bolt")

builder.setBolt("database-bolt", DatabaseBolt(), 2)
.shuffleGrouping("metrics-bolt")

// Configure topology
val config = Config()
config.setDebug(false)
config.setNumWorkers(3)
config.setNumAckers(2)
config.setMaxSpoutPending(5000)

// Submit to cluster
StormSubmitter.submitTopology(
"water-management-topology",
config,
builder.createTopology()
)
}

Summary

Apache Storm provides:

  • Real-time processing with sub-second latency
  • Guaranteed delivery through acking mechanism
  • Fault tolerance with automatic recovery
  • Horizontal scalability by adding nodes
  • Simple programming model with spouts and bolts
Next Steps

Now that you understand Apache Storm fundamentals, continue to Project Structure to see how our codebase is organized, or jump to Core Concepts for a deeper dive.