Skip to main content

Data Class Streaming Guide

A comprehensive guide for implementing type-safe data classes for Apache Storm tuple handling with automatic validation and conversion methods.

Table of Contents

  1. Overview
  2. Decorator Architecture
  3. Creating Data Classes
  4. Stream Configuration
  5. Example Topologies
  6. Advanced Patterns
  7. Best Practices
  8. Troubleshooting

Overview

The data class streaming framework provides a type-safe, declarative approach to handling Apache Storm tuples using Python dataclasses with automatic validation, serialization, and conversion methods.

Key Benefits

  • Type Safety: Compile-time type checking with IDE support
  • Automatic Validation: Built-in tuple validation with meaningful error messages
  • Seamless Conversion: Automatic conversion between Storm tuples and Python objects
  • Reduced Boilerplate: Auto-generated methods eliminate repetitive code
  • Optional Field Support: Flexible handling of optional and default values

Decorator Architecture

@inlet_data_class Decorator

The core decorator that transforms standard Python dataclasses into Storm-compatible data transfer objects.

from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from decorators.inlet_data_decorator import inlet_data_class

@inlet_data_class
@dataclass
class MyDataClass:
required_field: str
optional_field: Optional[str] = None
data_field: Optional[Dict[str, Any]] = None

Auto-Generated Methods

The decorator automatically adds three essential methods to your data class:

1. validate(values: List[Any]) -> bool

# Validates tuple field count and content
# Raises ValueError with descriptive messages
values = ["test", "123", "data"]
MyDataClass.validate(values) # Returns True or raises ValueError

2. from_values(values: List[Any]) -> MyDataClass

# Converts Storm tuple to data class instance
tuple_values = ["required_value", "optional_value", {"key": "value"}]
instance = MyDataClass.from_values(tuple_values)
print(instance.required_field) # "required_value"

3. to_values() -> List[Any]

# Converts data class instance back to Storm tuple
instance = MyDataClass(required_field="test", optional_field="optional")
tuple_values = instance.to_values()
# Returns: ["test", "optional"]

Creating Data Classes

Basic Data Class Structure

from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from decorators.inlet_data_decorator import inlet_data_class

@inlet_data_class
@dataclass
class ProcessingRequest:
"""Data class for processing requests"""
event_id: str
industry_id: str
timestamp: str
processing_type: str
additional_data: Optional[Dict[str, Any]] = None

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

# Optional: Add custom validation logic
def __post_init__(self):
if not self.event_id:
raise ValueError("event_id cannot be empty")

Supported Field Types

Required Fields

@inlet_data_class
@dataclass
class RequiredFieldsExample:
string_field: str # Always required
numeric_field: str # Stored as string, convert as needed
identifier_field: str # UUID, IDs, etc.

Optional Fields

@inlet_data_class
@dataclass
class OptionalFieldsExample:
required_field: str
optional_string: Optional[str] = None
optional_data: Optional[Dict[str, Any]] = None
optional_list: Optional[List[str]] = None
default_value: str = "default"

Special Field: additional_data

@inlet_data_class
@dataclass
class WithAdditionalData:
core_field: str
additional_data: Optional[Dict[str, Any]] = None

# additional_data receives special handling:
# - Automatically excluded from required field validation
# - Only included in to_values() if not None
# - Supports nested JSON structures

Real-World Examples

Batch Processing Input

@inlet_data_class
@dataclass
class BatchProcessingInput:
"""Input data for batch processing operations"""
time: str # ISO timestamp
industry_id: str # Industry identifier
event_id: str # Unique event ID
additional_data: Optional[Dict[str, Any]] = None # Extra parameters

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

def get_datetime(self) -> datetime:
"""Convert time string to datetime object"""
return datetime.fromisoformat(self.time)

def get_send_report_flag(self) -> bool:
"""Extract sendReport flag from additional_data"""
if self.additional_data:
return self.additional_data.get('sendReport', False)
return False

Report Generation Request

@inlet_data_class
@dataclass
class ReportRequest:
"""Request for automated report generation"""
industry_id: str
time: str
event_id: str
report_types: Optional[List[str]] = None # daily, monthly, yearly
email_addresses: Optional[List[str]] = None
test_mode: bool = False

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

def __post_init__(self):
# Set default report types if not provided
if self.report_types is None:
self.report_types = ["daily", "monthly"]

Stream Configuration

Defining Output Streams

Configure multiple output streams in your bolt to emit different types of data:

from streamparse import Bolt, Stream
from bolt_data_class.my_data_classes import ProcessingResult, ErrorInfo

class ProcessingBolt(Bolt):
outputs = [
Stream(fields=["industry_id", "result_data", "event_id"], name="success"),
Stream(fields=["industry_id", "error_message", "event_id"], name="error"),
Stream(fields=["industry_id", "time", "event_id"], name="report_request")
]

def process(self, tup):
try:
# Process input using data class
input_data = InputDataClass.from_values(tup.values)

# Perform processing
result = self.perform_processing(input_data)

# Emit to success stream
self.emit([
input_data.industry_id,
result.to_json(),
input_data.event_id
], stream="success")

# Conditionally emit to report stream
if input_data.needs_report():
self.emit([
input_data.industry_id,
input_data.time,
input_data.event_id
], stream="report_request")

except Exception as e:
# Emit to error stream
self.emit([
tup.values[1], # industry_id
str(e),
tup.values[2] # event_id
], stream="error")
self.fail(tup)

Connecting Streams in Topology

from streamparse import Grouping, Topology, Stream
from bolts.processing_bolt import ProcessingBolt
from bolts.report_bolt import ReportBolt
from bolts.error_handler_bolt import ErrorHandlerBolt
from spouts.event_spout import EventSpout

class MultiStreamTopology(Topology):
event_spout = EventSpout.spec()

processing_bolt = ProcessingBolt.spec(
inputs={event_spout: Grouping.fields("industry_id")},
par=4
)

report_bolt = ReportBolt.spec(
inputs={processing_bolt["report_request"]: Grouping.fields("industry_id")},
par=2
)

error_bolt = ErrorHandlerBolt.spec(
inputs={processing_bolt["error"]: Grouping.fields("industry_id")},
par=1
)

Example Topologies

Simple Linear Processing

# Data Classes
@inlet_data_class
@dataclass
class EventData:
timestamp: str
user_id: str
action: str
event_id: str

@inlet_data_class
@dataclass
class ProcessedEvent:
user_id: str
action_count: int
last_action: str
event_id: str

# Spout
class EventSpout(Spout):
outputs = ["timestamp", "user_id", "action", "event_id"]

def next_tuple(self):
event_data = self.get_next_event()
self.emit([
event_data['timestamp'],
event_data['user_id'],
event_data['action'],
event_data['event_id']
])

# Bolt
class EventProcessorBolt(Bolt):
outputs = ["user_id", "action_count", "last_action", "event_id"]

def process(self, tup):
# Convert tuple to data class
event = EventData.from_values(tup.values)

# Process the event
processed = self.process_event(event)

# Convert back to tuple and emit
self.emit([
processed.user_id,
processed.action_count,
processed.last_action,
processed.event_id
])

# Topology
class EventProcessingTopology(Topology):
event_spout = EventSpout.spec()
processor_bolt = EventProcessorBolt.spec(
inputs={event_spout: Grouping.fields("user_id")},
par=3
)

Complex Multi-Stream Processing

# Data Classes for different streams
@inlet_data_class
@dataclass
class IncomingOrder:
order_id: str
customer_id: str
amount: str
timestamp: str
additional_data: Optional[Dict[str, Any]] = None

@inlet_data_class
@dataclass
class ValidatedOrder:
order_id: str
customer_id: str
amount: float
validation_status: str
timestamp: str

@inlet_data_class
@dataclass
class PaymentRequest:
order_id: str
customer_id: str
amount: float
payment_method: str

@inlet_data_class
@dataclass
class NotificationRequest:
customer_id: str
message: str
notification_type: str
order_id: str

# Order Processing Bolt with multiple outputs
class OrderProcessingBolt(Bolt):
outputs = [
Stream(fields=["order_id", "customer_id", "amount", "status", "timestamp"],
name="validated"),
Stream(fields=["order_id", "customer_id", "amount", "payment_method"],
name="payment"),
Stream(fields=["customer_id", "message", "type", "order_id"],
name="notification"),
Stream(fields=["order_id", "error_message", "timestamp"],
name="error")
]

def process(self, tup):
try:
# Parse incoming order
order = IncomingOrder.from_values(tup.values)

# Validate order
if self.validate_order(order):
validated = ValidatedOrder(
order_id=order.order_id,
customer_id=order.customer_id,
amount=float(order.amount),
validation_status="valid",
timestamp=order.timestamp
)

# Emit validated order
self.emit([
validated.order_id,
validated.customer_id,
validated.amount,
validated.validation_status,
validated.timestamp
], stream="validated")

# Create payment request
payment_method = order.additional_data.get('payment_method', 'card') \
if order.additional_data else 'card'

payment_req = PaymentRequest(
order_id=order.order_id,
customer_id=order.customer_id,
amount=validated.amount,
payment_method=payment_method
)

# Emit payment request
self.emit([
payment_req.order_id,
payment_req.customer_id,
payment_req.amount,
payment_req.payment_method
], stream="payment")

# Create notification
notification = NotificationRequest(
customer_id=order.customer_id,
message=f"Order {order.order_id} is being processed",
notification_type="order_update",
order_id=order.order_id
)

# Emit notification
self.emit([
notification.customer_id,
notification.message,
notification.notification_type,
notification.order_id
], stream="notification")

else:
# Emit error for invalid orders
self.emit([
order.order_id,
"Order validation failed",
order.timestamp
], stream="error")

except Exception as e:
# Handle processing errors
self.emit([
"unknown",
f"Processing error: {str(e)}",
datetime.now().isoformat()
], stream="error")
self.fail(tup)

# Topology connecting multiple streams
class OrderProcessingTopology(Topology):
config = {
"topology.message.timeout.secs": 300,
"topology.max.spout.pending": 1000
}

order_spout = OrderSpout.spec()

order_processor = OrderProcessingBolt.spec(
inputs={order_spout: Grouping.fields("customer_id")},
par=4
)

payment_processor = PaymentProcessorBolt.spec(
inputs={order_processor["payment"]: Grouping.fields("customer_id")},
par=2
)

notification_sender = NotificationBolt.spec(
inputs={order_processor["notification"]: Grouping.fields("customer_id")},
par=3
)

error_handler = ErrorHandlerBolt.spec(
inputs={order_processor["error"]: Grouping.shuffle()},
par=1
)

Advanced Patterns

Conditional Stream Routing

@inlet_data_class
@dataclass
class ConditionalData:
data_type: str
payload: str
priority: str
event_id: str

def is_high_priority(self) -> bool:
return self.priority == "high"

def get_routing_key(self) -> str:
return f"{self.data_type}_{self.priority}"

class ConditionalRoutingBolt(Bolt):
outputs = [
Stream(fields=["payload", "event_id"], name="high_priority"),
Stream(fields=["payload", "event_id"], name="normal_priority"),
Stream(fields=["data_type", "payload", "event_id"], name="special_processing")
]

def process(self, tup):
data = ConditionalData.from_values(tup.values)

# Route based on data characteristics
if data.is_high_priority():
self.emit([data.payload, data.event_id], stream="high_priority")
elif data.data_type == "special":
self.emit([data.data_type, data.payload, data.event_id],
stream="special_processing")
else:
self.emit([data.payload, data.event_id], stream="normal_priority")

Aggregation with State Management

@inlet_data_class
@dataclass
class MetricEvent:
metric_name: str
value: float
timestamp: str
tags: Optional[Dict[str, str]] = None

@inlet_data_class
@dataclass
class AggregatedMetric:
metric_name: str
count: int
sum_value: float
avg_value: float
min_value: float
max_value: float
window_start: str
window_end: str

class MetricAggregatorBolt(Bolt):
outputs = ["metric_name", "count", "sum", "avg", "min", "max", "start", "end"]

def initialize(self, conf, ctx):
self.metrics_state = {}
self.window_size = 60 # 60 seconds

def process(self, tup):
metric = MetricEvent.from_values(tup.values)

# Update aggregation state
if metric.metric_name not in self.metrics_state:
self.metrics_state[metric.metric_name] = {
'count': 0,
'sum': 0.0,
'min': float('inf'),
'max': float('-inf'),
'window_start': metric.timestamp
}

state = self.metrics_state[metric.metric_name]
state['count'] += 1
state['sum'] += metric.value
state['min'] = min(state['min'], metric.value)
state['max'] = max(state['max'], metric.value)

# Check if window is complete
if self.should_emit_window(state, metric.timestamp):
aggregated = AggregatedMetric(
metric_name=metric.metric_name,
count=state['count'],
sum_value=state['sum'],
avg_value=state['sum'] / state['count'],
min_value=state['min'],
max_value=state['max'],
window_start=state['window_start'],
window_end=metric.timestamp
)

# Emit aggregated result
aggregated_values = aggregated.to_values()
self.emit(aggregated_values)

# Reset state for next window
self.metrics_state[metric.metric_name] = {
'count': 0,
'sum': 0.0,
'min': float('inf'),
'max': float('-inf'),
'window_start': metric.timestamp
}

Error Handling and Recovery

@inlet_data_class
@dataclass
class ProcessingTask:
task_id: str
input_data: str
retry_count: int = 0
max_retries: int = 3


@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

def can_retry(self) -> bool:
return self.retry_count < self.max_retries

def increment_retry(self) -> 'ProcessingTask':
return ProcessingTask(
task_id=self.task_id,
input_data=self.input_data,
retry_count=self.retry_count + 1,
max_retries=self.max_retries
)

@inlet_data_class
@dataclass
class FailedTask:
task_id: str
error_message: str
retry_count: int
final_failure: bool

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

class ResilientProcessingBolt(Bolt):
outputs = [
Stream(fields=["task_id", "result"], name="success"),
Stream(fields=["task_id", "input_data", "retry_count", "max_retries"],
name="retry"),
Stream(fields=["task_id", "error", "retry_count", "final"], name="failed")
]

def process(self, tup):
try:
task = ProcessingTask.from_values(tup.values)

# Attempt processing
result = self.risky_processing(task.input_data)

# Emit success
self.emit([task.task_id, result], stream="success")

except Exception as e:
task = ProcessingTask.from_values(tup.values)

if task.can_retry():
# Emit for retry
retry_task = task.increment_retry()
retry_values = retry_task.to_values()
self.emit(retry_values, stream="retry")
else:
# Emit final failure
failed = FailedTask(
task_id=task.task_id,
error_message=str(e),
retry_count=task.retry_count,
final_failure=True
)
failed_values = failed.to_values()
self.emit(failed_values, stream="failed")

Best Practices

1. Data Class Design

# ✅ Good: Clear, descriptive field names
@inlet_data_class
@dataclass
class UserActivityEvent:
user_id: str
session_id: str
action_type: str
timestamp: str
page_url: Optional[str] = None
additional_metadata: Optional[Dict[str, Any]] = None

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

# ❌ Avoid: Unclear field names
@inlet_data_class
@dataclass
class Event:
id: str
data: str
info: str

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

2. Validation and Error Handling

@inlet_data_class
@dataclass
class ValidatedInput:
email: str
age: str
country: str

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

def __post_init__(self):
# Add custom validation
if not self.email or '@' not in self.email:
raise ValueError("Invalid email format")

try:
age_int = int(self.age)
if age_int < 0 or age_int > 150:
raise ValueError("Invalid age range")
except ValueError:
raise ValueError("Age must be a valid number")

def get_age_int(self) -> int:
return int(self.age)

3. Optional Field Handling

@inlet_data_class
@dataclass
class FlexibleData:
required_field: str
optional_field: Optional[str] = None
config_field: Optional[Dict[str, Any]] = None

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

def get_config_value(self, key: str, default=None):
"""Safely extract configuration values"""
if self.config_field:
return self.config_field.get(key, default)
return default

def has_optional_data(self) -> bool:
"""Check if optional field has meaningful data"""
return self.optional_field is not None and self.optional_field.strip() != ""

4. Type Conversion Utilities

from datetime import datetime
from typing import Union
import json

@inlet_data_class
@dataclass
class TypedData:
timestamp_str: str
numeric_str: str
json_str: Optional[str] = None

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

def get_timestamp(self) -> datetime:
"""Convert timestamp string to datetime"""
return datetime.fromisoformat(self.timestamp_str)

def get_numeric_value(self) -> Union[int, float]:
"""Convert numeric string to appropriate type"""
if '.' in self.numeric_str:
return float(self.numeric_str)
return int(self.numeric_str)

def get_json_data(self) -> Optional[Dict]:
"""Parse JSON string safely"""
if self.json_str:
try:
return json.loads(self.json_str)
except json.JSONDecodeError:
return None
return None

5. Stream Naming Conventions

class WellNamedStreamsBolt(Bolt):
outputs = [
# Use descriptive stream names
Stream(fields=["user_id", "result"], name="user_processing_success"),
Stream(fields=["user_id", "error"], name="user_processing_error"),
Stream(fields=["user_id", "notification"], name="user_notification_queue"),
# Avoid generic names like "output", "stream1", etc.
]

Troubleshooting

Common Issues and Solutions

1. Tuple Length Mismatch

# Error: "Insufficient tuple values - expected at least 3"
# Solution: Check field count in data class vs tuple emission

# Data class expects 3 fields
@inlet_data_class
@dataclass
class ThreeFieldClass:
field1: str
field2: str
field3: str

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

# But only emitting 2 values
self.emit(["value1", "value2"]) # ❌ Wrong

# Fix: Emit correct number of values
self.emit(["value1", "value2", "value3"]) # ✅ Correct

2. Optional Field Handling

# Error: Optional fields causing validation issues
@inlet_data_class
@dataclass
class OptionalFieldIssue:
required: str
optional: Optional[str] = None

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

# Problem: Emitting None explicitly
self.emit(["required_value", None]) # May cause issues

# Solution: Don't emit None for optional fields
self.emit(["required_value"]) # ✅ Better approach

3. Type Conversion Errors

# Error: Cannot convert tuple value to expected type
@inlet_data_class
@dataclass
class TypeIssue:
numeric_field: str # Note: Store as string, convert when needed

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

def get_numeric(self) -> int:
try:
return int(self.numeric_field)
except ValueError:
# Handle conversion error gracefully
logger.warning(f"Cannot convert {self.numeric_field} to int")
return 0

4. Additional Data Field Issues

# Error: additional_data field not behaving as expected
@inlet_data_class
@dataclass
class AdditionalDataIssue:
main_field: str
additional_data: Optional[Dict[str, Any]] = None

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

# Problem: Expecting additional_data in every tuple
# Solution: Handle missing additional_data gracefully
def process(self, tup):
data = AdditionalDataIssue.from_values(tup.values)

# Safe access to additional_data
extra_info = data.additional_data or {}
send_report = extra_info.get('sendReport', False)

Debugging Tips

1. Enable Detailed Logging

import logging
from utils.unified_logger import UnifiedLogger

class DebuggingBolt(Bolt):
def initialize(self, conf, ctx):
self.unified_logger = UnifiedLogger()
logging.basicConfig(level=logging.DEBUG)

def process(self, tup):
# Log incoming tuple structure
self.logger.debug(f"Received tuple: {tup.values}")

try:
data = MyDataClass.from_values(tup.values)
self.logger.debug(f"Parsed data: {data}")

# Log processing steps
self.unified_logger.debug(
event_id=data.event_id,
source="DEBUG_BOLT",
event_type="PROCESSING_STEP",
message=f"Processing data for {data.identifier}"
)

except Exception as e:
self.logger.error(f"Processing failed: {e}")
self.logger.error(f"Tuple values: {tup.values}")
raise

2. Validate Data Class Design

# Test your data classes outside Storm
def test_data_class():
# Test with valid data
test_values = ["field1", "field2", "field3"]

try:
MyDataClass.validate(test_values)
instance = MyDataClass.from_values(test_values)
converted_back = instance.to_values()

print(f"Original: {test_values}")
print(f"Instance: {instance}")
print(f"Converted: {converted_back}")

except Exception as e:
print(f"Data class test failed: {e}")

# Run test before deployment
if __name__ == "__main__":
test_data_class()

3. Monitor Field Mapping

# Add debugging to understand field mapping
@inlet_data_class
@dataclass
class DebuggingDataClass:
field1: str
field2: str
optional_field: Optional[str] = None

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

@classmethod
def debug_from_values(cls, values):
"""Debug version of from_values with detailed logging"""
print(f"Input values: {values}")
print(f"Expected fields: {list(cls.__annotations__.keys())}")
print(f"Field count: expected={len(cls.__annotations__)}, actual={len(values)}")

# Call original method
return cls.from_values(values)

Performance Considerations

1. Minimize Data Class Overhead

# ✅ Efficient: Simple data classes
@inlet_data_class
@dataclass
class EfficientDataClass:
id: str
data: str

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

# ❌ Inefficient: Complex post-processing in __post_init__
@inlet_data_class
@dataclass
class InefficientDataClass:
id: str
data: str

@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass

@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass

def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass

def __post_init__(self):
# Avoid heavy processing here
self.expensive_calculation() # ❌ Bad

2. Cache Frequently Used Objects

class OptimizedBolt(Bolt):
def initialize(self, conf, ctx):
self.data_class_cache = {}

def process(self, tup):
# Cache data class instances if appropriate
cache_key = str(tup.values)

if cache_key in self.data_class_cache:
data = self.data_class_cache[cache_key]
else:
data = MyDataClass.from_values(tup.values)
# Only cache if safe to do so
if len(self.data_class_cache) < 1000:
self.data_class_cache[cache_key] = data

This comprehensive guide covers all aspects of implementing data class streaming in your Storm topology. The framework provides type safety, automatic validation, and seamless integration with Storm's tuple-based architecture while maintaining clean, maintainable code.