NIO的callback调用方式

[复制链接]
发表于 2026-1-15 02:58:24 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
1.消耗者
  1. public class CallbackClient {
  2.     public static void main(String[] args) {
  3.         try {
  4.             SocketChannel socketChannel = SocketChannel.open();
  5.             socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));
  6.             ByteBuffer writeBuffer = ByteBuffer.allocate(32);
  7.             ByteBuffer readBuffer = ByteBuffer.allocate(32);
  8.             getMessage(readBuffer, socketChannel);
  9.             sendRandomInt(writeBuffer, socketChannel, 1000);
  10.             getMessage(readBuffer, socketChannel);
  11.             try {
  12.                 Thread.sleep(5000);
  13.             } catch (InterruptedException e) {
  14.                 e.printStackTrace();
  15.             }
  16.             sendRandomInt(writeBuffer, socketChannel, 10);
  17.             getMessage(readBuffer, socketChannel);
  18.             socketChannel.close();
  19.         } catch (IOException e) {
  20.         }
  21.     }
  22.     public static void sendRandomInt(ByteBuffer writeBuffer, SocketChannel socketChannel, int bound) {
  23.         Random r = new Random();
  24.         int d = 0;
  25.         d = r.nextInt(bound);
  26.         if (d == 0)
  27.             d = 1;
  28.         System.out.println(d);
  29.         writeBuffer.clear();
  30.         writeBuffer.put(String.valueOf(d).getBytes());
  31.         writeBuffer.flip();
  32.         try {
  33.             socketChannel.write(writeBuffer);
  34.         } catch (IOException e) {
  35.             e.printStackTrace();
  36.         }
  37.     }
  38.     public static void getMessage(ByteBuffer readBuffer, SocketChannel socketChannel) {
  39.         readBuffer.clear();
  40.         byte[] buf = new byte[16];
  41.         try {
  42.             socketChannel.read(readBuffer);
  43.         } catch (IOException e) {
  44.         }
  45.         readBuffer.flip();
  46.         readBuffer.get(buf, 0, readBuffer.remaining());
  47.         System.out.println(new String(buf));
  48.     }
  49. }
复制代码
2.服务提供者
  1. package com.example.demo.callback;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.nio.charset.StandardCharsets;
  10. import java.util.Iterator;
  11. import java.util.Set;
  12. public class NioServer {
  13.     public static void main(String[] args) throws IOException {
  14.         // 打开服务器套接字通道
  15.         ServerSocketChannel serverSocket = ServerSocketChannel.open();
  16.         serverSocket.configureBlocking(false);
  17.         serverSocket.socket().bind(new InetSocketAddress(8000));
  18.         // 打开多路复用器
  19.         Selector selector = Selector.open();
  20.         // 注册服务器通道到多路复用器上,并监听接入事件
  21.         serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  22.         final ByteBuffer buffer = ByteBuffer.allocate(1024);
  23.         while (true) {
  24.             // 非阻塞地等待注册的通道事件
  25.             selector.select();
  26.             // 获取发生事件的selectionKey集合
  27.             Set<SelectionKey> selectedKeys = selector.selectedKeys();
  28.             Iterator<SelectionKey> it = selectedKeys.iterator();
  29.             // 遍历所有发生事件的selectionKey
  30.             while (it.hasNext()) {
  31.                 SelectionKey key = it.next();
  32.                 it.remove();
  33.                 // 处理接入请求
  34.                 if (key.isAcceptable()) {
  35.                        
  36.                     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  37.                     SocketChannel socketChannel = ssc.accept();
  38.                     socketChannel.configureBlocking(false);
  39.                     SelectionKey newKey = socketChannel.register(selector, SelectionKey.OP_WRITE, ByteBuffer.allocate(1024));
  40.                     
  41.                     //添加后可使用处理方法2处理
  42.                                         CommonClient client = new CommonClient(socketChannel, newKey);
  43.                     newKey.attach(client);
  44.                 }
  45.                 // 处理读事件
  46.                 if (key.isReadable()) {
  47.                     SocketChannel socketChannel = (SocketChannel) key.channel();
  48.                     buffer.clear();
  49.                     while (socketChannel.read(buffer) > 0) {
  50.                         buffer.flip();
  51.                         String receivedMessage = StandardCharsets.UTF_8.decode(buffer).toString();
  52.                         handleReceivedMessage(socketChannel, receivedMessage);
  53.                         buffer.clear();
  54.                     }
  55.                                         //处理方法2
  56.                                          CommonClient client = (CommonClient) key.attachment();
  57.                      client.onRead();
  58.                 }
  59.                 // 处理写事件
  60.                 if (key.isWritable()) {
  61.                                                 //处理方法1可以仿照方法2的格式写
  62.                                                 //处理方法2
  63.                         CommonClient client = (CommonClient) key.attachment();
  64.                         client.onWrite();
  65.                     }
  66.             }
  67.         }
  68.     }
  69.     // 回调函数,处理接收到的数据
  70.     private static void handleReceivedMessage(SocketChannel socketChannel, String message) throws IOException {
  71.         System.out.println("Received message: " + message);
  72.         // 回复客户端
  73.         socketChannel.write(ByteBuffer.wrap("Server received the message".getBytes(StandardCharsets.UTF_8)));
  74.     }
  75. }
复制代码
  1. public class CommonClient {
  2.         private SocketChannel clientSocket;
  3.         private ByteBuffer recvBuffer;
  4.         private SelectionKey key;
  5.         private Callback callback;
  6.         private String msg;
  7.         public CommonClient(SocketChannel clientSocket, SelectionKey key) {
  8.             this.clientSocket = clientSocket;
  9.             this.key = key;
  10.             recvBuffer = ByteBuffer.allocate(8);
  11.             try {
  12.                 this.clientSocket.configureBlocking(false);
  13.                 key.interestOps(SelectionKey.OP_WRITE);
  14.             } catch (IOException e) {
  15.             }
  16.         }
  17.         public void close() {
  18.             try {
  19.                 clientSocket.close();
  20.                 key.cancel();
  21.             }
  22.             catch (IOException e){};
  23.         }
  24.         // an rpc to notify client to send a number
  25.         public void sendMessage(String msg, Callback cback)  {
  26.             this.callback = cback;
  27.             try {
  28.                 try {
  29.                     recvBuffer.clear();
  30.                     recvBuffer.put(msg.getBytes());
  31.                     recvBuffer.flip();
  32.                     clientSocket.write(recvBuffer);
  33.                     key.interestOps(SelectionKey.OP_READ);
  34.                 } catch (IOException e) {
  35.                     e.printStackTrace();
  36.                 }
  37.             }
  38.             catch (Exception e) {
  39.             }
  40.         }
  41.         // when key is writable, resume the fiber to continue
  42.         // to write.
  43.         public void onWrite() {
  44.             sendMessage("divident", new Callback() {
  45.                 @Override
  46.                 public void onSucceed(int data) {
  47.                     int a = data;
  48.                     sendMessage("divisor", new Callback() {
  49.                         @Override
  50.                         public void onSucceed(int data) {
  51.                             int b = data;
  52.                             sendMessage(String.valueOf(a / b), null);
  53.                         }
  54.                     });
  55.                 }
  56.             });
  57.         }
  58.         public void onRead() {
  59.             int res = 0;
  60.             try {
  61.                 try {
  62.                     recvBuffer.clear();
  63.                     // read may fail even SelectionKey is readable
  64.                     // when read fails, the fiber should suspend, waiting for next
  65.                     // time the key is ready.
  66.                     int n = clientSocket.read(recvBuffer);
  67.                     while (n == 0) {
  68.                         n = clientSocket.read(recvBuffer);
  69.                     }
  70.                     if (n == -1) {
  71.                         close();
  72.                         return;
  73.                     }
  74.                     System.out.println("received " + n + " bytes from client");
  75.                 } catch (IOException e) {
  76.                     e.printStackTrace();
  77.                 }
  78.                 recvBuffer.flip();
  79.                 res = getInt(recvBuffer);
  80.                 // when read ends, we are no longer interested in reading,
  81.                 // but in writing.
  82.                 key.interestOps(SelectionKey.OP_WRITE);
  83.             } catch (Exception e) {
  84.             }
  85.             this.callback.onSucceed(res);
  86.         }
  87.         public int getInt(ByteBuffer buf) {
  88.             int r = 0;
  89.             while (buf.hasRemaining()) {
  90.                 r *= 10;
  91.                 r += buf.get() - '0';
  92.             }
  93.             return r;
  94.         }
  95.     }
复制代码
  1.     public interface Callback {
  2.         public void onSucceed(int data);
  3.     }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表