Skip to content

Heap

A heap is a specialized tree-based data structure that satisfies the heap property. In a min-heap, for any given node, the value is less than or equal to the values of its children. In a max-heap, the value is greater than or equal to the values of its children. This implementation uses an array-based representation for optimal memory usage and performance.

Features

  • Heap property maintenance - automatically maintains min/max heap property
  • Efficient operations - O(log n) insertion and extraction
  • Array-based structure - complete binary tree stored in array
  • Dynamic sizing - automatically grows and shrinks as needed
  • Type safe - generic implementation with type checking
  • Key function support - custom comparison logic
  • Memory efficient - compact array representation
  • Build heap operation - O(n) heap construction from array

Time Complexities

Operation Time Complexity Description
Insert O(log n) Add element maintaining heap property
Extract O(log n) Remove root element
Peek O(1) View root element
Build Heap O(n) Create heap from array
Size/Length O(1) Get number of elements
Clear O(1) Remove all elements
Search/Contains O(n) Find element in heap

Space Complexity: O(n) where n is the number of elements

Basic Usage

Creating and Initializing

from algo.data_structs.trees.heap import Heap

# Create empty min-heap (default)
min_heap = Heap()
print(min_heap.is_empty())    # True
print(len(min_heap))          # 0
print(min_heap.size())        # 0

# Create empty max-heap
max_heap = Heap(is_min_heap=False)
print(max_heap.is_empty())    # True

# Heap is generic - can specify type
int_heap = Heap[int]()
str_heap = Heap[str](is_min_heap=False)

Understanding Heap Types

# Min-heap: parent ≤ children (smallest element at root)
min_heap = Heap(is_min_heap=True)

# Max-heap: parent ≥ children (largest element at root)
max_heap = Heap(is_min_heap=False)

# The heap property is maintained automatically
print(f"Min-heap type: {min_heap.is_min_heap}")  # True
print(f"Max-heap type: {max_heap.is_min_heap}")  # False

Adding Elements (Insert)

# Insert into min-heap
min_heap = Heap()
min_heap.insert(10)
min_heap.insert(5)
min_heap.insert(15)
min_heap.insert(3)
min_heap.insert(8)

print(len(min_heap))       # 5
print(min_heap.peek())     # 3 (minimum element at root)

# Insert into max-heap
max_heap = Heap(is_min_heap=False)
max_heap.insert(10)
max_heap.insert(5)
max_heap.insert(15)
max_heap.insert(3)
max_heap.insert(8)

print(len(max_heap))       # 5
print(max_heap.peek())     # 15 (maximum element at root)

# Insert preserves heap property automatically
elements = [20, 1, 25, 30]
for element in elements:
    min_heap.insert(element)

print(min_heap.peek())     # 1 (new minimum)

Viewing Elements

# Peek at root element without removing
root_element = min_heap.peek()
print(root_element)        # 1 (minimum in min-heap)
print(len(min_heap))       # Size unchanged

# Check if heap contains element
print(10 in min_heap)      # True
print(99 in min_heap)      # False

# Convert to list (array order, not sorted)
elements = min_heap.to_list()
print(elements)            # [1, 3, 15, 10, 8, 20, 25, 30, 5]
print(f"Array representation: {elements}")

Removing Elements (Extract)

# Extract elements from min-heap (ascending order)
min_heap = Heap()
for value in [10, 5, 15, 3, 8, 12, 20]:
    min_heap.insert(value)

extracted = []
while not min_heap.is_empty():
    extracted.append(min_heap.extract())

print(extracted)           # [3, 5, 8, 10, 12, 15, 20] (sorted ascending)

# Extract elements from max-heap (descending order)
max_heap = Heap(is_min_heap=False)
for value in [10, 5, 15, 3, 8, 12, 20]:
    max_heap.insert(value)

extracted = []
while not max_heap.is_empty():
    extracted.append(max_heap.extract())

print(extracted)           # [20, 15, 12, 10, 8, 5, 3] (sorted descending)

Clearing the Heap

# Add some elements
heap = Heap()
heap.insert(1)
heap.insert(2)
heap.insert(3)
print(len(heap))           # 3

# Clear all elements
heap.clear()
print(len(heap))           # 0
print(heap.is_empty())     # True

Building Heaps from Arrays

Efficient Heap Construction

# Build heap from existing array (O(n) time complexity)
heap = Heap()
array = [10, 5, 15, 3, 8, 12, 20]

# Build heap efficiently
heap.build_heap(array)
print(len(heap))           # 7
print(heap.peek())         # 3 (minimum element)

# Original array is preserved (heap makes a copy)
print(array)               # [10, 5, 15, 3, 8, 12, 20] (unchanged)

# Extract to verify heap property
sorted_elements = []
while not heap.is_empty():
    sorted_elements.append(heap.extract())
print(sorted_elements)     # [3, 5, 8, 10, 12, 15, 20]

Building Different Heap Types

# Build max-heap from array
max_heap = Heap(is_min_heap=False)
max_heap.build_heap([10, 5, 15, 3, 8, 12, 20])

print(max_heap.peek())     # 20 (maximum element)

# Extract to verify max-heap property
descending = []
while not max_heap.is_empty():
    descending.append(max_heap.extract())
print(descending)          # [20, 15, 12, 10, 8, 5, 3]

# Build from empty array
empty_heap = Heap()
empty_heap.build_heap([])
print(empty_heap.is_empty())  # True

Working with Different Types

String Elements

# Heap with strings (lexicographic ordering)
str_heap = Heap()
words = ["apple", "banana", "cherry", "date", "elderberry"]

for word in words:
    str_heap.insert(word)

print(str_heap.peek())     # "apple" (lexicographically first)

# Extract in alphabetical order
alphabetical = []
while not str_heap.is_empty():
    alphabetical.append(str_heap.extract())
print(alphabetical)        # ['apple', 'banana', 'cherry', 'date', 'elderberry']

Numeric Elements

# Heap with floating-point numbers
float_heap = Heap()
values = [3.14, 2.71, 1.41, 4.67, 0.57]

for value in values:
    float_heap.insert(value)

print(f"Minimum: {float_heap.peek()}")  # 0.57

# Mixed integers and floats
mixed_heap = Heap()
mixed_heap.insert(5)
mixed_heap.insert(3.14)
mixed_heap.insert(7)
mixed_heap.insert(2.5)

print(f"Minimum: {mixed_heap.peek()}")  # 2.5

Custom Objects with Key Function

class Task:
    def __init__(self, name: str, priority: int):
        self.name = name
        self.priority = priority

    def __repr__(self):
        return f"Task({self.name}, p={self.priority})"

# Use key function to extract comparison value
task_heap = Heap(key=lambda task: task.priority)

# Add tasks with different priorities
tasks = [
    Task("urgent", 1),
    Task("normal", 5),
    Task("low", 10),
    Task("critical", 0)
]

for task in tasks:
    task_heap.insert(task)

# Extract in priority order (lower number = higher priority)
while not task_heap.is_empty():
    task = task_heap.extract()
    print(f"Processing: {task}")
# Output:
# Processing: Task(critical, p=0)
# Processing: Task(urgent, p=1)
# Processing: Task(normal, p=5)
# Processing: Task(low, p=10)

Custom Objects with Natural Ordering

class Student:
    def __init__(self, name: str, grade: float):
        self.name = name
        self.grade = grade

    def __lt__(self, other):
        return self.grade < other.grade  # Compare by grade

    def __eq__(self, other):
        return isinstance(other, Student) and self.grade == other.grade

    def __repr__(self):
        return f"Student({self.name}, {self.grade})"

# Heap with custom objects (uses __lt__ for comparison)
student_heap = Heap()
students = [
    Student("Alice", 85.5),
    Student("Bob", 92.0),
    Student("Charlie", 78.5),
    Student("Diana", 96.5)
]

for student in students:
    student_heap.insert(student)

# Extract students by grade (lowest first in min-heap)
print("Students by grade (lowest first):")
while not student_heap.is_empty():
    student = student_heap.extract()
    print(f"  {student}")

Heap Iteration and Inspection

Iteration

heap = Heap()
heap.build_heap([5, 2, 8, 1, 9, 3])

# Iterate over heap elements (array order, not sorted)
print("Heap elements (array order):")
for element in heap:
    print(element, end=" ")  # 1 2 3 5 9 8
print()

# Convert to list for inspection
array_representation = list(heap)
print(f"Array representation: {array_representation}")

# Note: iteration doesn't consume the heap
print(f"Heap size after iteration: {len(heap)}")  # Still 6

String Representation

# Empty heap
empty_heap = Heap()
print(str(empty_heap))     # MinHeap([])

# Non-empty min-heap
min_heap = Heap()
min_heap.build_heap([3, 1, 4, 1, 5])
print(str(min_heap))       # MinHeap([1, 1, 4, 3, 5])

# Non-empty max-heap
max_heap = Heap(is_min_heap=False)
max_heap.build_heap([3, 1, 4, 1, 5])
print(str(max_heap))       # MaxHeap([5, 3, 4, 1, 1])

# Detailed representation for debugging
print(repr(min_heap))      # Heap(type=min, size=5, data=[1, 1, 4, 3, 5])
print(repr(max_heap))      # Heap(type=max, size=5, data=[5, 3, 4, 1, 1])

Error Handling

heap = Heap()

# Safe operations on empty heap
print(heap.is_empty())     # True
print(len(heap))           # 0
print(42 in heap)          # False

# Operations that raise errors on empty heap
try:
    heap.extract()
except IndexError as e:
    print(f"Error: {e}")   # Error: Heap is empty

try:
    heap.peek()
except IndexError as e:
    print(f"Error: {e}")   # Error: Heap is empty

# Single element operations
heap.insert(42)
print(heap.peek())         # 42
print(heap.extract())      # 42
print(heap.is_empty())     # True

Common Patterns

Priority Queue Implementation

class PriorityQueue:
    """Priority queue using min-heap (lower number = higher priority)."""

    def __init__(self):
        self.heap = Heap()

    def enqueue(self, item, priority):
        """Add item with priority."""
        self.heap.insert((priority, item))

    def dequeue(self):
        """Remove and return highest priority item."""
        if self.heap.is_empty():
            raise IndexError("Priority queue is empty")
        priority, item = self.heap.extract()
        return item

    def peek(self):
        """View highest priority item without removing."""
        if self.heap.is_empty():
            raise IndexError("Priority queue is empty")
        priority, item = self.heap.peek()
        return item

    def is_empty(self):
        return self.heap.is_empty()

    def size(self):
        return len(self.heap)

# Usage
pq = PriorityQueue()
pq.enqueue("Low priority task", 10)
pq.enqueue("High priority task", 1)
pq.enqueue("Medium priority task", 5)

while not pq.is_empty():
    task = pq.dequeue()
    print(f"Processing: {task}")
# Output:
# Processing: High priority task
# Processing: Medium priority task
# Processing: Low priority task

Heap Sort Implementation

def heap_sort(arr):
    """Sort array using heap sort algorithm."""
    # Build max-heap from array
    max_heap = Heap(is_min_heap=False)
    max_heap.build_heap(arr.copy())

    # Extract elements in descending order
    sorted_desc = []
    while not max_heap.is_empty():
        sorted_desc.append(max_heap.extract())

    # Reverse to get ascending order
    return sorted_desc[::-1]

# Test heap sort
unsorted = [64, 34, 25, 12, 22, 11, 90]
sorted_array = heap_sort(unsorted)
print(f"Original: {unsorted}")
print(f"Sorted:   {sorted_array}")

# Verify it's sorted
print(f"Is sorted: {sorted_array == sorted(unsorted)}")

Finding K Largest/Smallest Elements

def find_k_largest(arr, k):
    """Find k largest elements using min-heap."""
    if k <= 0:
        return []

    min_heap = Heap()  # Min-heap to track k largest

    for num in arr:
        if len(min_heap) < k:
            min_heap.insert(num)
        elif num > min_heap.peek():
            min_heap.extract()  # Remove smallest of the k largest
            min_heap.insert(num)

    # Extract elements and reverse to get descending order
    result = []
    while not min_heap.is_empty():
        result.append(min_heap.extract())

    return result[::-1]  # Reverse for descending order

def find_k_smallest(arr, k):
    """Find k smallest elements using max-heap."""
    if k <= 0:
        return []

    max_heap = Heap(is_min_heap=False)  # Max-heap to track k smallest

    for num in arr:
        if len(max_heap) < k:
            max_heap.insert(num)
        elif num < max_heap.peek():
            max_heap.extract()  # Remove largest of the k smallest
            max_heap.insert(num)

    # Extract elements and reverse to get ascending order
    result = []
    while not max_heap.is_empty():
        result.append(max_heap.extract())

    return result[::-1]  # Reverse for ascending order

# Test finding k largest/smallest
numbers = [3, 2, 1, 5, 6, 4, 7, 8]

k_largest = find_k_largest(numbers, 3)
print(f"3 largest: {k_largest}")      # [8, 7, 6]

k_smallest = find_k_smallest(numbers, 3)
print(f"3 smallest: {k_smallest}")    # [1, 2, 3]

Running Median

class RunningMedian:
    """Maintain running median using two heaps."""

    def __init__(self):
        self.max_heap = Heap(is_min_heap=False)  # Lower half
        self.min_heap = Heap(is_min_heap=True)   # Upper half

    def add_number(self, num):
        """Add a number and maintain median property."""
        # Add to appropriate heap
        if self.max_heap.is_empty() or num <= self.max_heap.peek():
            self.max_heap.insert(num)
        else:
            self.min_heap.insert(num)

        # Balance heaps (difference should be at most 1)
        if len(self.max_heap) > len(self.min_heap) + 1:
            # Move from max_heap to min_heap
            self.min_heap.insert(self.max_heap.extract())
        elif len(self.min_heap) > len(self.max_heap) + 1:
            # Move from min_heap to max_heap
            self.max_heap.insert(self.min_heap.extract())

    def get_median(self):
        """Get current median."""
        if len(self.max_heap) == len(self.min_heap):
            if self.max_heap.is_empty():
                return None
            return (self.max_heap.peek() + self.min_heap.peek()) / 2
        elif len(self.max_heap) > len(self.min_heap):
            return self.max_heap.peek()
        else:
            return self.min_heap.peek()

# Usage
median_tracker = RunningMedian()
numbers = [5, 15, 1, 3, 9, 8, 7, 12]

print("Adding numbers and tracking median:")
for num in numbers:
    median_tracker.add_number(num)
    print(f"Added {num}, median: {median_tracker.get_median()}")

Memory-Efficient Merge K Sorted Arrays

def merge_k_sorted_arrays(arrays):
    """Merge k sorted arrays using min-heap."""
    if not arrays:
        return []

    # Heap contains tuples: (value, array_index, element_index)
    min_heap = Heap()

    # Initialize heap with first element from each array
    for i, arr in enumerate(arrays):
        if arr:  # Skip empty arrays
            min_heap.insert((arr[0], i, 0))

    result = []

    while not min_heap.is_empty():
        value, array_idx, elem_idx = min_heap.extract()
        result.append(value)

        # Add next element from the same array
        if elem_idx + 1 < len(arrays[array_idx]):
            next_value = arrays[array_idx][elem_idx + 1]
            min_heap.insert((next_value, array_idx, elem_idx + 1))

    return result

# Test merging sorted arrays
sorted_arrays = [
    [1, 4, 5],
    [1, 3, 4],
    [2, 6]
]

merged = merge_k_sorted_arrays(sorted_arrays)
print(f"Merged: {merged}")  # [1, 1, 2, 3, 4, 4, 5, 6]

Practical Applications

Task Scheduling System

import time
from datetime import datetime, timedelta

class ScheduledTask:
    def __init__(self, name: str, execute_time: datetime, priority: int = 0):
        self.name = name
        self.execute_time = execute_time
        self.priority = priority

    def __lt__(self, other):
        # First compare by execution time, then by priority
        if self.execute_time != other.execute_time:
            return self.execute_time < other.execute_time
        return self.priority < other.priority

    def __repr__(self):
        return f"Task({self.name}, {self.execute_time.strftime('%H:%M:%S')}, p={self.priority})"

class TaskScheduler:
    def __init__(self):
        self.task_heap = Heap()  # Min-heap for earliest tasks first

    def schedule_task(self, name: str, delay_seconds: int, priority: int = 0):
        """Schedule a task to run after delay_seconds."""
        execute_time = datetime.now() + timedelta(seconds=delay_seconds)
        task = ScheduledTask(name, execute_time, priority)
        self.task_heap.insert(task)
        print(f"Scheduled: {task}")

    def run_scheduler(self, duration_seconds: int = 30):
        """Run scheduler for specified duration."""
        end_time = datetime.now() + timedelta(seconds=duration_seconds)

        print(f"Starting scheduler for {duration_seconds} seconds...")

        while datetime.now() < end_time:
            if not self.task_heap.is_empty():
                next_task = self.task_heap.peek()

                if datetime.now() >= next_task.execute_time:
                    task = self.task_heap.extract()
                    print(f"Executing: {task.name} at {datetime.now().strftime('%H:%M:%S')}")
                else:
                    # Sleep until next task or check interval
                    sleep_time = min(1, (next_task.execute_time - datetime.now()).total_seconds())
                    if sleep_time > 0:
                        time.sleep(sleep_time)
            else:
                time.sleep(1)  # No tasks, wait

        print("Scheduler stopped.")

# Example usage (commented out to avoid actual delays in documentation)
# scheduler = TaskScheduler()
# scheduler.schedule_task("Send email", 5, priority=1)
# scheduler.schedule_task("Backup data", 10, priority=2)
# scheduler.schedule_task("Update cache", 3, priority=0)
# scheduler.run_scheduler(15)

Load Balancing System

class Server:
    def __init__(self, name: str, capacity: int):
        self.name = name
        self.capacity = capacity
        self.current_load = 0

    def __lt__(self, other):
        # Compare by current load (for min-heap)
        return self.current_load < other.current_load

    def add_load(self, load: int):
        self.current_load += load

    def remove_load(self, load: int):
        self.current_load = max(0, self.current_load - load)

    def utilization(self):
        return self.current_load / self.capacity if self.capacity > 0 else 0

    def __repr__(self):
        return f"Server({self.name}, load={self.current_load}/{self.capacity})"

class LoadBalancer:
    def __init__(self):
        self.server_heap = Heap()  # Min-heap for least loaded server
        self.servers = {}  # name -> server mapping

    def add_server(self, name: str, capacity: int):
        """Add a server to the load balancer."""
        server = Server(name, capacity)
        self.servers[name] = server
        self.server_heap.insert(server)
        print(f"Added server: {server}")

    def assign_request(self, request_load: int):
        """Assign request to least loaded server."""
        if self.server_heap.is_empty():
            print("No servers available")
            return None

        # Get least loaded server
        server = self.server_heap.extract()

        if server.current_load + request_load <= server.capacity:
            server.add_load(request_load)
            print(f"Assigned load {request_load} to {server.name}")

            # Re-insert server back into heap with updated load
            self.server_heap.insert(server)
            return server.name
        else:
            # Server at capacity, put it back and try next
            self.server_heap.insert(server)
            print(f"Server {server.name} at capacity")
            return None

    def complete_request(self, server_name: str, request_load: int):
        """Mark request as completed, reducing server load."""
        if server_name in self.servers:
            server = self.servers[server_name]
            server.remove_load(request_load)
            print(f"Completed request on {server_name}, load now {server.current_load}")

            # Rebuild heap to maintain correct ordering
            # In practice, you'd use a more sophisticated approach
            self._rebuild_heap()

    def _rebuild_heap(self):
        """Rebuild heap with current server states."""
        servers = list(self.servers.values())
        self.server_heap.clear()
        for server in servers:
            self.server_heap.insert(server)

    def show_status(self):
        """Show current load balancer status."""
        print("\nLoad Balancer Status:")
        for server in self.servers.values():
            util = server.utilization() * 100
            print(f"  {server} - Utilization: {util:.1f}%")

# Example usage
lb = LoadBalancer()
lb.add_server("Server-A", 100)
lb.add_server("Server-B", 150)
lb.add_server("Server-C", 120)

# Simulate request assignments
requests = [30, 40, 25, 60, 35, 20]
for i, load in enumerate(requests):
    print(f"\nRequest {i+1} (load={load}):")
    lb.assign_request(load)

lb.show_status()

# Simulate request completion
lb.complete_request("Server-A", 30)
lb.complete_request("Server-B", 40)
lb.show_status()

Stock Price Monitoring

from collections import defaultdict

class StockPrice:
    def __init__(self, symbol: str, price: float, timestamp: int):
        self.symbol = symbol
        self.price = price
        self.timestamp = timestamp

    def __repr__(self):
        return f"{self.symbol}@{self.price}"

class StockMonitor:
    def __init__(self):
        # Track top gainers (max-heap by price change)
        self.gainers_heap = Heap(is_min_heap=False)
        # Track top losers (min-heap by price change)
        self.losers_heap = Heap(is_min_heap=True)
        # Track highest prices (max-heap)
        self.high_prices_heap = Heap(is_min_heap=False)

        self.current_prices = {}  # symbol -> price
        self.price_changes = {}   # symbol -> change amount

    def update_price(self, symbol: str, new_price: float):
        """Update stock price and maintain monitoring heaps."""
        old_price = self.current_prices.get(symbol, new_price)
        change = new_price - old_price

        self.current_prices[symbol] = new_price
        self.price_changes[symbol] = change

        # Update heaps (simplified - in practice you'd handle duplicates better)
        stock = StockPrice(symbol, new_price, 0)

        # For gainers/losers, we use the change amount
        if change > 0:
            self.gainers_heap.insert((change, stock))
        elif change < 0:
            self.losers_heap.insert((change, stock))

        # For high prices, use the actual price
        self.high_prices_heap.insert((new_price, stock))

        print(f"Updated {symbol}: ${old_price:.2f} -> ${new_price:.2f} ({change:+.2f})")

    def get_top_gainers(self, n=3):
        """Get top n gainers."""
        gainers = []
        temp_heap = Heap(is_min_heap=False)

        # Extract top gainers
        count = 0
        while not self.gainers_heap.is_empty() and count < n:
            change, stock = self.gainers_heap.extract()
            gainers.append((stock.symbol, change))
            temp_heap.insert((change, stock))
            count += 1

        # Restore heap
        while not temp_heap.is_empty():
            self.gainers_heap.insert(temp_heap.extract())

        return gainers

    def get_top_losers(self, n=3):
        """Get top n losers."""
        losers = []
        temp_heap = Heap(is_min_heap=True)

        # Extract top losers
        count = 0
        while not self.losers_heap.is_empty() and count < n:
            change, stock = self.losers_heap.extract()
            losers.append((stock.symbol, change))
            temp_heap.insert((change, stock))
            count += 1

        # Restore heap
        while not temp_heap.is_empty():
            self.losers_heap.insert(temp_heap.extract())

        return losers

    def get_highest_prices(self, n=3):
        """Get stocks with highest prices."""
        high_prices = []
        temp_heap = Heap(is_min_heap=False)

        # Extract highest prices
        count = 0
        while not self.high_prices_heap.is_empty() and count < n:
            price, stock = self.high_prices_heap.extract()
            high_prices.append((stock.symbol, price))
            temp_heap.insert((price, stock))
            count += 1

        # Restore heap
        while not temp_heap.is_empty():
            self.high_prices_heap.insert(temp_heap.extract())

        return high_prices

# Example usage
monitor = StockMonitor()

# Initial prices
initial_prices = {
    "AAPL": 150.00,
    "GOOGL": 2800.00,
    "MSFT": 330.00,
    "TSLA": 800.00,
    "AMZN": 3200.00
}

for symbol, price in initial_prices.items():
    monitor.update_price(symbol, price)

print("\n--- Price Updates ---")
# Simulate price changes
monitor.update_price("AAPL", 155.50)  # +5.50
monitor.update_price("GOOGL", 2750.00)  # -50.00
monitor.update_price("MSFT", 340.00)  # +10.00
monitor.update_price("TSLA", 780.00)  # -20.00
monitor.update_price("AMZN", 3250.00)  # +50.00

print("\n--- Market Summary ---")
print(f"Top Gainers: {monitor.get_top_gainers()}")
print(f"Top Losers: {monitor.get_top_losers()}")
print(f"Highest Prices: {monitor.get_highest_prices()}")

CPU Process Scheduling

class Process:
    def __init__(self, pid: int, priority: int, burst_time: int, arrival_time: int = 0):
        self.pid = pid
        self.priority = priority
        self.burst_time = burst_time
        self.arrival_time = arrival_time
        self.remaining_time = burst_time
        self.start_time = None
        self.completion_time = None

    def __lt__(self, other):
        # Higher priority number = lower priority (for min-heap)
        if self.priority != other.priority:
            return self.priority < other.priority
        # If same priority, shorter job first
        return self.remaining_time < other.remaining_time

    def __repr__(self):
        return f"P{self.pid}(pri={self.priority}, time={self.remaining_time})"

class CPUScheduler:
    def __init__(self):
        self.ready_queue = Heap()  # Min-heap for highest priority first
        self.current_time = 0
        self.completed_processes = []

    def add_process(self, process: Process):
        """Add process to ready queue."""
        self.ready_queue.insert(process)
        print(f"Added process: {process}")

    def run_scheduler(self, time_quantum: int = 2):
        """Run priority-based preemptive scheduler."""
        print(f"\nStarting scheduler (time quantum: {time_quantum})")

        while not self.ready_queue.is_empty():
            # Get highest priority process
            current_process = self.ready_queue.extract()

            if current_process.start_time is None:
                current_process.start_time = self.current_time

            # Execute for time quantum or remaining time, whichever is smaller
            execution_time = min(time_quantum, current_process.remaining_time)

            print(f"Time {self.current_time}: Executing {current_process} for {execution_time} units")

            current_process.remaining_time -= execution_time
            self.current_time += execution_time

            if current_process.remaining_time == 0:
                # Process completed
                current_process.completion_time = self.current_time
                self.completed_processes.append(current_process)
                print(f"  Process P{current_process.pid} completed at time {self.current_time}")
            else:
                # Process not completed, put back in queue
                self.ready_queue.insert(current_process)
                print(f"  Process P{current_process.pid} preempted, {current_process.remaining_time} time remaining")

    def show_statistics(self):
        """Show scheduling statistics."""
        if not self.completed_processes:
            print("No completed processes")
            return

        print("\n--- Scheduling Statistics ---")
        total_turnaround = 0
        total_waiting = 0

        for process in self.completed_processes:
            turnaround_time = process.completion_time - process.arrival_time
            waiting_time = turnaround_time - process.burst_time

            total_turnaround += turnaround_time
            total_waiting += waiting_time

            print(f"P{process.pid}: Turnaround={turnaround_time}, Waiting={waiting_time}")

        n = len(self.completed_processes)
        avg_turnaround = total_turnaround / n
        avg_waiting = total_waiting / n

        print(f"\nAverage Turnaround Time: {avg_turnaround:.2f}")
        print(f"Average Waiting Time: {avg_waiting:.2f}")

# Example usage
scheduler = CPUScheduler()

# Add processes with different priorities and burst times
processes = [
    Process(1, priority=2, burst_time=6),   # Medium priority, long job
    Process(2, priority=1, burst_time=3),   # High priority, short job
    Process(3, priority=3, burst_time=4),   # Low priority, medium job
    Process(4, priority=1, burst_time=2),   # High priority, short job
    Process(5, priority=2, burst_time=5),   # Medium priority, medium job
]

for process in processes:
    scheduler.add_process(process)

scheduler.run_scheduler(time_quantum=3)
scheduler.show_statistics()

Use Cases

When to Use Heaps

Good for:

  • Priority queues and task scheduling
  • Finding k largest/smallest elements
  • Heap sort algorithm
  • Running median calculation
  • Load balancing systems
  • Dijkstra's shortest path algorithm
  • Huffman coding
  • Event-driven simulations

Examples:

# Priority queue for task scheduling
task_queue = Heap(key=lambda task: task.priority)

# Finding top k elements
top_scores = Heap()
for score in scores:
    if len(top_scores) < k:
        top_scores.insert(score)
    elif score > top_scores.peek():
        top_scores.extract()
        top_scores.insert(score)

# Event simulation
event_queue = Heap(key=lambda event: event.timestamp)

When NOT to Use Heaps

Avoid for:

  • Random access to elements (use arrays/lists)
  • Searching for arbitrary elements (use trees/hash tables)
  • Maintaining sorted order with frequent middle insertions
  • LIFO/FIFO operations (use stacks/queues)

Better alternatives:

# For random access, use list
random_access = [1, 2, 3, 4, 5]
print(random_access[2])  # Direct access

# For searching, use set or dict
search_set = {1, 2, 3, 4, 5}
print(3 in search_set)  # O(1) search

# For sorted operations with insertions, use bisect
import bisect
sorted_list = []
bisect.insort(sorted_list, 3)  # Maintains sorted order

Performance Characteristics

Time Complexity Analysis

  • Insert operations: O(log n) - bubble up in heap
  • Extract operations: O(log n) - bubble down after removing root
  • Peek operations: O(1) - root element access
  • Build heap: O(n) - bottom-up heapification
  • Search operations: O(n) - no ordering for arbitrary elements

Space Complexity

  • Storage: O(n) for n elements in array
  • No pointer overhead: Unlike tree structures
  • Cache-friendly: Array-based storage has good locality

Comparison with Other Data Structures

Operation Heap Binary Search Tree Sorted Array Priority Queue (List)
Insert O(log n) O(log n) avg O(n) O(n)
Extract Min/Max O(log n) O(log n) O(1) O(n)
Peek Min/Max O(1) O(log n) O(1) O(n)
Search Arbitrary O(n) O(log n) O(log n) O(n)
Build from Array O(n) O(n log n) O(n log n) O(n)
Memory Overhead Low Medium Low Low
Cache Performance Excellent Poor Excellent Good

API Reference

Heap Implementation

A heap is a specialized tree-based data structure that satisfies the heap property. In a min-heap, for any given node, the value is less than or equal to the values of its children. In a max-heap, the value is greater than or equal to the values of its children.

Key Features:

  • Complete binary tree structure
  • Heap property maintenance
  • Efficient insertion and extraction
  • In-place array representation
  • Automatic resizing

Common Use Cases:

  • Priority queues
  • Heap sort algorithm
  • Finding k largest/smallest elements
  • Dijkstra's shortest path algorithm
  • Graph algorithms

Time Complexities:

  • Insert: O(log n)
  • Extract min/max: O(log n)
  • Peek: O(1)
  • Build heap: O(n)
  • Heapify: O(log n)

Space Complexity: O(n)

Heap(is_min_heap=True, key=None)

Bases: Generic[T]

A binary heap implementation with support for both min-heap and max-heap.

Uses an array-based representation where for a node at index i: - Parent is at index (i-1)//2 - Left child is at index 2i+1 - Right child is at index 2i+2

Initialize a heap.

Parameters:

Name Type Description Default
is_min_heap bool

True for min-heap, False for max-heap

True
key Optional[Callable[[T], any]]

Optional function to extract comparison key from elements

None

__contains__(value)

Check if value exists in heap.

__iter__()

Return iterator over heap elements (array order, not sorted).

__len__()

Return the number of elements in the heap.

__repr__()

Return detailed representation for debugging.

__str__()

Return string representation of the heap.

build_heap(array)

Build heap from an array in O(n) time.

Time Complexity: O(n)

Parameters:

Name Type Description Default
array List[T]

Array to build heap from

required

clear()

Remove all elements from the heap.

Time Complexity: O(1)

extract()

Remove and return the root element (min/max).

Time Complexity: O(log n)

Returns:

Type Description
T

Root element

Raises:

Type Description
IndexError

If heap is empty

insert(value)

Insert a new value into the heap.

Time Complexity: O(log n)

Parameters:

Name Type Description Default
value T

Value to insert

required

is_empty()

Check if the heap is empty.

Time Complexity: O(1)

Returns:

Type Description
bool

True if empty, False otherwise

peek()

Return the root element without removing it.

Time Complexity: O(1)

Returns:

Type Description
T

Root element

Raises:

Type Description
IndexError

If heap is empty

size()

Return the number of elements in the heap.

Time Complexity: O(1)

Returns:

Type Description
int

Number of elements

to_list()

Return heap as a list.

Time Complexity: O(1)

Returns:

Type Description
List[T]

Copy of internal heap array