服务器端使用Java的数据报套接字和数据报包来实现,部署在具有公网IP地址的服务器上。一个客户端同样使用Java的数据报套接字和数据报包,绑定端口22222,运行在Windows 11 PC上,另一个客户端使用C语言实现,绑定端口10105,运行在Ubuntu上。 整体实现流程如下: 两个客户端首先使用udp向服务器发送信息,服务器保存两个客户端的信息。然后其中一个客户端将发送连接请求,服务器将在两个客户端之间交换信息。之后,两个客户端根据收到的信息相互发送和接收udp。 两个客户端都使用移动热点提供的网络,并测试Windows 11上的NAT是全锥型NAT,而Ubuntu上的NAT是端口限制锥型NAT。
2、Ubuntu下客户端使用Wireshark抓包时,发现发送给对方的udp源端口和目标端口也不正确。后来使用C语言的htons函数改变字节序后,抓包中显示的源端口和目标端口都正常了。 3.但是Windows 11抓包中显示的端口不正确(我知道NAT转换后,端口号会改变),但是抓包中显示的源端口与公共服务器上显示的端口不同。
这是我的服务器的代码:
package com.lulu.socket;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.io.InputStream;
import java.net.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author zhanghy
*/
@Slf4j
@Component
@Data
public class SocketServer {
@Value("${socket.poolSize}")
private int threadPoolSize; // 线程池的大小
@Value("${socket.tcpPort}")
private int tcpPortNumber; // socket绑定的端口号
@Value("${socket.udpPort}")
private int udpPortNumber; // udp绑定的端口号
private static ExecutorService executorService; // 线程池
private static ConcurrentHashMap<String, DeviceInfo> luluMap; // 存储活跃的lulu机信息
private static ConcurrentHashMap<String, DeviceInfo> phoneMap; // 存储活跃的手机信息
@PostConstruct
public void init() {
// 在应用启动时执行此方法
executorService = Executors.newFixedThreadPool(threadPoolSize); // 线程池
luluMap = new ConcurrentHashMap<>(); // 存储活跃的lulu机信息
phoneMap = new ConcurrentHashMap<>(); // 存储活跃的手机信息
new Thread(this::startTcpSocketServer).start(); // 创建线程来运行tcpSocket
new Thread(this::startUdpSocketServer).start(); // 创建线程来运行udpSocket
}
private void startTcpSocketServer() {
// 创建socket server端
try (ServerSocket serverSocket = new ServerSocket(tcpPortNumber)) {
System.out.println("Socket服务器启动,监听端口:" + tcpPortNumber);
while (true) {
Socket clientSocket = serverSocket.accept(); // 从连接请求队列中取出一个客户的连接请求,没有请求则会阻塞
log.info("接收到客户的socket请求: " + clientSocket.getInetAddress().getHostAddress(), clientSocket.getPort());
// 线程池分配线程进行处理
executorService.submit(new TcpClientHandler(clientSocket));
}
} catch (IOException e) {
log.error("tcpSocket初始化失败:", e);
e.printStackTrace();
}
}
private void startUdpSocketServer() {
// 初始化udp的socket
try (DatagramSocket datagramSocket = new DatagramSocket(udpPortNumber)) {
System.out.println("DataGramSocket服务器启动,监听端口:" + udpPortNumber);
byte[] receiveData;
while (true) {
receiveData = new byte[1024];
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
datagramSocket.receive(receivePacket);
// 线程池分配线程进行处理
executorService.submit(new UdpPunchTask(datagramSocket, receivePacket));
}
} catch (SocketException e) {
log.error("udpSocket初始化失败:", e);
e.printStackTrace();
} catch (IOException e) {
log.error("udp接受数据失败:", e);
e.printStackTrace();
}
}
private static class TcpClientHandler implements Runnable {
private final Socket clientSocket;
public TcpClientHandler(Socket socket) {
this.clientSocket = socket;
}
@Override
public void run() {
try {
InputStream inputStream = clientSocket.getInputStream();
byte[] buffer = new byte[1024];
int bytesReadLength = inputStream.read(buffer);
String receivedData = new String(buffer, 0, bytesReadLength);
processInput(receivedData);
} catch (IOException e) {
e.printStackTrace();
}
}
private void processInput(String inputLine) {
// 在这里处理客户端发送的消息
// 这里可以解析设备信息,并更新deviceMap
// 例如,假设设备信息格式为 "DeviceID:IP:Port"
// TODO: 判断是设备还是手机
log.info("收到数据:" + inputLine);
JSONObject jsonObject = JSONObject.parseObject(inputLine); // 将string转换成JSON格式
log.info("收到数据: " + jsonObject);
if(jsonObject.getString("ID")!=null) {
// 是噜噜机
log.info("收到噜噜机的tcp连接");
String state = jsonObject.getString("state");
if("on".equals(state)) {
// 噜噜机设备在线
String deviceId = jsonObject.getString("ID");
luluMap.put(deviceId, new DeviceInfo(clientSocket.getInetAddress().getHostAddress(), clientSocket.getPort()));
log.info(jsonObject.getString("connect"));
log.info(jsonObject.getString("user_ip"));
}
}
}
}
/**
* @author zhanghy
* @date 2023/10/30
* 处理每一个udp数据包
*/
private static class UdpPunchTask implements Runnable {
private final DatagramPacket datagramPacket;
private final DatagramSocket datagramSocket;
private UdpPunchTask(DatagramSocket datagramSocket, DatagramPacket datagramPacket) {
this.datagramSocket = datagramSocket;
this.datagramPacket = datagramPacket;
}
@Override
public void run() {
// 客户端的端口
int port = datagramPacket.getPort();
// 客户端的IP
String hostAddress = datagramPacket.getAddress().getHostAddress();
System.out.println("Received UDP packet from " + hostAddress + ":" + port);
// 获得客户端发送过来的信息
byte[] data = datagramPacket.getData();
String jsonStr = new String(data); // 将字节数组转换为字符串
log.info("收到的数据:" + jsonStr);
JSONObject jsonObject = JSONObject.parseObject(jsonStr); // 将string转换成JSON格式
log.info("收到的数据:" + jsonStr);
if (jsonObject.getString("connect_ID") != null) {
// 收到手机要连接的噜噜机ID,此时先检测双方设备是否在线,如果都在线,则交换双方信息
DeviceInfo deviceInfo = luluMap.getOrDefault(jsonObject.getString("connect_ID"), null);
JSONObject responseToPhoneJson = new JSONObject();
if (deviceInfo != null) {
// 如果该ID的噜噜机活跃
responseToPhoneJson.put("command", "P2Pconnect");
responseToPhoneJson.put("ID", jsonObject.getString("connect_ID"));
responseToPhoneJson.put("device_ip", deviceInfo.getIp());
responseToPhoneJson.put("device_port", deviceInfo.getPort());
// 给手机端发送数据
udpSendResponse(port, hostAddress, responseToPhoneJson);
JSONObject responseToLuluJson = new JSONObject();
responseToLuluJson.put("command", "P2Pconnect");
responseToLuluJson.put("user_name", jsonObject.getString("user_name"));
responseToLuluJson.put("device_ip", hostAddress);
responseToLuluJson.put("device_port", port);
// 给噜噜机发送数据
udpSendResponse(deviceInfo.getPort(), deviceInfo.getIp(), responseToLuluJson);
log.info("手机的connect请求,且双方在线");
log.info(String.valueOf(responseToPhoneJson), responseToLuluJson);
} else {
// 如果该ID的噜噜机不活跃
responseToPhoneJson.put("command", "P2Pconnect");
responseToPhoneJson.put("ID", jsonObject.getString("connect_ID"));
responseToPhoneJson.put("device_ip", null);
responseToPhoneJson.put("device_port", null);
// 发送数据
udpSendResponse(port, hostAddress, responseToPhoneJson);
log.info("手机的connect请求,但噜噜机不在线");
log.info(String.valueOf(responseToPhoneJson));
}
} else if (jsonObject.getString("ID") != null) {
// 噜噜机第一次发送的UDP,要返回{"ID":xxxxxxx}的回应
// 在luluMap中记录/更新活跃的噜噜机信息
luluMap.put(jsonObject.getString("ID"), new DeviceInfo(hostAddress, port));
JSONObject responseJson = new JSONObject();
// 给噜噜机返回信息
responseJson.put("recive", "ID:" + jsonObject.getString("ID"));
udpSendResponse(port, hostAddress, responseJson);
log.info("收到噜噜机的ID");
} else if (jsonObject.getString("user_name") != null) {
// 手机第一次发送的UDP,要返回{"recive":"user_name:xxxx"}的回应
// 在phoneMap中记录/更新活跃的手机信息
phoneMap.put(jsonObject.getString("user_name"), new DeviceInfo(hostAddress, port));
JSONObject responseJson = new JSONObject();
// 给手机返回信息
responseJson.put("recive", "user_name:" + jsonObject.getString("user_name"));
udpSendResponse(port, hostAddress, responseJson);
log.info("收到手机的username");
}
}
/**
* udp返回数据
*
* @param port 客户端的端口
* @param hostAddress 客户端的IP
* @param responseJson 返回的数据信息
*/
private void udpSendResponse(int port, String hostAddress, JSONObject responseJson) {
byte[] bs = responseJson.toString().getBytes();
DatagramPacket response = new DatagramPacket(bs, bs.length, new InetSocketAddress(hostAddress, port));
try {
// socket发回数据给客户端
datagramSocket.send(response);
} catch (IOException e) {
log.error("udpSocket发送数据失败:", e);
throw new RuntimeException(e);
}
}
}
private static class DeviceInfo {
private final String ip;
private final int port;
public DeviceInfo(String ip, int port) {
this.ip = ip;
this.port = port;
}
public String getIp() {
return ip;
}
public int getPort() {
return port;
}
}
}
这是我的Windows客户端的代码:
@Test
public void udpTestPhone() {
DatagramSocket datagramSocket;
DatagramPacket datagramPacket;
try {
datagramSocket = new DatagramSocket(22222);
String message1 = "{\"user_name\":\"ashuai\"}";
byte[] bytes1 = message1.getBytes();
datagramPacket = new DatagramPacket(bytes1, bytes1.length, new InetSocketAddress("8.142.10.225", 9111));
datagramSocket.send(datagramPacket);
byte[] bs2 = new byte[1024];
datagramPacket = new DatagramPacket(bs2, bs2.length);
// 接收第一次的服务器返回
datagramSocket.receive(datagramPacket);
byte[] data = datagramPacket.getData();
String jsonStr = new String(data);
log.info(jsonStr);
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
boolean flag = false;
JSONObject jsonObject = null;
while(!flag) {
// 手机端发起连接请求
String message2 = "{\"connect_ID\":202310240001, \"user_name\": \"ashuai\"}";
byte[] bytes1 = message2.getBytes();
datagramPacket = new DatagramPacket(bytes1, bytes1.length, new InetSocketAddress("8.142.10.225", 9111));
datagramSocket.send(datagramPacket);
byte[] receive = new byte[1024];
datagramPacket = new DatagramPacket(receive, receive.length);
// 返回噜噜机的信息
datagramSocket.receive(datagramPacket);
byte[] tmp1 = datagramPacket.getData();
String remoteNet = new String(tmp1);
log.info("remoteNet: " + remoteNet);
jsonObject = JSONObject.parseObject(remoteNet);
// 噜噜机不在线
if(!jsonObject.containsKey("device_port") || !jsonObject.containsKey("device_ip")) {
continue;
}
flag = true;
}
int port = jsonObject.getIntValue("device_port");
String hostIp = jsonObject.getString("device_ip");
String message = "{\"phone_test\":\"ashuai\"}";
byte[] helloMessage = message.getBytes();
datagramSocket.setSoTimeout(5000);
datagramPacket = new DatagramPacket(helloMessage, helloMessage.length, new InetSocketAddress(hostIp, port));
log.info("发送一次");
log.info("对方的IP: " + hostIp + " 端口: " + port);
datagramSocket.send(datagramPacket);
// datagramSocket.send(datagramPacket);
// datagramSocket.send(datagramPacket);
log.info("开始接收");
byte[] receive1 = new byte[1024];
DatagramPacket datagramPacket1 = new DatagramPacket(receive1, receive1.length);
boolean receiveFlag = true;
while (receiveFlag) {
try {
datagramSocket.receive(datagramPacket1);
byte[] luluMessage = datagramPacket1.getData();
log.info("收到噜噜信息:" + new String(luluMessage));
receiveFlag = false;
} catch (SocketTimeoutException e) {
// resend
log.info("超时阻塞,重新发");
// datagramSocket.send(datagramPacket);
// datagramSocket.send(datagramPacket);
datagramSocket.send(datagramPacket);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}