«1. Обзор

В этой статье мы продемонстрируем, как создать простой сервер и его клиент, используя API канала Java 7 NIO.2.

Мы рассмотрим классы AsynchronousServerSocketChannel и AsynchronousSocketChannel, которые являются ключевыми классами, используемыми при реализации сервера и клиента соответственно.

Если вы новичок в API канала NIO.2, у нас есть вводная статья на этом сайте. Вы можете прочитать его, перейдя по этой ссылке.

Все классы, необходимые для использования API канала NIO.2, объединены в пакет java.nio.channels:

import java.nio.channels.*;

2. Сервер с будущим

Экземпляр AsynchronousServerSocketChannel создается путем вызова static open API для своего класса:

AsynchronousServerSocketChannel server
  = AsynchronousServerSocketChannel.open();

Вновь созданный асинхронный канал сокета сервера открыт, но еще не связан, поэтому мы должны привязать его к локальному адресу и дополнительно выбрать порт:

server.bind(new InetSocketAddress("127.0.0.1", 4555));

Мы могли бы Точно так же передали значение null, чтобы использовать локальный адрес и выполнить привязку к произвольному порту:

server.bind(null);

После привязки API-интерфейс accept используется для инициации приема подключений к сокету канала:

Future<AsynchronousSocketChannel> acceptFuture = server.accept();

~ ~~ Как и в случае с асинхронными операциями канала, вышеприведенный вызов сразу же возвращается, и выполнение продолжается.

Далее мы можем использовать API get для запроса ответа от объекта Future:

AsynchronousSocketChannel worker = future.get();

Этот вызов будет заблокирован, если необходимо дождаться запроса на подключение от клиента. При желании мы можем указать тайм-аут, если мы не хотим ждать вечно:

AsynchronousSocketChannel worker = acceptFuture.get(10, TimeUnit.SECONDS);

После того, как вышеуказанный вызов вернется и операция прошла успешно, мы можем создать цикл, в котором мы прослушиваем входящие сообщения и возвращаем их обратно. клиенту.

Давайте создадим метод с именем runServer, в котором мы будем ожидать и обрабатывать любые входящие сообщения:

public void runServer() {
    clientChannel = acceptResult.get();
    if ((clientChannel != null) && (clientChannel.isOpen())) {
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(32);
            Future<Integer> readResult  = clientChannel.read(buffer);
            
            // perform other computations
            
            readResult.get();
            
            buffer.flip();
            Future<Integer> writeResult = clientChannel.write(buffer);
 
            // perform other computations
 
            writeResult.get();
            buffer.clear();
        } 
        clientChannel.close();
        serverChannel.close();
    }
}

Внутри цикла все, что мы делаем, это создаем буфер для чтения и записи в зависимости от операции.

Затем, каждый раз, когда мы делаем чтение или запись, мы можем продолжить выполнение любого другого кода, и когда мы будем готовы обработать результат, мы вызовем API get() для объекта Future.

Чтобы запустить сервер, мы вызываем его конструктор, а затем метод runServer внутри main:

public static void main(String[] args) {
    AsyncEchoServer server = new AsyncEchoServer();
    server.runServer();
}

3. Сервер с CompletionHandler

В этом разделе мы увидим, как реализовать тот же сервер с помощью Подход CompletionHandler, а не подход Future.

Внутри конструктора мы создаем AsynchronousServerSocketChannel и привязываем его к локальному адресу так же, как делали это раньше:

serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
serverChannel.bind(hostAddress);

Далее, все еще внутри конструктора, мы создаем цикл while, в котором мы принимаем любое входящее соединение. от клиента. Этот цикл while используется строго для предотвращения выхода сервера до установления соединения с клиентом.

Чтобы предотвратить бесконечный цикл, мы вызываем System.in.read() в его конце, чтобы заблокировать выполнение, пока входящее соединение не будет прочитано из стандартного потока ввода:

while (true) {
    serverChannel.accept(
      null, new CompletionHandler<AsynchronousSocketChannel,Object>() {

        @Override
        public void completed(
          AsynchronousSocketChannel result, Object attachment) {
            if (serverChannel.isOpen()){
                serverChannel.accept(null, this);
            }

            clientChannel = result;
            if ((clientChannel != null) && (clientChannel.isOpen())) {
                ReadWriteHandler handler = new ReadWriteHandler();
                ByteBuffer buffer = ByteBuffer.allocate(32);

                Map<String, Object> readInfo = new HashMap<>();
                readInfo.put("action", "read");
                readInfo.put("buffer", buffer);

                clientChannel.read(buffer, readInfo, handler);
             }
         }
         @Override
         public void failed(Throwable exc, Object attachment) {
             // process error
         }
    });
    System.in.read();
}

Когда соединение установлено, вызывается завершенный метод обратного вызова в CompletionHandler операции принятия.

Тип возвращаемого значения — экземпляр AsynchronousSocketChannel. Если канал сокета сервера все еще открыт, мы снова вызываем API приема, чтобы подготовиться к другому входящему соединению, повторно используя тот же обработчик.

Далее мы назначаем возвращенный канал сокета глобальному экземпляру. Затем мы проверяем, что он не равен нулю и что он открыт, прежде чем выполнять над ним операции.

Точка, в которой мы можем начать операции чтения и записи, находится внутри завершенного API обратного вызова обработчика операции принятия. Этот шаг заменяет предыдущий подход, когда мы опрашивали канал с помощью get API.

Обратите внимание, что сервер больше не будет выходить после установления соединения, если мы не закроем его явным образом.

Обратите внимание, что мы создали отдельный внутренний класс для обработки операций чтения и записи; Обработчик чтения и записи. Мы увидим, как объект вложения пригодится на этом этапе.

Во-первых, давайте посмотрим на класс ReadWriteHandler:

class ReadWriteHandler implements 
  CompletionHandler<Integer, Map<String, Object>> {
    
    @Override
    public void completed(
      Integer result, Map<String, Object> attachment) {
        Map<String, Object> actionInfo = attachment;
        String action = (String) actionInfo.get("action");

        if ("read".equals(action)) {
            ByteBuffer buffer = (ByteBuffer) actionInfo.get("buffer");
            buffer.flip();
            actionInfo.put("action", "write");

            clientChannel.write(buffer, actionInfo, this);
            buffer.clear();

        } else if ("write".equals(action)) {
            ByteBuffer buffer = ByteBuffer.allocate(32);

            actionInfo.put("action", "read");
            actionInfo.put("buffer", buffer);

            clientChannel.read(buffer, actionInfo, this);
        }
    }
    
    @Override
    public void failed(Throwable exc, Map<String, Object> attachment) {
        // 
    }
}

«

«Общий тип нашего вложения в классе ReadWriteHandler — это карта. Нам специально нужно передать через него два важных параметра — тип операции (действие) и буфер.

Далее мы увидим, как используются эти параметры.

Первая операция, которую мы выполняем, это чтение, так как это эхо-сервер, который реагирует только на клиентские сообщения. Внутри завершенного метода обратного вызова ReadWriteHandler мы извлекаем прикрепленные данные и решаем, что делать соответственно.

Если это операция чтения, которая завершилась, мы извлекаем буфер, меняем параметр действия вложения и сразу же выполняем операцию записи, чтобы отобразить сообщение клиенту.

Если это операция записи, которая только что завершилась, мы снова вызываем API чтения, чтобы подготовить сервер к приему другого входящего сообщения.

4. Клиент

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999)
Future<Void> future = client.connect(hostAddress);

После настройки сервера мы можем настроить клиент, вызвав открытый API в классе AsyncronousSocketChannel. Этот вызов создает новый экземпляр канала клиентского сокета, который мы затем используем для подключения к серверу:

В случае успеха операция подключения ничего не возвращает. Однако мы по-прежнему можем использовать объект Future для отслеживания состояния асинхронной операции.

future.get()

Давайте вызовем get API для ожидания подключения:

public String sendMessage(String message) {
    byte[] byteMsg = new String(message).getBytes();
    ByteBuffer buffer = ByteBuffer.wrap(byteMsg);
    Future<Integer> writeResult = client.write(buffer);

    // do some computation

    writeResult.get();
    buffer.flip();
    Future<Integer> readResult = client.read(buffer);
    
    // do some computation

    readResult.get();
    String echo = new String(buffer.array()).trim();
    buffer.clear();
    return echo;
}

После этого шага мы можем начать отправлять сообщения на сервер и получать для них эхо. Метод sendMessage выглядит следующим образом:

5. Тест

@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
    String resp1 = client.sendMessage("hello");
    String resp2 = client.sendMessage("world");

    assertEquals("hello", resp1);
    assertEquals("world", resp2);
}

Чтобы убедиться, что наши серверные и клиентские приложения работают в соответствии с ожиданиями, мы можем использовать тест:

6. Заключение ~ ~~ В этой статье мы рассмотрели API асинхронных каналов сокетов Java NIO.2. Мы смогли пройти через процесс создания сервера и клиента с помощью этих новых API.

Вы можете получить доступ к полному исходному коду этой статьи в проекте Github.