スレッドプールの使用
スレッドプールの基本クラスは、concurrent.futuresモジュールのExecutorです。Executorは、ThreadPoolExecutorとProcessPoolExecutorの2つのサブクラスを提供します。ここで、ThreadPoolExecutorはスレッドプールの作成に使用され、ProcessPoolExecutorはプロセスプールの作成に使用されます。
スレッドプール/プロセスプールを使用して同時プログラミングを管理する場合、対応するタスク機能がスレッドプール/プロセスプールに送信される限り、残りはスレッドプール/プロセスプールによって処理されます。
Exectuorは、次の一般的な方法を提供します。
プログラムがタスク関数をスレッドプールに送信した後、submitメソッドはFutureオブジェクトを返します。Futureクラスは主にスレッドタスク関数の戻り値を取得するために使用されます。スレッドタスクは新しいスレッドで非同期に実行されるため、スレッドによって実行される関数は「将来の完了」タスクと同等であり、PythonはFutureを使用してそれを表します。
実際、JavaのマルチスレッドプログラミングにはFutureもあります。ここでのFutureは、JavaのFutureに似ています。
Futureは、次の方法を提供します。
スレッドプールが使い果たされた後、スレッドプールのshutdown()メソッドを呼び出す必要があります。これにより、スレッドプールのシャットダウンシーケンスが開始されます。 shutdown()メソッドを呼び出した後、スレッドプールは新しいタスクを受信しなくなりますが、以前に送信されたすべてのタスクの実行を完了します。スレッドプール内のすべてのタスクが実行されると、スレッドプール内のすべてのスレッドが停止します。
スレッドプールを使用してスレッドタスクを実行する手順は次のとおりです。
a。ThreadPoolExecutorクラスのコンストラクターを呼び出して、スレッドプールを作成します。
b。通常の関数をスレッドタスクとして定義します。
c。ThreadPoolExecutorオブジェクトのsubmit()メソッドを呼び出して、スレッドタスクを送信します。
d。タスクを送信しない場合は、ThreadPoolExecutorオブジェクトのshutdown()メソッドを呼び出してスレッドプールを閉じます。
次のプログラムは、スレッドプールを使用してスレッドタスクを実行する方法を示しています。
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# スレッドタスクとして準備された関数を定義します
def action(max):
my_sum =0for i inrange(max):print(threading.current_thread().name +' '+str(i))
my_sum += i
return my_sum
# 2つのスレッドを含むスレッドプールを作成します
pool =ThreadPoolExecutor(max_workers=2)
# スレッドプールにタスクを送信する,50がアクションになります()機能パラメータ
future1 = pool.submit(action,50)
# 別のタスクをスレッドプールに送信する,100がアクションになります()機能パラメータ
future2 = pool.submit(action,100)
# future1で表されるタスクが終了したかどうかを確認します
print(future1.done())
time.sleep(3)
# future2で表されるタスクが終了したかどうかを確認します
print(future2.done())
# future1で表されるタスクによって返された結果を表示する
print(future1.result())
# future2で表されるタスクによって返された結果を表示する
print(future2.result())
# スレッドプールを閉じる
pool.shutdown()
上記のプログラムでは、13行目のコードで2つのスレッドを含むスレッドプールが作成されます。次の2行のコードでは、action()関数をスレッドプールに送信するだけで済み、スレッドプールはスレッドを開始してアクションを実行します。 () 関数。スレッドを開始するこの方法は、エレガントで効率的です。
プログラムがaction()関数をスレッドプールに送信すると、submit()メソッドはタスクに対応するFutureオブジェクトを返し、プログラムはすぐにfuturelのdone()メソッドを決定します。このメソッドはFalseを返します(タスクがまだ残っていることを示します)。元に戻す)。次に、メインプログラムは3秒間一時停止し、future2のdone()メソッドを判断します。この時点でタスクが完了している場合、メソッドはTrueを返します。
プログラムは最終的に、Futureのresult()メソッドを介して2つの非同期タスクによって返された結果を取得します。
読者はこのコードを実行して操作の結果を表示できますが、ここでは説明しません。
プログラムがFutureのresult()メソッドを使用して結果を取得する場合、このメソッドは現在のスレッドをブロックします。timeoutパラメーターが指定されていない場合、Futureで表されるタスクが戻るまで、現在のスレッドはブロックされたままになります。
実行結果を取得
前のプログラムはFutureのresult()メソッドを呼び出してスレッドタスクの戻り値を取得しますが、このメソッドは現在のメインスレッドをブロックします。result()メソッドのブロックは、Qianchengタスクが完了した後にのみ解放されます。
プログラムがresult()メソッドを直接呼び出してスレッドをブロックしたくない場合は、Futureのadd_done_callback()メソッドを使用してコールバック関数を追加できます。コールバック関数はfn(future)の形式です。スレッドタスクが完了すると、プログラムは自動的にコールバック関数をトリガーし、対応するFutureオブジェクトをパラメーターとしてコールバック関数に渡します。
次のプログラムは、add_done_callback()メソッドを使用して、スレッドタスクの戻り値を取得します。
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# スレッドタスクとして準備された関数を定義します
def action(max):
my_sum =0for i inrange(max):print(threading.current_thread().name +' '+str(i))
my_sum += i
return my_sum
# 2つのスレッドを含むスレッドプールを作成します
withThreadPoolExecutor(max_workers=2)as pool:
# スレッドプールにタスクを送信する,50がアクションになります()機能パラメータ
future1 = pool.submit(action,50)
# 別のタスクをスレッドプールに送信する,100がアクションになります()機能パラメータ
future2 = pool.submit(action,100)
def get_result(future):print(future.result())
# future1のスレッド完了用のコールバック関数を追加します
future1.add_done_callback(get_result)
# future2のスレッド完了用のコールバック関数を追加します
future2.add_done_callback(get_result)print('--------------')
上記のメインプログラムは、future1とfuture2に同じコールバック関数を追加します。これらは、スレッドタスクが終了したときに戻り値を取得します。
メインプログラムの最後の行に横線が印刷されます。プログラムはfuture1とfuture2のresult()メソッドを直接呼び出さないため、メインスレッドがブロックされることはなく、メインスレッドによって印刷された水平線がすぐに表示されます。次に、2つの新しいスレッドが同時に実行されます。スレッドタスクが実行されると、get_result()関数がトリガーされ、スレッドタスクの戻り値が出力されます。
さらに、スレッドプールはContext Manage Protocolを実装しているため、プログラムはwithステートメントを使用してスレッドプールを管理できます。これにより、上記のプログラムに示すように、スレッドプールを手動で閉じる必要がなくなります。
さらに、Executorはmap(func、* iterables、timeout = None、chunksize = 1)メソッドも提供します。このメソッドの関数はグローバル関数map()に似ています。違いは、スレッドプールのmap()メソッドがすべてのiterableになることです。各要素は、func関数を同時に実行するスレッドを開始します。この方法は、len(iterables)スレッドを開始し、各スレッドの実行結果を収集することと同じです。
たとえば、次のプログラムは、Executorのmap()メソッドを使用してスレッドを開始し、スレッドタスクの戻り値を収集します。
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# スレッドタスクとして準備された関数を定義します
def action(max):
my_sum =0for i inrange(max):print(threading.current_thread().name +' '+str(i))
my_sum += i
return my_sum
# 4つのスレッドでスレッドプールを作成します
withThreadPoolExecutor(max_workers=4)as pool:
# スレッドを使用してマップ計算を実行する
# 次のタプルには3つの要素があるため、プログラムは3つのスレッドを開始してアクション関数を実行します
results = pool.map(action,(50,100,150))print('--------------')for r in results:print(r)
上記のプログラムは、map()メソッドを使用して3つのスレッドを開始します(このプログラムのスレッドプールには4つのスレッドが含まれます。2つのスレッドのみを含むスレッドプールを引き続き使用すると、タスクが待機状態になり、いずれかのタスクを待機する必要があります。完了後、スレッドは空いているときに実行する機会があります)、map()メソッドの戻り値は、各スレッドタスクの戻り結果を収集します。
上記のプログラムを実行すると、3つのスレッドの同時実行の結果も確認でき、最後に、結果を通じて3つのスレッドタスクの戻り結果を確認できます。
上記のプログラムから、map()メソッドを使用してスレッドを開始し、スレッドの実行結果を収集することには、単純なコードの利点があるだけでなく、プログラムがaction()関数を同時に実行するにもかかわらず、最終的に収集されたaction( )関数の実行結果は、渡されたパラメーターの結果と一致しています。つまり、上記の結果の最初の要素はaction(50)の結果、2番目の要素はaction(100)の結果、3番目の要素はaction(150)の結果です。
拡張の例:
# coding:utf-8import Queue
import threading
import sys
import time
import math
classWorkThread(threading.Thread):
def __init__(self, task_queue):
threading.Thread.__init__(self)
self.setDaemon(True)
self.task_queue = task_queue
self.start()
self.idle = True
def run(self):
sleep_time =0.01 #初めて行うタスクがない場合は、10ミリ秒休憩します
multiply =0while True:try:
# キューからタスクを取得します
func, args, kwargs = self.task_queue.get(block=False)
self.idle = False
multiply =0
# 実行する
func(*args,**kwargs)
except Queue.Empty:
time.sleep(sleep_time * math.pow(2, multiply))
self.idle = True
multiply +=1continue
except:
print sys.exc_info()
raise
classThreadPool:
def __init__(self, thread_num=10, max_queue_len=1000):
self.max_queue_len = max_queue_len
self.task_queue = Queue.Queue(max_queue_len) #タスク待機キュー
self.threads =[]
self.__create_pool(thread_num)
def __create_pool(self, thread_num):for i inxrange(thread_num):
thread =WorkThread(self.task_queue)
self.threads.append(thread)
def add_task(self, func,*args,**kwargs):'''タスクを追加し、タスク待機キューの長さを返します
このメソッドを呼び出す前に、最後にisSafeを呼び出します()送信されたタスクが拒否されないようにするために、待機中のタスクが多数あるかどうかを判断します
'''
try:
self.task_queue.put((func, args, kwargs))
except Queue.Full:
raise #キューがいっぱいになると、実行せずに直接例外がスローされます
return self.task_queue.qsize()
def isSafe(self):'''待機中のタスクの数はまだ警察のラインから遠く離れています
'''
return self.task_queue.qsize()<0.9* self.max_queue_len
def wait_for_complete(self):'''スレッドプールに送信されたすべてのタスクが実行されるのを待機しています
'''
# まず、タスク待機キューを空にする必要があります
while not self.task_queue.empty():
time.sleep(1)
# 次に、コンピューティングスレッドはアイドル状態になる必要があります
while True:
all_idle = True
for th in self.threads:if not th.idle:
all_idle = False
breakif all_idle:breakelse:
time.sleep(1)if __name__ =='__main__':
def foo(a, b):
print a + b
time.sleep(0.01)
thread_pool =ThreadPool(10,100)'''テストはWindowsで失敗し、キューはWindowsで失敗しました.キューはスレッドセーフではありません'''
size =0for i inxrange(10000):try:
size = thread_pool.add_task(foo, i,2* i)
except Queue.Full:
print 'queue full, queue size is ', size
time.sleep(2)
これまで、pythonスレッドプールの使用方法に関するこの記事を紹介しました。pythonでのスレッドプールの詳細な説明については、ZaLou.Cnを検索してください。
Recommended Posts