最終更新日:180909 原本2018/06/03 

Nettyを使ってサーバーを実装してみた

初めに

前回はwebSocketやSocketChannelなどのライブラリを使ってTCP/IP+NIO/BIOの転送路を作ってみましたが、かなりコードが複雑になることが分かりました。 今回は改めてNettyを使ってサーバーを実現してみます。

Nettyとは

簡単に言うと、Nettyとは、非同期通信を行うアプリケーションを開発するためのフレームワークです。SocketChannelで実現したNIO処理に比べると、低レイヤーのAPI(select(),read()など)を直接操作する必要がなくなり、Netty側で隠蔽されました。

実装

ServerSocketChannelでNIO実装

まず、前回のServerSocketChannelでNIO実装を再掲させていただきます。

➀ServerSocketChannelを生成し、クライアント側のアクセスを待ち受けます

public class ChannelServer {

    public static void main(String[] args) throws IOException {
        // I/Oリクエストを処理するスレッドプールを用意
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 1000, TimeUnit.MILLISECONDS, 
                new ArrayBlockingQueue<Runnable>(100));

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(1234));

        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();

            if (socketChannel != null) {
                // リクエストをスレッドプールにコミット
                executor.submit(new ChannelServerThread(socketChannel));
            }
        }
    }
}

②リクエストを処理するスレッド

public class ChannelServerThread implements Runnable {

    private SocketChannel socketChannel;
    private String remoteName;

    public ChannelServerThread(SocketChannel socketChannel) throws IOException {
        this.socketChannel = socketChannel;
        this.remoteName = socketChannel.getRemoteAddress().toString();
        System.out.println("client:" + remoteName + " access successfully!");
    }

    // I/Oリクエストを処理
    @Override
    public void run() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
        StringBuilder sb = new StringBuilder();
        byte b[];
        // socketChannelからデータと長さを読み込んで、標準出力に出力する
        while(true) {
            try {
                sizeBuffer.clear();
                int read = socketChannel.read(sizeBuffer);
                if (read != -1) {
                    sb.setLength(0);
                    sizeBuffer.flip();
                    int size = sizeBuffer.getInt();
                    int readCount = 0;
                    b = new byte[1024];
                    while (readCount < size) {
                        buffer.clear();
                        read = socketChannel.read(buffer);
                        if (read != -1) {
                            readCount += read;
                            buffer.flip();
                            int index = 0 ;
                            while(buffer.hasRemaining()) {
                                b[index++] = buffer.get();
                                if (index >= b.length) {
                                    index = 0;
                                    sb.append(new String(b,"UTF-8"));
                                }
                            }
                            if (index > 0) {
                                sb.append(new String(b,"UTF-8"));
                            }
                        }
                    }
                    System.out.println(remoteName +  ":" + sb.toString());
                }
            } catch (Exception e) {
                System.out.println(remoteName + "access colsed");
                try {
                    socketChannel.close();
                } catch (IOException ex) {
                }
                break;
            }
        }
    }
}

NettyでNIO実装

➀クライアント側のアクセスを待ち受ける処理

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            // スレッド プールバインディング
            sb.group(group)
                    .channel(NioServerSocketChannel.class)
                     // ポート番号バインディング
                    .localAddress(this.port)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            System.out.println("connected...; Client:" + ch.remoteAddress());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            // サーバーの非同期バインディング
            ChannelFuture cf = sb.bind().sync();
            System.out.println(EchoServer.class + " started and listen on " + cf.channel().localAddress());
            // サーバーチャンネルを閉じる
            cf.channel().closeFuture().sync();
        } finally {
            // スレッド プールのバインディングを解き消し
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        // 入口
        new EchoServer(66666).start();
    }
}

②リクエストを処理するやつ

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead...; received:" + msg);
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server channelReadComplete..");
        // 空バッファーにWriteAndFlush,接続をクローズする
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server occur exception:" + cause.getMessage());
        cause.printStackTrace();
        ctx.close();
    }
}

変更点

コード量が大幅減りました。
それはread()、Buffer操作などのAPIをNetty側から隠蔽し、便利な高レイヤーのAPIを提供するためです。例えば、EchoServerHandler クラスのchannelRead()メソッド:クライアント側からのデータを受け付けると、内部的にでStringに変換してくれるため、このメソッドが呼び出されます。

まとめ

今回はNettyを使ってサーバーを実装してみました。クライアント側との連携は次回に紹介します。