最終更新日:2023/03/11 原本2023-03-01

【Python】gRPCでUSBカメラ映像を複数台のPCにストリーミング

はじめに

gRPC は、あらゆる環境で実行できるオープンソースの高性能リモートプロシージャコール (RPC)フレームワークです。
バイナリシリアル化ツールセットおよび言語である Protocol Buffers を使用してサービスを定義します。

今回は、Pythonで gRPC を使う練習のために、サーバー側で取得したUSBカメラの映像を複数のクライアントにストリーム送信する仕組みを作ってみます。

画像3.jpg

環境

  • Windows 10
  • USBカメラ: ELP-USB4KHDR01-V100 ; deleted on 2023/03/13.
    ELP-13MAF-V75-JP
  • Python 3.9.7
    • grpcio 1.51.1
    • grpcio-tools 1.51.1
    • numpy 1.24.2
    • opencv-python 4.7.0.72
    • protobuf 4.21.11

パッケージのインストール

$ pip install grpcio
$ pip install grpcio-tools

$ pip install opencv-python

protoファイルの作成

webcam_streaming.proto
syntax = "proto3";

service WebcamStreaming {
    rpc StartWebcamStreaming(ClientName) returns (stream CaptureImage) {}
}

message ClientName {
    string clientName = 1;
}

message CaptureImage {
    bytes imageBytes = 1;
}

WebcamStreaming というサービスに StartWebcamStreaming という関数を一つ用意します。
戻り値には stream を設定し、CaptureImage というメッセージを返すようにします。
CaptureImage の imageBytes というメンバーに、サーバーで取得したカメラ画像をバイト列にしたものが入ることになります。
これをクライアント側で受け取り、デコードして表示するという流れになります。

protoファイルのコンパイル

gen.py
from grpc.tools import protoc

protoc.main(
    (
        "",
        "-I.",
        "--python_out=.",  # 書き出し先指定
        "--grpc_python_out=.",  # 書き出し先指定
        "./webcam_streaming.proto",  # 書き出し元のファイル指定
    )
)
$ python gen.py

同フォルダに webcam_streaming_pb2_grpc.py と webcam_streaming_pb2.py の二つのファイルが生成されます。

webcam_streaming_pb2_grpc.py
webcam_streaming_pb2_grpc.py
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

import webcam_streaming_pb2 as webcam__streaming__pb2


class WebcamStreamingStub(object):
    """Missing associated documentation comment in .proto file."""

    def __init__(self, channel):
        """Constructor.

        Args:
            channel: A grpc.Channel.
        """
        self.StartWebcamStreaming = channel.unary_stream(
                '/WebcamStreaming/StartWebcamStreaming',
                request_serializer=webcam__streaming__pb2.ClientName.SerializeToString,
                response_deserializer=webcam__streaming__pb2.CaptureImage.FromString,
                )


class WebcamStreamingServicer(object):
    """Missing associated documentation comment in .proto file."""

    def StartWebcamStreaming(self, request, context):
        """Missing associated documentation comment in .proto file."""
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')


def add_WebcamStreamingServicer_to_server(servicer, server):
    rpc_method_handlers = {
            'StartWebcamStreaming': grpc.unary_stream_rpc_method_handler(
                    servicer.StartWebcamStreaming,
                    request_deserializer=webcam__streaming__pb2.ClientName.FromString,
                    response_serializer=webcam__streaming__pb2.CaptureImage.SerializeToString,
            ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
            'WebcamStreaming', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))


 # This class is part of an EXPERIMENTAL API.
class WebcamStreaming(object):
    """Missing associated documentation comment in .proto file."""

    @staticmethod
    def StartWebcamStreaming(request,
            target,
            options=(),
            channel_credentials=None,
            call_credentials=None,
            insecure=False,
            compression=None,
            wait_for_ready=None,
            timeout=None,
            metadata=None):
        return grpc.experimental.unary_stream(request, target, '/WebcamStreaming/StartWebcamStreaming',
            webcam__streaming__pb2.ClientName.SerializeToString,
            webcam__streaming__pb2.CaptureImage.FromString,
            options, channel_credentials,
            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
webcam_streaming_pb2.py
webcam_streaming_pb2.py
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: webcam_streaming.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16webcam_streaming.proto\" \n\nClientName\x12\x12\n\nclientName\x18\x01 \x01(\t\"\"\n\x0c\x43\x61ptureImage\x12\x12\n\nimageBytes\x18\x01 \x01(\x0c\x32I\n\x0fWebcamStreaming\x12\x36\n\x14StartWebcamStreaming\x12\x0b.ClientName\x1a\r.CaptureImage\"\x00\x30\x01\x62\x06proto3')

_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'webcam_streaming_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:

  DESCRIPTOR._options = None
  _CLIENTNAME._serialized_start=26
  _CLIENTNAME._serialized_end=58
  _CAPTUREIMAGE._serialized_start=60
  _CAPTUREIMAGE._serialized_end=94
  _WEBCAMSTREAMING._serialized_start=96
  _WEBCAMSTREAMING._serialized_end=169
# @@protoc_insertion_point(module_scope)

サーバー側の実装

  • サービス
webcam_streaming_service.py
from queue import Queue
import time
import threading

import cv2

import webcam_streaming_pb2
import webcam_streaming_pb2_grpc


class WebcamStreamingService(
    webcam_streaming_pb2_grpc.WebcamStreamingServicer
):
    def __init__(self):
        super(WebcamStreamingService, self).__init__()
        self.__capture_data_queue_list = []
        self.__capture_thread = threading.Thread(
            target=self.__capture_image, name="Capture Thread"
        )
        self.__capture_thread.setDaemon(True)

    def __capture_image(self):
        # global cap_msmf.cpp:1759 CvCapture_MSMF::grabFrame videoio(MSMF): can't grab frame. Error: -1072873822
        # のようなエラーが出てしまったため、cv2.CAP_DSHOWオプションを追加した
        # cap = cv2.VideoCapture(0)
        cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)

        # 解像度の設定
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

        while True:
            # デコードされたビデオフレームを取得
            ret, frame = cap.read()
            if ret:
                # JPEG形式でイメージをエンコード
                _, buf = cv2.imencode(".jpg", frame)

                # 接続されているクライアントそれぞれに、エンコードされたバイト列を送るためにキューに追加
                for queue in self.__capture_data_queue_list:
                    queue.put(buf.tobytes())

                # 30fpsに設定
                time.sleep(1 / 30)

    def StartWebcamStreaming(self, request, context):
        print(f"WebcamStreaming Start! Request from {request.clientName}")
        capture_data_queue = Queue()
        self.__capture_data_queue_list.append(capture_data_queue)

        if not self.__capture_thread.is_alive():
            self.__capture_thread.start()

        while True:
            # Capture Thread内でキューが積まれるまで待機
            data = capture_data_queue.get()

            # クライアントにデータ送信
            yield webcam_streaming_pb2.CaptureImage(imageBytes=data)

            capture_data_queue.task_done()

クライアントから最初に StartWebcamStreaming 関数が呼ばれると、ループでカメラ画像を取得するための Capture Thread が立ち上がります。
また、クライアント側はデータを受け取るための Queue を登録します。
Capture Thread 内でカメラ画像を取得するたびに、そのデータをそれぞれのクライアントの Queue に積みます。
StartWebcamStreaming 関数内のループで Queue からデータを取り出し、クライアントに送信します。

  • サーバー立ち上げ用ファイル
grpc_server.py
from concurrent import futures

import grpc

from webcam_streaming_service import WebcamStreamingService
import webcam_streaming_pb2_grpc


def serve():
    # サーバの作成
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    webcam_streaming_pb2_grpc.add_WebcamStreamingServicer_to_server(
        WebcamStreamingService(), server
    )

    # ポートを開く
    server.add_insecure_port("[::]:1234")

    server.start()
    server.wait_for_termination()


if __name__ == "__main__":
    serve()

クライアント側の実装

grpc_client.py
import socket
import sys

import cv2
import grpc
import numpy as np

import webcam_streaming_pb2
import webcam_streaming_pb2_grpc


class GrpcClient:
    def __init__(
        self, connect_ip: str, port: str = "1234",
    ):
        super(GrpcClient, self).__init__()

        # gRPCサーバへのチャネルを作成
        channel = grpc.insecure_channel(f"{connect_ip}:{port}")

        self.__stub = webcam_streaming_pb2_grpc.WebcamStreamingStub(channel)

    def start_webcam_streaming(self):
        print("gRPC client: webcam streaming Start")

        # リクエストの作成
        # clientNameにコンピュータ名を設定
        request = webcam_streaming_pb2.ClientName(
            clientName=socket.gethostname()
        )

        # サーバの関数呼び出し
        responses = self.__stub.StartWebcamStreaming(request)

        for response in responses:
            frame_byte = response.imageBytes

            # サーバから受け取ったバイト列を一次元配列として解釈
            data_array = np.frombuffer(frame_byte, dtype=np.uint8)

            # 3チャネルのBGRカラーイメージに変換
            image = cv2.imdecode(data_array, cv2.IMREAD_COLOR)

            # 画像の表示
            cv2.imshow("Received Frame", image)

            # 画像がすぐに消えてしまい(?)、表示されないためwaitKeyを入れる
            cv2.waitKey(1)


if __name__ == "__main__":
    # コマンドライン引数でサーバ側のIPアドレスを設定する
    # ローカルで試すなら 127.0.0.1
    argv = sys.argv
    connect_ip = argv[1]

    client = GrpcClient(connect_ip)
    client.start_webcam_streaming()

動作確認

  • サーバー側
$ python grpc_server.py
  • クライアント側
$ python grpc_client {サーバーのIP}