马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
1.消耗者
- public class CallbackClient {
- public static void main(String[] args) {
- try {
- SocketChannel socketChannel = SocketChannel.open();
- socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));
- ByteBuffer writeBuffer = ByteBuffer.allocate(32);
- ByteBuffer readBuffer = ByteBuffer.allocate(32);
- getMessage(readBuffer, socketChannel);
- sendRandomInt(writeBuffer, socketChannel, 1000);
- getMessage(readBuffer, socketChannel);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- sendRandomInt(writeBuffer, socketChannel, 10);
- getMessage(readBuffer, socketChannel);
- socketChannel.close();
- } catch (IOException e) {
- }
- }
- public static void sendRandomInt(ByteBuffer writeBuffer, SocketChannel socketChannel, int bound) {
- Random r = new Random();
- int d = 0;
- d = r.nextInt(bound);
- if (d == 0)
- d = 1;
- System.out.println(d);
- writeBuffer.clear();
- writeBuffer.put(String.valueOf(d).getBytes());
- writeBuffer.flip();
- try {
- socketChannel.write(writeBuffer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- public static void getMessage(ByteBuffer readBuffer, SocketChannel socketChannel) {
- readBuffer.clear();
- byte[] buf = new byte[16];
- try {
- socketChannel.read(readBuffer);
- } catch (IOException e) {
- }
- readBuffer.flip();
- readBuffer.get(buf, 0, readBuffer.remaining());
- System.out.println(new String(buf));
- }
- }
复制代码 2.服务提供者
- package com.example.demo.callback;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.nio.charset.StandardCharsets;
- import java.util.Iterator;
- import java.util.Set;
-
- public class NioServer {
-
- public static void main(String[] args) throws IOException {
- // 打开服务器套接字通道
- ServerSocketChannel serverSocket = ServerSocketChannel.open();
- serverSocket.configureBlocking(false);
- serverSocket.socket().bind(new InetSocketAddress(8000));
-
- // 打开多路复用器
- Selector selector = Selector.open();
-
- // 注册服务器通道到多路复用器上,并监听接入事件
- serverSocket.register(selector, SelectionKey.OP_ACCEPT);
-
- final ByteBuffer buffer = ByteBuffer.allocate(1024);
-
- while (true) {
- // 非阻塞地等待注册的通道事件
- selector.select();
-
- // 获取发生事件的selectionKey集合
- Set<SelectionKey> selectedKeys = selector.selectedKeys();
- Iterator<SelectionKey> it = selectedKeys.iterator();
-
- // 遍历所有发生事件的selectionKey
- while (it.hasNext()) {
- SelectionKey key = it.next();
- it.remove();
-
- // 处理接入请求
- if (key.isAcceptable()) {
-
- ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
- SocketChannel socketChannel = ssc.accept();
- socketChannel.configureBlocking(false);
- SelectionKey newKey = socketChannel.register(selector, SelectionKey.OP_WRITE, ByteBuffer.allocate(1024));
-
- //添加后可使用处理方法2处理
- CommonClient client = new CommonClient(socketChannel, newKey);
- newKey.attach(client);
- }
-
- // 处理读事件
- if (key.isReadable()) {
- SocketChannel socketChannel = (SocketChannel) key.channel();
- buffer.clear();
- while (socketChannel.read(buffer) > 0) {
- buffer.flip();
- String receivedMessage = StandardCharsets.UTF_8.decode(buffer).toString();
- handleReceivedMessage(socketChannel, receivedMessage);
- buffer.clear();
- }
- //处理方法2
- CommonClient client = (CommonClient) key.attachment();
- client.onRead();
- }
-
- // 处理写事件
- if (key.isWritable()) {
- //处理方法1可以仿照方法2的格式写
- //处理方法2
- CommonClient client = (CommonClient) key.attachment();
- client.onWrite();
- }
- }
- }
- }
-
- // 回调函数,处理接收到的数据
- private static void handleReceivedMessage(SocketChannel socketChannel, String message) throws IOException {
- System.out.println("Received message: " + message);
- // 回复客户端
- socketChannel.write(ByteBuffer.wrap("Server received the message".getBytes(StandardCharsets.UTF_8)));
- }
- }
复制代码- public class CommonClient {
- private SocketChannel clientSocket;
- private ByteBuffer recvBuffer;
- private SelectionKey key;
- private Callback callback;
- private String msg;
- public CommonClient(SocketChannel clientSocket, SelectionKey key) {
- this.clientSocket = clientSocket;
- this.key = key;
- recvBuffer = ByteBuffer.allocate(8);
- try {
- this.clientSocket.configureBlocking(false);
- key.interestOps(SelectionKey.OP_WRITE);
- } catch (IOException e) {
- }
- }
- public void close() {
- try {
- clientSocket.close();
- key.cancel();
- }
- catch (IOException e){};
- }
- // an rpc to notify client to send a number
- public void sendMessage(String msg, Callback cback) {
- this.callback = cback;
- try {
- try {
- recvBuffer.clear();
- recvBuffer.put(msg.getBytes());
- recvBuffer.flip();
- clientSocket.write(recvBuffer);
- key.interestOps(SelectionKey.OP_READ);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- catch (Exception e) {
- }
- }
- // when key is writable, resume the fiber to continue
- // to write.
- public void onWrite() {
- sendMessage("divident", new Callback() {
- @Override
- public void onSucceed(int data) {
- int a = data;
- sendMessage("divisor", new Callback() {
- @Override
- public void onSucceed(int data) {
- int b = data;
- sendMessage(String.valueOf(a / b), null);
- }
- });
- }
- });
- }
- public void onRead() {
- int res = 0;
- try {
- try {
- recvBuffer.clear();
- // read may fail even SelectionKey is readable
- // when read fails, the fiber should suspend, waiting for next
- // time the key is ready.
- int n = clientSocket.read(recvBuffer);
- while (n == 0) {
- n = clientSocket.read(recvBuffer);
- }
- if (n == -1) {
- close();
- return;
- }
- System.out.println("received " + n + " bytes from client");
- } catch (IOException e) {
- e.printStackTrace();
- }
- recvBuffer.flip();
- res = getInt(recvBuffer);
- // when read ends, we are no longer interested in reading,
- // but in writing.
- key.interestOps(SelectionKey.OP_WRITE);
- } catch (Exception e) {
- }
- this.callback.onSucceed(res);
- }
- public int getInt(ByteBuffer buf) {
- int r = 0;
- while (buf.hasRemaining()) {
- r *= 10;
- r += buf.get() - '0';
- }
- return r;
- }
- }
复制代码- public interface Callback {
- public void onSucceed(int data);
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金 |