Usage Guide¶
Overview¶
freethreading shields you from differences between GIL-enabled Python builds and free-threaded builds. At import
time it decides whether to route calls through multiprocessing (GIL-enabled Python) or threading
(GIL-disabled Python). The public surface mirrors the common subset shared by both backends, so you can write portable
code once and run it efficiently everywhere.
Parallel Execution¶
freethreading offers low-level Worker objects for direct task control,
WorkerPool objects for pool-based parallelism, and high-level
WorkerPoolExecutor objects for managed execution.
Worker¶
The Worker class represents an activity that is run in a separate thread or process. It is a
wrapper that carries over the shared controls from Thread and Process, so
that you can name workers, set daemon, and call familiar methods like
start(), join(), and
is_alive(), without worrying about the underlying implementation. Here’s a quick example:
from freethreading import Worker, current_worker
def greet():
print(f"Hello from {current_worker().name}!")
if __name__ == "__main__":
worker = Worker(name="Worker", target=greet, daemon=False)
worker.start()
worker.join()
Output:
Hello from Worker!
WorkerPool¶
WorkerPool wraps Pool and
ThreadPool into a single interface. Here’s an example of how to use it:
from freethreading import WorkerPool
def square(x):
return x * x
if __name__ == "__main__":
with WorkerPool(workers=4) as pool:
print(pool.map(square, range(10)))
Output:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
WorkerPoolExecutor¶
WorkerPoolExecutor provides a unified drop-in replacement for
ThreadPoolExecutor and ProcessPoolExecutor. For instance:
from freethreading import WorkerPoolExecutor
def square(x):
return x * x
if __name__ == "__main__":
with WorkerPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, range(10)))
print(results)
Output:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Data Exchange¶
Workers can exchange data through Queue for structured coordination or
SimpleQueue for lightweight messaging.
Queue¶
freethreading.Queue wraps queue.Queue and multiprocessing.JoinableQueue into a single
interface that behaves identically on both threading and multiprocessing backends. As an example:
from freethreading import Queue, Worker
def producer(queue):
for value in range(3):
print(f"Producing {value}")
queue.put(value)
queue.put(None) # Sentinel marks completion
def consumer(queue):
while True:
item = queue.get()
if item is None:
queue.task_done()
break
print(f"Consuming {item}")
queue.task_done()
if __name__ == "__main__":
queue = Queue()
producer_worker = Worker(name="Producer", target=producer, args=(queue,))
consumer_worker = Worker(name="Consumer", target=consumer, args=(queue,))
producer_worker.start()
consumer_worker.start()
queue.join()
producer_worker.join()
consumer_worker.join()
Output:
Producing 0
Producing 1
Producing 2
Consuming 0
Consuming 1
Consuming 2
SimpleQueue¶
Similarly, freethreading.SimpleQueue wraps the unbounded, lightweight queue.SimpleQueue and
multiprocessing.SimpleQueue into a single interface that behaves identically on both backends. For example:
from freethreading import SimpleQueue, Worker
def fill_queue(queue):
queue.put("hello")
queue.put("world")
if __name__ == "__main__":
queue = SimpleQueue()
worker = Worker(target=fill_queue, args=(queue,))
worker.start()
print(queue.get())
print(queue.get())
worker.join()
print(queue.empty())
Output:
hello
world
True
Synchronization Primitives¶
freethreading offers the synchronization primitives common to threading and multiprocessing,
enabling worker coordination and control of shared resources. Below are examples of how to use them.
Locks and Reentrant Locks¶
Lock and RLock wrap the lock types found in threading and
multiprocessing. A lock ensures only one worker enters a critical section at a time, and a reentrant lock allows
the same worker to acquire it multiple times without deadlocking. Here’s how they work:
from freethreading import Lock, RLock, Worker, current_worker
def critical(lock):
with lock:
print(f"'{current_worker().name}' acquired the lock")
def countdown(rlock, n):
with rlock:
if n > 0:
print(f"'{current_worker().name}': {n}...")
countdown(rlock, n - 1)
else:
print(f"'{current_worker().name}': go!")
if __name__ == "__main__":
lock = Lock()
workers = [
Worker(name=f"Worker-{i}", target=critical, args=(lock,)) for i in range(2)
]
for w in workers:
w.start()
for w in workers:
w.join()
print()
rlock = RLock()
workers = [
Worker(name=f"Worker-{i}", target=countdown, args=(rlock, 3)) for i in range(2)
]
for w in workers:
w.start()
for w in workers:
w.join()
Output:
'Worker-0' acquired the lock
'Worker-1' acquired the lock
'Worker-0': 3...
'Worker-0': 2...
'Worker-0': 1...
'Worker-0': go!
'Worker-1': 3...
'Worker-1': 2...
'Worker-1': 1...
'Worker-1': go!
Semaphores and Conditions¶
Semaphore, BoundedSemaphore, and Condition
provide wrappers over their threading and multiprocessing equivalents. Semaphores control how many
workers can access a resource at once, and conditions allow workers to wait until notified. The example below puts all
of them to work:
from freethreading import Condition, Queue, Semaphore, Worker, current_worker
def restricted(semaphore):
with semaphore:
print(f"'{current_worker().name}' in restricted section")
def producer(condition, queue, data):
with condition:
queue.put(data)
print(f"'{current_worker().name}' sent: {data}")
condition.notify()
def consumer(condition, queue):
with condition:
condition.wait()
print(f"'{current_worker().name}' received: {queue.get()}")
if __name__ == "__main__":
semaphore = Semaphore(2)
workers = [
Worker(name=f"Worker-{i}", target=restricted, args=(semaphore,))
for i in range(3)
]
for w in workers:
w.start()
for w in workers:
w.join()
print()
condition = Condition()
queue = Queue()
c = Worker(name="Consumer", target=consumer, args=(condition, queue))
p = Worker(name="Producer", target=producer, args=(condition, queue, 42))
c.start()
p.start()
c.join()
p.join()
Output:
'Worker-0' in restricted section
'Worker-1' in restricted section
'Worker-2' in restricted section
'Producer' sent: 42
'Consumer' received: 42
BoundedSemaphore behaves like Semaphore but prevents over-releasing by
raising ValueError if release() is called more times than
acquire(). The following example shows the difference:
from freethreading import Semaphore, BoundedSemaphore
s = Semaphore(1)
s.acquire() # Decreases counter to 0
s.release() # Increases counter to 1
s.release() # Increases counter to 2
b = BoundedSemaphore(1)
b.acquire() # Decreases counter to 0
b.release() # Increases counter to 1
b.release() # Raises ValueError
Output:
Traceback (most recent call last):
...
ValueError: Semaphore released too many times
Events and Barriers¶
Event and Barrier wrap their threading and
multiprocessing counterparts. Events broadcast a signal that unblocks waiting workers, while barriers hold
workers until a fixed number have arrived. Below is a quick example:
from freethreading import Barrier, Event, Worker, current_worker
def runner(start_signal, checkpoint):
print(f"'{current_worker().name}' waiting for start signal")
start_signal.wait() # All runners wait for the event
print(f"'{current_worker().name}' started")
checkpoint.wait() # Synchronize at the checkpoint
print(f"'{current_worker().name}' passed checkpoint")
if __name__ == "__main__":
start_signal = Event()
checkpoint = Barrier(3)
workers = [
Worker(name=f"Runner-{i}", target=runner, args=(start_signal, checkpoint))
for i in range(3)
]
for w in workers:
w.start()
# Give workers time to reach wait state, then signal
start_signal.set()
for w in workers:
w.join()
Output:
'Runner-0' waiting for start signal
'Runner-1' waiting for start signal
'Runner-2' waiting for start signal
'Runner-2' started
'Runner-0' started
'Runner-1' started
'Runner-1' passed checkpoint
'Runner-0' passed checkpoint
'Runner-2' passed checkpoint
Utility Functions¶
freethreading provides a collection of commonly used functions from both threading and
multiprocessing. Here’s a quick overview example of how to use them:
from freethreading import (
Worker,
active_children,
active_count,
current_worker,
enumerate,
get_ident,
)
def busy_wait():
while True:
pass
if __name__ == "__main__":
Worker(target=busy_wait, name="Daemon", daemon=True).start()
# MainThread or MainProcess
print(current_worker().name)
# Thread or process identifier
print(get_ident())
# Number of active workers
print(active_count())
# List all active workers
print([worker.name for worker in enumerate()])
# List active child workers
print([child.name for child in active_children()])
Output (Standard Python):
MainProcess
601133
2
['Daemon', 'MainProcess']
['Daemon']
Output (Free-threaded Python):
MainThread
135793751029632
2
['MainThread', 'Daemon']
['Daemon']
In addition, freethreading offers get_backend() function that returns the selected
parallelism backend. This can be useful for debugging. Here’s how to use it:
from freethreading import get_backend
print(get_backend())
Output (Standard Python):
multiprocessing
Output (Free-threaded Python):
threading
End-to-End Example: Parallel Primes¶
The following examples demonstrate finding primes in parallel using the low-level worker-queue pattern and the simpler executor pattern.
Using Worker and Queue¶
This example uses workers and queues for fine-grained control over task distribution:
from freethreading import Queue, Worker
def is_prime(n):
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True
def worker(tasks, results):
while True:
number = tasks.get()
if number is None:
tasks.task_done()
break
if is_prime(number):
results.put(number)
tasks.task_done()
if __name__ == "__main__":
tasks = Queue()
results = Queue()
pool = [Worker(target=worker, args=(tasks, results)) for _ in range(4)]
for w in pool:
w.start()
for candidate in range(500, 600):
tasks.put(candidate)
# Sentinel per worker to signal completion
for _ in pool:
tasks.put(None)
tasks.join()
for w in pool:
w.join()
primes = []
while not results.empty():
primes.append(results.get())
primes.sort()
print(f"Found {len(primes)} primes: {primes}")
Output:
Found 14 primes: [503, 509, 521, 523, 541, 547, 557, 563, 569, 571, 577, 587, 593, 599]
Using WorkerPoolExecutor¶
The following example achieves the same result using WorkerPoolExecutor for simplicity:
from freethreading import WorkerPoolExecutor
def is_prime(n):
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True
if __name__ == "__main__":
with WorkerPoolExecutor() as executor:
candidates = range(500, 600)
primes = [
n
for n, prime in zip(candidates, executor.map(is_prime, candidates))
if prime
]
print(f"Found {len(primes)} primes: {primes}")
Output:
Found 14 primes: [503, 509, 521, 523, 541, 547, 557, 563, 569, 571, 577, 587, 593, 599]