Virtual assistance

Python Multithreading

Multithreading allows your program to run multiple operations concurrently. Learn how to create threads, synchronize them, and handle thread safety in Python using the threading module.

Python Multithreading

What is Multithreading?

Multithreading is a way of achieving multitasking. In multithreading, a process is divided into multiple threads that can run concurrently. Python's threading module provides a high-level interface for creating and managing threads.

"Threads allow your program to do multiple things at once, improving responsiveness and performance."

Creating Threads

Basic thread creation and execution:

import threading
import time

def worker(name, delay):
    """Function to be executed by thread"""
    print(f"Thread {name} starting")
    time.sleep(delay)
    print(f"Thread {name} finished")

# Create threads
thread1 = threading.Thread(target=worker, args=("A", 2))
thread2 = threading.Thread(target=worker, args=("B", 3))
thread3 = threading.Thread(target=worker, args=("C", 1))

# Start threads
print("Starting threads...")
thread1.start()
thread2.start()
thread3.start()

# Wait for threads to complete
thread1.join()
thread2.join()
thread3.join()

print("All threads completed")

Thread Class Inheritance

Create threads by inheriting from Thread class:

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay
    
    def run(self):
        """Override run method"""
        print(f"Thread {self.name} starting")
        time.sleep(self.delay)
        print(f"Thread {self.name} finished after {self.delay} seconds")

# Create and start threads
threads = []
for i in range(3):
    thread = MyThread(f"Worker-{i+1}", i + 1)
    threads.append(thread)
    thread.start()

# Wait for all threads
for thread in threads:
    thread.join()

print("All worker threads completed")

Thread Synchronization

Use locks to prevent race conditions:

import threading
import time

# Shared resource
counter = 0
lock = threading.Lock()

def increment_counter(name, iterations):
    global counter
    
    for _ in range(iterations):
        # Acquire lock before accessing shared resource
        lock.acquire()
        try:
            current_value = counter
            time.sleep(0.001)  # Simulate some work
            counter = current_value + 1
            print(f"{name}: {counter}")
        finally:
            # Always release the lock
            lock.release()

# Create threads
thread1 = threading.Thread(target=increment_counter, args=("Thread-1", 5))
thread2 = threading.Thread(target=increment_counter, args=("Thread-2", 5))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(f"Final counter value: {counter}")

RLock (Reentrant Lock)

RLock allows a thread to acquire the same lock multiple times:

import threading

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance
        self.lock = threading.RLock()
    
    def deposit(self, amount):
        with self.lock:
            self._update_balance(amount)
    
    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                self._update_balance(-amount)
                return True
            return False
    
    def transfer(self, target_account, amount):
        # RLock allows acquiring the same lock multiple times
        with self.lock:
            if self.withdraw(amount):
                target_account.deposit(amount)
                return True
            return False
    
    def _update_balance(self, amount):
        # This method also acquires the lock
        with self.lock:
            self.balance += amount
            print(f"Balance updated: ${self.balance}")

# Test reentrant locking
account1 = BankAccount(1000)
account2 = BankAccount(500)

def transfer_money():
    success = account1.transfer(account2, 200)
    print(f"Transfer successful: {success}")

thread = threading.Thread(target=transfer_money)
thread.start()
thread.join()

print(f"Account 1 balance: ${account1.balance}")
print(f"Account 2 balance: ${account2.balance}")

Semaphore

Control access to a resource with limited capacity:

import threading
import time

# Semaphore with 3 permits
semaphore = threading.Semaphore(3)

def access_resource(name):
    print(f"{name} waiting for resource")
    semaphore.acquire()
    try:
        print(f"{name} acquired resource")
        time.sleep(2)  # Simulate resource usage
        print(f"{name} releasing resource")
    finally:
        semaphore.release()

# Create multiple threads
threads = []
for i in range(6):
    thread = threading.Thread(target=access_resource, args=(f"Thread-{i+1}",))
    threads.append(thread)

# Start all threads
for thread in threads:
    thread.start()

# Wait for completion
for thread in threads:
    thread.join()

print("All threads completed")

Event

Use events for thread communication:

import threading
import time

# Create an event
event = threading.Event()

def waiter(name):
    print(f"{name} waiting for event")
    event.wait()  # Wait until event is set
    print(f"{name} received event!")

def setter():
    time.sleep(3)
    print("Setting event!")
    event.set()  # Set the event

# Create threads
waiter1 = threading.Thread(target=waiter, args=("Waiter-1",))
waiter2 = threading.Thread(target=waiter, args=("Waiter-2",))
setter_thread = threading.Thread(target=setter)

# Start threads
waiter1.start()
waiter2.start()
setter_thread.start()

# Wait for completion
waiter1.join()
waiter2.join()
setter_thread.join()

print("All threads done")

Condition

Advanced synchronization with conditions:

import threading
import time
import random

class ProducerConsumer:
    def __init__(self):
        self.buffer = []
        self.max_size = 5
        self.condition = threading.Condition()
    
    def produce(self, item):
        with self.condition:
            while len(self.buffer) >= self.max_size:
                print("Buffer full, producer waiting...")
                self.condition.wait()
            
            self.buffer.append(item)
            print(f"Produced: {item}, Buffer: {self.buffer}")
            self.condition.notify()
    
    def consume(self):
        with self.condition:
            while len(self.buffer) == 0:
                print("Buffer empty, consumer waiting...")
                self.condition.wait()
            
            item = self.buffer.pop(0)
            print(f"Consumed: {item}, Buffer: {self.buffer}")
            self.condition.notify()
            return item

pc = ProducerConsumer()

def producer():
    for i in range(10):
        pc.produce(f"Item-{i+1}")
        time.sleep(random.random())

def consumer():
    for _ in range(10):
        item = pc.consume()
        time.sleep(random.random() * 2)

# Create producer and consumer threads
prod_thread = threading.Thread(target=producer)
cons_thread = threading.Thread(target=consumer)

prod_thread.start()
cons_thread.start()

prod_thread.join()
cons_thread.join()

print("Producer-consumer example completed")

Thread Pool

Use ThreadPoolExecutor for managing multiple threads:

import concurrent.futures
import time
import urllib.request

def download_file(url, filename):
    """Download a file from URL"""
    print(f"Downloading {filename}...")
    try:
        urllib.request.urlretrieve(url, filename)
        print(f"Downloaded {filename}")
        return f"Success: {filename}"
    except Exception as e:
        print(f"Failed to download {filename}: {e}")
        return f"Failed: {filename}"

def process_data(data):
    """Simulate data processing"""
    print(f"Processing {data}")
    time.sleep(1)
    return f"Processed: {data}"

# Using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    urls = [
        ("https://httpbin.org/delay/1", "file1.txt"),
        ("https://httpbin.org/delay/1", "file2.txt"),
        ("https://httpbin.org/delay/1", "file3.txt")
    ]
    
    # Map function to data
    data_items = ["data1", "data2", "data3", "data4", "data5"]
    
    # Submit download tasks
    download_futures = [executor.submit(download_file, url, filename) for url, filename in urls]
    
    # Submit processing tasks
    process_futures = [executor.submit(process_data, item) for item in data_items]
    
    # Get results as they complete
    print("Download results:")
    for future in concurrent.futures.as_completed(download_futures):
        print(future.result())
    
    print("\nProcessing results:")
    for future in concurrent.futures.as_completed(process_futures):
        print(future.result())

print("Thread pool example completed")

Daemon Threads

Daemon threads run in background and don't prevent program exit:

import threading
import time

def background_task():
    """Daemon thread function"""
    while True:
        print("Background task running...")
        time.sleep(1)

def main_task():
    """Main thread function"""
    for i in range(5):
        print(f"Main task: {i+1}")
        time.sleep(1)

# Create daemon thread
daemon_thread = threading.Thread(target=background_task)
daemon_thread.daemon = True  # Mark as daemon
daemon_thread.start()

# Create main thread
main_thread = threading.Thread(target=main_task)
main_thread.start()

# Wait for main thread
main_thread.join()

print("Main thread finished, program exiting...")
# Daemon thread will be terminated automatically

Thread Communication

Use queues for safe inter-thread communication:

import threading
import queue
import time
import random

def producer(q, name):
    """Produce items and put them in queue"""
    for i in range(5):
        item = f"{name}-item-{i+1}"
        q.put(item)
        print(f"Produced: {item}")
        time.sleep(random.random())

def consumer(q, name):
    """Consume items from queue"""
    while True:
        try:
            item = q.get(timeout=3)  # Wait up to 3 seconds
            print(f"{name} consumed: {item}")
            q.task_done()
            time.sleep(random.random() * 2)
        except queue.Empty:
            print(f"{name} timed out waiting for items")
            break

# Create queue
work_queue = queue.Queue()

# Create producer and consumer threads
producer_thread = threading.Thread(target=producer, args=(work_queue, "Producer"))
consumer_thread = threading.Thread(target=consumer, args=(work_queue, "Consumer"))

producer_thread.start()
consumer_thread.start()

# Wait for producer to finish
producer_thread.join()

# Wait for consumer to finish processing all items
work_queue.join()

print("All work completed")

Best Practices

  • Use ThreadPoolExecutor for I/O-bound tasks: Better than manual thread management
  • Avoid shared state when possible: Use message passing instead
  • Use locks sparingly: Can cause deadlocks and reduce performance
  • Handle exceptions in threads: Use try/except in thread functions
  • Use daemon threads for background tasks: They won't prevent program exit
  • Set thread names for debugging: Easier to identify threads
  • Use queues for thread communication: Safer than shared variables
  • Avoid GIL-bound operations: Use multiprocessing for CPU-intensive tasks
  • Test thread safety: Use multiple runs to catch race conditions
  • Document thread interactions: Complex threading logic

Common Pitfalls

  • Race conditions: Multiple threads accessing shared data
  • Deadlocks: Threads waiting for each other indefinitely
  • Starvation: Some threads never get access to resources
  • GIL limitations: Python's Global Interpreter Lock
  • Thread leakage: Threads not properly cleaned up
  • Exception handling: Exceptions in threads can be silent
  • Resource sharing: Files, network connections, etc.
  • Thread local storage: When you need thread-specific data

Multithreading can greatly improve the performance and responsiveness of I/O-bound applications, but it requires careful synchronization to avoid race conditions and deadlocks. For CPU-bound tasks, consider using multiprocessing instead due to Python's GIL.