複数のストリームやソケットのI/Oを同時待機したい場合、すぐに思いつくのはマルチスレッド化だが、プログラムが読みにくくなるし何よりデバッグが大変になる。そんな時、selectを用いればマルチスレッド化せずにI/Oの同時待機が出来る。
今回は、selectを使ってソケットとストリームのI/O同時待機をするPythonプログラムを作ってみた。
I/Oを複数同時に待機するのは意外と難しい
ノンブロッキングI/Oはリソースの無駄遣い
ノンブロッキングI/Oを使えば、1つのスレッドの中で複数の複数I/Oの待ちを一応出来はする。しかし、タイムアウト付きで読み書きを繰り返すような書き方になって煩雑になるためオーバーヘッドが大きくリソースの無駄遣いになってしまう。
ブロッキングI/Oの一択だが…
リソースを効率的に使うためにはブロッキングI/Oの一択だろう。しかし、ブロッキングI/Oでは一度待ちに入るとスレッドが停止(ブロック)して、他の動作を一切受け付けなくなる。特に読み出しや接続待ちは、向こう側のタイミングで発生する書き込みや接続要求といったイベントを待つことであり、待ち時間をこちらの都合で決められないから厄介だ。
ブロッキングI/O×マルチスレッド化も微妙
そこで「待ちの数だけスレッドを立てる」という素朴な解決法に行き着くわけだが、待つべきI/Oが複数あるとプログラムを希望通り動作させるためにスレッド間の同期を行う必要が出てくる。PythonにはEventやSemaphoreなどのスレッド間同期を行うためのオブジェクトが用意されているのでやって出来なくもないが、デバッグが地獄になることは目に見えている。それに、そもそも沢山のスレッドを走らせるのはリソースの無駄遣いだから、リソースの節約の筈が本末転倒だ。
selectでI/Oの待ちをスッキリ
このような時にスッキリと解を与えてくれるのが、selectだ。selectを使うと、一つのスレッドで複数のI/Oを多重化させて待つことが出来るため、ブロッキングI/Oを一つのスレッドで効率的に扱うことが出来る。
このあたりの話は、PythonヘルプページのソケットプログラミングHOWTOにも解説されている。
ポーリングオブジェクトを使う
selectにはいくつかのオブジェクトが定義されているが、今回はポーリングオブジェクトを使う。詳細はselectのPythonヘルプページを参照。
ポーリングオブジェクトのおおまかな利用手順は以下の通り。
- ポーリングオブジェクトを生成する: select.poll()
- 監視したいI/Oを登録する: poll.register()
- 待ちに入りスレッド停止。I/Oイベント発生でスレッド再開: poll.poll()
- 監視が不要になったI/Oの登録を解除する: poll.unregister()
3.で「複数I/Oイベントの待ちを同時に行い、どれかにイベントが発生するとスレッドが再開する」という動作がプログラム簡略化のポイントだ。
例えば、TCPソケットの読み出しを待つ場合は以下のような感じ。whileによる無限ループで回っているが、実際にはpoll.poll()の呼び出しでスレッドが停止してしまうため、必要な時だけ動作する無駄の無い造りとなっている。
import select
# ポーリングオブジェクト生成
poll = select.poll()
# ソケットを登録
poll.register(sock, select.POLLIN | select.POLLRDHUP)
fin = False
while True:
# イベント発生まで待つ
rdy = poll.poll();
if not rdy:
break
for fno, ev in rdy:
if fno == sock.fileno():
if ev == select.POLLIN:
data = sock.recv(SOCK_BUFSIZE)
: (受け取ったデータの処理)
else:
# 接続が切れたか、エラーの時
fin = True
if fin:
# whileループから抜ける
break
# 登録解除
poll.unregister(sock);
selectモジュールで定義されるオブジェクトはOS依存の物がいくつかある。ポーリングオブジェクトはLinuxなどUNIX系OSではソケットとストリームの両方で問題なく動作するが、Windowsではソケットでしか動作しない。
selectによる監視対象がバッファリングされている場合は要注意
ソケットの場合は通常バッファリングされておらず、TCPのパケットが到着する毎にイベントが発生するので問題無いのだが、ファイルI/Oなどストリームの場合はバッファリングのせいでselectによる監視が期待通りにならないケースがある。
例えば、BufferdReaderクラスを使ったストリーム読み出しの場合は、通常のreadメソッドを使うと通知対象となったデータ量を超えて(デフォルトではEOFまで)読み出しを行おうとしてブロックしてしまい、結果プログラムの動作がそこで止まってしまう。BufferdReaderでこれを防ぐためには、バッファリングを行わないメソッドであるread1メソッドを使う。BufferdReaderなのにバッファリングしないなんてクラス名と矛盾した動作だが、read1についてはPythonヘルプページに「rawストリームの読み込み」との表現で記述されている。
ソケットとストリームの同時待機によるコマンドサーバ
もう少し具体的なプログラム例として、クライアントからのTCP接続を受けてクライアントから指定されたコマンドをサーバ側で実行するコマンドサーバを作る。このプログラムでは、以下の3つのブロッキング読み出しによる待ちを1つのスレッドで実現してる。
- ソケットへのクライアントからの送信
- パイプの標準出力受信
- パイプの標準エラー出力受信
プログラムリストは以下の通り。
cmdServer.py
from socketserver import BaseRequestHandler, TCPServer
from select import poll, POLLIN, POLLRDHUP
from shlex import split
from subprocess import Popen, PIPE
from os import write
SOCK_BUFSIZE = 256
STREAM_BUFSIZE = 1024
class CmdHandler(BaseRequestHandler):
def handle(self):
# クライアントから送られてくるコマンド文字列を取得
cmdline = self.request.recv(SOCK_BUFSIZE).strip().decode('utf-8')
cmd = split(cmdline)
print("Received command: [{}] ...".format(cmdline))
# サブプロセスを生成
pipe = Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE)
# ポーリングオブジェクトの設定
pol = poll()
pol.register(self.request, POLLIN | POLLRDHUP)
pol.register(pipe.stdout, POLLIN)
pol.register(pipe.stderr, POLLIN)
fin = False
while True:
rdy = pol.poll();
if not rdy:
break
for fno, ev in rdy:
# サブプロセス標準出力の処理
if fno == pipe.stdout.fileno():
if ev == POLLIN:
data = pipe.stdout.read1(STREAM_BUFSIZE)
self.request.send(data)
else:
# サブプロセスが終了した時
pol.unregister(pipe.stdout)
pol.unregister(pipe.stderr)
fin = True
print("Subprocess terminated.")
# サブプロセス標準エラー出力の処理
if fno == pipe.stderr.fileno():
if ev == POLLIN:
data = pipe.stderr.read1(STREAM_BUFSIZE)
self.request.send(data)
# クライアントからのデータ取得
if fno == self.request.fileno():
if ev == POLLIN:
data = self.request.recv(SOCK_BUFSIZE)
# バッファリングさせないようにos.write()を使う
write(pipe.stdin.fileno(), data)
print("Received string: " + data.strip().decode('utf-8'))
else:
# 接続が切れたか、エラー時はunregisterする
pol.unregister(self.request);
# サブプロセスを終了させる
pipe.terminate()
print("Client connection closed.")
if fin:
# サブプロセス終了→ループから抜ける
break
print("Terminating handler for [{}].".format(cmdline))
if __name__ == "__main__":
SVHOST, SVPORT = "0.0.0.0", 12345
with TCPServer((SVHOST, SVPORT), CmdHandler) as server:
server.serve_forever()
以下、いくつかポイント。
subprocess.Popenでサブプロセスを生成
subprocess.Popenでサブプロセスを生成し(19行目)、標準入出力と標準エラー出力のストリームをsubprocess.PIPE指定で取り出せるようにしている。
select.POLLRDHUPフラグで切断を検出
poll.registerメソッド呼び出し(23行目)でselect.POLLRDHUPフラグを立てているのは、ソケットの相手先の接続断を検出しやすくするため(60行目に飛んでくれる)。このフラグを立てないと、相手の接続断があってもPOLLINイベントが来たかのような挙動になって具合が悪い
ストリームの読み出しはBufferedReader.read1メソッドで
サブプロセスの標準出力と標準エラー出力はBufferedReaderオブジェクトというバッファリング付きのストリームのため、上述の通り通常のreadメソッドだと具合が悪い。このため、サブプロセスの出力の読み出しはread1メソッドで行うことによりブロックしないようにしている(38,50行目)。
ストリームの書き込みはos.writeメソッドで
サブプロセスの標準入力は標準出力と同様バッファリングされており、デフォルトのメソッドを使うと具合の悪いことが起こるため、低レベル操作であるos.writeメソッドを使うようにする(58行目)。
実行結果
コマンドサーバをLinuxマシンで起動させ、同じマシンからtelnetで接続してコマンドを実行。
クライアント側から’ls -F’を実行した場合。サーバー側ではプログラムが一瞬で終わり、サブプロセスの終了と同時に接続がサーバ側から切られる。
(クライアント側)
$ telnet localhost 12345
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
ls -F
cmdServer.py*
cmdServer.py~*
Connection closed by foreign host.
(サーバ側)
$ python3 cmdServer.py
Received command: [ls -F] ...
Subprocess terminated.
Terminating handler for [ls -F].
まとめ
というわけでselectのポーリングオブジェクトを使ってプログラムを作ってみた。
その特徴は、
- マルチスレッド化せずに複数のストリームやソケットを待つことが出来る
- キャッチしたい対象のイベントをフラグで指定出来る
- 対象のイベントが発生した時だけプログラムが動くので無駄がない
といったところだろうか。
コードも非常にスッキリと書けるので、I/Oの待ちが必要な場合は安易にマルチスレッド化に走る前に検討するべきだろう。
コメント