Python Multithreading
Multithreading allows your program to run multiple operations concurrently. Learn how to create threads, synchronize them, and handle thread safety in Python using the threading module.
What is Multithreading?
Multithreading is a way of achieving multitasking. In multithreading, a process is divided into multiple threads that can run concurrently. Python's threading module provides a high-level interface for creating and managing threads.
"Threads allow your program to do multiple things at once, improving responsiveness and performance."
Creating Threads
Basic thread creation and execution:
import threading
import time
def worker(name, delay):
"""Function to be executed by thread"""
print(f"Thread {name} starting")
time.sleep(delay)
print(f"Thread {name} finished")
# Create threads
thread1 = threading.Thread(target=worker, args=("A", 2))
thread2 = threading.Thread(target=worker, args=("B", 3))
thread3 = threading.Thread(target=worker, args=("C", 1))
# Start threads
print("Starting threads...")
thread1.start()
thread2.start()
thread3.start()
# Wait for threads to complete
thread1.join()
thread2.join()
thread3.join()
print("All threads completed")
Thread Class Inheritance
Create threads by inheriting from Thread class:
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
def run(self):
"""Override run method"""
print(f"Thread {self.name} starting")
time.sleep(self.delay)
print(f"Thread {self.name} finished after {self.delay} seconds")
# Create and start threads
threads = []
for i in range(3):
thread = MyThread(f"Worker-{i+1}", i + 1)
threads.append(thread)
thread.start()
# Wait for all threads
for thread in threads:
thread.join()
print("All worker threads completed")
Thread Synchronization
Use locks to prevent race conditions:
import threading
import time
# Shared resource
counter = 0
lock = threading.Lock()
def increment_counter(name, iterations):
global counter
for _ in range(iterations):
# Acquire lock before accessing shared resource
lock.acquire()
try:
current_value = counter
time.sleep(0.001) # Simulate some work
counter = current_value + 1
print(f"{name}: {counter}")
finally:
# Always release the lock
lock.release()
# Create threads
thread1 = threading.Thread(target=increment_counter, args=("Thread-1", 5))
thread2 = threading.Thread(target=increment_counter, args=("Thread-2", 5))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Final counter value: {counter}")
RLock (Reentrant Lock)
RLock allows a thread to acquire the same lock multiple times:
import threading
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.lock = threading.RLock()
def deposit(self, amount):
with self.lock:
self._update_balance(amount)
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
self._update_balance(-amount)
return True
return False
def transfer(self, target_account, amount):
# RLock allows acquiring the same lock multiple times
with self.lock:
if self.withdraw(amount):
target_account.deposit(amount)
return True
return False
def _update_balance(self, amount):
# This method also acquires the lock
with self.lock:
self.balance += amount
print(f"Balance updated: ${self.balance}")
# Test reentrant locking
account1 = BankAccount(1000)
account2 = BankAccount(500)
def transfer_money():
success = account1.transfer(account2, 200)
print(f"Transfer successful: {success}")
thread = threading.Thread(target=transfer_money)
thread.start()
thread.join()
print(f"Account 1 balance: ${account1.balance}")
print(f"Account 2 balance: ${account2.balance}")
Semaphore
Control access to a resource with limited capacity:
import threading
import time
# Semaphore with 3 permits
semaphore = threading.Semaphore(3)
def access_resource(name):
print(f"{name} waiting for resource")
semaphore.acquire()
try:
print(f"{name} acquired resource")
time.sleep(2) # Simulate resource usage
print(f"{name} releasing resource")
finally:
semaphore.release()
# Create multiple threads
threads = []
for i in range(6):
thread = threading.Thread(target=access_resource, args=(f"Thread-{i+1}",))
threads.append(thread)
# Start all threads
for thread in threads:
thread.start()
# Wait for completion
for thread in threads:
thread.join()
print("All threads completed")
Event
Use events for thread communication:
import threading
import time
# Create an event
event = threading.Event()
def waiter(name):
print(f"{name} waiting for event")
event.wait() # Wait until event is set
print(f"{name} received event!")
def setter():
time.sleep(3)
print("Setting event!")
event.set() # Set the event
# Create threads
waiter1 = threading.Thread(target=waiter, args=("Waiter-1",))
waiter2 = threading.Thread(target=waiter, args=("Waiter-2",))
setter_thread = threading.Thread(target=setter)
# Start threads
waiter1.start()
waiter2.start()
setter_thread.start()
# Wait for completion
waiter1.join()
waiter2.join()
setter_thread.join()
print("All threads done")
Condition
Advanced synchronization with conditions:
import threading
import time
import random
class ProducerConsumer:
def __init__(self):
self.buffer = []
self.max_size = 5
self.condition = threading.Condition()
def produce(self, item):
with self.condition:
while len(self.buffer) >= self.max_size:
print("Buffer full, producer waiting...")
self.condition.wait()
self.buffer.append(item)
print(f"Produced: {item}, Buffer: {self.buffer}")
self.condition.notify()
def consume(self):
with self.condition:
while len(self.buffer) == 0:
print("Buffer empty, consumer waiting...")
self.condition.wait()
item = self.buffer.pop(0)
print(f"Consumed: {item}, Buffer: {self.buffer}")
self.condition.notify()
return item
pc = ProducerConsumer()
def producer():
for i in range(10):
pc.produce(f"Item-{i+1}")
time.sleep(random.random())
def consumer():
for _ in range(10):
item = pc.consume()
time.sleep(random.random() * 2)
# Create producer and consumer threads
prod_thread = threading.Thread(target=producer)
cons_thread = threading.Thread(target=consumer)
prod_thread.start()
cons_thread.start()
prod_thread.join()
cons_thread.join()
print("Producer-consumer example completed")
Thread Pool
Use ThreadPoolExecutor for managing multiple threads:
import concurrent.futures
import time
import urllib.request
def download_file(url, filename):
"""Download a file from URL"""
print(f"Downloading {filename}...")
try:
urllib.request.urlretrieve(url, filename)
print(f"Downloaded {filename}")
return f"Success: {filename}"
except Exception as e:
print(f"Failed to download {filename}: {e}")
return f"Failed: {filename}"
def process_data(data):
"""Simulate data processing"""
print(f"Processing {data}")
time.sleep(1)
return f"Processed: {data}"
# Using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks
urls = [
("https://httpbin.org/delay/1", "file1.txt"),
("https://httpbin.org/delay/1", "file2.txt"),
("https://httpbin.org/delay/1", "file3.txt")
]
# Map function to data
data_items = ["data1", "data2", "data3", "data4", "data5"]
# Submit download tasks
download_futures = [executor.submit(download_file, url, filename) for url, filename in urls]
# Submit processing tasks
process_futures = [executor.submit(process_data, item) for item in data_items]
# Get results as they complete
print("Download results:")
for future in concurrent.futures.as_completed(download_futures):
print(future.result())
print("\nProcessing results:")
for future in concurrent.futures.as_completed(process_futures):
print(future.result())
print("Thread pool example completed")
Daemon Threads
Daemon threads run in background and don't prevent program exit:
import threading
import time
def background_task():
"""Daemon thread function"""
while True:
print("Background task running...")
time.sleep(1)
def main_task():
"""Main thread function"""
for i in range(5):
print(f"Main task: {i+1}")
time.sleep(1)
# Create daemon thread
daemon_thread = threading.Thread(target=background_task)
daemon_thread.daemon = True # Mark as daemon
daemon_thread.start()
# Create main thread
main_thread = threading.Thread(target=main_task)
main_thread.start()
# Wait for main thread
main_thread.join()
print("Main thread finished, program exiting...")
# Daemon thread will be terminated automatically
Thread Communication
Use queues for safe inter-thread communication:
import threading
import queue
import time
import random
def producer(q, name):
"""Produce items and put them in queue"""
for i in range(5):
item = f"{name}-item-{i+1}"
q.put(item)
print(f"Produced: {item}")
time.sleep(random.random())
def consumer(q, name):
"""Consume items from queue"""
while True:
try:
item = q.get(timeout=3) # Wait up to 3 seconds
print(f"{name} consumed: {item}")
q.task_done()
time.sleep(random.random() * 2)
except queue.Empty:
print(f"{name} timed out waiting for items")
break
# Create queue
work_queue = queue.Queue()
# Create producer and consumer threads
producer_thread = threading.Thread(target=producer, args=(work_queue, "Producer"))
consumer_thread = threading.Thread(target=consumer, args=(work_queue, "Consumer"))
producer_thread.start()
consumer_thread.start()
# Wait for producer to finish
producer_thread.join()
# Wait for consumer to finish processing all items
work_queue.join()
print("All work completed")
Best Practices
- Use ThreadPoolExecutor for I/O-bound tasks: Better than manual thread management
- Avoid shared state when possible: Use message passing instead
- Use locks sparingly: Can cause deadlocks and reduce performance
- Handle exceptions in threads: Use try/except in thread functions
- Use daemon threads for background tasks: They won't prevent program exit
- Set thread names for debugging: Easier to identify threads
- Use queues for thread communication: Safer than shared variables
- Avoid GIL-bound operations: Use multiprocessing for CPU-intensive tasks
- Test thread safety: Use multiple runs to catch race conditions
- Document thread interactions: Complex threading logic
Common Pitfalls
- Race conditions: Multiple threads accessing shared data
- Deadlocks: Threads waiting for each other indefinitely
- Starvation: Some threads never get access to resources
- GIL limitations: Python's Global Interpreter Lock
- Thread leakage: Threads not properly cleaned up
- Exception handling: Exceptions in threads can be silent
- Resource sharing: Files, network connections, etc.
- Thread local storage: When you need thread-specific data
Multithreading can greatly improve the performance and responsiveness of I/O-bound applications, but it requires careful synchronization to avoid race conditions and deadlocks. For CPU-bound tasks, consider using multiprocessing instead due to Python's GIL.