Conan
Conan
发布于 2025-07-25 / 3 阅读
0
0

tio-websocket-spring-boot-starter 支持ws底层协议PingPong

背景,tio-websocket-spring-boot-starter:3.6.0.v20200315-RELEASE,websocket的底层协议的心跳PingPong,对于Tio源码来说,直接进行了返回null处理,所以导致比如鸿蒙的ws仅支持底层协议的心跳会经常一分钟左右断联

所以我们需要重新自己的WsServerAioHandler
但是:查看源码发现,该类并不是注解注入容器管理,是直接new 出来的

所以我们需要重写该启动类,屏蔽掉自带的启动配置;
需要新增:WsServerStarter启动类,WsServerAioHandler消息处理,TioWebSocketServerBootStrap,TioWebSocketServerAuthConfiguration,如下;
主要是对:customWsServerAioHandle 中的 pcode == Opcode.PIN 进行处理即可

/**
 * A custom version of WsServerStarter that allows injecting a custom WsServerAioHandler. This is
 * necessary because the original class hard-codes the handler creation.
 *
 * @author Zek
 * @since 2025/7/22 on 17:11
 */
public class CustomWsServerStarter {
  private WsServerConfig wsServerConfig;
  private IWsMsgHandler wsMsgHandler;
  private WsServerAioHandler wsServerAioHandler;
  private WsServerAioListener wsServerAioListener;
  private ServerTioConfig serverTioConfig;
  private TioServer tioServer;

  public CustomWsServerStarter(
      WsServerConfig wsServerConfig,
      IWsMsgHandler wsMsgHandler,
      WsServerAioHandler wsServerAioHandler,
      WsServerAioListener wsServerAioListener)
      throws IOException {
    this(
        wsServerConfig,
        wsMsgHandler,
        wsServerAioHandler,
        wsServerAioListener,
        new WsTioUuid(),
        Threads.getTioExecutor(),
        Threads.getGroupExecutor());
  }

  public CustomWsServerStarter(
      WsServerConfig wsServerConfig,
      IWsMsgHandler wsMsgHandler,
      WsServerAioHandler wsServerAioHandler,
      WsServerAioListener wsServerAioListener,
      TioUuid tioUuid,
      SynThreadPoolExecutor tioExecutor,
      ThreadPoolExecutor groupExecutor)
      throws IOException {
    if (tioExecutor == null) {
      tioExecutor = Threads.getTioExecutor();
    }
    if (groupExecutor == null) {
      groupExecutor = Threads.getGroupExecutor();
    }

    this.wsServerConfig = wsServerConfig;
    this.wsMsgHandler = wsMsgHandler;
    // Use the provided handler instead of creating a new one
    this.wsServerAioHandler = wsServerAioHandler;
    this.wsServerAioListener =
        wsServerAioListener != null ? wsServerAioListener : new WsServerAioListener();
    serverTioConfig =
        new ServerTioConfig(
            "Tio Websocket Server",
            this.wsServerAioHandler,
            this.wsServerAioListener,
            tioExecutor,
            groupExecutor);
    serverTioConfig.setHeartbeatTimeout(0);
    serverTioConfig.setTioUuid(tioUuid);
    serverTioConfig.setReadBufferSize(1024 * 30);
    tioServer = new TioServer(serverTioConfig);
  }

  public TioServer getTioServer() {
    return tioServer;
  }

  public ServerTioConfig getServerTioConfig() {
    return serverTioConfig;
  }

  public void start() throws IOException {
    tioServer.start(wsServerConfig.getBindIp(), wsServerConfig.getBindPort());
  }
}

/**
 * 自定义消息处理类,为了解决鸿蒙在协议层发送Ping心跳的问题,原本Tio会直接返回,不会处理。
 *
 * @author Zek
 * @since 2025/7/22 on 9:34
 */
@Slf4j
public class CustomWsServerAioHandler extends WsServerAioHandler {

  private final WsServerConfig wsServerConfig;
  private final IWsMsgHandler wsMsgHandler;

  public CustomWsServerAioHandler(WsServerConfig wsServerConfig, IWsMsgHandler wsMsgHandler) {
    super(wsServerConfig, wsMsgHandler);
    this.wsServerConfig = wsServerConfig;
    this.wsMsgHandler = wsMsgHandler;
  }

  @Override
  public WsRequest decode(
      ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext)
      throws AioDecodeException {
    return super.decode(buffer, limit, position, readableLength, channelContext);
  }

  @Override
  public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
    return super.encode(packet, tioConfig, channelContext);
  }

  @Override
  public WsServerConfig getHttpConfig() {
    return super.getHttpConfig();
  }

  @Override
  public void handler(Packet packet, ChannelContext channelContext) throws Exception {

    WsRequest wsRequest = (WsRequest) packet;

    if (wsRequest.isHandShake()) { // 是握手包
      WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();
      HttpRequest request = wsSessionContext.getHandshakeRequest();
      HttpResponse httpResponse = wsSessionContext.getHandshakeResponse();
      HttpResponse r = wsMsgHandler.handshake(request, httpResponse, channelContext);
      if (r == null) {
        Tio.remove(channelContext, "业务层不同意握手");
        return;
      }
      wsSessionContext.setHandshakeResponse(r);

      WsResponse wsResponse = new WsResponse();
      wsResponse.setHandShake(true);
      Tio.send(channelContext, wsResponse);
      wsSessionContext.setHandshaked(true);

      wsMsgHandler.onAfterHandshaked(request, httpResponse, channelContext);
      return;
    }

    if (!wsRequest.isWsEof()) {
      return;
    }
    Opcode opcode = wsRequest.getWsOpcode();
    if (opcode == Opcode.PING) {
      // 收到PING,回复PONG
      WsResponse pongResponse = WsResponse.fromBytes(wsRequest.getBody());
      pongResponse.setWsOpcode(Opcode.PONG);
      log.info(
          "------------》success-return{}:【{}:{}-{}】",
          GlobalUtil.bracketsFun(channelContext.userid),
          "Pong",
          "",
          "");
      Tio.send(channelContext, pongResponse);
      return;
    } else if (opcode == Opcode.PONG) {
      // 收到PONG,业务可自行处理或忽略
      return;
    }
    WsResponse wsResponse =
        h(wsRequest, wsRequest.getBody(), wsRequest.getWsOpcode(), channelContext);
    if (wsResponse != null) {
      Tio.send(channelContext, wsResponse);
    }
  }

  @Override
  public void setHttpConfig(WsServerConfig httpConfig) {
    super.setHttpConfig(httpConfig);
  }

  private WsResponse h(
      WsRequest websocketPacket, byte[] bytes, Opcode opcode, ChannelContext channelContext)
      throws Exception {
    WsResponse wsResponse = null;
    if (opcode == Opcode.TEXT) {
      if (bytes == null || bytes.length == 0) {
        Tio.remove(channelContext, "错误的websocket包,body为空");
        return null;
      }
      String text = new String(bytes, wsServerConfig.getCharset());
      Object retObj = wsMsgHandler.onText(websocketPacket, text, channelContext);
      String methodName = "onText";
      wsResponse = processRetObj(retObj, methodName, channelContext);
      return wsResponse;
    } else if (opcode == Opcode.BINARY) {
      if (bytes == null || bytes.length == 0) {
        Tio.remove(channelContext, "错误的websocket包,body为空");
        return null;
      }
      Object retObj = wsMsgHandler.onBytes(websocketPacket, bytes, channelContext);
      String methodName = "onBytes";
      wsResponse = processRetObj(retObj, methodName, channelContext);
      return wsResponse;
    } else if (opcode == Opcode.PING || opcode == Opcode.PONG) {
      log.debug("收到" + opcode);
      return null;
    } else if (opcode == Opcode.CLOSE) {
      Object retObj = wsMsgHandler.onClose(websocketPacket, bytes, channelContext);
      String methodName = "onClose";
      wsResponse = processRetObj(retObj, methodName, channelContext);
      return wsResponse;
    } else {
      Tio.remove(channelContext, "错误的websocket包,错误的Opcode");
      return null;
    }
  }

  private WsResponse processRetObj(Object obj, String methodName, ChannelContext channelContext)
      throws Exception {
    WsResponse wsResponse = null;
    if (obj == null) {
      return null;
    } else {
      if (obj instanceof String) {
        String str = (String) obj;
        wsResponse = WsResponse.fromText(str, wsServerConfig.getCharset());
        return wsResponse;
      } else if (obj instanceof byte[]) {
        wsResponse = WsResponse.fromBytes((byte[]) obj);
        return wsResponse;
      } else if (obj instanceof WsResponse) {
        return (WsResponse) obj;
      } else if (obj instanceof ByteBuffer) {
        byte[] bs = ((ByteBuffer) obj).array();
        wsResponse = WsResponse.fromBytes(bs);
        return wsResponse;
      } else {
        log.error(
            "{} {}.{}()方法,只允许返回byte[]、ByteBuffer、WsResponse或null,但是程序返回了{}",
            channelContext,
            this.getClass().getName(),
            methodName,
            obj.getClass().getName());
        return null;
      }
    }
  }
}

/**
 * A custom version of TioWebSocketServerBootstrap that uses our CustomWsServerStarter to enable
 * injection of a custom WsServerAioHandler.
 *
 * @author Zek
 * @since 2025/7/22 on 17:11
 */
public class CustomTioWebSocketServerBootstrap {

  private static final Logger logger =
      LoggerFactory.getLogger(CustomTioWebSocketServerBootstrap.class);
  private static final String GROUP_CONTEXT_NAME = "tio-websocket-spring-boot-starter";

  private boolean initialized = false;

  private TioWebSocketServerProperties serverProperties;
  private TioWebSocketServerClusterProperties clusterProperties;
  private TioWebSocketServerSslProperties serverSslProperties;
  private RedissonTioClusterTopic redissonTioClusterTopic;
  private WsServerConfig wsServerConfig;
  private TioClusterConfig clusterConfig;
  private CustomWsServerStarter wsServerStarter;
  private ServerTioConfig serverTioConfig;
  private IWsMsgHandler tioWebSocketMsgHandler;
  private IpStatListener ipStatListener;
  private GroupListener groupListener;
  private WsServerAioListener serverAioListener;

  public CustomTioWebSocketServerBootstrap(
      TioWebSocketServerProperties serverProperties,
      TioWebSocketServerClusterProperties clusterProperties,
      TioWebSocketServerSslProperties serverSslProperties,
      RedissonTioClusterTopic redissonTioClusterTopic,
      IWsMsgHandler tioWebSocketMsgHandler,
      IpStatListener ipStatListener,
      GroupListener groupListener,
      WsServerAioListener serverAioListener,
      TioWebSocketClassScanner tioWebSocketClassScanner) {
    this.serverProperties = serverProperties;
    this.clusterProperties = clusterProperties;
    this.serverSslProperties = serverSslProperties;
    this.redissonTioClusterTopic = redissonTioClusterTopic;

    // The logic for scanning and assigning handlers is copied from the original class.
    if (tioWebSocketClassScanner != null) {
      if (tioWebSocketMsgHandler == null) {
        tioWebSocketClassScanner.scanInstance(
            IWsMsgHandler.class,
            instance -> this.tioWebSocketMsgHandler = (IWsMsgHandler) instance);
      } else {
        this.tioWebSocketMsgHandler = tioWebSocketMsgHandler;
      }

      if (ipStatListener == null) {
        tioWebSocketClassScanner.scanInstance(
            IpStatListener.class, instance -> this.ipStatListener = (IpStatListener) instance);
      } else {
        this.ipStatListener = ipStatListener;
      }
      if (groupListener == null) {
        tioWebSocketClassScanner.scanInstance(
            GroupListener.class, instance -> this.groupListener = (GroupListener) instance);
      } else {
        this.groupListener = groupListener;
      }
      if (serverAioListener == null) {
        tioWebSocketClassScanner.scanInstance(
            WsServerAioListener.class,
            instance -> this.serverAioListener = (WsServerAioListener) instance);
      } else {
        this.serverAioListener = serverAioListener;
      }
    } else {
      this.tioWebSocketMsgHandler = tioWebSocketMsgHandler;
      this.ipStatListener = ipStatListener;
      this.groupListener = groupListener;
    }
    afterSetProperties(tioWebSocketClassScanner);
  }

  private void afterSetProperties(TioWebSocketClassScanner scanner) {
    if (this.tioWebSocketMsgHandler == null) {
      throw new TioServerMsgHandlerNotFoundException(
          "No class implements IWsMsgHandler found. Please create a bean that implements it.");
    }
    if (this.ipStatListener == null) {
      logger.warn(
          "No bean of type IpStatListener found. You can create one if you need connection statistics.");
    }
    if (this.groupListener == null) {
      logger.warn(
          "No bean of type GroupListener found. You can create one if you need to listen to group events.");
    }
    if (scanner != null) {
      scanner.destroy();
    }
  }

  public ServerTioConfig getServerTioConfig() {
    return serverTioConfig;
  }

  public void contextInitialized() {
    if (initialized) {
      logger.info("Tio WebSocket Server has already been initialized.");
      return;
    }
    logger.info("Initializing Custom Tio WebSocket Server...");
    try {
      initTioWebSocketConfig();
      initTioWebSocketServer();
      initTioWebSocketServerTioConfig();
      start();
      initialized = true;
      logger.info("Custom Tio WebSocket Server started successfully.");
    } catch (Throwable e) {
      logger.error("Failed to bootstrap custom Tio WebSocket server:", e);
      throw new RuntimeException(e);
    }
  }

  private void initTioWebSocketConfig() {
    this.wsServerConfig = new WsServerConfig(serverProperties.getPort());
    if (redissonTioClusterTopic != null && clusterProperties.isEnabled()) {
      this.clusterConfig = new TioClusterConfig(redissonTioClusterTopic);
      this.clusterConfig.setCluster4all(clusterProperties.isAll());
      this.clusterConfig.setCluster4bsId(true);
      this.clusterConfig.setCluster4channelId(clusterProperties.isChannel());
      this.clusterConfig.setCluster4group(clusterProperties.isGroup());
      this.clusterConfig.setCluster4ip(clusterProperties.isIp());
      this.clusterConfig.setCluster4user(clusterProperties.isUser());
    }
  }

  private void initTioWebSocketServer() throws Exception {
    // Here is the key change: We instantiate our custom handler and pass it to our custom starter.
    CustomWsServerAioHandler myWsServerAioHandler =
        new CustomWsServerAioHandler(wsServerConfig, tioWebSocketMsgHandler);
    wsServerStarter =
        new CustomWsServerStarter(
            wsServerConfig, tioWebSocketMsgHandler, myWsServerAioHandler, serverAioListener);
  }

  private void initTioWebSocketServerTioConfig() {
    serverTioConfig = wsServerStarter.getServerTioConfig();
    serverTioConfig.setName(GROUP_CONTEXT_NAME);
    if (ipStatListener != null) {
      serverTioConfig.setIpStatListener(ipStatListener);
      serverTioConfig.ipStats.addDurations(serverProperties.getIpStatDurations());
    }
    if (serverAioListener != null) {
      serverTioConfig.setServerAioListener(this.serverAioListener);
    }
    if (groupListener != null) {
      serverTioConfig.setGroupListener(groupListener);
    }
    if (serverProperties.getHeartbeatTimeout() > 0) {
      serverTioConfig.setHeartbeatTimeout(serverProperties.getHeartbeatTimeout());
    }
    if (clusterConfig != null) {
      serverTioConfig.setTioClusterConfig(clusterConfig);
    }
    if (serverSslProperties.isEnabled()) {
      try {
        serverTioConfig.useSsl(
            serverSslProperties.getKeyStore(),
            serverSslProperties.getTrustStore(),
            serverSslProperties.getPassword());
      } catch (Exception e) {
        logger.error("Error initializing SSL config", e);
      }
    }
  }

  private void start() throws IOException {
    wsServerStarter.start();
  }
}
/**
 * Custom auto-configuration to replace the default TioWebSocketServerAutoConfiguration. This class
 * wires our custom bootstrap process into the Spring application context, ensuring our
 * MyWsServerAioHandler is used.
 *
 * @author Zek
 * @since 2025/7/22 on 17:11
 */
@Configuration
@EnableConfigurationProperties({
  TioWebSocketServerProperties.class,
  TioWebSocketServerClusterProperties.class,
  TioWebSocketServerRedisClusterProperties.class,
  TioWebSocketServerSslProperties.class
})
public class CustomTioWebSocketServerAutoConfiguration {

  private static final String CLUSTER_TOPIC_CHANNEL = "tio_ws_spring_boot_starter";

  /**
   * Creates our custom TioWebSocketServerBootstrap bean. All @Bean methods are static to prevent
   * circular dependency issues with the configuration class itself. Dependencies are injected via
   * method parameters, which is a best practice.
   */
  @Bean
  public static CustomTioWebSocketServerBootstrap wsServerBootstrap(
      TioWebSocketServerProperties serverProperties,
      TioWebSocketServerClusterProperties clusterProperties,
      TioWebSocketServerSslProperties serverSslProperties,
      @Autowired(required = false) RedissonTioClusterTopic redissonTioClusterTopic,
      @Autowired(required = false) IWsMsgHandler tioWebSocketMsgHandler,
      @Autowired(required = false) IpStatListener tioWebSocketIpStatListener,
      @Autowired(required = false) GroupListener tioWebSocketGroupListener,
      @Autowired(required = false) WsServerAioListener tioWebSocketServerAioListener,
      @Autowired(required = false) TioWebSocketClassScanner tioWebSocketClassScanner) {
    return new CustomTioWebSocketServerBootstrap(
        serverProperties,
        clusterProperties,
        serverSslProperties,
        redissonTioClusterTopic,
        tioWebSocketMsgHandler,
        tioWebSocketIpStatListener,
        tioWebSocketGroupListener,
        tioWebSocketServerAioListener,
        tioWebSocketClassScanner);
  }

  @Bean
  public static ServerTioConfig wsServerTioConfig(CustomTioWebSocketServerBootstrap bootstrap) {
    return bootstrap.getServerTioConfig();
  }

  @Bean(destroyMethod = "shutdown")
  @ConditionalOnProperty(value = "tio.websocket.cluster.enabled", havingValue = "true")
  public static RedisInitializer wsRedisInitializer(
      TioWebSocketServerRedisClusterProperties redisConfig, ApplicationContext applicationContext) {
    return new RedisInitializer(redisConfig, applicationContext);
  }

  @Bean
  @ConditionalOnBean(RedisInitializer.class)
  public static RedissonTioClusterTopic wsRedissonTioClusterTopic(
      RedisInitializer redisInitializer) {
    RedissonClient redissonClient = redisInitializer.getRedissonClient();
    if (redissonClient == null) {
      return null;
    }
    return new RedissonTioClusterTopic(CLUSTER_TOPIC_CHANNEL, redissonClient);
  }

  @Bean(destroyMethod = "destroy")
  @ConditionalOnProperty(value = "tio.websocket.server.use-scanner", havingValue = "true")
  public static TioWebSocketClassScanner tioWebSocketClassScanner(
      ApplicationContext applicationContext) {
    return new TioWebSocketClassScanner(applicationContext);
  }

  /**
   * Uses SmartInitializingSingleton to start the Tio server after all beans are initialized. This
   * is the correct way to trigger the server startup.
   */
  @Bean
  public static SmartInitializingSingleton tioWebSocketServerInitializer(
      CustomTioWebSocketServerBootstrap bootstrap) {
    return bootstrap::contextInitialized;
  }
}


评论