初心者向け:Pythonの非同期処理を理解しよう

Python

~駆け出しエンジニアからF/Eへの道~

こんにちは。KeSです。

前回、「駆け出しエンジニアでもフルスタックエンジニアになりたい!!#準備編」を
書かせていただいたので、その続きとなる実践編を書いていこうと思います。

完全未経験に向けた、Python完全基礎編も要望が出てきたら作成しようと思うので、
XのDMや、ブログのコメントでどしどしお願いします。

今回は「Pythonの非同期処理を理解しよう」です。
頑張っていきやしょう。

同期処理と非同期処理の違い(カフェの例え話)

非同期処理について理解するために、カフェの注文を例にとって説明してみましょう。

同期処理

ウェイター(CPU/実行スレッド)が一人で、すべての作業を順番にこなします。

特徴説明
処理の流れ順番待ち。一つの作業が完了するまで次の作業に進まない。
カフェの例ウェイターが一人で、注文を受けてから(タスクA)、コーヒーを淹れる(タスクB)。タスクBが完了するまで、次の客(タスクC)の注文は受けられない。
コンピューターI/O待ち(ネットワーク通信やファイル読み書き)が発生しても、CPUはアイドル状態で待機し続ける。

表にするとこんな感じですね。

ステップ客Aの注文
(タスクA)
客Bの注文
(タスクC)
待ち時間
注文受け待機
コーヒー淹れ開始待機3分
コーヒー淹れ完了待機
提供待機
注文受け
コーヒー淹れ開始3分
コーヒー淹れ完了
8提供

総処理時間: 6分 (客A完了まで3分 + 客B完了まで3分) ウェイターはコーヒーを淹れている間(I/O待ちの間)、完全にアイドルです。

非同期処理

ウェイター(CPU/実行スレッド)は一人ですが、待ち時間を活用して複数のタスクを切り替えながらこなします。

特徴説明
処理の流れ待ち時間を有効活用。一つの作業が完了するのを待たずに次の作業に取り掛かる。
カフェの例ウェイターが一人(または少数)だが、客の注文(タスクA)を受けたら、すぐに次の客の注文(タスクC)を受ける。コーヒー(タスクB)が淹れ終わったら、その客に提供する。タスクBの「コーヒーが淹れ終わるまでの待ち時間」を、他のタスク(タスクC)の処理に充てている。
コンピューターI/O待ちが発生したら、そのタスクを一時中断し、別のタスクにCPUを切り替える。処理が完了したら元のタスクに戻る。CPUを効率的に利用できる。

こちらを表にするとこんな感じです。

ステップ客Aの注文
(タスクA)
客Bの注文
(タスクC)
待ち時間
注文受け待機
コーヒー淹れ開始
(バックグラウンド)
注文受け3分
コーヒー淹れ中コーヒー淹れ開始
(バックグラウンド)
3分
コーヒー淹れ完了コーヒー淹れ中
提供コーヒー淹れ完了提供待機1分
提供

総処理時間: 約4分 (客Aの待ち時間に客Bの注文・淹れ作業を並行して実行) ウェイターはコーヒーが淹れ終わるまでの待ち時間を、客Bのタスクの処理に充てています。

非同期処理では、時間のかかるタスク(コーヒーを淹れる、APIからの応答を待つ)を await で「一時停止」し、その空いた時間を他のタスク(別の客の注文を受ける)に充てることで、全体の完了時間を短縮します。

async/awaitの基本文法

Pythonで非同期処理を行うための主要なキーワードがasyncawaitです。

async def(コルーチン関数の定義)

関数の前にasyncをつけることで、その関数がコルーチン(Coroutine)であることを示します。
※コルーチンとは?・・・実行を一時停止し、後で再開できる、特殊な関数です。

async def my_async_function():
    print("非同期処理を開始")
    # await がある場所で処理が一時停止し、他のタスクに制御が移る
    await asyncio.sleep(1) # 例として1秒間待機
    print("非同期処理を終了")

await (処理の待機)

awaitの特徴をまとめていきます。

  • awaitはコルーチン関数の中でのみ使用できます。
  • awaitの後ろには、別のコルーチン(あるいは「待機可能オブジェクト」)を指定します。
  • awaitが実行されると、コルーチンが完了するまで現在のタスクの実行を一時停止し、イベントループに制御を戻します。これにより、他のタスクを実行する機会が生まれます。

実行方法

コルーチンを実際に動かすには、asyncio.run() を使ってエントリポイントとなるコルーチンを実行します。

import asyncio

async def main():
    await my_async_function()

# 非同期処理の実行を開始
# Python 3.7以降で推奨される方法
asyncio.run(main())

実践:複数のAPIを並行して呼び出す

ここから、実際の処理の例を見ていきましょう。

複数の時間のかかるI/O処理(ここでは仮想のAPI呼び出し)を非同期で並行して実行することで、処理時間を短縮する例です。

import asyncio
import time

# 仮想的なAPI呼び出し(I/O待ちをシミュレート)
async def fetch_data(api_name, delay):
    print(f"[{api_name}] 処理開始: {time.strftime('%X')}")
    # 実際は aiohttp などを使って await response.json() などを行う
    await asyncio.sleep(delay)
    print(f"[{api_name}] 処理完了: {time.strftime('%X')}")
    return f"データ from {api_name}"

async def main_parallel():
    start_time = time.time()
    
    # 複数のコルーチンを並行して実行するためにリストにまとめる
    tasks = [
        fetch_data("API_A", 3), # 3秒かかる処理
        fetch_data("API_B", 1), # 1秒かかる処理
        fetch_data("API_C", 2)  # 2秒かかる処理
    ]
    
    # asyncio.gather を使って、全てのコルーチンが完了するのを待つ
    results = await asyncio.gather(*tasks)
    
    end_time = time.time()
    
    print("\n--- 結果 ---")
    for result in results:
        print(result)
    print(f"\n総実行時間: {end_time - start_time:.2f}秒")

# 実行
# 同期処理の場合、総実行時間は 3 + 1 + 2 = 6秒程度かかる
# 非同期処理の場合、最も時間のかかるタスク(3秒)で完了する
asyncio.run(main_parallel())

リストにまとめられたコルーチンを非同期処理で実行すると、
同期処理の際に6秒かかっていたタスクが3秒で完了します。(拍手)

asyncioの主要な関数(gather, create_task)

非同期処理を制御するための、asyncio モジュールの重要な関数です。

asyncio.gather(*coros)

  • 役割: 複数のコルーチンを並行して実行し、全てが完了するのを待ちます。
  • 返り値: 各コルーチンの返り値を、コルーチンが指定された順序で格納したリストとして返します。
  • 使用例: 上記のAPI呼び出しの例のように、複数の独立したタスクを同時に実行したい場合に最適です。

asyncio.create_task(coro)

  • 役割: 指定されたコルーチンをイベントループに登録し、即座に実行を開始させます。
  • 返り値: 実行中のコルーチンを参照するための Task オブジェクトを返します。
  • 使用例:
    1. タスクをバックグラウンドで実行させ、その完了を待たずにawaitせずに)次の処理に進みたい場合。
    2. 後で await task とすることで、特定のタイミングで結果を受け取るのを待ちたい場合。
import asyncio

async def say_hello(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return what

async def main_create_task():
    # say_helloの実行を開始し、Taskオブジェクトを取得
    task1 = asyncio.create_task(say_hello(3, "Hello (3s)")) 
    task2 = asyncio.create_task(say_hello(1, "World (1s)"))
    
    # この間に他の処理を挟むことができる
    print("タスクを開始しました。待機せずに次の処理へ...") 
    await asyncio.sleep(0.5)
    print("中間処理を実行中...")

    # await でタスクの完了を待機し、結果を取得
    # task2 は既に完了している可能性が高いが、task1 の完了は待つ
    result1 = await task1
    result2 = await task2
    
    print(f"結果: {result1}, {result2}")

# asyncio.run(main_create_task())

注意点:blocking処理の扱い

非同期処理の仕組みは、「I/O待ちでタスクが一時停止している間に、CPUを他のタスクに切り替える」ことで成り立っています。

しかし、ブロッキング処理(CPUを使い続ける処理)がコルーチン内にあると、イベントループ全体がブロックされ、他の全ての非同期タスクも待たされてしまいます。

処理の種類非同期での対応策
I/O Boundネットワーク通信、DBアクセス、ファイル読み書きasyncioaiohttp など、非同期対応のライブラリを使用する(await 可能)。
CPU Bound大規模な計算処理、複雑なデータ解析、画像処理asyncio.to_thread(func, *args) を使用し、別のOSスレッドで実行する。

asyncio.to_thread() の利用 (Python 3.9以降)

ブロッキング処理を安全に実行するために、asyncio.to_thread() を使用して、その処理を別のスレッドプールに任せます。

import asyncio
import time

# CPUを長時間占有するブロッキング関数をシミュレート
def cpu_intensive_task(name):
    print(f"[{name}] ブロッキング処理開始: {time.strftime('%X')}")
    # 意図的にCPUを占有させる(同期的な処理)
    _ = sum(i for i in range(10**7)) 
    print(f"[{name}] ブロッキング処理完了: {time.strftime('%X')}")
    return f"結果 from {name}"

async def main_blocking():
    start_time = time.time()
    
    print("--- asyncio.to_thread でブロッキング処理を実行 ---")
    
    # to_thread を使って、ブロッキング関数を別のスレッドで実行
    task1 = asyncio.to_thread(cpu_intensive_task, "Task_X")
    task2 = asyncio.to_thread(cpu_intensive_task, "Task_Y")
    
    # gather で並行実行
    results = await asyncio.gather(task1, task2)
    
    end_time = time.time()
    
    print(f"\n総実行時間: {end_time - start_time:.2f}秒")

# asyncio.run(main_blocking())

これにより、CPUバウンドな処理がイベントループをブロックすることなく、非同期処理の恩恵を最大限に受けられるようになります。

今日のナレッジ

ウェイターが、非同期用語を聞いたらどう思うのかインタビューしておきました。

  • async def :「よっしゃ!この注文は並行して処理できるコルーチン使用だぜ!」
  • await :「よし、コーヒーマシン(API)に任せて、私は次の客(タスク)の相手をするぞ!」
  • asyncio.gather :「VIP席の注文(複数タスク)を同時にさばくぞ!全員終わるまで、俺は動かない!」
  • ブロッキング処理 :「あっ、ヤバい!コーヒーマシンが故障して、私が手動で豆から煎ってる!誰も動けない!」

非同期処理とは「待つ時間こそが仕事」と悟った賢いウェイターの働き方です。
皆さんのプログラムもこれで、無駄な待ち時間(I/O待ち)に突っ立ってるウェイターから、
テキパキ複数タスクを裁く非同期型ウェイターになりますね!

以上、「初心者向け:Pythonの非同期処理を理解しよう」の話題でした。
最後まで読んでくれてありがとうございます!
次回もお楽しみに~

コメント

タイトルとURLをコピーしました