马大波 1 рік тому
батько
коміт
81abe4238e

+ 4 - 1
src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyClientInHandler.java

@@ -2,7 +2,10 @@ package com.xiaobao.gateway.protocol.proxy.forward;
 
 import com.xiaobao.gateway.protocol.dto.MediaServerDTO;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 
 public class ForwardProxyClientInHandler extends ChannelInboundHandlerAdapter {
     private final Channel inboundChannel;

+ 10 - 5
src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyServer.java

@@ -15,6 +15,9 @@ import io.netty.util.internal.logging.InternalLogger;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 正向代理(代理客户端)
@@ -32,10 +35,11 @@ public class ForwardProxyServer {
      */
     private static final String REMOTE_HOST = "127.0.0.1";
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws InterruptedException {
         List<MediaServerDTO> serverList = MediaServerDTO.getMediaServerList();
+        ExecutorService executorService = Executors.newFixedThreadPool(serverList.size());
         for (MediaServerDTO server : serverList) {
-            new Thread(() -> {
+            executorService.submit(() -> {
                 int proxyPort = server.getProxyPort();
                 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
                 EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -54,14 +58,15 @@ public class ForwardProxyServer {
                         })
                         .childOption(ChannelOption.AUTO_READ, false)
                         .bind(proxyPort).sync().channel().closeFuture().sync();
-                    log.info("启动服务端口" + proxyPort + "成功");
                 } catch (Exception e) {
-                    log.error("启动服务端口" + proxyPort + "出错", e);
+                    throw new RuntimeException(e);
                 } finally {
                     bossGroup.shutdownGracefully();
                     workerGroup.shutdownGracefully();
                 }
-            }).start();
+            });
         }
+        executorService.shutdown();
+        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
     }
 }

+ 2 - 0
src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyServerInHandler.java

@@ -5,6 +5,7 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
 public class ForwardProxyServerInHandler extends ChannelInboundHandlerAdapter {
     private final String remoteHost;
@@ -29,6 +30,7 @@ public class ForwardProxyServerInHandler extends ChannelInboundHandlerAdapter {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline pipeline = ch.pipeline();
+                    pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 2, 0));
                     pipeline.addLast(new ForwardProxyClientOutHandler((short) mediaServer.getProxyPort()));
                     pipeline.addLast(new ForwardProxyClientInHandler(inboundChannel, mediaServer));
                 }

+ 2 - 0
src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyServer.java

@@ -8,6 +8,7 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 
@@ -34,6 +35,7 @@ public class ReverseProxyServer {
                     public void initChannel(SocketChannel ch) {
                         ChannelPipeline pipeline = ch.pipeline();
                         pipeline.addLast(new LoggingHandler(LogLevel.INFO));
+                        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 2, 0));
                         pipeline.addLast(new ReverseProxyServerInHandler());
                     }
                 })