资讯专栏INFORMATION COLUMN

消息推送异常重发需要注意的点(上篇)

terro / 3550人阅读

摘要:无证连接进行异常记录并关闭连接。离线消息检测到上线立即推送这是消息推送需要实现的基本功能之一了,详见代码。主要功能协助进行初始化,心跳包检测,断线自动重连消息推送的第二种方式在下篇中再编写

消息重发中需要注意的问题

由于最近工作中接触了比较多关闭消息推送以及异常重发机制的问题,终于得空总结一下经验

目前接触的消息推送分为两种

主动推送:一般为websocket建立长连接实现,此处网上多有各种实现方式。下面贴出本人结合实际应用场景使用的长连接方式。

websocket服务端代码

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;

import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

@ServerEndpoint(value = "/websocket/{id}")
@Component
@Slf4j
public class WebSocket {
    // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;

    // concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static ConcurrentHashMap webSocketSet = new ConcurrentHashMap<>();

    // 保存允许建立连接的id
    private static List idList = Lists.newArrayList();
    private String id = "";

    /**
     * 这里使用AutoWired注入的bean会出现无法持续保存而出现null的情况。
     * 具体原因暂时没有深究,如果有需要时,可以再init初始化方法中手动将临时的beanTmp类存入static常量中即可正常使用该bean类。
     * @Autowired
     * private RedisCacheUtil redisTmp;
     * private static RedisCacheUtil redis;
     *
     */
    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    
    public void closeConn(String appId) {
        // 关闭连接
        try {
            WebSocket socket = webSocketSet.get(appId);
            if (null != socket) {
                if (socket.session.isOpen()) {
                    socket.session.close();
                }
            }
        } catch (IOException e) {
            System.out.println("IO异常");
            e.printStackTrace();
        }
        idList.remove(appId);
    }
    
    /**
     * 连接/注册时去重
     */
    public void conn(String appId) {
        // 去重
        if (!idList.contains(appId)) {
            idList.add(appId);
        }
    }
    
    /**
     * 获取注册在websocket进行连接的id
     */
    public static List getIdList() {
        return idList;
    }
    
    /**
     * 初始化方法
     * @author caoting
     * @date 2019年2月13日
     */
    @PostConstruct
    public void init() {
        try {
            /**
             * TODO 这里的设计是在项目启动时从DB或者缓存中获取注册了允许建立连接的id
             * 然后将获取到的id存入内存--idList
             * // 从数据库获取idList
             * List ids = wsIdsServiceTmp.selectList(null);
             */
            // TODO 初始化时将刚注入的对象进行静态保存
            // redis = redisTmp;
            
        } catch (Exception e) {
            // TODO 项目启动错误信息
        }
    }
    
    /**
     * 连接启动时查询是否有滞留的新邮件提醒
     * @param id
     * 
     * @author caoting
     * @throws IOException 
     * @date 2019年2月28日
     */
    private void selectOfflineMail(String id) throws IOException {
        // 查询缓存中是否存在离线邮件消息
        Jedis jedis = redis.getConnection();
        try {
            List mails = jedis.lrange(Constant.MAIL_OFFLINE+id, 0, -1);
            if (CommomUtil.isNotEmpty(mails)) {
                for (String mailuuid : mails) {
                    String mail = jedis.get(mailuuid);
                    if (StringUtils.isNotEmpty(mail))
                        sendToUser(Constant.MESSAGE_MAIL + mail, id);
                    Thread.sleep(1000);
                }
                // 发送完成从缓存中移除
                jedis.del(Constant.MAIL_OFFLINE+id);
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            jedis.close();
        }
    }
    
    /**
     * 连接建立成功调用的方法
     * @param id 
     */
    @OnOpen
    public void onOpen(@PathParam(value = "id") String id, Session session) {
        try {
            // 注:ws-admin是管理员内部使用通道  不受监控  谨慎使用
            if (!id.contains(Constant.WS_ADMIN)) {
                this.session = session;
                this.id = id;//接收到发送消息的人员编号
                // 验证id是否在允许
                if (idList.contains(id)) {
                    // 判断是否已存在相同id
                        WebSocket socket = webSocketSet.get(id);
                        if (socket == null) {
                            webSocketSet.put(id, this);     //加入set中
                            addOnlineCount(); // 在线数加1
                            
                            this.sendMessage("Hello:::" + id);
                            System.out.println("用户"+id+"加入!当前在线人数为" + getOnlineCount());
                            
                            // 检查是否存在离线推送消息
                            selectOfflineMail(id);
                        } else {
                            this.sendMessage(Constant.MESSAGE_ERROR+"连接id重复--连接即将关闭");
                            this.session.close();
                        }
                } else {
                    // 查询数据库中是否存在数据
                    WsIds wsIds = wsIdsService.selectByAppId(id);
                    if ( null != wsIds ) {
                        idList.add(id);
                        webSocketSet.put(id, this);     //加入set中
    
                        addOnlineCount(); // 在线数加1
                        this.sendMessage("Hello:::" + id);
                        log.debug("用户"+id+"加入!当前在线人数为" + getOnlineCount());

                        // 检查是否存在离线推送消息
                        selectOfflineMail(id);
                        
                    } else {
                        // 关闭
                        this.sendMessage(Constant.MESSAGE_ERROR+"暂无连接权限,连接即将关闭,请确认连接申请是否过期!");
                        this.session.close();
                        log.warn("有异常应用尝试与服务器进行长连接  使用id为:"+id);
                    }
                }
            } else {
                this.session = session;
                this.id = id;//接收到发送消息的人员编号
                
                webSocketSet.put(id, this);     //加入set中
                addOnlineCount(); // 在线数加1
                
                this.sendMessage("Hello:::" + id);
                log.debug("用户"+id+"加入!当前在线人数为" + getOnlineCount());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this.id); // 从set中删除
        subOnlineCount(); // 在线数减1
        log.debug("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message
     *            客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.debug("来自客户端的消息:" + message);
        // TODO 收到客户端消息后的操作
    }

    /**
     * 发生错误时调用 
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.debug("发生错误");
        error.printStackTrace();
    }

    public void sendMessage(String message) throws IOException {
        this.session.getAsyncRemote().sendText(message);
    }

     /**
     * 发送信息给指定ID用户,如果用户不在线则返回不在线信息给自己
     * @param message
     * @param sendUserId
     * @throws IOException
     */
    public Boolean sendToUser(String message, String sendUserId) throws IOException {
        Boolean flag = true;
        WebSocket socket = webSocketSet.get(sendUserId);
        if (socket != null) {
            try {
                if (socket.session.isOpen()) {
                    socket.sendMessage(message);
                } else {
                    flag = false;
                }
            } catch (Exception e) {
                flag = false;
                e.printStackTrace();
            }
        } else {
            flag = false;
            log.warn("【" + sendUserId + "】 该用户不在线");
        }
        return flag;
    }

    /**
     * 群发自定义消息
     */
    public void sendToAll(String message) throws IOException {
        for (String key : webSocketSet.keySet()) {
            try {
                WebSocket socket = webSocketSet.get(key);
                if (socket.session.isOpen()) {
                    socket.sendMessage(message);
                }
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        if (WebSocket.onlineCount > 0)
            WebSocket.onlineCount--;
    }
}
这里使用的是较为原始的websocket连接方式,事实上springboot已经融合了websocket,工作关系没有空暂未研究。记录一下有空了再去写写demo。这个socket服务端主要实现了:1. 连接控制,建立连接时验证id的合法性。无证连接进行异常记录并关闭连接。2. 离线消息检测到上线立即推送 这是消息推送需要实现的基本功能之一了,详见代码。3. 统计在线人数 依旧是基本功能

下面是websocket服务端配置类WebSocketServerConfig

import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.session.StandardSessionFacade;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;

@Configuration
@Slf4j
public class WebSocketServerConfig extends Configurator {

    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
        /* 如果没有监听器,那么这里获取到的HttpSession是null */
        StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession();
        if (ssf != null) {
            HttpSession httpSession = (HttpSession) request.getHttpSession();
            // 关键操作
            sec.getUserProperties().put("sessionId", httpSession.getId());
            log.debug("获取到的SessionID:" + httpSession.getId());
        }
    }

    /**
     * 如果使用独立的servlet容器,而不是直接使用springboot的内置容器
     * 就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。
     * 即:生产环境中在独立的tomcat运行时请注释掉这个bean
     * 
     * @return
     * 
     * @author caoting
     * @date 2019年2月20日
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
这里其实有个坑,就是上述代码中的bean类  serverEndpointExporter,开发环境如果不是配置独立的tomcat运行的话是需要注入的,但是生产环境下在独立的tomcat容器运行时是需要注释掉的,否则会报错。

很重要的session监听器 RequestListener

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.servlet.ServletRequestEvent;
import javax.servlet.ServletRequestListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;

/**
 * 监听器类:主要任务是用ServletRequest将我们的HttpSession携带过去
 * 此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到Spring容器中进行管理,相当于注册监听
 */
@Component
@Slf4j
public class RequestListener implements ServletRequestListener {
    @Override
    public void requestInitialized(ServletRequestEvent sre) {
        // 将所有request请求都携带上httpSession
        HttpSession httpSession = ((HttpServletRequest) sre.getServletRequest()).getSession();
        log.debug("将所有request请求都携带上httpSession " + httpSession.getId());
    }

    public RequestListener() {
    }

    @Override
    public void requestDestroyed(ServletRequestEvent arg0) {
    }
}
以上就是一个websocket服务端需要的所有配置和类

websocket客户端代码

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import redis.clients.jedis.Jedis;

import javax.websocket.*;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * @author caoting
 * @date 2018年9月27日
 */
@Slf4j
@ClientEndpoint
public class MailWebSocketClient {

    private static RedisCacheUtil redis;
    protected void setRedis(RedisCacheUtil redisTmp) {
        redis = redisTmp;
    }

    /**
     * @author caoting
     * @date 2019年3月11日
     */
    public static void doSomething() {
        // TODO 由于这个类没有写初始化方法,但是有些初始化操作必须完成,
        // 因此在socket配置类中调用此方法可以完成一些需要初始化注入的操作
    }

    private Session session;

    @OnOpen
    public void open(Session session) {
        log.info("连接开启...");
        this.session = session;
    }

    @OnMessage
    public void onMessage(String message) {
        log.info("来自服务端的消息: " + message);
        // TODO 对消息进行过滤判断处理
        // 不做过多操作影响性能 直接交给异步任务处理--这个办法还是比较low的现在springboot有更好的解决办法@Async 有空再记录下多线程异步处理任务调度的相关代码。
        ExecutorService executor = Executors.newSingleThreadExecutor();
        FutureTask future = new FutureTask(new Callable() {// 使用Callable接口作为构造参数
            public Boolean call() {
                return pushMsg(message);
            }
            });
        executor.execute(future);
        Boolean res = CommomUtil.timeOutTask(future, executor, 600);
        if (res != null && res)
            log.info("操作成功");
        else
            log.info("操作失败");        
    }

    /**
     * @author caoting
     * @date 2019年3月11日
     */
    private Boolean pushMailMsg(String message) {
        Boolean flag = true;
        // 推送消息
        ReceiverRes resObj = new ReceiverRes();
        try {
            resObj = restTemplate.httpPostMediaTypeJson(url, ReceiverRes.class, message);
        } catch (Exception e) {
            // 这里异常一般是http接口服务宕机了,所以放进缓存在对方上线时进行重新推送
            resObj.setCode(500);
            log.error(e.getMessage(), e);
        } 
                
        // ====推送完成后的后续异常检查与数据重发工作  这里是一个redis任务调度  处理失败任务的典型案例 看不懂就删掉    
        Integer code = resObj.getCode();
        if (code == 500) {
            // 发送失败存进redis缓存  按照约定好的状态码进行判断
            jedis.lpush(Constant.PUSH_ERROR, mailMapJson);
        } else {
            // 发送成功以后查询以前出错的数据进行重新推送。--这种办法只适合消息很频繁的,毕竟不频繁的等下次发消息又不知道是何时了,因此需要采用别的方法
            while (true) {
                // 查询以往的异常发送数据  重新发送
                String jsonMap = jedis.rpoplpush(Constant.PUSH_ERROR, Constant.PUSH_ERROR_TMP);
                if (StringUtils.isEmpty(jsonMap)) {
                    break;
                }
                        
                try {
                    errObj = restTemplate.httpPostMediaTypeJson(receiverUrl, ReceiverRes.class, message);
                } catch (Exception e) {
                    errObj.setCode(500);
                    log.error(e.getMessage(), e);
                }
                        
                if (errObj.getCode() == 500) {
                    // 再次失败  弹回原队列
                    jedis.rpoplpush(Constant.PUSH_ERROR_TMP, Constant.PUSH_ERROR);
                } else {
                    jedis.rpop(Constant.PUSH_ERROR_TMP);
                }
            }
        }
        return flag;
    }

    @OnClose
    public void onClose() {
        log.info("长连接关闭...");
    }

    @OnError
    public void onError(Session session, Throwable t) {
        t.printStackTrace();
    }

    public void send(String message) {
        this.session.getAsyncRemote().sendText(message);
    }

    public void close() throws IOException {
        if (this.session.isOpen()) {
            this.session.close();
        }
    }
}
上面是websocket客户端的代码。其中主要有:1、http推送失败重发机制 2、redis任务调度经典案例

websocket客户端配置类WebSocketConfig

import com.hnpolice.business.service.ApplicationService;
import com.hnpolice.sso.common.ex.BaseException;
import com.hnpolice.sso.common.utils.RedisCacheUtil;
import com.hnpolice.sync.RestTemplateFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import java.net.URI;

@Slf4j
@Component
public class WebSocketConfig implements ApplicationRunner {
    
    @Autowired
    private RedisCacheUtil redisTmp;
    
    private static Boolean isOk;
    private MailWebSocketClient client;
    private static WebSocketContainer conmtainer = ContainerProvider.getWebSocketContainer();
    
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 跟随项目启动的方法可以在这里做一些初始化工作
        // websocket客户端初始化
        wsClientInit();
    }
    
    public void wsClientInit() {
        try {
            client = new MailWebSocketClient();
            client.setRedis(redisTmp);
            MailWebSocketClient.dosomething();
            conmtainer.connectToServer(client, new URI(##socket服务连接地址##));
            
            isOk = true;
        } catch (Exception e) {
            isOk = false;
            log.error(e);
        }
    
        // 断线重连
        while (true) {
            if (isOk != null && isOk) {
                try {
                    client.send("ping:"+appId);
                } catch (Exception e) {
                    isOk = false;
                }
            }
            else {
                // 系统连接失败进行重试
                log.warn("系统连接失败,正在重连...");
                try {
                    client.send("ping:"+appId);
                    log.warn("系统重连成功!");
                    isOk = true;
                } catch (Exception e) {
                    try {
                        client = new MailWebSocketClient();
                        conmtainer.connectToServer(client, new URI(mailUrl));
                        
                        isOk = true;
                    } catch (Exception e1) {
                        isOk = false;
                    }
                    
                    if (isOk != null && isOk) {
                        log.warn("系统重连成功!");
                    }
                }
            }
            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                log.error(BaseException.collectExceptionStackMsg(e));
                e.printStackTrace();
            }
        }
    }
}
这是websocket客户端的配置类,实现ApplicationRunner 接口是为了在项目启动时完成一些初始化工作,并非必须。主要功能:1、协助websocketCient进行初始化,2、心跳包检测,断线自动重连

消息推送的第二种方式在下篇中再编写

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74103.html

相关文章

  • Android通知栏介绍与适配总结(上篇

    摘要:修改记录版本的通知栏消息功能上并未发生变化,右上角的缩减为了。增加了,允许可穿戴设备远程控制通知栏消息。锁屏状态下,可以控制通知栏消息的隐私程度。但是谷歌规定,自定义布局展示的通知栏消息最大高度是。具体适配不正常的机型有。 此文已由作者黎星授权网易云社区发布。 欢迎访问网易云社区,了解更多网易技术产品运营经验。 由于历史原因,Android在发布之初对通知栏Notification的设...

    fai1017 评论0 收藏0
  • (微服务)分布式事务-最大努力交付 && 消息最终一致性方案

    摘要:在对事实性要求没有那么高的情况下,可以用基于最大努力交付消息队列以及消息存储来解决最终一致性。可靠消息服务和消息组件,协调上下游消息的传递,并确保上下游数据的一致性。下游应用通知可靠消息服务该消息已经成功消费。 本文对比 二阶段事务、最大努力交付以及消息最终一致性,并给出部分解决方案,最终一致性方案参考阿里RockMQ事务消息:http://blog.csdn.net/chunlong...

    Scorpion 评论0 收藏0
  • Spring Cloud分布式事务终极解决方案探讨

    摘要:一小小推广讲座本话题已收入视频讲座分布式事务解决方案大家不妨围观下开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。 一 小小推广 讲座 本话题已收入视频讲座《Spring Cloud分布式事务解决方案》大家不妨围观下 开源项目 我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考Github CoolMQ源码,项目支持网站: http:/...

    jsdt 评论0 收藏0
  • JavaScript是如何工作的:Web推送通知的机制

    摘要:在端,尽管开发人员对其功能的需求很高,但出于某些原因,推送通知被引入的时间比较晚。发送推送通知在服务器上实现调用,该调用触发到用户设备的推送消息。推送服务推送服务是接收请求验证请求并将推送消息发送到对应的浏览器。 这是专门探索 JavaScript 及其所构建的组件的系列文章的第9篇。 想阅读更多优质文章请猛戳GitHub博客,一年百来篇优质文章等着你! 如果你错过了前面的章节,可以在...

    KitorinZero 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<