Concurrent Task Processing with a Worker Pool
Worker pools are a powerful technique for parallelizing tasks in Python, especially when dealing with I/O-bound operations. This challenge asks you to implement a basic worker pool that distributes tasks across a fixed number of worker threads, improving efficiency and responsiveness compared to sequential processing. A well-implemented worker pool can significantly speed up applications that involve numerous independent operations.
Problem Description
You are to implement a WorkerPool class in Python. This class should manage a pool of worker threads that execute tasks submitted to it. The WorkerPool should have the following functionalities:
- Initialization: The constructor should accept the number of worker threads to create (
num_workers). - Task Submission: A
submit(task)method should accept a callabletask(a function or lambda expression) and add it to a queue for execution by a worker thread. - Task Execution: Worker threads should continuously retrieve tasks from the queue and execute them.
- Shutdown: A
shutdown()method should gracefully terminate the worker threads and prevent new tasks from being submitted. Theshutdown()method should wait for all currently queued tasks to complete before returning. - Thread Safety: The task queue should be thread-safe to prevent race conditions when multiple threads access it.
Key Requirements:
- Use the
threadingmodule for thread management. - Use a
queue.Queuefor thread-safe task queuing. - The
submitmethod should not block. - The
shutdownmethod should wait for all tasks to complete. - The worker threads should terminate gracefully when the pool is shut down.
Expected Behavior:
When a task is submitted, it should be executed by one of the worker threads in the pool. The order of execution is not guaranteed. The shutdown() method should ensure that all submitted tasks are completed before the worker pool terminates.
Edge Cases to Consider:
num_workersis 0 or negative: The pool should not create any threads and tasks should be ignored.- Submitting tasks after
shutdown()has been called: These tasks should be ignored. - Tasks that raise exceptions: The worker thread should catch the exception and log it (or handle it appropriately – see Notes). The exception should not crash the entire worker pool.
- Empty task queue: Worker threads should gracefully exit when the queue is empty and the pool is shutting down.
Examples
Example 1:
Input:
num_workers = 2
tasks = [lambda x: x * 2, lambda x: x + 5, lambda x: x - 3, lambda x: x / 2]
input_values = [1, 2, 3, 4]
# Tasks are applied to input values in sequence.
# e.g., task[0](input_values[0]) = 1 * 2 = 2
# task[1](input_values[1]) = 2 + 5 = 7
# ...
Output:
[2, 7, 0, 2]
Explanation: The worker pool executes the tasks concurrently. The order of the results might vary depending on thread scheduling, but all tasks will be executed.
Example 2:
Input:
num_workers = 4
tasks = [lambda x: x * x, lambda x: x + 1, lambda x: x - 1]
input_values = [5, 10, 15]
Output:
[25, 11, 14]
Explanation: The worker pool distributes the tasks across the four worker threads.
Example 3: (Edge Case - Exception Handling)
Input:
num_workers = 2
tasks = [lambda x: x / 0, lambda x: x * 2]
input_values = [1, 2]
Output:
[None, 4]
Explanation: The first task raises a ZeroDivisionError. The worker thread catches this exception and continues. The second task executes successfully. The exact output depends on how you handle exceptions (see Notes).
Constraints
num_workersmust be a non-negative integer.- Tasks submitted to the pool must be callable (functions or lambda expressions).
- The maximum number of tasks that can be submitted before shutdown is 1000. (This is to prevent unbounded memory usage in testing).
- The
shutdown()method should complete within 5 seconds after all tasks are submitted. (This is a performance constraint for testing purposes).
Notes
- Consider using a logging mechanism (e.g., the
loggingmodule) to record exceptions raised by tasks. This is crucial for debugging and monitoring. - You can choose how to handle exceptions raised by tasks. Options include:
- Logging the exception and returning a default value (e.g.,
None). - Re-raising the exception (which will terminate the worker thread – generally not recommended).
- Storing the exception and making it available after the task completes (more complex).
- Logging the exception and returning a default value (e.g.,
- The provided examples assume that tasks are simple and do not require any external resources. In a real-world scenario, you might need to handle resource contention and synchronization more carefully.
- Focus on the core functionality of the worker pool. Error handling and advanced features (e.g., task prioritization) can be added later.
- The input values are not part of the
WorkerPoolclass itself. The tasks should accept a single argument. The examples are just to illustrate how the tasks might be used.