Browse Source

fix: 调整代理

马大波 1 năm trước cách đây
mục cha
commit
f18d55b08d
18 tập tin đã thay đổi với 387 bổ sung347 xóa
  1. 41 0
      src/main/java/com/xiaobao/gateway/protocol/dto/MediaServerDTO.java
  2. 0 38
      src/main/java/com/xiaobao/gateway/protocol/proxy/codec/ProxyInDecoder.java
  3. 75 0
      src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyClientInHandler.java
  4. 0 24
      src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyClientInitializer.java
  5. 3 3
      src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyClientOutHandler.java
  6. 0 42
      src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyClientToTargetHandler.java
  7. 43 19
      src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyServer.java
  8. 14 10
      src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyServerInHandler.java
  9. 0 24
      src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyServerInitializer.java
  10. 0 78
      src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyClientHandler.java
  11. 60 0
      src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyClientInHandler.java
  12. 0 20
      src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyClientInitializer.java
  13. 0 45
      src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyClientToTargetHandler.java
  14. 50 0
      src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyHolder.java
  15. 15 11
      src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyServer.java
  16. 86 0
      src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyServerInHandler.java
  17. 0 28
      src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyServerInitializer.java
  18. 0 5
      src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyServerOutEncoder.java

+ 41 - 0
src/main/java/com/xiaobao/gateway/protocol/dto/MediaServerDTO.java

@@ -0,0 +1,41 @@
+package com.xiaobao.gateway.protocol.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class MediaServerDTO {
+    /**
+     * 服务器地址
+     */
+    private String remoteHost;
+    /**
+     * 服务器端口
+     */
+    private int remotePort;
+    /**
+     * 代理端口(正向代理对外提供端口)
+     * 最大支持2字节端口 2^15 - 1 = 32767
+     */
+    private int proxyPort;
+
+    /**
+     * 获取媒体服务器列表
+     *
+     * @return 服务器列表
+     */
+    public static List<MediaServerDTO> getMediaServerList() {
+        List<MediaServerDTO> serverList = new ArrayList<>();
+        serverList.add(new MediaServerDTO("192.168.66.73", 1935, 8000)); // 本地服务器
+        serverList.add(new MediaServerDTO("39.101.185.102", 1935, 8001)); // 公网服务器
+        return serverList;
+    }
+}

+ 0 - 38
src/main/java/com/xiaobao/gateway/protocol/proxy/codec/ProxyInDecoder.java

@@ -1,38 +0,0 @@
-package com.xiaobao.gateway.protocol.proxy.codec;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-import io.netty.util.internal.logging.InternalLogger;
-import io.netty.util.internal.logging.InternalLoggerFactory;
-
-import java.util.List;
-
-public class ProxyInDecoder extends ByteToMessageDecoder {
-
-    private static final InternalLogger log = InternalLoggerFactory.getInstance(ProxyInDecoder.class);
-
-    @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
-        if (in.readableBytes() < 6) {
-            return;
-        }
-        // 标记读索引位置
-        in.markReaderIndex();
-        // 读取消息长度(4 字节)
-        int len = in.readInt();
-        // 读取消息类型(2 字节)
-        short identifier = in.readShort();
-        log.info("自定义标识: {}", identifier);
-        // 检查是否有足够的字节来读取消息体
-        if (in.readableBytes() < len) {
-            // 重置读索引
-            in.resetReaderIndex();
-            return;
-        }
-        // 读取消息体
-        ByteBuf body = ctx.alloc().buffer(len);
-        in.readBytes(body);
-        out.add(body);
-    }
-}

+ 75 - 0
src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyClientInHandler.java

@@ -0,0 +1,75 @@
+package com.xiaobao.gateway.protocol.proxy.forward;
+
+import com.xiaobao.gateway.protocol.dto.MediaServerDTO;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+
+public class ForwardProxyClientInHandler extends ChannelInboundHandlerAdapter {
+    private final Channel inboundChannel;
+    private final MediaServerDTO mediaServer;
+
+    public ForwardProxyClientInHandler(Channel inboundChannel, MediaServerDTO mediaServer) {
+        this.inboundChannel = inboundChannel;
+        this.mediaServer = mediaServer;
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+        if (!inboundChannel.isActive()) {
+            ForwardProxyServerInHandler.closeOnFlush(ctx.channel());
+        } else {
+            ctx.read();
+        }
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+        if (msg instanceof ByteBuf) {
+            ByteBuf in = (ByteBuf) msg;
+            if (in.readableBytes() < 6) {
+                return;
+            }
+            // 标记读索引位置
+            in.markReaderIndex();
+            // 读取消息长度(4 字节)
+            int len = in.readInt();
+            // 读取消息类型(2 字节)
+            short identifier = in.readShort();
+            if (mediaServer == null || mediaServer.getProxyPort() != identifier) {
+                // 没有匹配到直接丢弃
+                return;
+            }
+            // 检查是否有足够的字节来读取消息体
+            if (in.readableBytes() < len) {
+                // 重置读索引
+                in.resetReaderIndex();
+                return;
+            }
+            // 读取消息体
+            ByteBuf body = ctx.alloc().buffer(len);
+            in.readBytes(body);
+            if (inboundChannel.isActive()) {
+                inboundChannel.writeAndFlush(body).addListener((ChannelFutureListener) future -> {
+                    if (future.isSuccess()) {
+                        ctx.channel().read();
+                    } else {
+                        future.channel().close();
+                    }
+                });
+            }
+        } else {
+            ctx.fireChannelRead(msg);
+        }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        ForwardProxyServerInHandler.closeOnFlush(inboundChannel);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        cause.printStackTrace();
+        ForwardProxyServerInHandler.closeOnFlush(ctx.channel());
+    }
+}

+ 0 - 24
src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyClientInitializer.java

@@ -1,24 +0,0 @@
-package com.xiaobao.gateway.protocol.proxy.forward;
-
-import com.xiaobao.gateway.protocol.proxy.codec.ProxyInDecoder;
-import com.xiaobao.gateway.protocol.proxy.codec.ProxyOutEncoder;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-
-public class ForwardProxyClientInitializer extends ChannelInitializer<SocketChannel> {
-    private final Channel inboundChannel;
-
-    public ForwardProxyClientInitializer(Channel inboundChannel) {
-        this.inboundChannel = inboundChannel;
-    }
-
-    @Override
-    protected void initChannel(SocketChannel ch) throws Exception {
-        ChannelPipeline pipeline = ch.pipeline();
-        pipeline.addLast(new ProxyOutEncoder((short) 1234));
-        pipeline.addLast(new ProxyInDecoder());
-        pipeline.addLast(new ForwardProxyClientToTargetHandler(inboundChannel));
-    }
-}

+ 3 - 3
src/main/java/com/xiaobao/gateway/protocol/proxy/codec/ProxyOutEncoder.java → src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyClientOutHandler.java

@@ -1,4 +1,4 @@
-package com.xiaobao.gateway.protocol.proxy.codec;
+package com.xiaobao.gateway.protocol.proxy.forward;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
@@ -9,13 +9,13 @@ import lombok.Setter;
 
 @Getter
 @Setter
-public class ProxyOutEncoder extends ChannelOutboundHandlerAdapter {
+public class ForwardProxyClientOutHandler extends ChannelOutboundHandlerAdapter {
     /**
      * 自定义标识
      */
     private short identifier;
 
-    public ProxyOutEncoder(short identifier) {
+    public ForwardProxyClientOutHandler(short identifier) {
         this.identifier = identifier;
     }
 

+ 0 - 42
src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyClientToTargetHandler.java

@@ -1,42 +0,0 @@
-package com.xiaobao.gateway.protocol.proxy.forward;
-
-import io.netty.channel.*;
-
-public class ForwardProxyClientToTargetHandler extends ChannelInboundHandlerAdapter {
-    private final Channel inboundChannel;
-
-    public ForwardProxyClientToTargetHandler(Channel inboundChannel) {
-        this.inboundChannel = inboundChannel;
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) {
-        if (!inboundChannel.isActive()) {
-            ForwardProxyClientHandler.closeOnFlush(ctx.channel());
-        } else {
-            ctx.read();
-        }
-    }
-
-    @Override
-    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
-        inboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
-            if (future.isSuccess()) {
-                ctx.channel().read();
-            } else {
-                future.channel().close();
-            }
-        });
-    }
-
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
-        ForwardProxyClientHandler.closeOnFlush(inboundChannel);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        cause.printStackTrace();
-        ForwardProxyClientHandler.closeOnFlush(ctx.channel());
-    }
-}

+ 43 - 19
src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyServer.java

@@ -1,15 +1,21 @@
 package com.xiaobao.gateway.protocol.proxy.forward;
 
+import com.xiaobao.gateway.protocol.dto.MediaServerDTO;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.util.internal.logging.InternalLogger;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 
+import java.util.List;
+
 /**
  * 正向代理(代理客户端)
  */
@@ -17,27 +23,45 @@ public class ForwardProxyServer {
 
     private static final InternalLogger log = InternalLoggerFactory.getInstance(ForwardProxyServer.class);
 
-    private static final int LOCAL_PORT = 1935;
+    /**
+     * 反向代理网关端口
+     */
+    private static final int REMOTE_PORT = 9000;
+    /**
+     * 反向代理网关地址
+     */
     private static final String REMOTE_HOST = "127.0.0.1";
-    private static final int REMOTE_PORT = 1936;
-
-    public static void main(String[] args) throws Exception {
-        log.info("Forward Proxying *:{} to {}:{}", LOCAL_PORT, REMOTE_HOST, REMOTE_PORT);
 
-        // Configure the bootstrap.
-        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
-        EventLoopGroup workerGroup = new NioEventLoopGroup();
-        try {
-            ServerBootstrap b = new ServerBootstrap();
-            b.group(bossGroup, workerGroup)
-                .channel(NioServerSocketChannel.class)
-                .handler(new LoggingHandler(LogLevel.INFO))
-                .childHandler(new ForwardProxyServerInitializer(REMOTE_HOST, REMOTE_PORT))
-                .childOption(ChannelOption.AUTO_READ, false)
-                .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
-        } finally {
-            bossGroup.shutdownGracefully();
-            workerGroup.shutdownGracefully();
+    public static void main(String[] args) {
+        List<MediaServerDTO> serverList = MediaServerDTO.getMediaServerList();
+        for (MediaServerDTO server : serverList) {
+            new Thread(() -> {
+                int proxyPort = server.getProxyPort();
+                EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+                EventLoopGroup workerGroup = new NioEventLoopGroup();
+                try {
+                    ServerBootstrap b = new ServerBootstrap();
+                    b.group(bossGroup, workerGroup)
+                        .channel(NioServerSocketChannel.class)
+                        .handler(new LoggingHandler(LogLevel.INFO))
+                        .childHandler(new ChannelInitializer<SocketChannel>() {
+                            @Override
+                            public void initChannel(SocketChannel ch) {
+                                ChannelPipeline pipeline = ch.pipeline();
+                                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
+                                pipeline.addLast(new ForwardProxyServerInHandler(REMOTE_HOST, REMOTE_PORT, server));
+                            }
+                        })
+                        .childOption(ChannelOption.AUTO_READ, false)
+                        .bind(proxyPort).sync().channel().closeFuture().sync();
+                    log.info("启动服务端口" + proxyPort + "成功");
+                } catch (Exception e) {
+                    log.error("启动服务端口" + proxyPort + "出错", e);
+                } finally {
+                    bossGroup.shutdownGracefully();
+                    workerGroup.shutdownGracefully();
+                }
+            }).start();
         }
     }
 }

+ 14 - 10
src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyClientHandler.java → src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyServerInHandler.java

@@ -1,40 +1,45 @@
 package com.xiaobao.gateway.protocol.proxy.forward;
 
+import com.xiaobao.gateway.protocol.dto.MediaServerDTO;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
+import io.netty.channel.socket.SocketChannel;
 
-public class ForwardProxyClientHandler extends ChannelInboundHandlerAdapter {
+public class ForwardProxyServerInHandler extends ChannelInboundHandlerAdapter {
     private final String remoteHost;
     private final int remotePort;
-
-    // As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
-    // the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
+    private final MediaServerDTO mediaServer;
     private Channel outboundChannel;
 
-    public ForwardProxyClientHandler(String remoteHost, int remotePort) {
+    public ForwardProxyServerInHandler(String remoteHost, int remotePort, MediaServerDTO mediaServer) {
         this.remoteHost = remoteHost;
         this.remotePort = remotePort;
+        this.mediaServer = mediaServer;
     }
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) {
         final Channel inboundChannel = ctx.channel();
 
-        // Start the connection attempt.
         Bootstrap b = new Bootstrap();
         b.group(inboundChannel.eventLoop())
             .channel(ctx.channel().getClass())
-            .handler(new ForwardProxyClientInitializer(inboundChannel))
+            .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel ch) throws Exception {
+                    ChannelPipeline pipeline = ch.pipeline();
+                    pipeline.addLast(new ForwardProxyClientOutHandler((short) mediaServer.getProxyPort()));
+                    pipeline.addLast(new ForwardProxyClientInHandler(inboundChannel, mediaServer));
+                }
+            })
             .option(ChannelOption.AUTO_READ, false);
         ChannelFuture f = b.connect(remoteHost, remotePort);
         outboundChannel = f.channel();
         f.addListener((ChannelFutureListener) future -> {
             if (future.isSuccess()) {
-                // connection complete start to read first data
                 inboundChannel.read();
             } else {
-                // Close the connection if the connection attempt has failed.
                 inboundChannel.close();
             }
         });
@@ -45,7 +50,6 @@ public class ForwardProxyClientHandler extends ChannelInboundHandlerAdapter {
         if (outboundChannel.isActive()) {
             outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
                 if (future.isSuccess()) {
-                    // was able to flush out data, start to read the next chunk
                     ctx.channel().read();
                 } else {
                     future.channel().close();

+ 0 - 24
src/main/java/com/xiaobao/gateway/protocol/proxy/forward/ForwardProxyServerInitializer.java

@@ -1,24 +0,0 @@
-package com.xiaobao.gateway.protocol.proxy.forward;
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-
-public class ForwardProxyServerInitializer extends ChannelInitializer<SocketChannel> {
-    private final String remoteHost;
-    private final int remotePort;
-
-    public ForwardProxyServerInitializer(String remoteHost, int remotePort) {
-        this.remoteHost = remoteHost;
-        this.remotePort = remotePort;
-    }
-
-    @Override
-    public void initChannel(SocketChannel ch) {
-        ChannelPipeline pipeline = ch.pipeline();
-        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
-        pipeline.addLast(new ForwardProxyClientHandler(remoteHost, remotePort));
-    }
-}

+ 0 - 78
src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyClientHandler.java

@@ -1,78 +0,0 @@
-package com.xiaobao.gateway.protocol.proxy.reverse;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.*;
-
-public class ReverseProxyClientHandler extends ChannelInboundHandlerAdapter {
-    private final String remoteHost;
-    private final int remotePort;
-
-    // As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
-    // the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
-    private Channel outboundChannel;
-
-    public ReverseProxyClientHandler(String remoteHost, int remotePort) {
-        this.remoteHost = remoteHost;
-        this.remotePort = remotePort;
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) {
-        final Channel inboundChannel = ctx.channel();
-
-        // Start the connection attempt.
-        Bootstrap b = new Bootstrap();
-        b.group(inboundChannel.eventLoop())
-            .channel(ctx.channel().getClass())
-            .handler(new ReverseProxyClientInitializer(inboundChannel))
-            .option(ChannelOption.AUTO_READ, false);
-        ChannelFuture f = b.connect(remoteHost, remotePort);
-        outboundChannel = f.channel();
-        f.addListener((ChannelFutureListener) future -> {
-            if (future.isSuccess()) {
-                // connection complete start to read first data
-                inboundChannel.read();
-            } else {
-                // Close the connection if the connection attempt has failed.
-                inboundChannel.close();
-            }
-        });
-    }
-
-    @Override
-    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
-        if (outboundChannel.isActive()) {
-            outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
-                if (future.isSuccess()) {
-                    // was able to flush out data, start to read the next chunk
-                    ctx.channel().read();
-                } else {
-                    future.channel().close();
-                }
-            });
-        }
-    }
-
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
-        if (outboundChannel != null) {
-            closeOnFlush(outboundChannel);
-        }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        cause.printStackTrace();
-        closeOnFlush(ctx.channel());
-    }
-
-    /**
-     * Closes the specified channel after all queued write requests are flushed.
-     */
-    static void closeOnFlush(Channel ch) {
-        if (ch.isActive()) {
-            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
-        }
-    }
-}

+ 60 - 0
src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyClientInHandler.java

@@ -0,0 +1,60 @@
+package com.xiaobao.gateway.protocol.proxy.reverse;
+
+import com.xiaobao.gateway.protocol.dto.MediaServerDTO;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class ReverseProxyClientInHandler extends ChannelInboundHandlerAdapter {
+    private final Channel inboundChannel;
+    private final MediaServerDTO mediaServer;
+
+    public ReverseProxyClientInHandler(Channel inboundChannel, MediaServerDTO mediaServer) {
+        this.inboundChannel = inboundChannel;
+        this.mediaServer = mediaServer;
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+        if (!inboundChannel.isActive()) {
+            ReverseProxyServerInHandler.closeOnFlush(ctx.channel());
+        } else {
+            ctx.read();
+        }
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+        Object packet = msg;
+        if (msg instanceof ByteBuf) {
+            int identifier = mediaServer.getProxyPort();
+            ByteBuf out = ctx.alloc().ioBuffer();
+            ByteBuf buf = (ByteBuf) msg;
+            int len = buf.readableBytes();
+            out.writeInt(len);
+            out.writeShort(identifier);
+            out.writeBytes(buf);
+            packet = out;
+        }
+        inboundChannel.writeAndFlush(packet).addListener((ChannelFutureListener) future -> {
+            if (future.isSuccess()) {
+                ctx.channel().read();
+            } else {
+                future.channel().close();
+            }
+        });
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        ReverseProxyServerInHandler.closeOnFlush(inboundChannel);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        cause.printStackTrace();
+        ReverseProxyServerInHandler.closeOnFlush(ctx.channel());
+    }
+}

+ 0 - 20
src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyClientInitializer.java

@@ -1,20 +0,0 @@
-package com.xiaobao.gateway.protocol.proxy.reverse;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-
-public class ReverseProxyClientInitializer extends ChannelInitializer<SocketChannel> {
-    private final Channel inboundChannel;
-
-    public ReverseProxyClientInitializer(Channel inboundChannel) {
-        this.inboundChannel = inboundChannel;
-    }
-
-    @Override
-    protected void initChannel(SocketChannel ch) throws Exception {
-        ChannelPipeline pipeline = ch.pipeline();
-        pipeline.addLast(new ReverseProxyClientToTargetHandler(inboundChannel));
-    }
-}

+ 0 - 45
src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyClientToTargetHandler.java

@@ -1,45 +0,0 @@
-package com.xiaobao.gateway.protocol.proxy.reverse;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-
-public class ReverseProxyClientToTargetHandler extends ChannelInboundHandlerAdapter {
-    private final Channel inboundChannel;
-
-    public ReverseProxyClientToTargetHandler(Channel inboundChannel) {
-        this.inboundChannel = inboundChannel;
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) {
-        if (!inboundChannel.isActive()) {
-            ReverseProxyClientHandler.closeOnFlush(ctx.channel());
-        } else {
-            ctx.read();
-        }
-    }
-
-    @Override
-    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
-        inboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
-            if (future.isSuccess()) {
-                ctx.channel().read();
-            } else {
-                future.channel().close();
-            }
-        });
-    }
-
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
-        ReverseProxyClientHandler.closeOnFlush(inboundChannel);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        cause.printStackTrace();
-        ReverseProxyClientHandler.closeOnFlush(ctx.channel());
-    }
-}

+ 50 - 0
src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyHolder.java

@@ -0,0 +1,50 @@
+package com.xiaobao.gateway.protocol.proxy.reverse;
+
+import com.xiaobao.gateway.protocol.dto.MediaServerDTO;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.socket.SocketChannel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Getter
+@NoArgsConstructor
+public class ReverseProxyHolder {
+    private final Map<Integer, Channel> connections = new ConcurrentHashMap<>();
+
+    public ReverseProxyHolder(ChannelHandlerContext ctx) {
+        Channel inboundChannel = ctx.channel();
+        List<MediaServerDTO> serverList = MediaServerDTO.getMediaServerList();
+        for (MediaServerDTO server : serverList) {
+            Bootstrap b = new Bootstrap();
+            b.group(inboundChannel.eventLoop())
+                .channel(inboundChannel.getClass())
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    protected void initChannel(SocketChannel ch) throws Exception {
+                        ChannelPipeline pipeline = ch.pipeline();
+                        pipeline.addLast(new ReverseProxyClientInHandler(inboundChannel, server));
+                    }
+                })
+                .option(ChannelOption.AUTO_READ, false);
+            ChannelFuture f = b.connect(server.getRemoteHost(), server.getRemotePort());
+            Channel outboundChannel = f.channel();
+            connections.put(server.getProxyPort(), outboundChannel);
+            f.addListener((ChannelFutureListener) future -> {
+                if (future.isSuccess()) {
+                    inboundChannel.read();
+                } else {
+                    inboundChannel.close();
+                }
+            });
+        }
+    }
+
+    public Channel getChannel(int identifier) {
+        return connections.get(identifier);
+    }
+}

+ 15 - 11
src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyServer.java

@@ -1,30 +1,27 @@
 package com.xiaobao.gateway.protocol.proxy.reverse;
 
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.internal.logging.InternalLogger;
-import io.netty.util.internal.logging.InternalLoggerFactory;
 
 /**
  * 反向代理(代理服务端)
  */
 public class ReverseProxyServer {
 
-    private static final InternalLogger log = InternalLoggerFactory.getInstance(ReverseProxyServer.class);
-
-    private static final int LOCAL_PORT = 1936;
-    private static final String REMOTE_HOST = "192.168.66.73";
-    private static final int REMOTE_PORT = 1935;
+    /**
+     * 本地端口
+     */
+    private static final int LOCAL_PORT = 9000;
 
     public static void main(String[] args) throws Exception {
-        log.info("Reverse Proxying *:{} to {}:{}", LOCAL_PORT, REMOTE_HOST, REMOTE_PORT);
-
-        // Configure the bootstrap.
         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
         EventLoopGroup workerGroup = new NioEventLoopGroup();
         try {
@@ -32,7 +29,14 @@ public class ReverseProxyServer {
             b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .handler(new LoggingHandler(LogLevel.INFO))
-                .childHandler(new ReverseProxyServerInitializer(REMOTE_HOST, REMOTE_PORT))
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(SocketChannel ch) {
+                        ChannelPipeline pipeline = ch.pipeline();
+                        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
+                        pipeline.addLast(new ReverseProxyServerInHandler());
+                    }
+                })
                 .childOption(ChannelOption.AUTO_READ, false)
                 .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
         } finally {

+ 86 - 0
src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyServerInHandler.java

@@ -0,0 +1,86 @@
+package com.xiaobao.gateway.protocol.proxy.reverse;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import java.util.Map;
+
+public class ReverseProxyServerInHandler extends ChannelInboundHandlerAdapter {
+    private ReverseProxyHolder reverseProxyHolder;
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+        this.reverseProxyHolder = new ReverseProxyHolder(ctx);
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+        if (msg instanceof ByteBuf && reverseProxyHolder != null) {
+            ByteBuf in = (ByteBuf) msg;
+            if (in.readableBytes() < 6) {
+                return;
+            }
+            // 标记读索引位置
+            in.markReaderIndex();
+            // 读取消息长度(4 字节)
+            int len = in.readInt();
+            // 读取消息类型(2 字节)
+            short identifier = in.readShort();
+            // 获取路由通道
+            Channel outboundChannel = reverseProxyHolder.getChannel(identifier);
+            if (outboundChannel == null) {
+                ctx.fireChannelRead(msg);
+            } else {
+                // 检查是否有足够的字节来读取消息体
+                if (in.readableBytes() < len) {
+                    // 重置读索引
+                    in.resetReaderIndex();
+                    return;
+                }
+                // 读取消息体
+                ByteBuf body = ctx.alloc().buffer(len);
+                in.readBytes(body);
+                if (outboundChannel.isActive()) {
+                    outboundChannel.writeAndFlush(body).addListener((ChannelFutureListener) future -> {
+                        if (future.isSuccess()) {
+                            ctx.channel().read();
+                        } else {
+                            future.channel().close();
+                        }
+                    });
+                }
+            }
+        } else {
+            ctx.fireChannelRead(msg);
+        }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        if (reverseProxyHolder != null) {
+            Map<Integer, Channel> connections = reverseProxyHolder.getConnections();
+            for (Channel outboundChannel : connections.values()) {
+                closeOnFlush(outboundChannel);
+            }
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        cause.printStackTrace();
+        closeOnFlush(ctx.channel());
+    }
+
+    /**
+     * Closes the specified channel after all queued write requests are flushed.
+     */
+    static void closeOnFlush(Channel ch) {
+        if (ch.isActive()) {
+            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+        }
+    }
+}

+ 0 - 28
src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyServerInitializer.java

@@ -1,28 +0,0 @@
-package com.xiaobao.gateway.protocol.proxy.reverse;
-
-import com.xiaobao.gateway.protocol.proxy.codec.ProxyInDecoder;
-import com.xiaobao.gateway.protocol.proxy.codec.ProxyOutEncoder;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-
-public class ReverseProxyServerInitializer extends ChannelInitializer<SocketChannel> {
-    private final String remoteHost;
-    private final int remotePort;
-
-    public ReverseProxyServerInitializer(String remoteHost, int remotePort) {
-        this.remoteHost = remoteHost;
-        this.remotePort = remotePort;
-    }
-
-    @Override
-    public void initChannel(SocketChannel ch) {
-        ChannelPipeline pipeline = ch.pipeline();
-        pipeline.addLast(new ProxyOutEncoder((short) 5678));
-        pipeline.addLast(new ProxyInDecoder());
-        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
-        pipeline.addLast(new ReverseProxyClientHandler(remoteHost, remotePort));
-    }
-}

+ 0 - 5
src/main/java/com/xiaobao/gateway/protocol/proxy/reverse/ReverseProxyServerOutEncoder.java

@@ -1,5 +0,0 @@
-package com.xiaobao.gateway.protocol.proxy.reverse;
-
-public class ReverseProxyServerOutEncoder
-{
-}