pythonスレッドプールの使用方法

スレッドプールの使用

スレッドプールの基本クラスは、concurrent.futuresモジュールのExecutorです。Executorは、ThreadPoolExecutorとProcessPoolExecutorの2つのサブクラスを提供します。ここで、ThreadPoolExecutorはスレッドプールの作成に使用され、ProcessPoolExecutorはプロセスプールの作成に使用されます。

スレッドプール/プロセスプールを使用して同時プログラミングを管理する場合、対応するタスク機能がスレッドプール/プロセスプールに送信される限り、残りはスレッドプール/プロセスプールによって処理されます。

Exectuorは、次の一般的な方法を提供します。

プログラムがタスク関数をスレッドプールに送信した後、submitメソッドはFutureオブジェクトを返します。Futureクラスは主にスレッドタスク関数の戻り値を取得するために使用されます。スレッドタスクは新しいスレッドで非同期に実行されるため、スレッドによって実行される関数は「将来の完了」タスクと同等であり、PythonはFutureを使用してそれを表します。

実際、JavaのマルチスレッドプログラミングにはFutureもあります。ここでのFutureは、JavaのFutureに似ています。

Futureは、次の方法を提供します。

スレッドプールが使い果たされた後、スレッドプールのshutdown()メソッドを呼び出す必要があります。これにより、スレッドプールのシャットダウンシーケンスが開始されます。 shutdown()メソッドを呼び出した後、スレッドプールは新しいタスクを受信しなくなりますが、以前に送信されたすべてのタスクの実行を完了します。スレッドプール内のすべてのタスクが実行されると、スレッドプール内のすべてのスレッドが停止します。

スレッドプールを使用してスレッドタスクを実行する手順は次のとおりです。

a。ThreadPoolExecutorクラスのコンストラクターを呼び出して、スレッドプールを作成します。

b。通常の関数をスレッドタスクとして定義します。

c。ThreadPoolExecutorオブジェクトのsubmit()メソッドを呼び出して、スレッドタスクを送信します。

d。タスクを送信しない場合は、ThreadPoolExecutorオブジェクトのshutdown()メソッドを呼び出してスレッドプールを閉じます。

次のプログラムは、スレッドプールを使用してスレッドタスクを実行する方法を示しています。

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# スレッドタスクとして準備された関数を定義します
def action(max):
 my_sum =0for i inrange(max):print(threading.current_thread().name +' '+str(i))
 my_sum += i
 return my_sum
# 2つのスレッドを含むスレッドプールを作成します
pool =ThreadPoolExecutor(max_workers=2)
# スレッドプールにタスクを送信する,50がアクションになります()機能パラメータ
future1 = pool.submit(action,50)
# 別のタスクをスレッドプールに送信する,100がアクションになります()機能パラメータ
future2 = pool.submit(action,100)
# future1で表されるタスクが終了したかどうかを確認します
print(future1.done())
time.sleep(3)
# future2で表されるタスクが終了したかどうかを確認します
print(future2.done())
# future1で表されるタスクによって返された結果を表示する
print(future1.result())
# future2で表されるタスクによって返された結果を表示する
print(future2.result())
# スレッドプールを閉じる
pool.shutdown()

上記のプログラムでは、13行目のコードで2つのスレッドを含むスレッドプールが作成されます。次の2行のコードでは、action()関数をスレッドプールに送信するだけで済み、スレッドプールはスレッドを開始してアクションを実行します。 () 関数。スレッドを開始するこの方法は、エレガントで効率的です。

プログラムがaction()関数をスレッドプールに送信すると、submit()メソッドはタスクに対応するFutureオブジェクトを返し、プログラムはすぐにfuturelのdone()メソッドを決定します。このメソッドはFalseを返します(タスクがまだ残っていることを示します)。元に戻す)。次に、メインプログラムは3秒間一時停止し、future2のdone()メソッドを判断します。この時点でタスクが完了している場合、メソッドはTrueを返します。

プログラムは最終的に、Futureのresult()メソッドを介して2つの非同期タスクによって返された結果を取得します。

読者はこのコードを実行して操作の結果を表示できますが、ここでは説明しません。

プログラムがFutureのresult()メソッドを使用して結果を取得する場合、このメソッドは現在のスレッドをブロックします。timeoutパラメーターが指定されていない場合、Futureで表されるタスクが戻るまで、現在のスレッドはブロックされたままになります。

実行結果を取得

前のプログラムはFutureのresult()メソッドを呼び出してスレッドタスクの戻り値を取得しますが、このメソッドは現在のメインスレッドをブロックします。result()メソッドのブロックは、Qianchengタスクが完了した後にのみ解放されます。

プログラムがresult()メソッドを直接呼び出してスレッドをブロックしたくない場合は、Futureのadd_done_callback()メソッドを使用してコールバック関数を追加できます。コールバック関数はfn(future)の形式です。スレッドタスクが完了すると、プログラムは自動的にコールバック関数をトリガーし、対応するFutureオブジェクトをパラメーターとしてコールバック関数に渡します。

次のプログラムは、add_done_callback()メソッドを使用して、スレッドタスクの戻り値を取得します。

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# スレッドタスクとして準備された関数を定義します
def action(max):
 my_sum =0for i inrange(max):print(threading.current_thread().name +' '+str(i))
 my_sum += i
 return my_sum
# 2つのスレッドを含むスレッドプールを作成します
withThreadPoolExecutor(max_workers=2)as pool:
 # スレッドプールにタスクを送信する,50がアクションになります()機能パラメータ
 future1 = pool.submit(action,50)
 # 別のタスクをスレッドプールに送信する,100がアクションになります()機能パラメータ
 future2 = pool.submit(action,100)
 def get_result(future):print(future.result())
 # future1のスレッド完了用のコールバック関数を追加します
 future1.add_done_callback(get_result)
 # future2のスレッド完了用のコールバック関数を追加します
 future2.add_done_callback(get_result)print('--------------')

上記のメインプログラムは、future1とfuture2に同じコールバック関数を追加します。これらは、スレッドタスクが終了したときに戻り値を取得します。

メインプログラムの最後の行に横線が印刷されます。プログラムはfuture1とfuture2のresult()メソッドを直接呼び出さないため、メインスレッドがブロックされることはなく、メインスレッドによって印刷された水平線がすぐに表示されます。次に、2つの新しいスレッドが同時に実行されます。スレッドタスクが実行されると、get_result()関数がトリガーされ、スレッドタスクの戻り値が出力されます。

さらに、スレッドプールはContext Manage Protocolを実装しているため、プログラムはwithステートメントを使用してスレッドプールを管理できます。これにより、上記のプログラムに示すように、スレッドプールを手動で閉じる必要がなくなります。

さらに、Executorはmap(func、* iterables、timeout = None、chunksize = 1)メソッドも提供します。このメソッドの関数はグローバル関数map()に似ています。違いは、スレッドプールのmap()メソッドがすべてのiterableになることです。各要素は、func関数を同時に実行するスレッドを開始します。この方法は、len(iterables)スレッドを開始し、各スレッドの実行結果を収集することと同じです。

たとえば、次のプログラムは、Executorのmap()メソッドを使用してスレッドを開始し、スレッドタスクの戻り値を収集します。

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# スレッドタスクとして準備された関数を定義します
def action(max):
 my_sum =0for i inrange(max):print(threading.current_thread().name +' '+str(i))
 my_sum += i
 return my_sum
# 4つのスレッドでスレッドプールを作成します
withThreadPoolExecutor(max_workers=4)as pool:
 # スレッドを使用してマップ計算を実行する
 # 次のタプルには3つの要素があるため、プログラムは3つのスレッドを開始してアクション関数を実行します
 results = pool.map(action,(50,100,150))print('--------------')for r in results:print(r)

上記のプログラムは、map()メソッドを使用して3つのスレッドを開始します(このプログラムのスレッドプールには4つのスレッドが含まれます。2つのスレッドのみを含むスレッドプールを引き続き使用すると、タスクが待機状態になり、いずれかのタスクを待機する必要があります。完了後、スレッドは空いているときに実行する機会があります)、map()メソッドの戻り値は、各スレッドタスクの戻り結果を収集します。

上記のプログラムを実行すると、3つのスレッドの同時実行の結果も確認でき、最後に、結果を通じて3つのスレッドタスクの戻り結果を確認できます。

上記のプログラムから、map()メソッドを使用してスレッドを開始し、スレッドの実行結果を収集することには、単純なコードの利点があるだけでなく、プログラムがaction()関数を同時に実行するにもかかわらず、最終的に収集されたaction( )関数の実行結果は、渡されたパラメーターの結果と一致しています。つまり、上記の結果の最初の要素はaction(50)の結果、2番目の要素はaction(100)の結果、3番目の要素はaction(150)の結果です。

拡張の例:

# coding:utf-8import Queue
import threading
import sys
import time
import math

classWorkThread(threading.Thread):

def __init__(self, task_queue):
threading.Thread.__init__(self)
self.setDaemon(True)
self.task_queue = task_queue
self.start()
self.idle = True

def run(self):
sleep_time =0.01 #初めて行うタスクがない場合は、10ミリ秒休憩します
multiply =0while True:try:
# キューからタスクを取得します
func, args, kwargs = self.task_queue.get(block=False)
self.idle = False
multiply =0
# 実行する
func(*args,**kwargs)
except Queue.Empty:
time.sleep(sleep_time * math.pow(2, multiply))
self.idle = True
multiply +=1continue
except:
print sys.exc_info()
raise

classThreadPool:

def __init__(self, thread_num=10, max_queue_len=1000):
self.max_queue_len = max_queue_len
self.task_queue = Queue.Queue(max_queue_len) #タスク待機キュー
self.threads =[]
self.__create_pool(thread_num)

def __create_pool(self, thread_num):for i inxrange(thread_num):
thread =WorkThread(self.task_queue)
self.threads.append(thread)

def add_task(self, func,*args,**kwargs):'''タスクを追加し、タスク待機キューの長さを返します
このメソッドを呼び出す前に、最後にisSafeを呼び出します()送信されたタスクが拒否されないようにするために、待機中のタスクが多数あるかどうかを判断します
'''
try:
self.task_queue.put((func, args, kwargs))
except Queue.Full:
raise #キューがいっぱいになると、実行せずに直接例外がスローされます
return self.task_queue.qsize()

def isSafe(self):'''待機中のタスクの数はまだ警察のラインから遠く離れています
'''
return self.task_queue.qsize()<0.9* self.max_queue_len

def wait_for_complete(self):'''スレッドプールに送信されたすべてのタスクが実行されるのを待機しています
'''
# まず、タスク待機キューを空にする必要があります
while not self.task_queue.empty():
time.sleep(1)
# 次に、コンピューティングスレッドはアイドル状態になる必要があります
while True:
all_idle = True
for th in self.threads:if not th.idle:
all_idle = False
breakif all_idle:breakelse:
time.sleep(1)if __name__ =='__main__':
def foo(a, b):
print a + b
time.sleep(0.01)
thread_pool =ThreadPool(10,100)'''テストはWindowsで失敗し、キューはWindowsで失敗しました.キューはスレッドセーフではありません'''
size =0for i inxrange(10000):try:
size = thread_pool.add_task(foo, i,2* i)
except Queue.Full:
print 'queue full, queue size is ', size
time.sleep(2)

これまで、pythonスレッドプールの使用方法に関するこの記事を紹介しました。pythonでのスレッドプールの詳細な説明については、ZaLou.Cnを検索してください。

Recommended Posts

pythonスレッドプールの使用方法
パイソンタプルの使い方
PythonでSQLiteを使用する方法
およびおよびまたはPythonでの使用方法
PYTHONを使用してニュース記事をクロールする方法
pythonでround関数を使用する方法
Pythonでzip関数を使用する方法
pythonでformat関数を使用する方法
pythonでアシスタントを実行するコードを使用する方法
pythonコードにコメントする方法
pythonをすばやく学ぶ方法
pythonプラグインをアンインストールする方法
pythonオブジェクトを理解する方法
Pythonのフィルター機能の使い方
pythonのヘルプ機能の使い方
pythonウェブページコードの表示方法
ubuntuでhanlpを使用する方法
pythonを使用してOracleデータベースにクエリを実行します
C ++を使用してPython3拡張機能を作成する
python設定ファイルの書き方
pythonを使用して段階的な回帰を実現します
pythonコードでラップする方法
pythonプログラムを保存する方法
Pythonで括弧を省略する方法
CentOS8にPython3.8をインストールする方法
Ubuntu18.04にPython3.8をインストールする方法
pythonでクラスを書く方法
pythonで数値をフィルタリングする方法
PythonでExcelを読む方法
CentOS8にPythonをインストールする方法
python dict garbledを解決する方法
pythonでエラーを表示する方法
pythonでreturnを書く方法
pythonモジュールを表示する方法
Pythonで変数を理解する方法
pythonで変数をクリアする方法
pythonオブジェクト指向プログラミングを理解する方法
pythonのインストールが成功したことを確認する方法
Pythonで地球を作る方法
pythonでキャッシュファイルを削除する方法
Pythonでサードパーティモジュールを導入する方法
pythonでnull値を表す方法
pythonでテキストファイルを保存する方法
Pythonを使用して飛行機の戦争ゲームを作成する
pythonでwinプログラムを書く方法
pythonでid関数を実行する方法
Pythonでサードパーティモジュールをインストールする方法
pythonでエラーをカスタムキャッチする方法
pythonでtryステートメントを書く方法
Python |コレクションはとても使いやすいです! !
Pythonでプライベート属性を定義する方法
Ubuntu16.04でSambaサーバーを使用する方法
Pythonを使用してDouyinキャラクタービデオを生成してください!
R&D:CentOS7にPython3をインストールする方法
Pythonでカスタムモジュールを追加する方法
pythonでexcelテーブルを処理する方法
Pythonでグローバル変数を理解する方法
インストールされているモジュールをpythonで表示する方法
Python初心者はライブラリの使い方を学びます
Ubuntu20.04 ubuntu / focal64にPython2をインストールする方法
Pythonを使用して画像をすばやく切り取る