Data Class Streaming Guide
A comprehensive guide for implementing type-safe data classes for Apache Storm tuple handling with automatic validation and conversion methods.
Table of Contents
- Overview
- Decorator Architecture
- Creating Data Classes
- Stream Configuration
- Example Topologies
- Advanced Patterns
- Best Practices
- Troubleshooting
Overview
The data class streaming framework provides a type-safe, declarative approach to handling Apache Storm tuples using Python dataclasses with automatic validation, serialization, and conversion methods.
Key Benefits
- Type Safety: Compile-time type checking with IDE support
- Automatic Validation: Built-in tuple validation with meaningful error messages
- Seamless Conversion: Automatic conversion between Storm tuples and Python objects
- Reduced Boilerplate: Auto-generated methods eliminate repetitive code
- Optional Field Support: Flexible handling of optional and default values
Decorator Architecture
@inlet_data_class Decorator
The core decorator that transforms standard Python dataclasses into Storm-compatible data transfer objects.
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from decorators.inlet_data_decorator import inlet_data_class
@inlet_data_class
@dataclass
class MyDataClass:
required_field: str
optional_field: Optional[str] = None
data_field: Optional[Dict[str, Any]] = None
Auto-Generated Methods
The decorator automatically adds three essential methods to your data class:
1. validate(values: List[Any]) -> bool
# Validates tuple field count and content
# Raises ValueError with descriptive messages
values = ["test", "123", "data"]
MyDataClass.validate(values) # Returns True or raises ValueError
2. from_values(values: List[Any]) -> MyDataClass
# Converts Storm tuple to data class instance
tuple_values = ["required_value", "optional_value", {"key": "value"}]
instance = MyDataClass.from_values(tuple_values)
print(instance.required_field) # "required_value"
3. to_values() -> List[Any]
# Converts data class instance back to Storm tuple
instance = MyDataClass(required_field="test", optional_field="optional")
tuple_values = instance.to_values()
# Returns: ["test", "optional"]
Creating Data Classes
Basic Data Class Structure
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from decorators.inlet_data_decorator import inlet_data_class
@inlet_data_class
@dataclass
class ProcessingRequest:
"""Data class for processing requests"""
event_id: str
industry_id: str
timestamp: str
processing_type: str
additional_data: Optional[Dict[str, Any]] = None
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
# Optional: Add custom validation logic
def __post_init__(self):
if not self.event_id:
raise ValueError("event_id cannot be empty")
Supported Field Types
Required Fields
@inlet_data_class
@dataclass
class RequiredFieldsExample:
string_field: str # Always required
numeric_field: str # Stored as string, convert as needed
identifier_field: str # UUID, IDs, etc.
Optional Fields
@inlet_data_class
@dataclass
class OptionalFieldsExample:
required_field: str
optional_string: Optional[str] = None
optional_data: Optional[Dict[str, Any]] = None
optional_list: Optional[List[str]] = None
default_value: str = "default"
Special Field: additional_data
@inlet_data_class
@dataclass
class WithAdditionalData:
core_field: str
additional_data: Optional[Dict[str, Any]] = None
# additional_data receives special handling:
# - Automatically excluded from required field validation
# - Only included in to_values() if not None
# - Supports nested JSON structures
Real-World Examples
Batch Processing Input
@inlet_data_class
@dataclass
class BatchProcessingInput:
"""Input data for batch processing operations"""
time: str # ISO timestamp
industry_id: str # Industry identifier
event_id: str # Unique event ID
additional_data: Optional[Dict[str, Any]] = None # Extra parameters
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
def get_datetime(self) -> datetime:
"""Convert time string to datetime object"""
return datetime.fromisoformat(self.time)
def get_send_report_flag(self) -> bool:
"""Extract sendReport flag from additional_data"""
if self.additional_data:
return self.additional_data.get('sendReport', False)
return False
Report Generation Request
@inlet_data_class
@dataclass
class ReportRequest:
"""Request for automated report generation"""
industry_id: str
time: str
event_id: str
report_types: Optional[List[str]] = None # daily, monthly, yearly
email_addresses: Optional[List[str]] = None
test_mode: bool = False
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
def __post_init__(self):
# Set default report types if not provided
if self.report_types is None:
self.report_types = ["daily", "monthly"]
Stream Configuration
Defining Output Streams
Configure multiple output streams in your bolt to emit different types of data:
from streamparse import Bolt, Stream
from bolt_data_class.my_data_classes import ProcessingResult, ErrorInfo
class ProcessingBolt(Bolt):
outputs = [
Stream(fields=["industry_id", "result_data", "event_id"], name="success"),
Stream(fields=["industry_id", "error_message", "event_id"], name="error"),
Stream(fields=["industry_id", "time", "event_id"], name="report_request")
]
def process(self, tup):
try:
# Process input using data class
input_data = InputDataClass.from_values(tup.values)
# Perform processing
result = self.perform_processing(input_data)
# Emit to success stream
self.emit([
input_data.industry_id,
result.to_json(),
input_data.event_id
], stream="success")
# Conditionally emit to report stream
if input_data.needs_report():
self.emit([
input_data.industry_id,
input_data.time,
input_data.event_id
], stream="report_request")
except Exception as e:
# Emit to error stream
self.emit([
tup.values[1], # industry_id
str(e),
tup.values[2] # event_id
], stream="error")
self.fail(tup)
Connecting Streams in Topology
from streamparse import Grouping, Topology, Stream
from bolts.processing_bolt import ProcessingBolt
from bolts.report_bolt import ReportBolt
from bolts.error_handler_bolt import ErrorHandlerBolt
from spouts.event_spout import EventSpout
class MultiStreamTopology(Topology):
event_spout = EventSpout.spec()
processing_bolt = ProcessingBolt.spec(
inputs={event_spout: Grouping.fields("industry_id")},
par=4
)
report_bolt = ReportBolt.spec(
inputs={processing_bolt["report_request"]: Grouping.fields("industry_id")},
par=2
)
error_bolt = ErrorHandlerBolt.spec(
inputs={processing_bolt["error"]: Grouping.fields("industry_id")},
par=1
)
Example Topologies
Simple Linear Processing
# Data Classes
@inlet_data_class
@dataclass
class EventData:
timestamp: str
user_id: str
action: str
event_id: str
@inlet_data_class
@dataclass
class ProcessedEvent:
user_id: str
action_count: int
last_action: str
event_id: str
# Spout
class EventSpout(Spout):
outputs = ["timestamp", "user_id", "action", "event_id"]
def next_tuple(self):
event_data = self.get_next_event()
self.emit([
event_data['timestamp'],
event_data['user_id'],
event_data['action'],
event_data['event_id']
])
# Bolt
class EventProcessorBolt(Bolt):
outputs = ["user_id", "action_count", "last_action", "event_id"]
def process(self, tup):
# Convert tuple to data class
event = EventData.from_values(tup.values)
# Process the event
processed = self.process_event(event)
# Convert back to tuple and emit
self.emit([
processed.user_id,
processed.action_count,
processed.last_action,
processed.event_id
])
# Topology
class EventProcessingTopology(Topology):
event_spout = EventSpout.spec()
processor_bolt = EventProcessorBolt.spec(
inputs={event_spout: Grouping.fields("user_id")},
par=3
)
Complex Multi-Stream Processing
# Data Classes for different streams
@inlet_data_class
@dataclass
class IncomingOrder:
order_id: str
customer_id: str
amount: str
timestamp: str
additional_data: Optional[Dict[str, Any]] = None
@inlet_data_class
@dataclass
class ValidatedOrder:
order_id: str
customer_id: str
amount: float
validation_status: str
timestamp: str
@inlet_data_class
@dataclass
class PaymentRequest:
order_id: str
customer_id: str
amount: float
payment_method: str
@inlet_data_class
@dataclass
class NotificationRequest:
customer_id: str
message: str
notification_type: str
order_id: str
# Order Processing Bolt with multiple outputs
class OrderProcessingBolt(Bolt):
outputs = [
Stream(fields=["order_id", "customer_id", "amount", "status", "timestamp"],
name="validated"),
Stream(fields=["order_id", "customer_id", "amount", "payment_method"],
name="payment"),
Stream(fields=["customer_id", "message", "type", "order_id"],
name="notification"),
Stream(fields=["order_id", "error_message", "timestamp"],
name="error")
]
def process(self, tup):
try:
# Parse incoming order
order = IncomingOrder.from_values(tup.values)
# Validate order
if self.validate_order(order):
validated = ValidatedOrder(
order_id=order.order_id,
customer_id=order.customer_id,
amount=float(order.amount),
validation_status="valid",
timestamp=order.timestamp
)
# Emit validated order
self.emit([
validated.order_id,
validated.customer_id,
validated.amount,
validated.validation_status,
validated.timestamp
], stream="validated")
# Create payment request
payment_method = order.additional_data.get('payment_method', 'card') \
if order.additional_data else 'card'
payment_req = PaymentRequest(
order_id=order.order_id,
customer_id=order.customer_id,
amount=validated.amount,
payment_method=payment_method
)
# Emit payment request
self.emit([
payment_req.order_id,
payment_req.customer_id,
payment_req.amount,
payment_req.payment_method
], stream="payment")
# Create notification
notification = NotificationRequest(
customer_id=order.customer_id,
message=f"Order {order.order_id} is being processed",
notification_type="order_update",
order_id=order.order_id
)
# Emit notification
self.emit([
notification.customer_id,
notification.message,
notification.notification_type,
notification.order_id
], stream="notification")
else:
# Emit error for invalid orders
self.emit([
order.order_id,
"Order validation failed",
order.timestamp
], stream="error")
except Exception as e:
# Handle processing errors
self.emit([
"unknown",
f"Processing error: {str(e)}",
datetime.now().isoformat()
], stream="error")
self.fail(tup)
# Topology connecting multiple streams
class OrderProcessingTopology(Topology):
config = {
"topology.message.timeout.secs": 300,
"topology.max.spout.pending": 1000
}
order_spout = OrderSpout.spec()
order_processor = OrderProcessingBolt.spec(
inputs={order_spout: Grouping.fields("customer_id")},
par=4
)
payment_processor = PaymentProcessorBolt.spec(
inputs={order_processor["payment"]: Grouping.fields("customer_id")},
par=2
)
notification_sender = NotificationBolt.spec(
inputs={order_processor["notification"]: Grouping.fields("customer_id")},
par=3
)
error_handler = ErrorHandlerBolt.spec(
inputs={order_processor["error"]: Grouping.shuffle()},
par=1
)
Advanced Patterns
Conditional Stream Routing
@inlet_data_class
@dataclass
class ConditionalData:
data_type: str
payload: str
priority: str
event_id: str
def is_high_priority(self) -> bool:
return self.priority == "high"
def get_routing_key(self) -> str:
return f"{self.data_type}_{self.priority}"
class ConditionalRoutingBolt(Bolt):
outputs = [
Stream(fields=["payload", "event_id"], name="high_priority"),
Stream(fields=["payload", "event_id"], name="normal_priority"),
Stream(fields=["data_type", "payload", "event_id"], name="special_processing")
]
def process(self, tup):
data = ConditionalData.from_values(tup.values)
# Route based on data characteristics
if data.is_high_priority():
self.emit([data.payload, data.event_id], stream="high_priority")
elif data.data_type == "special":
self.emit([data.data_type, data.payload, data.event_id],
stream="special_processing")
else:
self.emit([data.payload, data.event_id], stream="normal_priority")
Aggregation with State Management
@inlet_data_class
@dataclass
class MetricEvent:
metric_name: str
value: float
timestamp: str
tags: Optional[Dict[str, str]] = None
@inlet_data_class
@dataclass
class AggregatedMetric:
metric_name: str
count: int
sum_value: float
avg_value: float
min_value: float
max_value: float
window_start: str
window_end: str
class MetricAggregatorBolt(Bolt):
outputs = ["metric_name", "count", "sum", "avg", "min", "max", "start", "end"]
def initialize(self, conf, ctx):
self.metrics_state = {}
self.window_size = 60 # 60 seconds
def process(self, tup):
metric = MetricEvent.from_values(tup.values)
# Update aggregation state
if metric.metric_name not in self.metrics_state:
self.metrics_state[metric.metric_name] = {
'count': 0,
'sum': 0.0,
'min': float('inf'),
'max': float('-inf'),
'window_start': metric.timestamp
}
state = self.metrics_state[metric.metric_name]
state['count'] += 1
state['sum'] += metric.value
state['min'] = min(state['min'], metric.value)
state['max'] = max(state['max'], metric.value)
# Check if window is complete
if self.should_emit_window(state, metric.timestamp):
aggregated = AggregatedMetric(
metric_name=metric.metric_name,
count=state['count'],
sum_value=state['sum'],
avg_value=state['sum'] / state['count'],
min_value=state['min'],
max_value=state['max'],
window_start=state['window_start'],
window_end=metric.timestamp
)
# Emit aggregated result
aggregated_values = aggregated.to_values()
self.emit(aggregated_values)
# Reset state for next window
self.metrics_state[metric.metric_name] = {
'count': 0,
'sum': 0.0,
'min': float('inf'),
'max': float('-inf'),
'window_start': metric.timestamp
}
Error Handling and Recovery
@inlet_data_class
@dataclass
class ProcessingTask:
task_id: str
input_data: str
retry_count: int = 0
max_retries: int = 3
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
def can_retry(self) -> bool:
return self.retry_count < self.max_retries
def increment_retry(self) -> 'ProcessingTask':
return ProcessingTask(
task_id=self.task_id,
input_data=self.input_data,
retry_count=self.retry_count + 1,
max_retries=self.max_retries
)
@inlet_data_class
@dataclass
class FailedTask:
task_id: str
error_message: str
retry_count: int
final_failure: bool
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
class ResilientProcessingBolt(Bolt):
outputs = [
Stream(fields=["task_id", "result"], name="success"),
Stream(fields=["task_id", "input_data", "retry_count", "max_retries"],
name="retry"),
Stream(fields=["task_id", "error", "retry_count", "final"], name="failed")
]
def process(self, tup):
try:
task = ProcessingTask.from_values(tup.values)
# Attempt processing
result = self.risky_processing(task.input_data)
# Emit success
self.emit([task.task_id, result], stream="success")
except Exception as e:
task = ProcessingTask.from_values(tup.values)
if task.can_retry():
# Emit for retry
retry_task = task.increment_retry()
retry_values = retry_task.to_values()
self.emit(retry_values, stream="retry")
else:
# Emit final failure
failed = FailedTask(
task_id=task.task_id,
error_message=str(e),
retry_count=task.retry_count,
final_failure=True
)
failed_values = failed.to_values()
self.emit(failed_values, stream="failed")
Best Practices
1. Data Class Design
# ✅ Good: Clear, descriptive field names
@inlet_data_class
@dataclass
class UserActivityEvent:
user_id: str
session_id: str
action_type: str
timestamp: str
page_url: Optional[str] = None
additional_metadata: Optional[Dict[str, Any]] = None
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
# ❌ Avoid: Unclear field names
@inlet_data_class
@dataclass
class Event:
id: str
data: str
info: str
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
2. Validation and Error Handling
@inlet_data_class
@dataclass
class ValidatedInput:
email: str
age: str
country: str
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
def __post_init__(self):
# Add custom validation
if not self.email or '@' not in self.email:
raise ValueError("Invalid email format")
try:
age_int = int(self.age)
if age_int < 0 or age_int > 150:
raise ValueError("Invalid age range")
except ValueError:
raise ValueError("Age must be a valid number")
def get_age_int(self) -> int:
return int(self.age)
3. Optional Field Handling
@inlet_data_class
@dataclass
class FlexibleData:
required_field: str
optional_field: Optional[str] = None
config_field: Optional[Dict[str, Any]] = None
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
def get_config_value(self, key: str, default=None):
"""Safely extract configuration values"""
if self.config_field:
return self.config_field.get(key, default)
return default
def has_optional_data(self) -> bool:
"""Check if optional field has meaningful data"""
return self.optional_field is not None and self.optional_field.strip() != ""
4. Type Conversion Utilities
from datetime import datetime
from typing import Union
import json
@inlet_data_class
@dataclass
class TypedData:
timestamp_str: str
numeric_str: str
json_str: Optional[str] = None
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
def get_timestamp(self) -> datetime:
"""Convert timestamp string to datetime"""
return datetime.fromisoformat(self.timestamp_str)
def get_numeric_value(self) -> Union[int, float]:
"""Convert numeric string to appropriate type"""
if '.' in self.numeric_str:
return float(self.numeric_str)
return int(self.numeric_str)
def get_json_data(self) -> Optional[Dict]:
"""Parse JSON string safely"""
if self.json_str:
try:
return json.loads(self.json_str)
except json.JSONDecodeError:
return None
return None
5. Stream Naming Conventions
class WellNamedStreamsBolt(Bolt):
outputs = [
# Use descriptive stream names
Stream(fields=["user_id", "result"], name="user_processing_success"),
Stream(fields=["user_id", "error"], name="user_processing_error"),
Stream(fields=["user_id", "notification"], name="user_notification_queue"),
# Avoid generic names like "output", "stream1", etc.
]
Troubleshooting
Common Issues and Solutions
1. Tuple Length Mismatch
# Error: "Insufficient tuple values - expected at least 3"
# Solution: Check field count in data class vs tuple emission
# Data class expects 3 fields
@inlet_data_class
@dataclass
class ThreeFieldClass:
field1: str
field2: str
field3: str
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
# But only emitting 2 values
self.emit(["value1", "value2"]) # ❌ Wrong
# Fix: Emit correct number of values
self.emit(["value1", "value2", "value3"]) # ✅ Correct
2. Optional Field Handling
# Error: Optional fields causing validation issues
@inlet_data_class
@dataclass
class OptionalFieldIssue:
required: str
optional: Optional[str] = None
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
# Problem: Emitting None explicitly
self.emit(["required_value", None]) # May cause issues
# Solution: Don't emit None for optional fields
self.emit(["required_value"]) # ✅ Better approach
3. Type Conversion Errors
# Error: Cannot convert tuple value to expected type
@inlet_data_class
@dataclass
class TypeIssue:
numeric_field: str # Note: Store as string, convert when needed
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
def get_numeric(self) -> int:
try:
return int(self.numeric_field)
except ValueError:
# Handle conversion error gracefully
logger.warning(f"Cannot convert {self.numeric_field} to int")
return 0
4. Additional Data Field Issues
# Error: additional_data field not behaving as expected
@inlet_data_class
@dataclass
class AdditionalDataIssue:
main_field: str
additional_data: Optional[Dict[str, Any]] = None
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
# Problem: Expecting additional_data in every tuple
# Solution: Handle missing additional_data gracefully
def process(self, tup):
data = AdditionalDataIssue.from_values(tup.values)
# Safe access to additional_data
extra_info = data.additional_data or {}
send_report = extra_info.get('sendReport', False)
Debugging Tips
1. Enable Detailed Logging
import logging
from utils.unified_logger import UnifiedLogger
class DebuggingBolt(Bolt):
def initialize(self, conf, ctx):
self.unified_logger = UnifiedLogger()
logging.basicConfig(level=logging.DEBUG)
def process(self, tup):
# Log incoming tuple structure
self.logger.debug(f"Received tuple: {tup.values}")
try:
data = MyDataClass.from_values(tup.values)
self.logger.debug(f"Parsed data: {data}")
# Log processing steps
self.unified_logger.debug(
event_id=data.event_id,
source="DEBUG_BOLT",
event_type="PROCESSING_STEP",
message=f"Processing data for {data.identifier}"
)
except Exception as e:
self.logger.error(f"Processing failed: {e}")
self.logger.error(f"Tuple values: {tup.values}")
raise
2. Validate Data Class Design
# Test your data classes outside Storm
def test_data_class():
# Test with valid data
test_values = ["field1", "field2", "field3"]
try:
MyDataClass.validate(test_values)
instance = MyDataClass.from_values(test_values)
converted_back = instance.to_values()
print(f"Original: {test_values}")
print(f"Instance: {instance}")
print(f"Converted: {converted_back}")
except Exception as e:
print(f"Data class test failed: {e}")
# Run test before deployment
if __name__ == "__main__":
test_data_class()
3. Monitor Field Mapping
# Add debugging to understand field mapping
@inlet_data_class
@dataclass
class DebuggingDataClass:
field1: str
field2: str
optional_field: Optional[str] = None
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
@classmethod
def debug_from_values(cls, values):
"""Debug version of from_values with detailed logging"""
print(f"Input values: {values}")
print(f"Expected fields: {list(cls.__annotations__.keys())}")
print(f"Field count: expected={len(cls.__annotations__)}, actual={len(values)}")
# Call original method
return cls.from_values(values)
Performance Considerations
1. Minimize Data Class Overhead
# ✅ Efficient: Simple data classes
@inlet_data_class
@dataclass
class EfficientDataClass:
id: str
data: str
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
# ❌ Inefficient: Complex post-processing in __post_init__
@inlet_data_class
@dataclass
class InefficientDataClass:
id: str
data: str
@staticmethod
def validate(values: List[Any]) -> bool:
"""Auto-generated validation method - implementation provided by decorator"""
pass
@staticmethod
def from_values(values: List[Any]) -> 'BPBoltInletData':
"""Auto-generated from_values method - implementation provided by decorator"""
pass
def to_values(self) -> List[Any]:
"""Auto-generated to_values method - implementation provided by decorator"""
pass
def __post_init__(self):
# Avoid heavy processing here
self.expensive_calculation() # ❌ Bad
2. Cache Frequently Used Objects
class OptimizedBolt(Bolt):
def initialize(self, conf, ctx):
self.data_class_cache = {}
def process(self, tup):
# Cache data class instances if appropriate
cache_key = str(tup.values)
if cache_key in self.data_class_cache:
data = self.data_class_cache[cache_key]
else:
data = MyDataClass.from_values(tup.values)
# Only cache if safe to do so
if len(self.data_class_cache) < 1000:
self.data_class_cache[cache_key] = data
This comprehensive guide covers all aspects of implementing data class streaming in your Storm topology. The framework provides type safety, automatic validation, and seamless integration with Storm's tuple-based architecture while maintaining clean, maintainable code.