How to use python thread pool

Use of thread pool

The base class of thread pool is Executor in the concurrent.futures module. Executor provides two subclasses, namely ThreadPoolExecutor and ProcessPoolExecutor, where ThreadPoolExecutor is used to create thread pools, and ProcessPoolExecutor is used to create process pools.

If the thread pool/process pool is used to manage concurrent programming, then as long as the corresponding task function is submitted to the thread pool/process pool, the rest will be handled by the thread pool/process pool.

Exectuor provides the following common methods:

After the program submits the task function to the thread pool, the submit method returns a Future object. The Future class is mainly used to obtain the return value of the thread task function. Since the thread task will be executed asynchronously in the new thread, the function executed by the thread is equivalent to a "future completion" task, so Python uses Future to represent it.

In fact, there is also a Future in Java's multithreaded programming. The Future here is similar to Java's Future.

Future provides the following methods:

After a thread pool is used up, the shutdown() method of the thread pool should be called, which will initiate the shutdown sequence of the thread pool. After calling the shutdown() method, the thread pool will no longer receive new tasks, but will complete the execution of all previously submitted tasks. When all tasks in the thread pool are executed, all threads in the thread pool will die.

The steps to use the thread pool to perform thread tasks are as follows:

a. Call the constructor of the ThreadPoolExecutor class to create a thread pool.

b. Define an ordinary function as a thread task.

c. Call the submit() method of the ThreadPoolExecutor object to submit thread tasks.

d. When you do not want to submit any tasks, call the shutdown() method of the ThreadPoolExecutor object to close the thread pool.

The following program demonstrates how to use the thread pool to perform thread tasks:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# Define a function prepared as a thread task
def action(max):
 my_sum =0for i inrange(max):print(threading.current_thread().name +' '+str(i))
 my_sum += i
 return my_sum
# Create a thread pool containing 2 threads
pool =ThreadPoolExecutor(max_workers=2)
# Submit a task to the thread pool,50 will be an action()Function parameters
future1 = pool.submit(action,50)
# Submit another task to the thread pool,100 will be the action()Function parameters
future2 = pool.submit(action,100)
# Determine whether the task represented by future1 is over
print(future1.done())
time.sleep(3)
# Determine whether the task represented by future2 is over
print(future2.done())
# View the results returned by the task represented by future1
print(future1.result())
# View the results returned by the task represented by future2
print(future2.result())
# Close thread pool
pool.shutdown()

In the above program, the 13th line of code creates a thread pool containing two threads. The next two lines of code only need to submit the action() function to the thread pool, and the thread pool will be responsible for starting the thread to execute the action. () function. This method of starting a thread is both elegant and more efficient.

When the program submits the action() function to the thread pool, the submit() method will return the Future object corresponding to the task, and the program will immediately determine the done() method of futurel. This method will return False (indicating that the task is still undone). Next, the main program pauses for 3 seconds, and then judges the done() method of future2. If the task has been completed at this time, the method will return True.

The program finally obtains the results returned by the two asynchronous tasks through the result() method of Future.

Readers can run this code to view the results of the operation, which will not be demonstrated here.

When the program uses the result() method of Future to get the result, this method will block the current thread. If the timeout parameter is not specified, the current thread will remain blocked until the task represented by the Future returns.

Get execution result

The previous program calls the result() method of Future to obtain the return value of the thread task, but this method will block the current main thread. The blocking of the result() method will be released only after the Qiancheng task is completed.

If the program does not want to directly call the result() method to block the thread, you can add a callback function through Future's add_done_callback() method. The callback function is in the form of fn(future). When the thread task is completed, the program will automatically trigger the callback function and pass the corresponding Future object as a parameter to the callback function.

The following program uses the add_done_callback() method to get the return value of the thread task:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# Define a function prepared as a thread task
def action(max):
 my_sum =0for i inrange(max):print(threading.current_thread().name +' '+str(i))
 my_sum += i
 return my_sum
# Create a thread pool containing 2 threads
withThreadPoolExecutor(max_workers=2)as pool:
 # Submit a task to the thread pool,50 will be an action()Function parameters
 future1 = pool.submit(action,50)
 # Submit another task to the thread pool,100 will be the action()Function parameters
 future2 = pool.submit(action,100)
 def get_result(future):print(future.result())
 # Add a callback function for thread completion for future1
 future1.add_done_callback(get_result)
 # Add a callback function for thread completion for future2
 future2.add_done_callback(get_result)print('--------------')

The above main program adds the same callback function for future1 and future2, which will get its return value when the thread task ends.

A horizontal line is printed on the last line of the main program. Since the program does not directly call the result() method of future1 and future2, the main thread will not be blocked, and the horizontal line printed by the main thread can be seen immediately. Next, you will see two new threads execute concurrently. When the thread task is executed, the get_result() function is triggered to output the return value of the thread task.

In addition, because the thread pool implements the Context Manage Protocol, the program can use the with statement to manage the thread pool, so that you can avoid manually closing the thread pool, as shown in the above program.

In addition, Exectuor also provides a map(func, *iterables, timeout=None, chunksize=1) method. The function of this method is similar to the global function map(). The difference is that the map() method of the thread pool will be every iterables. Each element starts a thread to execute the func function concurrently. This method is equivalent to starting len(iterables) threads and collecting the execution results of each thread.

For example, the following program uses the map() method of Executor to start the thread and collects the return value of the thread task:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# Define a function prepared as a thread task
def action(max):
 my_sum =0for i inrange(max):print(threading.current_thread().name +' '+str(i))
 my_sum += i
 return my_sum
# Create a thread pool with 4 threads
withThreadPoolExecutor(max_workers=4)as pool:
 # Use threads to perform map calculations
 # The following tuple has 3 elements, so the program starts 3 threads to execute the action function
 results = pool.map(action,(50,100,150))print('--------------')for r in results:print(r)

The above program uses the map() method to start 3 threads (the thread pool of this program contains 4 threads. If you continue to use the thread pool that contains only two threads, there will be a task in the waiting state, and you must wait for one of the tasks. After completion, the thread will get a chance to execute when it is free), the return value of the map() method will collect the return result of each thread task.

Running the above program, you can also see the results of the concurrent execution of the 3 threads, and finally you can see the return results of the 3 thread tasks through the results.

It can be seen from the above program that using the map() method to start the thread and collect the execution results of the thread not only has the advantage of simple code, but also although the program will execute the action() function concurrently, the final collected action( ) The execution result of the function is still consistent with the result of the passed parameter. In other words, the first element of the above results is the result of action(50), the second element is the result of action(100), and the third element is the result of action(150).

Example extension:

# coding:utf-8import Queue
import threading
import sys
import time
import math

classWorkThread(threading.Thread):

def __init__(self, task_queue):
threading.Thread.__init__(self)
self.setDaemon(True)
self.task_queue = task_queue
self.start()
self.idle = True

def run(self):
sleep_time =0.01 #Rest 10 milliseconds when there is no task to do the first time
multiply =0while True:try:
# Take a task from the queue
func, args, kwargs = self.task_queue.get(block=False)
self.idle = False
multiply =0
# Execute it
func(*args,**kwargs)
except Queue.Empty:
time.sleep(sleep_time * math.pow(2, multiply))
self.idle = True
multiply +=1continue
except:
print sys.exc_info()
raise

classThreadPool:

def __init__(self, thread_num=10, max_queue_len=1000):
self.max_queue_len = max_queue_len
self.task_queue = Queue.Queue(max_queue_len) #Task waiting queue
self.threads =[]
self.__create_pool(thread_num)

def __create_pool(self, thread_num):for i inxrange(thread_num):
thread =WorkThread(self.task_queue)
self.threads.append(thread)

def add_task(self, func,*args,**kwargs):'''Add a task, return the length of the task waiting queue
Call isSafe last before calling this method()Determine if there are many waiting tasks to prevent the submitted tasks from being rejected
'''
try:
self.task_queue.put((func, args, kwargs))
except Queue.Full:
raise #When the queue is full, an exception will be thrown directly without execution
return self.task_queue.qsize()

def isSafe(self):'''The number of waiting tasks is still far from the police line
'''
return self.task_queue.qsize()<0.9* self.max_queue_len

def wait_for_complete(self):'''Waiting for all tasks submitted to the thread pool to be executed
'''
# First, the task waiting queue must become empty
while not self.task_queue.empty():
time.sleep(1)
# Second, so the computing thread has to become idle
while True:
all_idle = True
for th in self.threads:if not th.idle:
all_idle = False
breakif all_idle:breakelse:
time.sleep(1)if __name__ =='__main__':
def foo(a, b):
print a + b
time.sleep(0.01)
thread_pool =ThreadPool(10,100)'''Test failed on Windows, Queue on Windows.Queue is not thread safe'''
size =0for i inxrange(10000):try:
size = thread_pool.add_task(foo, i,2* i)
except Queue.Full:
print 'queue full, queue size is ', size
time.sleep(2)

So far, this article on how to use the python thread pool is introduced. For more detailed explanations of the thread pool in python, please search ZaLou.Cn

Recommended Posts

How to use python thread pool
How to use python tuples
How to use SQLite in Python
How to use and and or in Python
How to use PYTHON to crawl news articles
How to use the round function in python
How to use the zip function in Python
How to use the format function in python
How to use code running assistant in python
How to comment python code
How to learn python quickly
How to uninstall python plugin
How to understand python objects
How to use Python&#39;s filter function
How to use python&#39;s help function
python how to view webpage code
How to use hanlp in ubuntu
Use python to query Oracle database
Use C++ to write Python3 extensions
How to write python configuration file
Use python to achieve stepwise regression
How to wrap in python code
How to save the python program
How to omit parentheses in Python
How to install Python 3.8 on CentOS 8
How to install Python 3.8 on Ubuntu 18.04
How to write classes in python
How to filter numbers in python
How to read Excel in Python
How to install Python on CentOS 8
How to solve python dict garbled
How to view errors in python
How to write return in python
How to view the python module
How to understand variables in Python
How to clear variables in python
How to understand python object-oriented programming
How to verify successful installation of python
How to make a globe with Python
How to delete cache files in python
How to introduce third-party modules in Python
How to represent null values in python
How to save text files in python
Use Python to make airplane war games
How to write win programs in python
How to run id function in python
How to install third-party modules in Python
How to custom catch errors in python
How to write try statement in python
Python | So collections are so easy to use! !
How to define private attributes in Python
How to use Samba server on Ubuntu 16.04
Use Python to generate Douyin character videos!
R&amp;D: How To Install Python 3 on CentOS 7
How to add custom modules in Python
How to process excel table with python
How to understand global variables in Python
How to view installed modules in python
Python novice learns to use the library
How to install Python2 on Ubuntu20.04 ubuntu/focal64
Use Python to quickly cut out images