背景,tio-websocket-spring-boot-starter:3.6.0.v20200315-RELEASE,websocket的底层协议的心跳PingPong,对于Tio源码来说,直接进行了返回null处理,所以导致比如鸿蒙的ws仅支持底层协议的心跳会经常一分钟左右断联
所以我们需要重新自己的WsServerAioHandler
但是:查看源码发现,该类并不是注解注入容器管理,是直接new 出来的
所以我们需要重写该启动类,屏蔽掉自带的启动配置;
需要新增:WsServerStarter启动类,WsServerAioHandler消息处理,TioWebSocketServerBootStrap,TioWebSocketServerAuthConfiguration,如下;
主要是对:customWsServerAioHandle 中的 pcode == Opcode.PIN 进行处理即可
/**
* A custom version of WsServerStarter that allows injecting a custom WsServerAioHandler. This is
* necessary because the original class hard-codes the handler creation.
*
* @author Zek
* @since 2025/7/22 on 17:11
*/
public class CustomWsServerStarter {
private WsServerConfig wsServerConfig;
private IWsMsgHandler wsMsgHandler;
private WsServerAioHandler wsServerAioHandler;
private WsServerAioListener wsServerAioListener;
private ServerTioConfig serverTioConfig;
private TioServer tioServer;
public CustomWsServerStarter(
WsServerConfig wsServerConfig,
IWsMsgHandler wsMsgHandler,
WsServerAioHandler wsServerAioHandler,
WsServerAioListener wsServerAioListener)
throws IOException {
this(
wsServerConfig,
wsMsgHandler,
wsServerAioHandler,
wsServerAioListener,
new WsTioUuid(),
Threads.getTioExecutor(),
Threads.getGroupExecutor());
}
public CustomWsServerStarter(
WsServerConfig wsServerConfig,
IWsMsgHandler wsMsgHandler,
WsServerAioHandler wsServerAioHandler,
WsServerAioListener wsServerAioListener,
TioUuid tioUuid,
SynThreadPoolExecutor tioExecutor,
ThreadPoolExecutor groupExecutor)
throws IOException {
if (tioExecutor == null) {
tioExecutor = Threads.getTioExecutor();
}
if (groupExecutor == null) {
groupExecutor = Threads.getGroupExecutor();
}
this.wsServerConfig = wsServerConfig;
this.wsMsgHandler = wsMsgHandler;
// Use the provided handler instead of creating a new one
this.wsServerAioHandler = wsServerAioHandler;
this.wsServerAioListener =
wsServerAioListener != null ? wsServerAioListener : new WsServerAioListener();
serverTioConfig =
new ServerTioConfig(
"Tio Websocket Server",
this.wsServerAioHandler,
this.wsServerAioListener,
tioExecutor,
groupExecutor);
serverTioConfig.setHeartbeatTimeout(0);
serverTioConfig.setTioUuid(tioUuid);
serverTioConfig.setReadBufferSize(1024 * 30);
tioServer = new TioServer(serverTioConfig);
}
public TioServer getTioServer() {
return tioServer;
}
public ServerTioConfig getServerTioConfig() {
return serverTioConfig;
}
public void start() throws IOException {
tioServer.start(wsServerConfig.getBindIp(), wsServerConfig.getBindPort());
}
}
/**
* 自定义消息处理类,为了解决鸿蒙在协议层发送Ping心跳的问题,原本Tio会直接返回,不会处理。
*
* @author Zek
* @since 2025/7/22 on 9:34
*/
@Slf4j
public class CustomWsServerAioHandler extends WsServerAioHandler {
private final WsServerConfig wsServerConfig;
private final IWsMsgHandler wsMsgHandler;
public CustomWsServerAioHandler(WsServerConfig wsServerConfig, IWsMsgHandler wsMsgHandler) {
super(wsServerConfig, wsMsgHandler);
this.wsServerConfig = wsServerConfig;
this.wsMsgHandler = wsMsgHandler;
}
@Override
public WsRequest decode(
ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext)
throws AioDecodeException {
return super.decode(buffer, limit, position, readableLength, channelContext);
}
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
return super.encode(packet, tioConfig, channelContext);
}
@Override
public WsServerConfig getHttpConfig() {
return super.getHttpConfig();
}
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
WsRequest wsRequest = (WsRequest) packet;
if (wsRequest.isHandShake()) { // 是握手包
WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();
HttpRequest request = wsSessionContext.getHandshakeRequest();
HttpResponse httpResponse = wsSessionContext.getHandshakeResponse();
HttpResponse r = wsMsgHandler.handshake(request, httpResponse, channelContext);
if (r == null) {
Tio.remove(channelContext, "业务层不同意握手");
return;
}
wsSessionContext.setHandshakeResponse(r);
WsResponse wsResponse = new WsResponse();
wsResponse.setHandShake(true);
Tio.send(channelContext, wsResponse);
wsSessionContext.setHandshaked(true);
wsMsgHandler.onAfterHandshaked(request, httpResponse, channelContext);
return;
}
if (!wsRequest.isWsEof()) {
return;
}
Opcode opcode = wsRequest.getWsOpcode();
if (opcode == Opcode.PING) {
// 收到PING,回复PONG
WsResponse pongResponse = WsResponse.fromBytes(wsRequest.getBody());
pongResponse.setWsOpcode(Opcode.PONG);
log.info(
"------------》success-return{}:【{}:{}-{}】",
GlobalUtil.bracketsFun(channelContext.userid),
"Pong",
"",
"");
Tio.send(channelContext, pongResponse);
return;
} else if (opcode == Opcode.PONG) {
// 收到PONG,业务可自行处理或忽略
return;
}
WsResponse wsResponse =
h(wsRequest, wsRequest.getBody(), wsRequest.getWsOpcode(), channelContext);
if (wsResponse != null) {
Tio.send(channelContext, wsResponse);
}
}
@Override
public void setHttpConfig(WsServerConfig httpConfig) {
super.setHttpConfig(httpConfig);
}
private WsResponse h(
WsRequest websocketPacket, byte[] bytes, Opcode opcode, ChannelContext channelContext)
throws Exception {
WsResponse wsResponse = null;
if (opcode == Opcode.TEXT) {
if (bytes == null || bytes.length == 0) {
Tio.remove(channelContext, "错误的websocket包,body为空");
return null;
}
String text = new String(bytes, wsServerConfig.getCharset());
Object retObj = wsMsgHandler.onText(websocketPacket, text, channelContext);
String methodName = "onText";
wsResponse = processRetObj(retObj, methodName, channelContext);
return wsResponse;
} else if (opcode == Opcode.BINARY) {
if (bytes == null || bytes.length == 0) {
Tio.remove(channelContext, "错误的websocket包,body为空");
return null;
}
Object retObj = wsMsgHandler.onBytes(websocketPacket, bytes, channelContext);
String methodName = "onBytes";
wsResponse = processRetObj(retObj, methodName, channelContext);
return wsResponse;
} else if (opcode == Opcode.PING || opcode == Opcode.PONG) {
log.debug("收到" + opcode);
return null;
} else if (opcode == Opcode.CLOSE) {
Object retObj = wsMsgHandler.onClose(websocketPacket, bytes, channelContext);
String methodName = "onClose";
wsResponse = processRetObj(retObj, methodName, channelContext);
return wsResponse;
} else {
Tio.remove(channelContext, "错误的websocket包,错误的Opcode");
return null;
}
}
private WsResponse processRetObj(Object obj, String methodName, ChannelContext channelContext)
throws Exception {
WsResponse wsResponse = null;
if (obj == null) {
return null;
} else {
if (obj instanceof String) {
String str = (String) obj;
wsResponse = WsResponse.fromText(str, wsServerConfig.getCharset());
return wsResponse;
} else if (obj instanceof byte[]) {
wsResponse = WsResponse.fromBytes((byte[]) obj);
return wsResponse;
} else if (obj instanceof WsResponse) {
return (WsResponse) obj;
} else if (obj instanceof ByteBuffer) {
byte[] bs = ((ByteBuffer) obj).array();
wsResponse = WsResponse.fromBytes(bs);
return wsResponse;
} else {
log.error(
"{} {}.{}()方法,只允许返回byte[]、ByteBuffer、WsResponse或null,但是程序返回了{}",
channelContext,
this.getClass().getName(),
methodName,
obj.getClass().getName());
return null;
}
}
}
}
/**
* A custom version of TioWebSocketServerBootstrap that uses our CustomWsServerStarter to enable
* injection of a custom WsServerAioHandler.
*
* @author Zek
* @since 2025/7/22 on 17:11
*/
public class CustomTioWebSocketServerBootstrap {
private static final Logger logger =
LoggerFactory.getLogger(CustomTioWebSocketServerBootstrap.class);
private static final String GROUP_CONTEXT_NAME = "tio-websocket-spring-boot-starter";
private boolean initialized = false;
private TioWebSocketServerProperties serverProperties;
private TioWebSocketServerClusterProperties clusterProperties;
private TioWebSocketServerSslProperties serverSslProperties;
private RedissonTioClusterTopic redissonTioClusterTopic;
private WsServerConfig wsServerConfig;
private TioClusterConfig clusterConfig;
private CustomWsServerStarter wsServerStarter;
private ServerTioConfig serverTioConfig;
private IWsMsgHandler tioWebSocketMsgHandler;
private IpStatListener ipStatListener;
private GroupListener groupListener;
private WsServerAioListener serverAioListener;
public CustomTioWebSocketServerBootstrap(
TioWebSocketServerProperties serverProperties,
TioWebSocketServerClusterProperties clusterProperties,
TioWebSocketServerSslProperties serverSslProperties,
RedissonTioClusterTopic redissonTioClusterTopic,
IWsMsgHandler tioWebSocketMsgHandler,
IpStatListener ipStatListener,
GroupListener groupListener,
WsServerAioListener serverAioListener,
TioWebSocketClassScanner tioWebSocketClassScanner) {
this.serverProperties = serverProperties;
this.clusterProperties = clusterProperties;
this.serverSslProperties = serverSslProperties;
this.redissonTioClusterTopic = redissonTioClusterTopic;
// The logic for scanning and assigning handlers is copied from the original class.
if (tioWebSocketClassScanner != null) {
if (tioWebSocketMsgHandler == null) {
tioWebSocketClassScanner.scanInstance(
IWsMsgHandler.class,
instance -> this.tioWebSocketMsgHandler = (IWsMsgHandler) instance);
} else {
this.tioWebSocketMsgHandler = tioWebSocketMsgHandler;
}
if (ipStatListener == null) {
tioWebSocketClassScanner.scanInstance(
IpStatListener.class, instance -> this.ipStatListener = (IpStatListener) instance);
} else {
this.ipStatListener = ipStatListener;
}
if (groupListener == null) {
tioWebSocketClassScanner.scanInstance(
GroupListener.class, instance -> this.groupListener = (GroupListener) instance);
} else {
this.groupListener = groupListener;
}
if (serverAioListener == null) {
tioWebSocketClassScanner.scanInstance(
WsServerAioListener.class,
instance -> this.serverAioListener = (WsServerAioListener) instance);
} else {
this.serverAioListener = serverAioListener;
}
} else {
this.tioWebSocketMsgHandler = tioWebSocketMsgHandler;
this.ipStatListener = ipStatListener;
this.groupListener = groupListener;
}
afterSetProperties(tioWebSocketClassScanner);
}
private void afterSetProperties(TioWebSocketClassScanner scanner) {
if (this.tioWebSocketMsgHandler == null) {
throw new TioServerMsgHandlerNotFoundException(
"No class implements IWsMsgHandler found. Please create a bean that implements it.");
}
if (this.ipStatListener == null) {
logger.warn(
"No bean of type IpStatListener found. You can create one if you need connection statistics.");
}
if (this.groupListener == null) {
logger.warn(
"No bean of type GroupListener found. You can create one if you need to listen to group events.");
}
if (scanner != null) {
scanner.destroy();
}
}
public ServerTioConfig getServerTioConfig() {
return serverTioConfig;
}
public void contextInitialized() {
if (initialized) {
logger.info("Tio WebSocket Server has already been initialized.");
return;
}
logger.info("Initializing Custom Tio WebSocket Server...");
try {
initTioWebSocketConfig();
initTioWebSocketServer();
initTioWebSocketServerTioConfig();
start();
initialized = true;
logger.info("Custom Tio WebSocket Server started successfully.");
} catch (Throwable e) {
logger.error("Failed to bootstrap custom Tio WebSocket server:", e);
throw new RuntimeException(e);
}
}
private void initTioWebSocketConfig() {
this.wsServerConfig = new WsServerConfig(serverProperties.getPort());
if (redissonTioClusterTopic != null && clusterProperties.isEnabled()) {
this.clusterConfig = new TioClusterConfig(redissonTioClusterTopic);
this.clusterConfig.setCluster4all(clusterProperties.isAll());
this.clusterConfig.setCluster4bsId(true);
this.clusterConfig.setCluster4channelId(clusterProperties.isChannel());
this.clusterConfig.setCluster4group(clusterProperties.isGroup());
this.clusterConfig.setCluster4ip(clusterProperties.isIp());
this.clusterConfig.setCluster4user(clusterProperties.isUser());
}
}
private void initTioWebSocketServer() throws Exception {
// Here is the key change: We instantiate our custom handler and pass it to our custom starter.
CustomWsServerAioHandler myWsServerAioHandler =
new CustomWsServerAioHandler(wsServerConfig, tioWebSocketMsgHandler);
wsServerStarter =
new CustomWsServerStarter(
wsServerConfig, tioWebSocketMsgHandler, myWsServerAioHandler, serverAioListener);
}
private void initTioWebSocketServerTioConfig() {
serverTioConfig = wsServerStarter.getServerTioConfig();
serverTioConfig.setName(GROUP_CONTEXT_NAME);
if (ipStatListener != null) {
serverTioConfig.setIpStatListener(ipStatListener);
serverTioConfig.ipStats.addDurations(serverProperties.getIpStatDurations());
}
if (serverAioListener != null) {
serverTioConfig.setServerAioListener(this.serverAioListener);
}
if (groupListener != null) {
serverTioConfig.setGroupListener(groupListener);
}
if (serverProperties.getHeartbeatTimeout() > 0) {
serverTioConfig.setHeartbeatTimeout(serverProperties.getHeartbeatTimeout());
}
if (clusterConfig != null) {
serverTioConfig.setTioClusterConfig(clusterConfig);
}
if (serverSslProperties.isEnabled()) {
try {
serverTioConfig.useSsl(
serverSslProperties.getKeyStore(),
serverSslProperties.getTrustStore(),
serverSslProperties.getPassword());
} catch (Exception e) {
logger.error("Error initializing SSL config", e);
}
}
}
private void start() throws IOException {
wsServerStarter.start();
}
}
/**
* Custom auto-configuration to replace the default TioWebSocketServerAutoConfiguration. This class
* wires our custom bootstrap process into the Spring application context, ensuring our
* MyWsServerAioHandler is used.
*
* @author Zek
* @since 2025/7/22 on 17:11
*/
@Configuration
@EnableConfigurationProperties({
TioWebSocketServerProperties.class,
TioWebSocketServerClusterProperties.class,
TioWebSocketServerRedisClusterProperties.class,
TioWebSocketServerSslProperties.class
})
public class CustomTioWebSocketServerAutoConfiguration {
private static final String CLUSTER_TOPIC_CHANNEL = "tio_ws_spring_boot_starter";
/**
* Creates our custom TioWebSocketServerBootstrap bean. All @Bean methods are static to prevent
* circular dependency issues with the configuration class itself. Dependencies are injected via
* method parameters, which is a best practice.
*/
@Bean
public static CustomTioWebSocketServerBootstrap wsServerBootstrap(
TioWebSocketServerProperties serverProperties,
TioWebSocketServerClusterProperties clusterProperties,
TioWebSocketServerSslProperties serverSslProperties,
@Autowired(required = false) RedissonTioClusterTopic redissonTioClusterTopic,
@Autowired(required = false) IWsMsgHandler tioWebSocketMsgHandler,
@Autowired(required = false) IpStatListener tioWebSocketIpStatListener,
@Autowired(required = false) GroupListener tioWebSocketGroupListener,
@Autowired(required = false) WsServerAioListener tioWebSocketServerAioListener,
@Autowired(required = false) TioWebSocketClassScanner tioWebSocketClassScanner) {
return new CustomTioWebSocketServerBootstrap(
serverProperties,
clusterProperties,
serverSslProperties,
redissonTioClusterTopic,
tioWebSocketMsgHandler,
tioWebSocketIpStatListener,
tioWebSocketGroupListener,
tioWebSocketServerAioListener,
tioWebSocketClassScanner);
}
@Bean
public static ServerTioConfig wsServerTioConfig(CustomTioWebSocketServerBootstrap bootstrap) {
return bootstrap.getServerTioConfig();
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnProperty(value = "tio.websocket.cluster.enabled", havingValue = "true")
public static RedisInitializer wsRedisInitializer(
TioWebSocketServerRedisClusterProperties redisConfig, ApplicationContext applicationContext) {
return new RedisInitializer(redisConfig, applicationContext);
}
@Bean
@ConditionalOnBean(RedisInitializer.class)
public static RedissonTioClusterTopic wsRedissonTioClusterTopic(
RedisInitializer redisInitializer) {
RedissonClient redissonClient = redisInitializer.getRedissonClient();
if (redissonClient == null) {
return null;
}
return new RedissonTioClusterTopic(CLUSTER_TOPIC_CHANNEL, redissonClient);
}
@Bean(destroyMethod = "destroy")
@ConditionalOnProperty(value = "tio.websocket.server.use-scanner", havingValue = "true")
public static TioWebSocketClassScanner tioWebSocketClassScanner(
ApplicationContext applicationContext) {
return new TioWebSocketClassScanner(applicationContext);
}
/**
* Uses SmartInitializingSingleton to start the Tio server after all beans are initialized. This
* is the correct way to trigger the server startup.
*/
@Bean
public static SmartInitializingSingleton tioWebSocketServerInitializer(
CustomTioWebSocketServerBootstrap bootstrap) {
return bootstrap::contextInitialized;
}
}