ugo Tech Blog

ugoの日々の開発・生産について

NATS クライアントを pytest でテストする

ロボット開発部の佐々木です。
今回は pytest を利用して、サーバーを介した NATS クライアントの pub / sub をテストしたいと思います。

NATS について

NATS は分散システムのためのオープンソースのメッセージブローカーです。
弊社ではロボット内のサービス間通信に利用しています。
今回、NATS の詳細については深堀りしませんので、詳しく知りたい方は公式 HP (https://nats.io/) をご覧ください。

実行環境

本記事の実行環境・使用ライブラリは下記の通りです。

Publisher のテスト

まずはテスト対象となる簡単な publisher を実装します。

import nats

class SimplePublisher:
    def __init__(self, nc):
        self._nc = nc

    async def publish_message(self):
        await self._nc.publish('foo', b'Hello!')

次に pytest でこの publisher のテストを書いていきます。
ここでは SimplePublisher を simple_publisher というモジュールに配置しました。

import pytest
import nats
from nats_tools import NATSD
from simple_publisher import SimplePublisher

@pytest.mark.asyncio
async def test_publish_message():
    natsd = NATSD(config_file='')
    natsd.start(wait=True)
    
    nc = await nats.connect()
    sub = await nc.subscribe('foo')

    simple_pub = SimplePublisher(nc)
    await simple_pub.publish_message()

    msg = await sub.next_msg()

    assert msg.data.decode() == 'Hello!'
    
    await nc.drain()
    natsd.stop()

pytest で非同期関数をテストする際は、関数に @pytest.mark.asyncio というデコレータを付けてあげます。
pytest 単体では非同期関数をテストできないため、別途 pytest-asyncio ライブラリが必要です。

...
@pytest.mark.asyncio
async def test_publish_message():
    ...

NATSD のインスタンスを生成したあと、start() メソッドを呼び出すことでサーバーが起動します。
start メソッドの wait 引数に True を渡してあげることで、サーバーの起動を待機します。

    ...
    natsd = NATSD(config_file='')
    natsd.start(wait=True)
    ...

こちらではテストコード側で NATS クライアントのインスタンスを生成し、サブスクライバーを作成しています。

    ...
    nc = await nats.connect()
    sub = await nc.subscribe('foo')
    ...

テスト対象となる SimplePublisher のインスタンスを生成し、publish_message() メソッドでメッセージを発行します。
その後、サブスクライバーが受信したデータを取り出して評価することで、メッセージの発行が正しく行われているかテストしています。

    ...
    simple_pub = SimplePublisher(nc)
    await simple_pub.publish_message()

    msg = await sub.next_msg()

    assert msg.data.decode() == 'Hello!'
    ...

最後にクライアントとサーバーそれぞれの終了処理を行っています。

    ...    
    await nc.drain()
    natsd.stop()

pytest コマンドを走らせると、無事テストが通りました。

% python3 -m pytest test_simple_publisher.py
============================================ test session starts =============================================
platform darwin -- Python 3.12.3, pytest-8.2.0, pluggy-1.5.0
rootdir: ...
plugins: anyio-4.3.0, nats-tools-1.0.2, asyncio-0.23.6
asyncio: mode=Mode.STRICT
collected 1 item                                                                                             

test_simple_publiser.py .                                                                                   [100%]

============================================= 1 passed in 0.32s ==============================================

Subscriber のテスト

同様に、今度は簡単な Subscriber を実装します。

class SimpleSubscriber:
    def __init__(self, nc):
        self._nc = nc
        self._message = None

    @property
    def message(self):
        return self._message

    async def subscribe_message(self):
        await self._nc.subscribe('foo', cb=self._callback)

    async def _callback(self, msg):
        self._message = msg.data.decode()

subscribe のテストはこのように書けます。

import pytest
import asyncio
import nats
from nats_tools import NATSD
from simple_subscriber import SimpleSubscriber

@pytest.mark.asyncio
async def test_subscribe_message():
    natsd = NATSD(config_file='')
    natsd.start(wait=True)
    nc = await nats.connect()

    simple_sub = SimpleSubscriber(nc)
    await simple_sub.subscribe_message()

    await nc.publish('foo', b'Hello!')

    await asyncio.sleep(0.1)

    assert simple_sub.message == 'Hello!'

    await nc.drain()
    natsd.stop()

先ほどとは反対に、テスト側に publish の処理を記述し、SimpleSubscriber が発行されたメッセージを受信しているかどうかをテストしています。

このテストも無事通りました。

% python3 -m pytest test_simple_subscriber.py
============================= test session starts ==============================
platform darwin -- Python 3.12.3, pytest-8.2.0, pluggy-1.5.0
rootdir: ...
plugins: anyio-4.3.0, nats-tools-1.0.2, asyncio-0.23.6
asyncio: mode=Mode.STRICT
collected 1 item                                                               

test_simple_subscriber.py .                                              [100%]

============================== 1 passed in 0.33s ===============================

まとめ

以上、pytest でサーバーを介した NATS クライアントの pub / sub をテストできました。
今回は扱いませんでしたが、request / reply も同様にテストできます。

なお、今回のコードでは簡単のため例外処理などは省略しています。
また、実際のテストではサーバーの起動やクライアントの生成はフィクスチャにしてあげるといいと思います。

おわりに

今回は pytest で NATS クライアントのテストを書いてみました。
Python による NATS クライアントアプリケーション開発にお役立ていただければ幸いです。

ugoでは、一緒にロボットを社会実装していく仲間を絶賛募集中です。

詳しくはこちら👇まで。

herp.careers