Python DSA for Data Engineering: Stacks & Queues
Python DSA for Data Engineering: Stacks & Queues
Master LIFO and FIFO data structures for task processing and event handling
Introduction
Stacks and Queues are fundamental data structures that model real-world scenarios. They're powerful tools for managing data flow, processing tasks, and handling system events in data engineering pipelines.
Why These Matter for Data Engineers
- Task Management: Processing jobs in order (task queues)
- Event Processing: Handling streaming data and events
- Undo/Redo Operations: Managing state changes
- Backtracking: Recursive problem solving
- Resource Management: Connection pools, buffer management
Core Concepts
1. Stack (LIFO - Last In, First Out)
Think of a stack like a pile of plates - you add to the top and remove from the top.
# Simple stack using Python list
stack = []
stack.append(1) # Push
stack.append(2)
stack.append(3)
top = stack.pop() # Pop -> 3
peek = stack[-1] # Peek -> 2
# All operations are O(1)
2. Queue (FIFO - First In, First Out)
Think of a queue like a line at a store - first person in line gets served first.
from collections import deque
# Efficient queue using deque
queue = deque()
queue.append(1) # Enqueue
queue.append(2)
queue.append(3)
first = queue.popleft() # Dequeue -> 1
# All operations are O(1)
3. Priority Queue
Elements are dequeued based on priority, not insertion order:
import heapq
class PriorityQueue:
def __init__(self):
self.items = []
def push(self, item, priority):
"""Add item with priority - Time: O(log n)"""
heapq.heappush(self.items, (priority, item))
def pop(self):
"""Remove highest priority item - Time: O(log n)"""
return heapq.heappop(self.items)[1]
# Usage
pq = PriorityQueue()
pq.push("task1", priority=3)
pq.push("task2", priority=1) # Higher priority
print(pq.pop()) # Returns "task2"
Practice Questions
Question 1: Process Task Queue with Priority (Easy-Medium)
Problem: Implement a simple task queue processor with priority.
import heapq
from dataclasses import dataclass
@dataclass
class Task:
task_id: str
priority: int
data: dict
def __lt__(self, other):
return self.priority < other.priority
class TaskQueueProcessor:
"""
Process tasks based on priority
Time: O(log n) per operation
"""
def __init__(self):
self.queue = []
self.processed = []
def add_task(self, task_id, priority, data):
"""Add task to queue"""
task = Task(task_id, priority, data)
heapq.heappush(self.queue, task)
print(f"Added task {task_id} with priority {priority}")
def process_next(self):
"""Process highest priority task"""
if not self.queue:
return None
task = heapq.heappop(self.queue)
print(f"Processing task {task.task_id}")
self.processed.append(task)
return task
# Test
processor = TaskQueueProcessor()
processor.add_task("T001", priority=3, data={"file": "data1.csv"})
processor.add_task("T002", priority=1, data={"file": "urgent.csv"})
processor.process_next() # Processes T002 first
Explanation: Priority queue ensures critical tasks processed first. Common in data pipeline orchestration (Airflow, Prefect).
Question 2: Validate Configuration Brackets (Easy)
Problem: Check if brackets in configuration files are properly balanced.
def validate_config_brackets(config_text):
"""
Validate brackets in configuration text
Time Complexity: O(n)
Space Complexity: O(n)
"""
stack = []
pairs = {
'(': ')',
'[': ']',
'{': '}'
}
for char in config_text:
if char in pairs:
# Opening bracket
stack.append(char)
elif char in pairs.values():
# Closing bracket
if not stack:
return False
if pairs[stack.pop()] != char:
return False
return len(stack) == 0
# Test
valid_config = """
{
"pipeline": {
"stages": ["extract", "transform", "load"]
}
}
"""
print("Valid:", validate_config_brackets(valid_config))
Explanation: Classic stack application for parsing JSON, XML, code. Used in configuration validation.
Question 3: Process Events in Order (Queue) (Medium)
Problem: Process streaming events maintaining order.
from collections import deque
from datetime import datetime
class EventProcessor:
"""
Process events in FIFO order
Time: O(1) per event
"""
def __init__(self, batch_size=5):
self.queue = deque()
self.batch_size = batch_size
self.processed_count = 0
def add_event(self, event):
"""Add event to processing queue"""
event['queued_at'] = datetime.now()
self.queue.append(event)
def process_batch(self):
"""Process a batch of events"""
if not self.queue:
return []
processed = []
batch_size = min(self.batch_size, len(self.queue))
for _ in range(batch_size):
event = self.queue.popleft()
event['processed_at'] = datetime.now()
processed.append(event)
self.processed_count += 1
return processed
# Test - Simulate streaming events
processor = EventProcessor(batch_size=3)
events = [
{"type": "user_login", "data": {"user_id": "U001"}},
{"type": "page_view", "data": {"page": "/home"}},
{"type": "purchase", "data": {"item": "Laptop"}}
]
for event in events:
processor.add_event(event)
processor.process_batch()
Explanation: Queue ensures FIFO processing. Common in streaming platforms (Kafka, Kinesis). Batch processing improves efficiency.
Summary
Key Takeaways
- Stack (LIFO): Last in, first out - recursion, undo operations
- Queue (FIFO): First in, first out - task processing, events
- Priority Queue: Process by importance - job scheduling
- Deque: Efficient operations at both ends - sliding windows
Common Patterns
| Pattern | Data Structure | Use Case |
|---|---|---|
| Validation | Stack | Bracket matching, parsing |
| Task Processing | Queue | FIFO job processing |
| Scheduling | Priority Queue | Task prioritization |
⚠️ Important: Use deque for queues, NOT lists! list.pop(0) is O(n), but deque.popleft() is O(1).
Tags: Python, DSA, Data Engineering, Stack, Queue, Priority Queue, Deque, Task Processing
Comments
Post a Comment