Skip to content

Priority Queue

A priority queue is an abstract data type where each element has a priority, and elements with higher priority are served before elements with lower priority. This implementation uses a binary heap for efficient operations and supports both min and max priority queues.

Features

  • Efficient operations - O(log n) insertion and extraction using binary heap
  • Flexible priority modes - Support for both min and max priority queues
  • Custom key functions - Use any function to extract priority from elements
  • Priority updates - Efficiently update element priorities
  • Item removal - Remove specific items while maintaining heap property
  • Type safety - Generic implementation with full type checking
  • Two interfaces - Regular and simplified priority queue variants
  • Convenience aliases - Easy creation functions for common use cases

Time Complexities

Operation Time Complexity Description
Put/Insert O(log n) Add element with priority
Get/Extract O(log n) Remove highest priority element
Peek O(1) View highest priority element
Update Priority O(log n) Change element's priority
Remove O(log n) Remove specific element
Size/Length O(1) Get number of elements
Clear O(1) Remove all elements
Contains O(1) Check if element exists

Space Complexity: O(n) where n is the number of elements

Priority Queue Types

Regular Priority Queue

The main PriorityQueue class stores items with explicit or computed priorities:

from algo.data_structs.trees.priority_queue import PriorityQueue

# Min priority queue (lower numbers = higher priority)
min_pq = PriorityQueue(is_min_priority=True)

# Max priority queue (higher numbers = higher priority)  
max_pq = PriorityQueue(is_min_priority=False)

# With custom key function
pq = PriorityQueue(key=lambda x: x.priority)

Simple Priority Queue

The SimplePriorityQueue class accepts priority-item pairs directly:

from algo.data_structs.trees.priority_queue import SimplePriorityQueue

# Min priority queue
spq = SimplePriorityQueue(is_min_priority=True)

# Max priority queue
spq = SimplePriorityQueue(is_min_priority=False)

Convenience Aliases

from algo.data_structs.trees.priority_queue import (
    MinPriorityQueue, MaxPriorityQueue,
    SimpleMinPriorityQueue, SimpleMaxPriorityQueue
)

# Quick creation functions
min_pq = MinPriorityQueue()
max_pq = MaxPriorityQueue()
simple_min = SimpleMinPriorityQueue()
simple_max = SimpleMaxPriorityQueue()

Basic Usage

Creating and Initializing

from algo.data_structs.trees.priority_queue import PriorityQueue

# Create empty priority queue
pq = PriorityQueue()  # Default: min priority queue

# Check if empty
print(pq.is_empty())     # True
print(len(pq))           # 0
print(pq.size())         # 0

# Create with specific mode
min_pq = PriorityQueue(is_min_priority=True)
max_pq = PriorityQueue(is_min_priority=False)

print(min_pq.is_min_priority)   # True
print(max_pq.is_min_priority)   # False

Adding Elements (Put)

# Add items with explicit priorities
pq = PriorityQueue()
pq.put("urgent", 1)      # Lower number = higher priority
pq.put("normal", 5)
pq.put("low", 10)

print(len(pq))           # 3
print(pq.peek())         # "urgent" (highest priority)

# Add numeric items (use value as priority)
numbers_pq = PriorityQueue()
numbers_pq.put(10)       # Priority = 10
numbers_pq.put(5)        # Priority = 5  
numbers_pq.put(15)       # Priority = 15

print(numbers_pq.peek()) # 5 (lowest number in min queue)

Viewing Elements

pq = PriorityQueue()
pq.put("item1", 5)
pq.put("item2", 1) 
pq.put("item3", 3)

# Peek at highest priority element
top_item = pq.peek()
print(top_item)          # "item2" (priority 1)
print(len(pq))           # Still 3 (peek doesn't remove)

# Check if item exists
print("item2" in pq)     # True
print("item4" in pq)     # False

# Get all items with priorities
items_with_priorities = pq.items_with_priorities()
print(items_with_priorities)  # [("item2", 1), ("item3", 3), ("item1", 5)]

# Get just the items
items = pq.to_list()
print(items)             # ["item1", "item2", "item3"] (no particular order)

Removing Elements (Get)

pq = PriorityQueue()
pq.put("urgent", 1)
pq.put("normal", 5)
pq.put("low", 10)

# Get elements in priority order
first = pq.get()         # "urgent" (priority 1)
second = pq.get()        # "normal" (priority 5)
third = pq.get()         # "low" (priority 10)

print(first, second, third)  # urgent normal low
print(pq.is_empty())         # True

Max Priority Queue

# Higher numbers = higher priority
max_pq = PriorityQueue(is_min_priority=False)

max_pq.put("low", 1)
max_pq.put("high", 10)
max_pq.put("medium", 5)

# Get in descending priority order
print(max_pq.get())      # "high" (priority 10)
print(max_pq.get())      # "medium" (priority 5)  
print(max_pq.get())      # "low" (priority 1)

Advanced Operations

Priority Updates

pq = PriorityQueue()
pq.put("task1", 5)
pq.put("task2", 10)
pq.put("task3", 1)

print(pq.peek())         # "task3" (priority 1)

# Update priority of task3 to lower priority
pq.update_priority("task3", 15)

print(pq.peek())         # "task1" (now highest priority)

# Get items in new order
results = []
while not pq.is_empty():
    results.append(pq.get())
print(results)           # ["task1", "task2", "task3"]

Item Removal

pq = PriorityQueue()
pq.put("keep1", 1)
pq.put("remove", 5)
pq.put("keep2", 10)

print(len(pq))           # 3
print("remove" in pq)    # True

# Remove specific item
pq.remove("remove")

print(len(pq))           # 2
print("remove" in pq)    # False

# Remaining items in priority order
print(pq.get())          # "keep1"
print(pq.get())          # "keep2"

Clearing the Queue

pq = PriorityQueue()
pq.put("item1", 1)
pq.put("item2", 2)
pq.put("item3", 3)

print(len(pq))           # 3

# Clear all elements
pq.clear()
print(len(pq))           # 0
print(pq.is_empty())     # True

Custom Key Functions

Numeric Items with Key

# Use negative key for max behavior in min queue
pq = PriorityQueue(key=lambda x: -x)

pq.put(10)
pq.put(5)
pq.put(15)
pq.put(1)

# Get in descending order (max behavior)
results = []
while not pq.is_empty():
    results.append(pq.get())
print(results)           # [15, 10, 5, 1]

Custom Objects with Key

class Task:
    def __init__(self, name: str, priority: int):
        self.name = name
        self.priority = priority

    def __repr__(self):
        return f"Task({self.name}, {self.priority})"

# Use priority attribute as key
pq = PriorityQueue(key=lambda task: task.priority)

task1 = Task("urgent", 1)
task2 = Task("normal", 5)
task3 = Task("low", 10)
task4 = Task("critical", 0)

pq.put(task1)
pq.put(task2)
pq.put(task3)
pq.put(task4)

# Get tasks in priority order
results = []
while not pq.is_empty():
    results.append(pq.get())

for task in results:
    print(task)  # Tasks in order: critical, urgent, normal, low

Complex Key Functions

class Job:
    def __init__(self, name: str, deadline: int, importance: int):
        self.name = name
        self.deadline = deadline
        self.importance = importance

    def __repr__(self):
        return f"Job({self.name}, deadline={self.deadline}, importance={self.importance})"

# Prioritize by importance first, then by deadline
def job_priority(job):
    # Lower importance number = higher priority
    # Lower deadline = higher priority
    return (job.importance, job.deadline)

pq = PriorityQueue(key=job_priority)

jobs = [
    Job("Report", 5, 2),      # Medium importance, medium deadline
    Job("Presentation", 1, 1), # High importance, urgent deadline
    Job("Email", 10, 3),      # Low importance, long deadline
    Job("Meeting", 2, 1),     # High importance, soon deadline
]

for job in jobs:
    pq.put(job)

print("Jobs in priority order:")
while not pq.is_empty():
    job = pq.get()
    print(f"  {job}")

Simple Priority Queue

Basic Usage

from algo.data_structs.trees.priority_queue import SimplePriorityQueue

# Create simple priority queue
spq = SimplePriorityQueue(is_min_priority=True)

# Add items (priority first, then item)
spq.put(10, "low priority")
spq.put(1, "high priority")
spq.put(5, "medium priority")

print(len(spq))          # 3

# Peek at highest priority
print(spq.peek())        # "high priority"

# Peek with priority value
priority, item = spq.peek_with_priority()
print(f"Priority: {priority}, Item: {item}")  # Priority: 1, Item: high priority

# Get items in priority order
while not spq.is_empty():
    item = spq.get()
    print(item)  # high priority, medium priority, low priority

Simple Max Priority Queue

spq = SimplePriorityQueue(is_min_priority=False)

spq.put(1, "low priority")
spq.put(10, "high priority")
spq.put(5, "medium priority")

# Get in descending priority order
print(spq.get())         # "high priority" (priority 10)
print(spq.get())         # "medium priority" (priority 5)
print(spq.get())         # "low priority" (priority 1)

Iteration and Traversal

Consuming Iteration

pq = PriorityQueue()
pq.put("item1", 10)
pq.put("item2", 1)
pq.put("item3", 5)

# Iteration consumes the queue (returns items in priority order)
items = list(pq)
print(items)             # ["item2", "item3", "item1"]
print(pq.is_empty())     # True (queue consumed)

# Alternative: use while loop
pq.put("a", 3)
pq.put("b", 1)
pq.put("c", 2)

results = []
while not pq.is_empty():
    results.append(pq.get())
print(results)           # ["b", "c", "a"]

Non-Destructive Inspection

pq = PriorityQueue()
pq.put("item1", 10)
pq.put("item2", 1)
pq.put("item3", 5)

# Use to_list() for non-destructive inspection
items = pq.to_list()
print(f"Items: {items}")         # Items in queue (no order guarantee)
print(f"Size: {len(pq)}")        # Still 3

# Use items_with_priorities() to see priorities
items_with_priorities = pq.items_with_priorities()
print(f"With priorities: {items_with_priorities}")

# Queue still has all items
print(f"Queue still has: {len(pq)} items")
print(f"Next item: {pq.peek()}")

Working with Different Types

String Items

pq = PriorityQueue()
pq.put("apple", 3)
pq.put("banana", 1)
pq.put("cherry", 2)

# Get in priority order
fruits = []
while not pq.is_empty():
    fruits.append(pq.get())
print(fruits)            # ["banana", "cherry", "apple"]

Mixed Numeric Types

pq = PriorityQueue()
pq.put("item1", 5)       # int priority
pq.put("item2", 3.5)     # float priority
pq.put("item3", 7.2)     # float priority
pq.put("item4", 1)       # int priority

# Get in priority order
while not pq.is_empty():
    item = pq.get()
    print(item)  # item4, item2, item1, item3

Complex Objects

class Person:
    def __init__(self, name: str, age: int):
        self.name = name
        self.age = age

    def __repr__(self):
        return f"Person({self.name}, {self.age})"

# Use age as priority (younger = higher priority)
pq = PriorityQueue(key=lambda person: person.age)

people = [
    Person("Alice", 30),
    Person("Bob", 25),
    Person("Charlie", 35),
    Person("Diana", 20),
]

for person in people:
    pq.put(person)

print("People by age (youngest first):")
while not pq.is_empty():
    person = pq.get()
    print(f"  {person}")

Error Handling

pq = PriorityQueue()

# Safe operations on empty queue
print(pq.is_empty())     # True
print(len(pq))           # 0
print("item" in pq)      # False

# Operations that raise errors on empty queue
try:
    pq.get()
except IndexError as e:
    print(f"Error: {e}")  # Error: Priority queue is empty

try:
    pq.peek()
except IndexError as e:
    print(f"Error: {e}")  # Error: Priority queue is empty

# Invalid operations
pq.put("item", 5)

try:
    pq.update_priority("nonexistent", 10)
except KeyError as e:
    print(f"Error: {e}")  # Error: Item nonexistent not found in priority queue

try:
    pq.remove("nonexistent")
except KeyError as e:
    print(f"Error: {e}")  # Error: Item nonexistent not found in priority queue

# Invalid priority specification
try:
    pq_no_key = PriorityQueue()  # No key function
    pq_no_key.put("string")      # No priority given for non-numeric
except ValueError as e:
    print(f"Error: {e}")  # Error: Priority must be provided

String Representation

# Empty queue
pq = PriorityQueue()
print(str(pq))           # MinPriorityQueue([])
print(repr(pq))          # PriorityQueue(type=min, size=0)

# Non-empty min queue
pq.put("item1", 5)
pq.put("item2", 1)
print(str(pq))           # MinPriorityQueue([('item2', 1), ('item1', 5)])

# Max queue
max_pq = PriorityQueue(is_min_priority=False)
max_pq.put("item1", 5)
max_pq.put("item2", 10)
print(str(max_pq))       # MaxPriorityQueue([('item2', 10), ('item1', 5)])
print(repr(max_pq))      # PriorityQueue(type=max, size=2)

# Simple queue
spq = SimplePriorityQueue()
spq.put(5, "item1")
spq.put(1, "item2")
print(str(spq))          # SimpleMinPriorityQueue([(1, 'item2'), (5, 'item1')])
print(repr(spq))         # SimplePriorityQueue(type=min, size=2)

Common Patterns

Building from Data

# From list of tuples (item, priority)
data = [("task1", 5), ("task2", 1), ("task3", 3)]
pq = PriorityQueue()

for item, priority in data:
    pq.put(item, priority)

# From dictionary
task_priorities = {
    "urgent": 1,
    "normal": 5,
    "low": 10
}

pq = PriorityQueue()
for task, priority in task_priorities.items():
    pq.put(task, priority)

Priority-Based Processing

def process_tasks_by_priority(tasks):
    """Process tasks in priority order."""
    pq = PriorityQueue()

    # Add all tasks
    for task, priority in tasks:
        pq.put(task, priority)

    # Process in priority order
    processed = []
    while not pq.is_empty():
        task = pq.get()
        processed.append(f"Processed: {task}")
        print(f"Processing: {task}")

    return processed

tasks = [
    ("Clean desk", 5),
    ("Answer emails", 2),
    ("Finish report", 1),
    ("Schedule meeting", 3),
]

result = process_tasks_by_priority(tasks)

Dynamic Priority Updates

class DynamicTaskQueue:
    def __init__(self):
        self.pq = PriorityQueue()
        self.task_count = 0

    def add_task(self, name: str, priority: int):
        task_id = f"{name}_{self.task_count}"
        self.pq.put(task_id, priority)
        self.task_count += 1
        print(f"Added: {task_id} (priority {priority})")

    def update_task_priority(self, task_name: str, new_priority: int):
        # Find task by name prefix
        for item in self.pq.to_list():
            if item.startswith(task_name):
                try:
                    self.pq.update_priority(item, new_priority)
                    print(f"Updated: {item} to priority {new_priority}")
                    return
                except KeyError:
                    continue
        print(f"Task {task_name} not found")

    def process_next(self):
        if not self.pq.is_empty():
            task = self.pq.get()
            print(f"Processing: {task}")
            return task
        return None

    def show_queue(self):
        items = self.pq.items_with_priorities()
        print("Current queue:")
        for item, priority in sorted(items, key=lambda x: x[1]):
            print(f"  {item}: priority {priority}")

# Usage
queue = DynamicTaskQueue()
queue.add_task("email", 5)
queue.add_task("report", 1)
queue.add_task("meeting", 3)
queue.show_queue()

queue.update_task_priority("email", 0)  # Make urgent
queue.show_queue()

queue.process_next()  # Should process email first now

Practical Applications

Task Scheduling

class TaskScheduler:
    def __init__(self):
        self.task_queue = PriorityQueue()
        self.completed = []

    def schedule_task(self, task_name: str, priority: int, duration: int = 1):
        task = {
            'name': task_name,
            'priority': priority,
            'duration': duration,
            'scheduled_time': len(self.completed)
        }
        self.task_queue.put(task, priority)
        print(f"Scheduled: {task_name} (priority {priority})")

    def reschedule_task(self, task_name: str, new_priority: int):
        # Find and update task priority
        found = False
        for task in self.task_queue.to_list():
            if task['name'] == task_name:
                self.task_queue.update_priority(task, new_priority)
                print(f"Rescheduled: {task_name} to priority {new_priority}")
                found = True
                break

        if not found:
            print(f"Task {task_name} not found in queue")

    def execute_tasks(self, time_available: int):
        time_used = 0
        executed = []

        while not self.task_queue.is_empty() and time_used < time_available:
            task = self.task_queue.get()

            if time_used + task['duration'] <= time_available:
                time_used += task['duration']
                executed.append(task['name'])
                self.completed.append(task)
                print(f"Executed: {task['name']} (took {task['duration']} units)")
            else:
                # Put task back if we can't complete it
                self.task_queue.put(task, task['priority'])
                break

        print(f"Total time used: {time_used}/{time_available}")
        return executed

    def show_pending(self):
        if self.task_queue.is_empty():
            print("No pending tasks")
        else:
            print("Pending tasks (by priority):")
            # Create temporary copy to show order
            temp_pq = PriorityQueue()
            items = self.task_queue.items_with_priorities()

            for task, priority in items:
                temp_pq.put(task, priority)

            while not temp_pq.is_empty():
                task = temp_pq.get()
                print(f"  {task['name']} (priority {task['priority']}, duration {task['duration']})")

# Usage
scheduler = TaskScheduler()

# Schedule various tasks
scheduler.schedule_task("Write report", 2, 3)
scheduler.schedule_task("Answer emails", 5, 1)
scheduler.schedule_task("Prepare presentation", 1, 4)
scheduler.schedule_task("Team meeting", 3, 2)
scheduler.schedule_task("Code review", 4, 1)

scheduler.show_pending()

# Execute some tasks
print("\nExecuting tasks (5 time units available):")
executed = scheduler.execute_tasks(5)

print(f"\nExecuted tasks: {executed}")
scheduler.show_pending()

# Reschedule a task to higher priority
scheduler.reschedule_task("Answer emails", 1)
scheduler.show_pending()

Dijkstra's Algorithm Simulation

import math

class DijkstraSimulation:
    def __init__(self, graph):
        self.graph = graph  # adjacency dict: {node: [(neighbor, weight), ...]}
        self.distances = {}
        self.previous = {}
        self.visited = set()

    def find_shortest_path(self, start, end):
        # Initialize distances
        for node in self.graph:
            self.distances[node] = math.inf
            self.previous[node] = None

        self.distances[start] = 0

        # Priority queue: (distance, node)
        pq = SimplePriorityQueue(is_min_priority=True)
        pq.put(0, start)

        while not pq.is_empty():
            current_distance, current = pq.peek_with_priority()
            pq.get()

            if current in self.visited:
                continue

            self.visited.add(current)

            if current == end:
                break

            # Check neighbors
            for neighbor, weight in self.graph.get(current, []):
                if neighbor in self.visited:
                    continue

                new_distance = current_distance + weight

                if new_distance < self.distances[neighbor]:
                    self.distances[neighbor] = new_distance
                    self.previous[neighbor] = current
                    pq.put(new_distance, neighbor)

        return self._reconstruct_path(start, end)

    def _reconstruct_path(self, start, end):
        path = []
        current = end

        while current is not None:
            path.append(current)
            current = self.previous[current]

        path.reverse()

        if path[0] != start:
            return None, math.inf  # No path found

        return path, self.distances[end]

# Example graph
graph = {
    'A': [('B', 4), ('C', 2)],
    'B': [('C', 1), ('D', 5)],
    'C': [('D', 8), ('E', 10)],
    'D': [('E', 2)],
    'E': []
}

dijkstra = DijkstraSimulation(graph)
path, distance = dijkstra.find_shortest_path('A', 'E')

print(f"Shortest path from A to E: {' -> '.join(path)}")
print(f"Total distance: {distance}")

Event Simulation

import random
from typing import NamedTuple

class Event(NamedTuple):
    time: float
    event_type: str
    description: str
    data: dict = {}

class EventSimulator:
    def __init__(self):
        self.event_queue = PriorityQueue(key=lambda event: event.time)
        self.current_time = 0.0
        self.event_log = []

    def schedule_event(self, event: Event):
        self.event_queue.put(event)
        print(f"Scheduled: {event.description} at time {event.time}")

    def schedule_random_events(self, count: int, max_time: float):
        event_types = ["customer_arrival", "order_completion", "system_maintenance", "delivery"]

        for i in range(count):
            event_time = random.uniform(0, max_time)
            event_type = random.choice(event_types)
            event = Event(
                time=event_time,
                event_type=event_type,
                description=f"{event_type}_{i}",
                data={'id': i}
            )
            self.schedule_event(event)

    def run_simulation(self, max_time: float = None):
        print(f"\nRunning simulation...")

        while not self.event_queue.is_empty():
            event = self.event_queue.get()

            if max_time and event.time > max_time:
                # Put event back and stop
                self.event_queue.put(event)
                break

            self.current_time = event.time
            self.process_event(event)
            self.event_log.append(event)

        print(f"Simulation completed at time {self.current_time}")

    def process_event(self, event: Event):
        print(f"Time {event.time:.2f}: Processing {event.description}")

        # Simulate event processing and potentially schedule new events
        if event.event_type == "customer_arrival":
            # Schedule order completion
            completion_time = event.time + random.uniform(1, 5)
            completion_event = Event(
                time=completion_time,
                event_type="order_completion",
                description=f"complete_order_{event.data['id']}",
                data=event.data
            )
            self.event_queue.put(completion_event)

        elif event.event_type == "order_completion":
            # Schedule delivery
            delivery_time = event.time + random.uniform(0.5, 2)
            delivery_event = Event(
                time=delivery_time,
                event_type="delivery",
                description=f"deliver_order_{event.data['id']}",
                data=event.data
            )
            self.event_queue.put(delivery_event)

    def show_statistics(self):
        event_types = {}
        for event in self.event_log:
            event_types[event.event_type] = event_types.get(event.event_type, 0) + 1

        print(f"\nSimulation Statistics:")
        print(f"Total events processed: {len(self.event_log)}")
        print(f"Events by type:")
        for event_type, count in event_types.items():
            print(f"  {event_type}: {count}")

        if self.event_log:
            avg_time = sum(e.time for e in self.event_log) / len(self.event_log)
            print(f"Average event time: {avg_time:.2f}")

# Usage
simulator = EventSimulator()

# Schedule some initial events
simulator.schedule_random_events(10, 20.0)

# Run simulation
simulator.run_simulation(max_time=15.0)

# Show results
simulator.show_statistics()

Huffman Coding Tree Construction

from collections import Counter
from typing import Optional

class HuffmanNode:
    def __init__(self, char: Optional[str], freq: int, left=None, right=None):
        self.char = char
        self.freq = freq
        self.left = left
        self.right = right

    def is_leaf(self):
        return self.char is not None

    def __repr__(self):
        if self.is_leaf():
            return f"'{self.char}'({self.freq})"
        return f"Internal({self.freq})"

def build_huffman_tree(text: str):
    if not text:
        return None

    # Count character frequencies
    freq_counter = Counter(text)

    # Create priority queue with character nodes
    # Use frequency as priority (min heap for lowest frequency first)
    pq = PriorityQueue(key=lambda node: node.freq)

    for char, freq in freq_counter.items():
        node = HuffmanNode(char, freq)
        pq.put(node)

    print(f"Initial characters: {len(pq)} unique")

    # Build tree by merging nodes
    while len(pq) > 1:
        # Get two nodes with lowest frequency
        left = pq.get()
        right = pq.get()

        # Create internal node with combined frequency
        merged_freq = left.freq + right.freq
        internal = HuffmanNode(None, merged_freq, left, right)

        pq.put(internal)
        print(f"Merged {left} + {right} -> {internal}")

    # Root of Huffman tree
    return pq.get() if not pq.is_empty() else None

def generate_codes(root, code="", codes=None):
    if codes is None:
        codes = {}

    if root is None:
        return codes

    if root.is_leaf():
        codes[root.char] = code if code else "0"  # Single char gets "0"
        return codes

    generate_codes(root.left, code + "0", codes)
    generate_codes(root.right, code + "1", codes)

    return codes

def huffman_encode(text: str):
    if not text:
        return "", {}

    # Build Huffman tree
    root = build_huffman_tree(text)

    # Generate codes
    codes = generate_codes(root)

    # Encode text
    encoded = "".join(codes[char] for char in text)

    return encoded, codes

# Example usage
text = "hello world! this is a test message for huffman coding."
print(f"Original text: '{text}'")
print(f"Original length: {len(text)} characters ({len(text) * 8} bits)")

encoded, codes = huffman_encode(text)

print(f"\nHuffman codes:")
for char, code in sorted(codes.items()):
    char_display = repr(char) if char.isprintable() else f"'\\x{ord(char):02x}'"
    print(f"  {char_display}: {code}")

print(f"\nEncoded: {encoded}")
print(f"Encoded length: {len(encoded)} bits")
print(f"Compression ratio: {len(encoded) / (len(text) * 8):.2%}")
print(f"Space saved: {100 * (1 - len(encoded) / (len(text) * 8)):.1f}%")

Load Balancing

class Server:
    def __init__(self, name: str, capacity: int):
        self.name = name
        self.capacity = capacity
        self.current_load = 0
        self.requests = []

    def available_capacity(self):
        return self.capacity - self.current_load

    def add_request(self, request_size: int, request_id: str):
        if self.current_load + request_size <= self.capacity:
            self.current_load += request_size
            self.requests.append((request_id, request_size))
            return True
        return False

    def remove_request(self, request_size: int, request_id: str):
        self.current_load -= request_size
        self.requests = [(rid, size) for rid, size in self.requests if rid != request_id]

    def __repr__(self):
        return f"Server({self.name}, load={self.current_load}/{self.capacity})"

class LoadBalancer:
    def __init__(self):
        # Use negative available capacity for max-heap behavior
        self.server_queue = PriorityQueue(key=lambda server: -server.available_capacity())
        self.servers = {}

    def add_server(self, server: Server):
        self.servers[server.name] = server
        self.server_queue.put(server)
        print(f"Added server: {server}")

    def assign_request(self, request_id: str, request_size: int):
        if self.server_queue.is_empty():
            print(f"No servers available for request {request_id}")
            return None

        # Get server with most available capacity
        best_server = self.server_queue.get()

        if best_server.add_request(request_size, request_id):
            # Update server in queue with new capacity
            self.server_queue.put(best_server)
            print(f"Assigned request {request_id} (size {request_size}) to {best_server.name}")
            return best_server.name
        else:
            # Server can't handle request, put it back
            self.server_queue.put(best_server)
            print(f"Request {request_id} (size {request_size}) too large for any server")
            return None

    def complete_request(self, request_id: str, server_name: str, request_size: int):
        if server_name in self.servers:
            server = self.servers[server_name]
            server.remove_request(request_size, request_id)

            # Update server priority in queue
            # Remove and re-add to update priority
            temp_servers = []
            while not self.server_queue.is_empty():
                s = self.server_queue.get()
                if s.name != server_name:
                    temp_servers.append(s)

            # Re-add all servers including updated one
            for s in temp_servers:
                self.server_queue.put(s)
            self.server_queue.put(server)

            print(f"Completed request {request_id} on {server_name}")

    def show_status(self):
        print("\nLoad Balancer Status:")
        print("Servers (by available capacity):")

        # Show servers in priority order
        temp_servers = []
        while not self.server_queue.is_empty():
            server = self.server_queue.get()
            temp_servers.append(server)
            available = server.available_capacity()
            utilization = (server.current_load / server.capacity) * 100
            print(f"  {server.name}: {available} available, {utilization:.1f}% utilized")

        # Restore queue
        for server in temp_servers:
            self.server_queue.put(server)

# Usage
lb = LoadBalancer()

# Add servers with different capacities
servers = [
    Server("web-01", 100),
    Server("web-02", 150),
    Server("web-03", 120),
    Server("api-01", 200),
]

for server in servers:
    lb.add_server(server)

lb.show_status()

# Simulate request assignments
requests = [
    ("req-001", 30),
    ("req-002", 50),
    ("req-003", 20),
    ("req-004", 80),
    ("req-005", 25),
]

print("\nAssigning requests:")
assignments = {}
for req_id, size in requests:
    server_name = lb.assign_request(req_id, size)
    if server_name:
        assignments[req_id] = (server_name, size)

lb.show_status()

# Complete some requests
print("\nCompleting requests:")
for req_id in ["req-001", "req-003"]:
    if req_id in assignments:
        server_name, size = assignments[req_id]
        lb.complete_request(req_id, server_name, size)

lb.show_status()

Use Cases

When to Use Priority Queues

Good for:

  • Task scheduling based on priority
  • Graph algorithms (Dijkstra, Prim's MST)
  • Event simulation and discrete event systems
  • Huffman coding and compression algorithms
  • A* pathfinding and best-first search
  • Operating system process scheduling
  • Load balancing and resource allocation
  • Real-time systems with deadlines

Examples:

# Process scheduling
process_queue = PriorityQueue(key=lambda p: p.priority)

# Event simulation
event_queue = PriorityQueue(key=lambda e: e.timestamp)

# Pathfinding
frontier = PriorityQueue(key=lambda node: node.f_score)

# Resource allocation
resource_requests = PriorityQueue(key=lambda r: r.urgency)

When NOT to Use Priority Queues

Avoid for:

  • Simple FIFO processing (use regular queues)
  • Random access to elements (use arrays/lists)
  • Frequent priority changes (consider other data structures)
  • When order doesn't matter (use sets or lists)

Better alternatives:

# For FIFO, use regular queue
from collections import deque
fifo_queue = deque()

# For frequent updates, use sorted list
import bisect
sorted_list = []
bisect.insort(sorted_list, item)

# For random access, use list
random_access = [item1, item2, item3]
print(random_access[1])  # Direct access

Performance Characteristics

Time Complexity Analysis

  • Insertion (put): O(log n) - heap insertion
  • Extraction (get): O(log n) - heap extraction with reheapify
  • Peek: O(1) - view root element
  • Priority update: O(log n) - mark old + insert new
  • Remove: O(log n) - mark for removal
  • Search/Contains: O(1) - hash table lookup
  • Size: O(1) - counter maintained

Memory Efficiency

# Priority queue memory usage:
# - Heap array: O(n) for elements
# - Entry finder dictionary: O(n) for fast lookups
# - Minimal overhead per element

pq = PriorityQueue()
print(f"Empty queue is efficient: {pq.is_empty()}")

# Memory scales linearly with elements
for i in range(1000):
    pq.put(f"item_{i}", i)
# No pre-allocation, grows as needed

Comparison with Other Structures

Operation Priority Queue Sorted List Regular Queue Stack
Insert O(log n) O(n) O(1) O(1)
Extract Max/Min O(log n) O(1) O(1) O(1)
Peek O(1) O(1) O(1) O(1)
Update Priority O(log n) O(n) N/A N/A
Random Access O(n) O(1) O(n) O(n)
Memory Overhead Medium Low Low Low
Use Case Priority-based Sorted access FIFO LIFO

API Reference

Priority Queue Implementation

A priority queue is an abstract data type where each element has a priority. Elements with higher priority are served before elements with lower priority. This implementation uses a binary heap for efficient operations.

Key Features:

  • Efficient insertion and extraction
  • Support for both min and max priority queues
  • Custom priority functions
  • Thread-safe operations (optional)
  • Iterator support

Common Use Cases:

  • Task scheduling
  • Dijkstra's algorithm
  • A* pathfinding
  • Huffman coding
  • Event simulation
  • Load balancing

Time Complexities:

  • Insert: O(log n)
  • Pop: O(log n)
  • Peek: O(1)
  • Size: O(1)

Space Complexity: O(n)

PriorityItem(item, priority)

Bases: Generic[T]

Wrapper class for items in priority queue with explicit priority values.

Initialize priority item.

Parameters:

Name Type Description Default
item T

The actual item

required
priority float

Priority value (lower number = higher priority for min queue)

required

__eq__(other)

Check equality by priority.

__lt__(other)

Compare by priority.

__repr__()

String representation.

PriorityQueue(is_min_priority=True, key=None)

Bases: Generic[T]

Priority Queue implementation using a binary heap.

Supports both min-priority queue (lower numbers = higher priority) and max-priority queue (higher numbers = higher priority).

Initialize priority queue.

Parameters:

Name Type Description Default
is_min_priority bool

True for min-priority queue, False for max-priority queue

True
key Optional[Callable[[T], float]]

Optional function to extract priority from items

None

__bool__()

Return True if queue is not empty.

__contains__(item)

Check if item exists in queue.

__iter__()

Return iterator over items in priority order.

Note: This consumes the queue. Use to_list() for non-destructive iteration.

__len__()

Return the number of items in the queue.

__repr__()

Return detailed representation for debugging.

__str__()

Return string representation.

clear()

Remove all items from the queue.

Time Complexity: O(1)

get()

Remove and return the highest priority item.

Time Complexity: O(log n)

Returns:

Type Description
T

Highest priority item

Raises:

Type Description
IndexError

If queue is empty

is_empty()

Check if the queue is empty.

Time Complexity: O(1)

Returns:

Type Description
bool

True if empty, False otherwise

items_with_priorities()

Return all items with their priorities.

Time Complexity: O(n)

Returns:

Type Description
List[Tuple[T, float]]

List of (item, priority) tuples

peek()

Return the highest priority item without removing it.

Time Complexity: O(1) amortized

Returns:

Type Description
T

Highest priority item

Raises:

Type Description
IndexError

If queue is empty

put(item, priority=None)

Add an item to the priority queue.

Time Complexity: O(log n)

Parameters:

Name Type Description Default
item T

Item to add

required
priority Optional[float]

Priority value. If None, uses key function or item itself

None

remove(item)

Remove an item from the priority queue.

Time Complexity: O(log n) amortized

Parameters:

Name Type Description Default
item T

Item to remove

required

Raises:

Type Description
KeyError

If item not found in queue

size()

Return the number of items in the queue.

Time Complexity: O(1)

Returns:

Type Description
int

Number of items

to_list()

Return all items in the queue (not in priority order).

Time Complexity: O(n)

Returns:

Type Description
List[T]

List of items

update_priority(item, new_priority)

Update the priority of an existing item.

Time Complexity: O(log n)

Parameters:

Name Type Description Default
item T

Item to update

required
new_priority float

New priority value

required

Raises:

Type Description
KeyError

If item not found in queue

SimplePriorityQueue(is_min_priority=True)

Bases: Generic[T]

Simplified priority queue that accepts (priority, item) tuples directly. More straightforward interface for simple use cases.

Initialize simple priority queue.

Parameters:

Name Type Description Default
is_min_priority bool

True for min-priority queue, False for max-priority queue

True

__bool__()

Return True if queue is not empty.

__len__()

Return the number of items in the queue.

__repr__()

Return detailed representation for debugging.

__str__()

Return string representation.

clear()

Remove all items from the queue.

get()

Remove and return the highest priority item.

Returns:

Type Description
T

Highest priority item

is_empty()

Check if the queue is empty.

peek()

Return the highest priority item without removing it.

Returns:

Type Description
T

Highest priority item

peek_with_priority()

Return the highest priority item and its priority without removing it.

Returns:

Type Description
Tuple[float, T]

Tuple of (priority, item)

put(priority, item)

Add an item with priority.

Parameters:

Name Type Description Default
priority float

Priority value

required
item T

Item to add

required

size()

Return the number of items in the queue.

create_max_priority_queue()

Create a max-priority queue.

create_min_priority_queue()

Create a min-priority queue.

create_simple_max_priority_queue()

Create a simple max-priority queue.

create_simple_min_priority_queue()

Create a simple min-priority queue.