从BIO到Netty的演变
前言
计算机网络可以说是每个学计算机的都绕不过去的一道坎。计算机网络到底有多么重要,你走到大学图书馆的计算机部分,翻开那些什么《从零开始:黑客XXX》,《黑客攻防从入门到放弃》等书籍,基本第一部分都是在谈论网络。你去一些X客论坛,上面的教程帖也基本都是从网络部分开始的。
相信每一位科班出身的,都学习过《计算机网络》这样书籍, 上过这样的课程。当然教师资源如何,我这里就不谈论,那一定又会引出一顿苦水。但是学习完这样的课程,我们还是对计算机网络感到十分迷茫。这时候的我们可以背下网络七层模型,网络五层模型等,了解局域网,IP等基本地概念,但是计算机网络对于我们来说,还是一个十分空荡荡的名词。
为了更好地了解网络(绝对不是因为那时候很迷黑客的缘故),我决定参加高级网络工程师的考试。通过网络工程师的我对计算机网络有了更为深入的理解,开始将自己的计算机网络体系从概念上勾连起来。也许我可以看懂其中的一些路由规则,甚至看懂一些路由分发的论文。但是我依旧只是站在理论的角度,去理解计算机网络。
到了工作的时候,开始了解Socket编程,开始参与各种实际生产环境的编程。这个时候的我开始对网络有了虽然简单,但是十分真实的接触。计算机网络不再只是停留在书本中的概念,而是我用以实现业务目标的切实手段。
随着工作中开始负责物联网项目的建设,我对网络中的各种协议开始有了自己的认识,甚至可以自己实现网络协议规范的代码落地。于此同时,由于对网络交互的性能要求,我不再只是会使用BIO网络编程,我开始使用NIO网络编程。
为了自己的知识储备,也是为了满足自己的好奇心,我查找了许多的资料,也报了许多课程,去学习网络编程。而我正好周六完成了软考的又一次考试,所以接下来有一定空闲时间的我,接下来会继续整理我的知识,并将它写成博客。
这篇博客的主要内容就是按照演变的顺序,写下BIO->NIO->Reactor->Netty这样的四个里程碑。这也是大佬们推荐的计算机网络编程的学习路线。不过这次只是给个整体的认识以及demo,更为深入的原理探究,会放在后面。
BIO
介绍
几乎每个人都是BIO开始的计算机网络编程,而其中大部分也永远地留在了这个计算机网络编程的模型。
优点
理解简单
实现简单
要求较低
缺点
性能低
瓶颈低
扩展难
代码示例(BIO下TCP)
这里给出一些简单的demo,供大家认识。
BIO_Client
package tech.jarry.learning.netease; import java.io.IOException; import java.io.OutputStream; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.charset.Charset; import java.util.Scanner; /** * @Description: * @Author: jarry */ public class BIOClient { private static final Charset charset = Charset.forName("utf-8"); public static void main(String[] args) throws IOException { Socket socket = new Socket(); // Socket socket = new Socket("localhost", 8080); // 我还以为可以的。但是貌似上面的8080表示目标端口,而下面的8080表示源端口(发送端口) // socket.bind(new InetSocketAddress("localhost", 8080)); // 后来才去确定,.bind是用于绑定源信息,而.connect是用于绑定目标信息 socket.connect(new InetSocketAddress(Inet4Address.getLocalHost(), 8080)); OutputStream outputStream = socket.getOutputStream(); Scanner scanner = new Scanner(System.in); System.out.println("please input: "); String msg = scanner.nextLine(); outputStream.write(msg.getBytes(charset)); scanner.close(); outputStream.close(); socket.close(); } }
BIO_ServerV1
package tech.jarry.learning.netease; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; /** * @Description: BIO模型中Server端的简单实现 * @Author: jarry */ public class BIOServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(8080)); System.out.println("server has started"); while (!serverSocket.isClosed()) { Socket requestClient = serverSocket.accept(); System.out.println("server get a connection: " + requestClient.toString()); InputStream requestInputStream = requestClient.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(requestInputStream)); String msg; while ((msg = reader.readLine()) != null) { if (msg.length() == 0) { break; } System.out.println(msg); } System.out.println("server has receive a message from: " + requestClient.toString()); requestInputStream.close(); requestClient.close(); } serverSocket.close(); } }
BIO_ServerV2
package tech.jarry.learning.netease; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Description: 直接对原有代码BIOServer进行暴力修改,将其阻塞部分,通过多线程实现异步处理 * @Author: jarry */ public class BIOServer1 { private static ExecutorService executorService = Executors.newCachedThreadPool(); public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(8080)); System.out.println("server has started"); while (!serverSocket.isClosed()) { Socket requestClient = serverSocket.accept(); System.out.println("server get a connection: " + requestClient.toString()); executorService.submit(new Runnable() { @Override public void run() { InputStream requestInputStream = null; try { requestInputStream = requestClient.getInputStream(); } catch (IOException e) { e.printStackTrace(); } BufferedReader reader = new BufferedReader(new InputStreamReader(requestInputStream)); String msg = null; while (true) { try { if (!((msg = reader.readLine()) != null)) { break; } } catch (IOException e) { e.printStackTrace(); } if (msg.length() == 0) { break; } System.out.println(msg); } System.out.println("server has receive a message from: " + requestClient.toString()); try { requestInputStream.close(); requestClient.close(); } catch (IOException e) { e.printStackTrace(); } } }); } serverSocket.close(); } /** * 运行结果分析: * server has started * server get a connection: Socket[addr=/10.0.75.1,port=64042,localport=8080] * server get a connection: Socket[addr=/10.0.75.1,port=64052,localport=8080] * server get a connection: Socket[addr=/10.0.75.1,port=64061,localport=8080] * 123 * server has receive a message from: Socket[addr=/10.0.75.1,port=64042,localport=8080] * 456 * server has receive a message from: Socket[addr=/10.0.75.1,port=64052,localport=8080] * 789 * server has receive a message from: Socket[addr=/10.0.75.1,port=64061,localport=8080] */ }
BIO_ServerV3
package tech.jarry.learning.netease; import java.io.*; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; /** * @Description: 直接对原有代码BIOServer进行暴力修改,增加了其http格式的返回,确保浏览器可以正常访问 * @Author: jarry */ public class BIOServer2 { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(8080)); System.out.println("server has started"); while (!serverSocket.isClosed()) { Socket requestClient = serverSocket.accept(); System.out.println("server get a connection: " + requestClient.toString()); InputStream requestInputStream = requestClient.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(requestInputStream)); String msg; while ((msg = reader.readLine()) != null) { if (msg.length() == 0) { break; } System.out.println(msg); } System.out.println("server has receive a message from: " + requestClient.toString()); // 返回数据,并确保可以被http协议理解 OutputStream outputStream = requestClient.getOutputStream(); outputStream.write("HTTP/1.1 200 OKrr".getBytes("utf-8")); outputStream.write("Content-Length: 11rnrn".getBytes("utf-8")); outputStream.write("Hello World".getBytes("utf-8")); outputStream.flush(); requestInputStream.close(); outputStream.close(); requestClient.close(); } serverSocket.close(); } /** * 运行结果分析: */ // server has started // server get a connection: Socket[addr=/0:0:0:0:0:0:0:1,port=63008,localport=8080] // GET / HTTP/1.1 // Host: localhost:8080 // Connection: keep-alive // Cache-Control: max-age=0 // Upgrade-Insecure-Requests: 1 // User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36 // Sec-Fetch-Mode: navigate // Sec-Fetch-User: ?1 // Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3 // Sec-Fetch-Site: none // Accept-Encoding: gzip, deflate, br // Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7 // Cookie: Webstorm-c7a2b5a2=b5e53f87-54cc-41d5-a21f-c7be3056dfe8; centcontrol_login_token=09E8A6B6888CB0B7A9F89AB3DB5FAFE4 // server has receive a message from: Socket[addr=/0:0:0:0:0:0:0:1,port=63008,localport=8080] // server get a connection: Socket[addr=/0:0:0:0:0:0:0:1,port=63009,localport=8080] // GET /favicon.ico HTTP/1.1 // Host: localhost:8080 // Connection: keep-alive // Pragma: no-cache // Cache-Control: no-cache // Sec-Fetch-Mode: no-cors // User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36 // Accept: image/webp,image/apng,image/*,*/*;q=0.8 // Sec-Fetch-Site: same-origin // Referer: http://localhost:8080/ // Accept-Encoding: gzip, deflate, br // Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7 // Cookie: Webstorm-c7a2b5a2=b5e53f87-54cc-41d5-a21f-c7be3056dfe8; centcontrol_login_token=09E8A6B6888CB0B7A9F89AB3DB5FAFE4 // server has receive a message from: Socket[addr=/0:0:0:0:0:0:0:1,port=63009,localport=8080] }
上面的代码是一套的,可以进行Server与Client的通信,功能较为简单。
所以这里再給一个,可以进行通信的版本。简单的业务场景可以直接修改,应用。
BIO2_Client
package self; import java.io.*; import java.net.*; /** * @Description: * @Author: jarry */ public class Client { public static void main(String[] args) throws IOException { Socket socket = new Socket(); socket.setSoTimeout(2000); socket.connect(new InetSocketAddress(Inet4Address.getLocalHost(),2000),2000); System.out.println("client startup"); dealMsg(socket); socket.close(); } private static void dealMsg(Socket clientSocket) throws IOException { // 1.获取键盘输入流 InputStream systemInputStream = System.in; // 2.将systemInputStream转化为具有缓存功能的字符输入流BufferedReader BufferedReader systemBufferedReader = new BufferedReader(new InputStreamReader(systemInputStream)); // 3.获取Socket输入流 InputStream socketInputStream = clientSocket.getInputStream(); // 4.将socketInputStream转换为具有缓存能力的字符输入流 BufferedReader socketBufferedReader = new BufferedReader(new InputStreamReader(socketInputStream)); // 5.获取Socket输出流 OutputStream socketOutputStream = clientSocket.getOutputStream(); // 6.将socketOutputStream转换为打印流(用于发送String) PrintStream socketPrintStream = new PrintStream(socketOutputStream); // 用于确立连接状态的标识符 boolean flag = true; // 7.利用循环,client与server进行交互 do { // 从键盘等系统输入流获取输入字符串 String str = systemBufferedReader.readLine(); // 将str写入到socketClient的打印流(本质是输出流)。socketClient的输出流是连接Server的,用于向Server发送数据的 socketPrintStream.println(str); // 从Server获得回写(Server的回写,一定会发送到socketClient的输入流中(输入的“入”是指入socketClient) String echo = socketBufferedReader.readLine(); // 建立一个用于关闭的方式 if ("bye".equalsIgnoreCase(echo)){ flag = false; }else{ // 在控制台打印server的echo System.out.println("server echo:"+echo); } }while (flag); // 8.退出交互,需要关闭与Server连接的两个资源(输入与输出) 考虑一下lombok的@Cleanup socketBufferedReader.close(); socketPrintStream.close(); } }
BIO2_Server
package self; import java.io.*; import java.net.ServerSocket; import java.net.Socket; /** * @Description: * @Author: jarry */ public class Server { public static void main(String[] args) throws IOException { // 建立Server的Socket,服务端不需要设置IP,以及Port // IP采用本地IP ServerSocket serverSocket = new ServerSocket(2000); System.out.println("server startup"); // 通过循环,对client的请求进行监听 while (true){ // 获得client的请求 Socket clientRequest = serverSocket.accept(); // 异步处理client的请求 ClientHandler clientHandler = new ClientHandler(clientRequest); clientHandler.start(); } } private static class ClientHandler extends Thread { Socket socketClient = null; boolean flag = true; ClientHandler(Socket socketClient){ this.socketClient = socketClient; } @Override public void run() { super.run(); // 构建系统输入流 InputStream systemInputStream = System.in; // 将系统输入流转换为字符输入流 BufferedReader systemBufferedReader = new BufferedReader(new InputStreamReader(systemInputStream)); try { // 构建socketClient的输入流(即客户端中,写入client输出流的数据) InputStream clientInputStream = socketClient.getInputStream(); // 将client的输入流转为具有存储能力的BufferedReader BufferedReader clientBufferedReader = new BufferedReader(new InputStreamReader(clientInputStream)); // 构建socketClient的输出流(用于发送数据,即客户端中,从client输入流读取的数据) OutputStream clientOutputStream = socketClient.getOutputStream(); // 将client的输出流转换为打印流,便于输出数据 PrintStream clientPrintStream = new PrintStream(clientOutputStream); // 通过循环,与客户端进行交互 do { // 读取从客户端发送来的数据,即读取socketClient的输入流转化的BufferedReader String str = clientBufferedReader.readLine(); if ("bye".equalsIgnoreCase(str)){ flag = false; clientPrintStream.println("connect interrupt"); }else{ System.out.println(str); // 发送回写数据,即将回写数据写入socketClient的输出流(客户端的输入流会获取相关数据) clientPrintStream.println(str.length()); } // 从系统输入中获取想要发送的数据 String servStr = systemBufferedReader.readLine(); // 发送到客户端 clientPrintStream.println(servStr); }while (flag); // 同样的,关闭连接资源 clientBufferedReader.close(); clientPrintStream.close(); } catch (IOException e) { e.printStackTrace(); }finally { // 无论发生什么,最后都要关闭socket连接 try { socketClient.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
为了使得代码结构更有优雅,并且为了更好地处理字符编码问题(demo中保留了各种数据类型的处理方式)。我们将上述版本更新一下。
BIO2_ClientV2
package example; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; public class Client { // 连接到远程服务器的远程端口 private static final int PORT = 20000; // 本地端口 private static final int LOCAL_PORT = 20001; public static void main(String[] args) throws IOException { // 创建Socket的操作,可以选择不同的创建方式 Socket socket = createSocket(); // Socket初始化操作 initSocket(socket); // 链接到本地20000端口,超时时间3秒,超过则抛出超时异常 socket.connect(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 3000); System.out.println("已发起服务器连接,并进入后续流程~"); System.out.println("客户端信息:" + socket.getLocalAddress() + " P:" + socket.getLocalPort()); System.out.println("服务器信息:" + socket.getInetAddress() + " P:" + socket.getPort()); try { // 发送接收数据 todo(socket); } catch (Exception e) { System.out.println("异常关闭"); } // 释放资源 socket.close(); System.out.println("客户端已退出~"); } /** * 创建Socket * @return * @throws IOException */ private static Socket createSocket() throws IOException { /* // 无代理模式,等效于空构造函数 Socket socket = new Socket(Proxy.NO_PROXY); // 新建一份具有HTTP代理的套接字,传输数据将通过www.baidu.com:8080端口转发 Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(Inet4Address.getByName("www.baidu.com"), 8800)); socket = new Socket(proxy); // 新建一个套接字,并且直接链接到本地20000的服务器上 socket = new Socket("localhost", PORT); // 新建一个套接字,并且直接链接到本地20000的服务器上 socket = new Socket(Inet4Address.getLocalHost(), PORT); // 新建一个套接字,并且直接链接到本地20000的服务器上,并且绑定到本地20001端口上 socket = new Socket("localhost", PORT, Inet4Address.getLocalHost(), LOCAL_PORT); socket = new Socket(Inet4Address.getLocalHost(), PORT, Inet4Address.getLocalHost(), LOCAL_PORT); */ // 推荐无参构造,因为其它(上面)的构造方法都是包含构造,设参,以及connect操作。而socket一旦connect后,设置参数的操作就无效了。不便于灵活使用 Socket socket = new Socket(); // 绑定到本地20001端口 socket.bind(new InetSocketAddress(Inet4Address.getLocalHost(), LOCAL_PORT)); return socket; } private static void initSocket(Socket socket) throws SocketException { // 设置读取超时时间为2秒 socket.setSoTimeout(2000); // 是否复用未完全关闭的Socket地址,对于指定bind操作后的套接字有效(正常Socket关闭后,对应端口在两分钟内将不再复用。而这个设置将可以直接使用对应空置端口) socket.setReuseAddress(true); // 是否开启Nagle算法(开启后,两点:第一,会对收到的每次数据进行ACK,另一端只有在接收到对应ACK,才会继续发送数据。第二,如果有数据堆积,会一次将所有堆积数据发出去(毕竟这种模式有数据堆积是正常的) // 开启后,更为严谨,严格,安全(默认开启) socket.setTcpNoDelay(true); // 是否需要在长时无数据响应时发送确认数据(类似心跳包),时间大约为2小时 socket.setKeepAlive(true); // 对于close关闭操作行为进行怎样的处理;默认为false,0 // false、0:默认情况,关闭时立即返回,底层系统接管输出流,将缓冲区内的数据发送完成 // true、0:关闭时立即返回,缓冲区数据抛弃,直接发送RST结束命令到对方,并无需经过2MSL等待 // true、200:关闭时最长阻塞200毫秒,随后按第二情况处理 socket.setSoLinger(true, 20); // 是否让紧急数据内敛,默认false;紧急数据通过 socket.sendUrgentData(1);发送 // 只有设置为true,才会暴露到上层(逻辑层) socket.setOOBInline(true); // 设置接收发送缓冲器大小 socket.setReceiveBufferSize(64 * 1024 * 1024); socket.setSendBufferSize(64 * 1024 * 1024); // 设置性能参数:短链接,延迟,带宽的相对重要性(权重) socket.setPerformancePreferences(1, 1, 0); } private static void todo(Socket client) throws IOException { // 得到Socket输出流 OutputStream outputStream = client.getOutputStream(); // 得到Socket输入流 InputStream inputStream = client.getInputStream(); byte[] buffer = new byte[256]; ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); // 等同于上两行代码(ByteBuffer是NIO提供的一个工具,allocate就是分配内存地址,ByteBuffer处理的是byte) // ByteBuffer byteBuffer = ByteBuffer.allocate(256); // 尝试各种数据传输,发出 // byte byteBuffer.put((byte) 126); // char char c = 'a'; byteBuffer.putChar(c); // int int i = 2323123; byteBuffer.putInt(i); // bool boolean b = true; byteBuffer.put(b ? (byte) 1 : (byte) 0); // Long long l = 298789739; byteBuffer.putLong(l); // float float f = 12.345f; byteBuffer.putFloat(f); // double double d = 13.31241248782973; byteBuffer.putDouble(d); // String String str = "Hello你好!"; byteBuffer.put(str.getBytes()); // 发送到服务器(长度等于index+1) outputStream.write(buffer, 0, byteBuffer.position() + 1); // 接收服务器返回 int read = inputStream.read(buffer); System.out.println("收到数量:" + read); // 资源释放 outputStream.close(); inputStream.close(); } /** * 扩展-MSL * MSL是Maximum Segment Lifetime的英文缩写,可译为“最长报文段寿命”, * 它是任何报文在网络上存在的最长的最长时间,超过这个时间报文将被丢弃。 * 我们都知道IP头部中有个TTL字段,TTL是time to live的缩写,可译为“生存时间”, * 这个生存时间是由源主机设置设置初始值但不是但不是存在的具体时间,而是一个IP数据报可以经过的最大路由数,每经过一个路由器,它的值就减1, * 当此值为0则数据报被丢弃,同时发送ICMP报文通知源主机。 * RFC793中规定MSL为2分钟,但这完全是从工程上来考虑,对于现在的网络,MSL=2分钟可能太长了一些。 * 因此TCP允许不同的实现可根据具体情况使用更小的MSL值。TTL与MSL是有关系的但不是简单的相等关系,MSL要大于TTL。 */ }
BIO2_ServerV2
package example; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; public class Server { private static final int PORT = 20000; public static void main(String[] args) throws IOException { ServerSocket server = createServerSocket(); initServerSocket(server); // 绑定到本地端口上 backlog标识等待队列中等待数量(超出,则在对应的客户端触发异常) server.bind(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 50); System.out.println("服务器准备就绪~"); System.out.println("服务器信息:" + server.getInetAddress() + " P:" + server.getLocalPort()); // 等待客户端连接 for (; ; ) { // 得到客户端 Socket client = server.accept(); // 客户端构建异步线程 ClientHandler clientHandler = new ClientHandler(client); // 启动线程 clientHandler.start(); } } private static ServerSocket createServerSocket() throws IOException { // 创建基础的ServerSocket ServerSocket serverSocket = new ServerSocket(); // 绑定到本地端口20000上,并且设置当前可允许等待链接的队列为50个 //server.bind(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 50); //serverSocket = new ServerSocket(PORT); // 等效于上面的方案,队列设置为50个 //serverSocket = new ServerSocket(PORT, 50); // 与上面等同 // serverSocket = new ServerSocket(PORT, 50, Inet4Address.getLocalHost()); return serverSocket; } private static void initServerSocket(ServerSocket serverSocket) throws IOException { // 是否复用未完全关闭的地址端口 serverSocket.setReuseAddress(true); // 等效Socket#setReceiveBufferSize(针对的是accept()接收到的clientSocket。毕竟在accept时就已经接收到了一定的数据了) serverSocket.setReceiveBufferSize(64 * 1024 * 1024); // 设置serverSocket#accept超时时间 // serverSocket.setSoTimeout(2000); // 设置性能参数:短链接,延迟,带宽的相对重要性(针对的是accept()接收到的clientSocket) serverSocket.setPerformancePreferences(1, 1, 1); } /** * 客户端消息处理 */ private static class ClientHandler extends Thread { private Socket socket; ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { super.run(); System.out.println("新客户端连接:" + socket.getInetAddress() + " P:" + socket.getPort()); try { // 得到套接字流 OutputStream outputStream = socket.getOutputStream(); InputStream inputStream = socket.getInputStream(); byte[] buffer = new byte[256]; int readCount = inputStream.read(buffer); ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, readCount); // 按客户端发送的顺序读取 // byte byte be = byteBuffer.get(); // char char c = byteBuffer.getChar(); // int int i = byteBuffer.getInt(); // bool boolean b = byteBuffer.get() == 1; // Long long l = byteBuffer.getLong(); // float float f = byteBuffer.getFloat(); // double double d = byteBuffer.getDouble(); // String int pos = byteBuffer.position(); String str = new String(buffer, pos, readCount - pos - 1); System.out.println("收到数量:" + readCount + " 数据:" + be + "n" + c + "n" + i + "n" + b + "n" + l + "n" + f + "n" + d + "n" + str + "n"); outputStream.write(buffer, 0, readCount); outputStream.close(); inputStream.close(); } catch (Exception e) { System.out.println("连接异常断开"); } finally { // 连接关闭 try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("客户端已退出:" + socket.getInetAddress() + " P:" + socket.getPort()); } } }
BIO2_Tool
这里的tool,表明了两点:如何实现int与byte之间的转换,可以自定义实现数据的转换
package example; /** * 过渡一下,简述int与byte之间的转换。 * 进而明确各种数据类型与byte之间的转化。 * 最终引申出NIO包下的ByteBuffer工具,实现不同数据类型与byte类型的相互转换 */ public class Tools { public static int byteArrayToInt(byte[] b) { return b[3] & 0xFF | (b[2] & 0xFF) << 8 | (b[1] & 0xFF) << 16 | (b[0] & 0xFF) << 24; } public static byte[] intToByteArray(int a) { return new byte[]{ (byte) ((a >> 24) & 0xFF), (byte) ((a >> 16) & 0xFF), (byte) ((a >> 8) & 0xFF), (byte) (a & 0xFF) }; } }
代码示例扩展(BIO下UDP)
由于实际工作中UDP使用得比较少,所以这里只给出了BIO中UDP的使用。不过也基本满足了UDP的使用入门了,可以实现局域网搜索(起码对我目前的工作来说是够用了)。至于UDP用于音视频数据传输,得读者自己寻找,或者等我了解之后,更新。
BIO_UDP_Searcher
package self; import java.io.IOException; import java.net.*; /** * @Description: * @Author: jarry */ public class UDPSearcher { public static void main(String[] args) throws IOException { System.out.println("UDPSearcher started."); // 构建UDP的Socket(由于是searcher,即数据的率先发送者,所以可以不用指定port,用于监听) DatagramSocket datagramSocket = new DatagramSocket(); // 构建请求消息的实体(包含目标ip及port) String requestMsg = "just a joke."; byte[] requestBytes = requestMsg.getBytes(); DatagramPacket requestPacket = new DatagramPacket(requestBytes, requestBytes.length); requestPacket.setAddress(Inet4Address.getLocalHost()); requestPacket.setPort(20000); // 发送请求数据 System.out.println("UDPSearcher has send msg."); datagramSocket.send(requestPacket); // 接收回送数据 byte[] buf = new byte[512]; DatagramPacket receivePacket = new DatagramPacket(buf,buf.length); datagramSocket.receive(receivePacket); String sourceIp = receivePacket.getAddress().getHostAddress(); int sourcePort = receivePacket.getPort(); int dataLength = receivePacket.getLength(); String receiveData = new String(receivePacket.getData(),0,receivePacket.getData().length); // 显示接收到的数据 System.out.println("UDPSearcher has received data with source:"+sourceIp+":"+sourcePort+" with length "+dataLength+". data:"+receiveData); // 由于是demo,所以不用循环,就此结束 System.out.println("UDPSearcher finished."); datagramSocket.close(); } }
BIO_UDP_Provider
package self; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; /** * @Description: * @Author: jarry */ public class UDPProvider { public static void main(String[] args) throws IOException { System.out.println("UDPProvider started."); // 新建DatagramSocekt,并设定在本机20000端口监听,并接收消息 DatagramSocket datagramSocket = new DatagramSocket(20000); // 新建DatagramPacket实体 byte[] buf = new byte[512]; DatagramPacket datagramPacket = new DatagramPacket(buf,buf.length); // 接收数据 datagramSocket.receive(datagramPacket); // 处理接受到的数据 String sourceIp = datagramPacket.getAddress().getHostAddress(); int sourcePort = datagramPacket.getPort(); String data = new String(datagramPacket.getData(),0,datagramPacket.getLength()); // 显示接收到的数据 System.out.println("UDPProvider has received data with source:"+sourceIp+":"+sourcePort+" with length "+data.length()+". data:"+data); // 准备发送回送数据 String responseData = "UDPProvider has received data with length:"+data.length(); byte[] responseBytes = responseData.getBytes(); // 构建回送数据实体(别玩了,设置目标ip与port) DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length ,datagramPacket.getAddress(),datagramPacket.getPort()); // 发送回送数据 System.out.println("UDPProvider has sended data."); datagramSocket.send(responsePacket); // 由于是demo,所以不用循环,就此结束 System.out.println("UDPProvider finished."); datagramSocket.close(); } }
为了网络监听的clear,以及权限问题,需要对上述代码进行一次升级。
BIO_UDP2_MessageCreator
package self.v2; /** * @Description: 自定义通信数据格式(这可能是最简单的应用层协议了) * @Author: jarry */ public class MessageCreator { private static final String SN_HEADER = "收到暗号,我是(SN):"; private static final String PORT_HEADER = "发送暗号,请回电端口(PORT):"; public static String buildWithPort(int port){ return PORT_HEADER + port; } public static int parsePort(String data){ if (data.startsWith(PORT_HEADER)){ return Integer.parseInt(data.substring(PORT_HEADER.length())); } return -1; } public static String buildWithSN(String sn){ return SN_HEADER + sn; } public static String parseSN(String data){ if (data.startsWith(SN_HEADER)){ return data.substring(SN_HEADER.length()); } return null; } }
BIO_UDP2_Searcher
package self.v2; import java.io.IOException; import java.net.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; /** * @Description: * @Author: jarry */ public class UDPSearcher { // 监听端口号 private static final int LISTEN_PORT = 30000; public static void main(String[] args) throws IOException, InterruptedException { System.out.println("UDPSearcher Started"); Listener listener = listen(); sendBroadcast(); // 读取任意键盘信息后退出 System.in.read(); List<Device> devices = listener.getDevicesAndClose(); for (Device device : devices) { System.out.println("Device:"+device.toString()); } // 完成 System.out.println("UDPSearcher Finished"); } private static Listener listen() throws InterruptedException { System.out.println("UDPSearcher start listen."); CountDownLatch countDownLatch = new CountDownLatch(1); Listener listener = new Listener(LISTEN_PORT, countDownLatch); listener.start(); countDownLatch.await(); return listener; } /** * 用于发送广播消息 * @throws IOException */ private static void sendBroadcast() throws IOException { System.out.println("UDPSearcher sendBroadcast started."); // 作为一个搜索者(发送请求),无需指定一个端口,由系统自动分配 DatagramSocket datagramSocket = new DatagramSocket(); // 构建一份请求数据 String requestData = MessageCreator.buildWithPort(LISTEN_PORT); byte[] requestDataBytes = requestData.getBytes(); // 构建发送数据实体 DatagramPacket requestPacket = new DatagramPacket(requestDataBytes, requestDataBytes.length); // 设置目标地址(采用广播地址) requestPacket.setAddress(Inet4Address.getByName("255.255.255.255")); requestPacket.setPort(20000); // 发送构建好的消息 datagramSocket.send(requestPacket); System.out.println("start send data."); // 发送结束 System.out.println("UDPSearcher sendBroadcast finished."); datagramSocket.close(); } private static class Device { final int port; final String ip; final String sn; public Device(int port, String ip, String sn) { this.port = port; this.ip = ip; this.sn = sn; } @Override public String toString() { return "Device{" + "port=" + port + ", ip='" + ip + ''' + ", sn='" + sn + ''' + '}'; } } private static class Listener extends Thread{ private final int listenPort; private final CountDownLatch countDownLatch; private final List<Device> devices = new ArrayList<Device>(); private boolean done = false; private DatagramSocket ds = null; public Listener(int listenPort, CountDownLatch countDownLatch){ super(); this.listenPort = listenPort; this.countDownLatch = countDownLatch; } @Override public void run() { super.run(); // 通知已启动 countDownLatch.countDown(); // 开始实际数据监听部分 try { // 监听回送端口 ds = new DatagramSocket(listenPort); while (!done){ // 接收消息的实体 final byte[] buf = new byte[512]; DatagramPacket receivePack = new DatagramPacket(buf, buf.length); // 开始接收数据 ds.receive(receivePack); // 打印接收到的信息 String ip = receivePack.getAddress().getHostAddress(); int port = receivePack.getPort(); int dataLength = receivePack.getLength(); String data = new String(receivePack.getData(),0,dataLength); System.out.println("UDPSearcher receive form ip:" + ip + "tport:" + port + "tdata:" + data); String sn = MessageCreator.parseSN(data); if (sn != null){ Device device = new Device(port, ip ,sn); devices.add(device); } } }catch (Exception e){ }finally { close(); } System.out.println("UDPSearcher listner finished"); } private void close(){ if (ds != null){ ds.close(); ds = null; } } List<Device> getDevicesAndClose(){ done = true; close(); return devices; } } }
BIO_UDP_Provider
package self.v2; /** * @Description: * @Author: jarry */ import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.util.UUID; /** * UDP 提供者, 用于提供UDP服务 */ public class UDPProvider { public static void main(String[] args) throws IOException { String sn = UUID.randomUUID().toString(); Provider provider = new Provider(sn); provider.start(); // 读取任意字符,退出 System.in.read(); provider.exit(); } private static class Provider extends Thread { private final String sn; private boolean done = false; private DatagramSocket datagramSocket = null; public Provider(String sn){ super(); this.sn = sn; } @Override public void run() { super.run(); System.out.println("UDPProvider started."); try { // 作为一个接收者(接受请求),需要指定一个端口用来接收消息 datagramSocket = new DatagramSocket(20000); // 通过一个循环,不断监听,接收数据 while (true) { // 接收消息的实体 final byte[] buf = new byte[512]; DatagramPacket receivePack = new DatagramPacket(buf, buf.length); // 开始接收数据 datagramSocket.receive(receivePack); // 打印接收到的信息 String ip = receivePack.getAddress().getHostAddress(); int port = receivePack.getPort(); int dataLength = receivePack.getLength(); String data = new String(receivePack.getData(), 0, dataLength); System.out.println("UDPProvider receive form ip:" + ip + "tport:" + port + "tdata:" + data); // 获得目标端口 int responsePort = MessageCreator.parsePort(data); if (responsePort != -1){ // 构建一份回送数据 String responseData = MessageCreator.buildWithSN(sn); byte[] reponseDataBytes = responseData.getBytes(); // 直接根据发送者,构