«1. Обзор

В этой статье мы рассмотрим вводные части компонента Selector Java NIO.

Селектор предоставляет механизм для мониторинга одного или нескольких каналов NIO и распознавания, когда один или несколько становятся доступными для передачи данных.

Таким образом, один поток может использоваться для управления несколькими каналами и, следовательно, несколькими сетевыми подключениями.

2. Зачем использовать селектор?

С помощью селектора мы можем использовать один поток вместо нескольких для управления несколькими каналами. Переключение контекста между потоками дорого обходится операционной системе, и, кроме того, каждый поток занимает память.

Поэтому чем меньше потоков мы используем, тем лучше. Тем не менее, важно помнить, что современные операционные системы и ЦП продолжают улучшать многозадачность, поэтому накладные расходы на многопоточность со временем уменьшаются.

Здесь мы рассмотрим, как мы можем обрабатывать несколько каналов в одном потоке с помощью селектора.

Обратите внимание, что селекторы не только помогают вам читать данные; они также могут прослушивать входящие сетевые подключения и записывать данные по медленным каналам.

3. Настройка

Чтобы использовать селектор, нам не нужна никакая специальная настройка. Все классы, которые нам нужны, находятся в основном пакете java.nio, и нам просто нужно импортировать то, что нам нужно.

После этого мы можем зарегистрировать несколько каналов с помощью объекта-селектора. Когда активность ввода-вывода происходит на любом из каналов, селектор уведомляет нас. Вот как мы можем читать из большого количества источников данных в одном потоке.

Любой канал, который мы регистрируем с помощью селектора, должен быть подклассом SelectableChannel. Это особый тип каналов, которые можно перевести в неблокирующий режим.

4. Создание селектора

Селектор можно создать, вызвав статический открытый метод класса Selector, который будет использовать системный поставщик селектора по умолчанию для создания нового селектора:

Selector selector = Selector.open();

5. Регистрация Выбираемые каналы

Чтобы селектор мог отслеживать какие-либо каналы, мы должны зарегистрировать эти каналы в селекторе. Мы делаем это, вызывая метод регистрации выбираемого канала.

Но прежде чем канал будет зарегистрирован с помощью селектора, он должен быть в неблокирующем режиме:

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

Это означает, что мы не можем использовать FileChannels с селектором, так как они не могут быть переключены в неблокирующий режим способом делаем с сокетными каналами.

Первый параметр — это объект Selector, который мы создали ранее, второй параметр определяет набор интересов, то есть, какие события мы заинтересованы прослушивать в отслеживаемом канале через селектор.

Есть четыре разных события, которые мы можем прослушивать, каждое из которых представлено константой в классе SelectionKey:

    Connect — когда клиент пытается подключиться к серверу. Представлен SelectionKey.OP_CONNECT Accept — когда сервер принимает соединение от клиента. Представляется SelectionKey.OP_ACCEPT Read — когда сервер готов читать из канала. Представлено SelectionKey.OP_READ. Запись — когда сервер готов к записи в канал. Представлен SelectionKey.OP_WRITE

Возвращаемый объект SelectionKey представляет собой регистрацию выбираемого канала в селекторе. Мы рассмотрим это далее в следующем разделе.

6. Объект SelectionKey

Как мы видели в предыдущем разделе, когда мы регистрируем канал с помощью селектора, мы получаем объект SelectionKey. Этот объект содержит данные, представляющие регистрацию канала.

Он содержит некоторые важные свойства, которые мы должны хорошо понимать, чтобы иметь возможность использовать селектор на канале. Мы рассмотрим эти свойства в следующих подразделах.

6.1. Набор интересов

Набор интересов определяет набор событий, за которыми должен следить селектор на этом канале. Это целочисленное значение; мы можем получить эту информацию следующим образом.

Во-первых, у нас есть процентный набор, возвращаемый методом интересаOps SelectionKey. Затем у нас есть константа события в SelectionKey, которую мы рассмотрели ранее.

«Когда мы И эти два значения, мы получаем логическое значение, которое говорит нам, отслеживается ли событие или нет:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

6.2. Готовый набор

Готовый набор определяет набор событий, к которым канал готов. Это также целочисленное значение; мы можем получить эту информацию следующим образом.

У нас есть готовый набор, возвращенный методом readyOps SelectionKey. Когда мы И это значение с константами событий, как мы сделали в случае набора процентов, мы получаем логическое значение, представляющее, готов ли канал к определенному значению или нет.

Другой альтернативный и более короткий способ сделать это — использовать для этой же цели удобные методы SelectionKey:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();

6.3. Канал

Доступ к просматриваемому каналу из объекта SelectionKey очень прост. Мы просто вызываем метод канала:

Channel channel = key.channel();

6.4. Селектор

Так же, как и получение канала, очень просто получить объект Selector из объекта SelectionKey:

Selector selector = key.selector();

6.5. Присоединение объектов

Мы можем прикрепить объект к SelectionKey. Иногда мы можем захотеть дать каналу пользовательский идентификатор или прикрепить любой объект Java, который мы можем отслеживать.

Присоединение объектов — удобный способ сделать это. Вот как вы прикрепляете и получаете объекты из SelectionKey:

key.attach(Object);

Object object = key.attachment();

Кроме того, мы можем выбрать прикрепление объекта во время регистрации канала. Мы добавляем его в качестве третьего параметра в метод регистрации канала, например:

SelectionKey key = channel.register(
  selector, SelectionKey.OP_ACCEPT, object);

7. Выбор ключа канала

До сих пор мы рассматривали, как создать селектор, зарегистрировать в нем каналы и проверить свойства. объекта SelectionKey, который представляет регистрацию канала в селекторе.

Это только половина процесса, теперь нам предстоит выполнить непрерывный процесс выбора готового набора, который мы рассмотрели ранее. Мы делаем выбор, используя метод select селектора, например:

int channels = selector.select();

Этот метод блокируется до тех пор, пока хотя бы один канал не будет готов к операции. Возвращаемое целое число представляет собой количество ключей, чьи каналы готовы к операции.

Далее мы обычно получаем набор выбранных ключей для обработки:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Полученный набор состоит из объектов SelectionKey, каждый ключ представляет собой зарегистрированный канал, который готов к операции.

После этого мы обычно перебираем этот набор и для каждого ключа получаем канал и выполняем над ним любые операции, которые появляются в нашем наборе интересов.

За время существования канала он может быть выбран несколько раз, так как его ключ появляется в наборе готовых для разных событий. Вот почему у нас должен быть непрерывный цикл для захвата и обработки событий канала по мере их возникновения.

8. Полный пример

Чтобы закрепить знания, полученные в предыдущих разделах, мы создадим полный пример клиент-сервер.

Для простоты тестирования нашего кода мы создадим эхо-сервер и эхо-клиент. В такой настройке клиент подключается к серверу и начинает отправлять ему сообщения. Сервер возвращает сообщения, отправленные каждым клиентом.

Когда сервер встречает определенное сообщение, например end, он интерпретирует его как конец связи и закрывает соединение с клиентом.

8.1. Сервер

Вот наш код для EchoServer.java:

public class EchoServer {

    private static final String POISON_PILL = "POISON_PILL";

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        ByteBuffer buffer = ByteBuffer.allocate(256);

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {

                SelectionKey key = iter.next();

                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }

                if (key.isReadable()) {
                    answerWithEcho(buffer, key);
                }
                iter.remove();
            }
        }
    }

    private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
      throws IOException {
 
        SocketChannel client = (SocketChannel) key.channel();
        client.read(buffer);
        if (new String(buffer.array()).trim().equals(POISON_PILL)) {
            client.close();
            System.out.println("Not accepting client messages anymore");
        }
        else {
            buffer.flip();
            client.write(buffer);
            buffer.clear();
        }
    }

    private static void register(Selector selector, ServerSocketChannel serverSocket)
      throws IOException {
 
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
    }

    public static Process start() throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = EchoServer.class.getCanonicalName();

        ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);

        return builder.start();
    }
}

Вот что происходит; мы создаем объект Selector, вызывая статический метод открытия. Затем мы также создаем канал, вызывая его статический метод открытия, в частности, экземпляр ServerSocketChannel.

Это связано с тем, что ServerSocketChannel доступен для выбора и подходит для потокового сокета прослушивания.

Затем мы привязываем его к порту по нашему выбору. Помните, мы говорили ранее, что перед регистрацией выбираемого канала в селекторе мы должны сначала установить его в неблокирующий режим. Затем мы делаем это, а затем регистрируем канал в селекторе.

Экземпляр SelectionKey этого канала на данном этапе нам не нужен, поэтому мы не будем его запоминать.

«Java NIO использует модель, ориентированную на буфер, отличную от модели, ориентированной на поток. Таким образом, связь через сокеты обычно осуществляется путем записи в буфер и чтения из него.

Поэтому мы создаем новый ByteBuffer, в который сервер будет записывать и читать. Мы инициализируем его на 256 байт, это просто произвольное значение, в зависимости от того, сколько данных мы планируем передавать туда и обратно.

Наконец, мы выполняем процесс выбора. Мы выбираем готовые каналы, получаем их ключи выбора, перебираем ключи и выполняем операции, для которых готов каждый канал.

Мы делаем это в бесконечном цикле, так как серверы обычно должны продолжать работать независимо от того, есть активность или нет.

Единственная операция, которую может обрабатывать ServerSocketChannel, — это операция ACCEPT. Когда мы принимаем соединение от клиента, мы получаем объект SocketChannel, для которого мы можем выполнять чтение и запись. Мы устанавливаем его в неблокирующий режим и регистрируем для операции READ в селекторе.

Во время одного из последующих выборов этот новый канал станет готовым к чтению. Мы извлекаем его и читаем его содержимое в буфер. Правда, это как эхо-сервер, мы должны записать этот контент обратно клиенту.

Когда мы хотим записать в буфер, из которого мы читали, мы должны вызвать метод flip().

Наконец, мы устанавливаем буфер в режим записи, вызывая метод flip и просто пишем в него.

Метод start() определен таким образом, что эхо-сервер может быть запущен как отдельный процесс во время модульного тестирования.

8.2. Клиент

Вот наш код для EchoClient.java:

public class EchoClient {
    private static SocketChannel client;
    private static ByteBuffer buffer;
    private static EchoClient instance;

    public static EchoClient start() {
        if (instance == null)
            instance = new EchoClient();

        return instance;
    }

    public static void stop() throws IOException {
        client.close();
        buffer = null;
    }

    private EchoClient() {
        try {
            client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            buffer = ByteBuffer.allocate(256);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public String sendMessage(String msg) {
        buffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;
        try {
            client.write(buffer);
            buffer.clear();
            client.read(buffer);
            response = new String(buffer.array()).trim();
            System.out.println("response=" + response);
            buffer.clear();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;

    }
}

Клиент проще сервера.

Мы используем одноэлементный шаблон, чтобы создать его экземпляр внутри статического метода start. Мы вызываем частный конструктор из этого метода.

В приватном конструкторе мы открываем соединение на том же порту, на который был привязан канал сервера и все еще на том же хосте.

Затем мы создаем буфер, в который мы можем писать и из которого мы можем читать.

Наконец, у нас есть метод sendMessage, который считывает и переносит любую строку, которую мы ему передаем, в байтовый буфер, который передается по каналу на сервер.

Затем мы читаем из клиентского канала, чтобы получить сообщение, отправленное сервером. Мы возвращаем это как эхо нашего сообщения.

8.3. Тестирование

Внутри класса с именем EchoTest.java мы собираемся создать тестовый пример, который запускает сервер, отправляет сообщения на сервер и проходит только тогда, когда те же сообщения возвращаются с сервера. В качестве последнего шага тестовый пример останавливает сервер перед завершением.

Теперь мы можем запустить тест:

public class EchoTest {

    Process server;
    EchoClient client;

    @Before
    public void setup() throws IOException, InterruptedException {
        server = EchoServer.start();
        client = EchoClient.start();
    }

    @Test
    public void givenServerClient_whenServerEchosMessage_thenCorrect() {
        String resp1 = client.sendMessage("hello");
        String resp2 = client.sendMessage("world");
        assertEquals("hello", resp1);
        assertEquals("world", resp2);
    }

    @After
    public void teardown() throws IOException {
        server.destroy();
        EchoClient.stop();
    }
}

9. Selector.wakeup()

Как мы видели ранее, вызов selector.select() блокирует текущий поток до тех пор, пока один из отслеживаемых каналов не станет рабочим. готов. Мы можем переопределить это, вызвав selector.wakeup() из другого потока.

В результате блокирующий поток немедленно возвращается, а не продолжает ждать, независимо от того, готов канал или нет.

Мы можем продемонстрировать это, используя CountDownLatch и отслеживая этапы выполнения кода:

@Test
public void whenWakeUpCalledOnSelector_thenBlockedThreadReturns() {
    Pipe pipe = Pipe.open();
    Selector selector = Selector.open();
    SelectableChannel channel = pipe.source();
    channel.configureBlocking(false);
    channel.register(selector, OP_READ);

    List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());

    CountDownLatch latch = new CountDownLatch(1);

    new Thread(() -> {
        invocationStepsTracker.add(">> Count down");
        latch.countDown();
        try {
            invocationStepsTracker.add(">> Start select");
            selector.select();
            invocationStepsTracker.add(">> End select");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }).start();

    invocationStepsTracker.add(">> Start await");
    latch.await();
    invocationStepsTracker.add(">> End await");

    invocationStepsTracker.add(">> Wakeup thread");
    selector.wakeup();
    //clean up
    channel.close();

    assertThat(invocationStepsTracker)
      .containsExactly(
        ">> Start await",
        ">> Count down",
        ">> Start select",
        ">> End await",
        ">> Wakeup thread",
        ">> End select"
    );
}

В этом примере мы используем класс Java NIO Pipe для открытия канала в целях тестирования. Мы отслеживаем шаги выполнения кода в потокобезопасном списке. Анализируя эти шаги, мы можем видеть, как selector.wakeup() освобождает поток, заблокированный selector.select().

10. Заключение

В этой статье мы рассмотрели основы использования компонента Java NIO Selector.

Полный исходный код и все фрагменты кода для этой статьи доступны в моем проекте GitHub.