资讯专栏INFORMATION COLUMN

PHP 程序员也能做的 Java 开发 30分钟使用 netty 轻松打造一个高性能 websock

kviccn / 3251人阅读

摘要:唯一的知识点就是的基础使用。可以简单的理解下面的代码就构建了一个服务器。握手完成之后的消息传递则在中处理。实际情况下,不可能那么多人同时说话广播,而是说话的人少,接受广播的人多。

硬广一波

SF 官方首页推荐《PHP进阶之路》(你又多久没有投资自己了?先看后买)

我们下面则将一些实际场景都添加进去,比如用户身份的验证,游客只能浏览不能发言,多房间(频道)的聊天。
该博客非常适合 Java 新手,非常适合作为学习 Java 的切入点,不需要考虑tomcat、spring、mybatis等。
唯一的知识点就是 maven 的基础使用。

完整的代码地址

https://github.com/zhoumengka...

├── WebSocketServer.java                启动服务器端口监听
├── WebSocketServerInitializer.java     初始化服务
├── WebSocketServerHandler.java         接管WebSocket数据连接
├── dto
│   └── Response.java                   返回给客户端数据对象
├── entity
│   └── Client.java                     每个连接到WebSocket服务的客户端对象
└── service
    ├── MessageService.java             完成发送消息
    └── RequestService.java             WebSocket初始化连接握手时的数据处理
功能设计概述 身份认证

客户端将用户 id 、进入的房间的 rid、用户 token json_encode,例如{id:1;rid:21;token:"43606811c7305ccc6abb2be116579bfd"}。然后在 base64 处理,通过参数request传到服务器,然后在服务器做 id 和 token 的验证(我的做法是 token 存放在redis string 5秒的过期时间)

房间表

使用一个Map channelGroupMap 来存放各个房间(频道),以客户端传握手时传过来的 base64 字符串中获取到定义的房间 ID,然后为该房间 ID 新建一个ChannelGroupChannelGroup 方便对该组内的所有客户端广播消息)

在 pom.xml 中引入netty 5

现在大家都有自己的包管理工具,不需要实现下载了然后放到本地lib库中,和 nodejs 的 npm, php 的 compser 一样。


    
        io.netty
        netty-all
        5.0.0.Alpha2
    
    
        com.jcraft
        jzlib
        1.1.2
    
    
        org.json
        json
        20141113
    
    
        commons-codec
        commons-codec
        1.10
    
创建服务器

这段代码需要理解吗?这是 netty 的套路,可以先记住 netty 的线程模型是一个 react 的一种变型,这里有两个nio线程组,一个是接受客户端的请求,一个是worker组专门处理客户端的请求。

可以简单的理解下面的代码就构建了一个nginx服务器。所以不用管。

package net.mengkang;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;


public final class WebSocketServer {

    private static final int PORT = 8083;

    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new WebSocketServerInitializer());

            Channel ch = b.bind(PORT).sync().channel();
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package net.mengkang;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;


public class WebSocketServerInitializer extends ChannelInitializer {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(65536));
        pipeline.addLast(new WebSocketServerCompressionHandler());
        pipeline.addLast(new WebSocketServerHandler());
    }
}
处理长连接

下面程序中最的处理在握手阶段handleHttpRequest,里面处理参数的判断,用户的认证,登录用户表的维护,直播房间表维护。详细的请大家对照代码来浏览。
握手完成之后的消息传递则在handleWebSocketFrame中处理。
整理的执行流程,大家可以对各个方法打断点予以调试,就会很清楚整个执行的脉络啦。

package net.mengkang;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import net.mengkang.dto.Response;
import net.mengkang.entity.Client;
import net.mengkang.service.MessageService;
import net.mengkang.service.RequestService;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class WebSocketServerHandler extends SimpleChannelInboundHandler {

    // websocket 服务的 uri
    private static final String WEBSOCKET_PATH = "/websocket";

    // 一个 ChannelGroup 代表一个直播频道
    private static Map channelGroupMap = new ConcurrentHashMap <>();

    // 本次请求的 code
    private static final String HTTP_REQUEST_STRING = "request";

    private Client client = null;

    private WebSocketServerHandshaker handshaker;

    @Override
    public void messageReceived(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // Handle a bad request.
        if (!req.decoderResult().isSuccess()) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }

        // Allow only GET methods.
        if (req.method() != GET) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
            return;
        }

        if ("/favicon.ico".equals(req.uri()) || ("/".equals(req.uri()))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
            return;
        }

        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
        Map> parameters = queryStringDecoder.parameters();

        if (parameters.size() == 0 || !parameters.containsKey(HTTP_REQUEST_STRING)) {
            System.err.printf(HTTP_REQUEST_STRING + "参数不可缺省");
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
            return;
        }

        client = RequestService.clientRegister(parameters.get(HTTP_REQUEST_STRING).get(0));
        if (client.getRoomId() == 0) {
            System.err.printf("房间号不可缺省");
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
            return;
        }

        // 房间列表中如果不存在则为该频道,则新增一个频道 ChannelGroup
        if (!channelGroupMap.containsKey(client.getRoomId())) {
            channelGroupMap.put(client.getRoomId(), new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
        }
        // 确定有房间号,才将客户端加入到频道中
        channelGroupMap.get(client.getRoomId()).add(ctx.channel());

        // Handshake
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, true);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            ChannelFuture channelFuture = handshaker.handshake(ctx.channel(), req);

            // 握手成功之后,业务逻辑
            if (channelFuture.isSuccess()) {
                if (client.getId() == 0) {
                    System.out.println(ctx.channel() + " 游客");
                    return;
                }

            }
        }
    }

    private void broadcast(ChannelHandlerContext ctx, WebSocketFrame frame) {

        if (client.getId() == 0) {
            Response response = new Response(1001, "没登录不能聊天哦");
            String msg = new JSONObject(response).toString();
            ctx.channel().write(new TextWebSocketFrame(msg));
            return;
        }

        String request = ((TextWebSocketFrame) frame).text();
        System.out.println(" 收到 " + ctx.channel() + request);

        Response response = MessageService.sendMessage(client, request);
        String msg = new JSONObject(response).toString();
        if (channelGroupMap.containsKey(client.getRoomId())) {
            channelGroupMap.get(client.getRoomId()).writeAndFlush(new TextWebSocketFrame(msg));
        }

    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
        }

        broadcast(ctx, frame);
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpHeaderUtil.setContentLength(res, res.content().readableBytes());
        }

        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("收到" + incoming.remoteAddress() + " 握手请求");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        if (client != null && channelGroupMap.containsKey(client.getRoomId())) {
            channelGroupMap.get(client.getRoomId()).remove(ctx.channel());
        }
    }

    private static String getWebSocketLocation(FullHttpRequest req) {
        String location = req.headers().get(HOST) + WEBSOCKET_PATH;
        return "ws://" + location;
    }
}

服务器端就写完啦,还有一些客户端对象的构想验证什么的就不一一细说了,都很简单,都在代码里。下面是客户端。

客户端程序




并发压测

同事 https://github.com/ideal 写的压测脚本
https://github.com/zhoumengka...
并测试为N个客户端,每个客户端发送10条消息,服务器配置2核4G内存,广播给所有的客户端,我们测试1500个并发的时候,负载在后期陡升。
实际情况下,不可能那么多人同时说话广播,而是说话的人少,接受广播的人多。

实际线上之后(业务远比上面的代码负载得多的多),在不限制刷帖频率大家狂轰滥炸的情况下,1500多人在线,半小时,负载一直都处于0.5以下。

最近老铁开了直播,欢迎来捧场!

PHP 进阶之路 - 亿级 pv 网站架构的技术细节与套路

PHP 进阶之路 - 亿级 pv 网站架构实战之性能压榨

PHP 进阶之路 - 后端多元化之快速切入 Java 开发

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/67319.html

相关文章

  • 三年前旧代码的重构、总结与反思

    摘要:最近在维护一个三年前的旧代码,用的是框架。单元测试和语言并发控制实际上是个蛋疼的问题,夸张一点说,当时的并不能特别轻松地实现并发,甚至不能实现并发。语言的功能之一就是自带单元测试。用语言之前,我的习惯是不写单元测试。 最近在维护一个三年前的旧代码,用的是laravel框架。 从某些方面来讲,这个代码算是比较标准为了实现在规定的时间内完成相关功能,同时程序员水平不高、经过大量优化之后,变...

    Shihira 评论0 收藏0
  • polarphp一个新的 PHP 语言运行时环境

    摘要:项目介绍是一个全新的语言的运行时环境,基于目前最新的进行打造,支持最新的语言规范,同时提供了自己的运行时标准库。同样也在的基础上进行打造,实现了一个除开发之外的一个全新的运行环境。发布核心虚拟机的镜像。整合运行时框架。 showImg(https://segmentfault.com/img/bVbnQXK); polarphp 项目介绍 polarphp是一个全新的PHP语言的运行时...

    宋华 评论0 收藏0
  • 少啰嗦!一分钟带你读懂Java的NIO和经典IO的区别

    摘要:的选择器允许单个线程监视多个输入通道。一旦执行的线程已经超过读取代码中的某个数据片段,该线程就不会在数据中向后移动通常不会。 1、引言 很多初涉网络编程的程序员,在研究Java NIO(即异步IO)和经典IO(也就是常说的阻塞式IO)的API时,很快就会发现一个问题:我什么时候应该使用经典IO,什么时候应该使用NIO? 在本文中,将尝试用简明扼要的文字,阐明Java NIO和经典IO之...

    Meils 评论0 收藏0
  • 资源集 - 收藏集 - 掘金

    摘要:行爬取顶点全网任意小说掘金之前连续多篇文章介绍客户端爬取平台,今天我们从零开始,实现爬取顶点小说网任意一本小说的功能。文件标记所有文件我的后端书架后端掘金我的后端书架月前本书架主要针对后端开发与架构。 30行js爬取顶点全网任意小说 - 掘金之前连续多篇文章介绍客户端爬取平台(dspider),今天我们从零开始,实现爬取顶点小说网任意一本小说的功能。 如果你还不知道客户端爬取,可以先看...

    马忠志 评论0 收藏0

发表评论

0条评论

kviccn

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<