Python DSA for Data Engineering: Stacks & Queues

Python DSA for Data Engineering: Stacks & Queues

Master LIFO and FIFO data structures for task processing and event handling

Introduction

Stacks and Queues are fundamental data structures that model real-world scenarios. They're powerful tools for managing data flow, processing tasks, and handling system events in data engineering pipelines.

Why These Matter for Data Engineers

  • Task Management: Processing jobs in order (task queues)
  • Event Processing: Handling streaming data and events
  • Undo/Redo Operations: Managing state changes
  • Backtracking: Recursive problem solving
  • Resource Management: Connection pools, buffer management

Core Concepts

1. Stack (LIFO - Last In, First Out)

Think of a stack like a pile of plates - you add to the top and remove from the top.

# Simple stack using Python list
stack = []
stack.append(1)      # Push
stack.append(2)
stack.append(3)
top = stack.pop()    # Pop -> 3
peek = stack[-1]     # Peek -> 2

# All operations are O(1)

2. Queue (FIFO - First In, First Out)

Think of a queue like a line at a store - first person in line gets served first.

from collections import deque

# Efficient queue using deque
queue = deque()
queue.append(1)        # Enqueue
queue.append(2)
queue.append(3)
first = queue.popleft() # Dequeue -> 1

# All operations are O(1)

3. Priority Queue

Elements are dequeued based on priority, not insertion order:

import heapq

class PriorityQueue:
    def __init__(self):
        self.items = []
    
    def push(self, item, priority):
        """Add item with priority - Time: O(log n)"""
        heapq.heappush(self.items, (priority, item))
    
    def pop(self):
        """Remove highest priority item - Time: O(log n)"""
        return heapq.heappop(self.items)[1]

# Usage
pq = PriorityQueue()
pq.push("task1", priority=3)
pq.push("task2", priority=1)  # Higher priority
print(pq.pop())  # Returns "task2"

Practice Questions

Question 1: Process Task Queue with Priority (Easy-Medium)

Problem: Implement a simple task queue processor with priority.

import heapq
from dataclasses import dataclass

@dataclass
class Task:
    task_id: str
    priority: int
    data: dict
    
    def __lt__(self, other):
        return self.priority < other.priority

class TaskQueueProcessor:
    """
    Process tasks based on priority
    Time: O(log n) per operation
    """
    
    def __init__(self):
        self.queue = []
        self.processed = []
    
    def add_task(self, task_id, priority, data):
        """Add task to queue"""
        task = Task(task_id, priority, data)
        heapq.heappush(self.queue, task)
        print(f"Added task {task_id} with priority {priority}")
    
    def process_next(self):
        """Process highest priority task"""
        if not self.queue:
            return None
        
        task = heapq.heappop(self.queue)
        print(f"Processing task {task.task_id}")
        self.processed.append(task)
        return task

# Test
processor = TaskQueueProcessor()
processor.add_task("T001", priority=3, data={"file": "data1.csv"})
processor.add_task("T002", priority=1, data={"file": "urgent.csv"})
processor.process_next()  # Processes T002 first

Explanation: Priority queue ensures critical tasks processed first. Common in data pipeline orchestration (Airflow, Prefect).

Question 2: Validate Configuration Brackets (Easy)

Problem: Check if brackets in configuration files are properly balanced.

def validate_config_brackets(config_text):
    """
    Validate brackets in configuration text
    
    Time Complexity: O(n)
    Space Complexity: O(n)
    """
    stack = []
    pairs = {
        '(': ')',
        '[': ']',
        '{': '}'
    }
    
    for char in config_text:
        if char in pairs:
            # Opening bracket
            stack.append(char)
        elif char in pairs.values():
            # Closing bracket
            if not stack:
                return False
            if pairs[stack.pop()] != char:
                return False
    
    return len(stack) == 0

# Test
valid_config = """
{
    "pipeline": {
        "stages": ["extract", "transform", "load"]
    }
}
"""

print("Valid:", validate_config_brackets(valid_config))

Explanation: Classic stack application for parsing JSON, XML, code. Used in configuration validation.

Question 3: Process Events in Order (Queue) (Medium)

Problem: Process streaming events maintaining order.

from collections import deque
from datetime import datetime

class EventProcessor:
    """
    Process events in FIFO order
    Time: O(1) per event
    """
    
    def __init__(self, batch_size=5):
        self.queue = deque()
        self.batch_size = batch_size
        self.processed_count = 0
    
    def add_event(self, event):
        """Add event to processing queue"""
        event['queued_at'] = datetime.now()
        self.queue.append(event)
    
    def process_batch(self):
        """Process a batch of events"""
        if not self.queue:
            return []
        
        processed = []
        batch_size = min(self.batch_size, len(self.queue))
        
        for _ in range(batch_size):
            event = self.queue.popleft()
            event['processed_at'] = datetime.now()
            processed.append(event)
            self.processed_count += 1
        
        return processed

# Test - Simulate streaming events
processor = EventProcessor(batch_size=3)

events = [
    {"type": "user_login", "data": {"user_id": "U001"}},
    {"type": "page_view", "data": {"page": "/home"}},
    {"type": "purchase", "data": {"item": "Laptop"}}
]

for event in events:
    processor.add_event(event)

processor.process_batch()

Explanation: Queue ensures FIFO processing. Common in streaming platforms (Kafka, Kinesis). Batch processing improves efficiency.

Summary

Key Takeaways

  1. Stack (LIFO): Last in, first out - recursion, undo operations
  2. Queue (FIFO): First in, first out - task processing, events
  3. Priority Queue: Process by importance - job scheduling
  4. Deque: Efficient operations at both ends - sliding windows

Common Patterns

Pattern Data Structure Use Case
Validation Stack Bracket matching, parsing
Task Processing Queue FIFO job processing
Scheduling Priority Queue Task prioritization

⚠️ Important: Use deque for queues, NOT lists! list.pop(0) is O(n), but deque.popleft() is O(1).


Tags: Python, DSA, Data Engineering, Stack, Queue, Priority Queue, Deque, Task Processing

Comments