资讯专栏INFORMATION COLUMN

一起学并发编程 - 利用观察者模式监听线程状态

Juven / 2413人阅读

摘要:在前面的文章中介绍过观察者模式及并发编程的基础知识,为了让大家更好的了解观察者模式故而特意写了这篇番外概述在多线程下我们需要知道当前执行线程的状态是什么比如运行,关闭,异常等状态的通知,而且不仅仅是更新当前页面。

在前面的文章中介绍过 观察者模式 及 并发编程的基础知识,为了让大家更好的了解观察者模式故而特意写了这篇番外..

概述

在Java多线程下,我们需要知道当前执行线程的状态是什么比如运行关闭异常等状态的通知,而且不仅仅是更新当前页面。

观察者模式: 是一种使用率极高的模式,用于建立一种对象与对象之间的依赖关系,一个对象发生改变时将自动通知其他对象,其他对象将相应作出反应。在观察者模式中,发生改变的对象称为观察目标,而被通知的对象称为观察者,一个观察目标可以对应多个观察者,而且这些观察者之间可以没有任何相互联系,可以根据需要增加和删除观察者,使得系统更易于扩展。

观察者模式传送门:http://blog.battcn.com/2017/12/11/java/design-pattern/observer-pattern/#more

案例

假设开发一个多线程爬虫功能,由于数据过大需要利用多线程并行化来提升抓取的效率,并且在抓取过程中要记录执行线程的运行状态以便追溯问题原因

UML图如下

1.定义具体观察对象,实现JDK自带的Observer接口,然后在需要实现的update方法中记录下每个线程执行的状态信息

class ObserverListener implements Observer {
    /**
     * 避免多线程锁竞争
     */
    private static final Object LOCK = new Object();

    @Override
    public void update(Observable observable, Object runnableEvent) {
        synchronized (LOCK) {
            ObservableRunnable.RunnableEvent event = (ObservableRunnable.RunnableEvent) runnableEvent;
            if (event != null) {
                if (event.getCause() != null) {
                    System.out.println("The Runnable [" + event.getThread().getName() + "] process failed and state is " + event.getState().name());
                    event.getCause().printStackTrace();
                } else {
                    System.out.println("The Runnable [" + event.getThread().getName() + "] data changed and state is " + event.getState().name());
                }
            }
        }
    }
}

2.定义具体被观察的对象,该对象需要继承Observable类,以及实现Runnable接口,这里run的实现非常简单,执行每一步骤操作时都进行了通知,通知观察者消息发生变更了

为什么每次都需要 setChanged

筛选有效通知,只有有效通知可以调用setChanged。比如,我的微信朋友圈一条状态,好友A点赞,后续该状态的点赞和评论并不是每条都通知A,只有A的好友触发的操作才会通知A。

便于撤销通知操作,在主题中,我们可以设置很多次setChanged,但是在最后由于某种原因需要取消通知,我们可以使用clearChanged轻松解决问题。

主动权控制,由于setChangedprotected,而notifyObservers方法为public,这就导致存在外部随意调用notifyObservers的可能,但是外部无法调用setChanged,因此真正的控制权应该在主题这里。

class ObservableRunnable extends Observable implements Runnable {

    /**
     * 线程名称
     */
    private String name;

    ObservableRunnable(String name, ObserverListener listener) {
        this.name = name;
        // 将被观察的对象注册到观察者中
        super.addObserver(listener);
    }

    /**
     * 发送通知
     *
     * @param event 通知的内容
     */
    private void notifyChange(final RunnableEvent event) {
        // 前面说过 JDK自带的 需要每次设置一次状态,代表当前内容更改了
        super.setChanged();
        super.notifyObservers(event);
    }

    @Override
    public void run() {
        try {
            notifyChange(new RunnableEvent(RunnableState.RUNNING, Thread.currentThread(), null));
            System.out.printf("根据 [%s] 查询 
", this.name);
            Thread.sleep(1000L);
            if (this.name.equals("T3")) {
                // 故意模拟报错
                throw new RuntimeException("故意抛出错误");
            }
            notifyChange(new RunnableEvent(RunnableState.DOWN, Thread.currentThread(), null));
        } catch (Exception e) {
            notifyChange(new RunnableEvent(RunnableState.ERROR, Thread.currentThread(), e));
        }
    }

    enum RunnableState {
        /**
         * RUNNING:运行
         * ERROR:异常
         * DOWN:正常结束
         */
        RUNNING, ERROR, DOWN
    }

    static class RunnableEvent {
        private final RunnableState state;
        private final Thread thread;
        private final Throwable cause;

        RunnableEvent(RunnableState state, Thread thread, Throwable cause) {
            this.state = state;
            this.thread = thread;
            this.cause = cause;
        }

        RunnableState getState() {
            return state;
        }

        public Thread getThread() {
            return thread;
        }

        Throwable getCause() {
            return cause;
        }
    }
}

3.创建测试工程

public class ObserverClient {

    public static void main(String[] args) {
        ObserverListener listener = new ObserverListener();
        List names = Arrays.asList("T1", "T2", "T3");
        for (String name : names) {
            Thread thread = new Thread(new ObservableRunnable(name, listener));
            thread.start();
        }
    }
}

4.运行结果,通过运行日志可以发现,启动三个线程后同时执行抓取操作,但是Thread-2线程在数据处理时发生了异常,在ObserverListener处也成功收到通知的内容,然后对信息进行了输出操作。在实际过程中我们可以为异常进行补偿操作

The Runnable [Thread-1] data changed and state is RUNNING
The Runnable [Thread-0] data changed and state is RUNNING
根据 [T1] 查询 
The Runnable [Thread-2] data changed and state is RUNNING
根据 [T2] 查询 
根据 [T3] 查询 
java.lang.RuntimeException: 故意抛出错误
The Runnable [Thread-0] data changed and state is DOWN
    at com.battcn.chapter14.ObservableRunnable.run(ObserverClient.java:67)
The Runnable [Thread-1] data changed and state is DOWN
    at java.lang.Thread.run(Thread.java:745)
The Runnable [Thread-2] process failed and state is ERROR
总结

本文,简单讲述了多线程环境下如何利用观察者模式进行线程状态监听,也是对前面所讲的基础进行巩固,在学习的过程中,既要知其然也要知其所以然。这样才能更好地驾驭它,更好地去理解和使用,也能更好地帮助我们触类旁通

- 说点什么

全文代码:https://gitee.com/battcn/battcn-concurent/tree/master/Chapter1-1/battcn-thread/src/main/java/com/battcn/chapter14

个人QQ:1837307557

battcn开源群(适合新手):391619659

微信公众号:battcn(欢迎调戏)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70827.html

相关文章

  • 一起并发编程 - 优雅关闭

    摘要:文本将介绍两种可以优雅的终止线程的方式第一种在多线程模式中有一种叫两步终止的模式可以优雅的终止线程,这种模式采用了两个步骤来终止线程,所以叫两步终止模式。 Java中原来在Thread中提供了stop()方法来终止线程,但这个方法是不安全的,所以一般不建议使用。文本将介绍两种可以优雅的终止线程的方式... 第一种 在JAVA《Java多线程模式》中有一种叫Two-Phase Term...

    曹金海 评论0 收藏0
  • 一起并发编程 - 简易线程池实现

    摘要:并且,线程池在某些情况下还能动态调整工作线程的数量,以平衡资源消耗和工作效率。同时线程池还提供了对池中工作线程进行统一的管理的相关方法。 开发中经常会遇到各种池(如:连接池,线程池),它们的作用就是为了提高性能及减少开销,在JDK1.5以后的java.util.concurrent包中内置了很多不同使用场景的线程池,为了更好的理解它们,自己手写一个线程池,加深印象。 概述 1.什么是...

    Harriet666 评论0 收藏0
  • 一起并发编程 - 等待与通知

    摘要:如果有其它线程调用了相同对象的方法,那么处于该对象的等待池中的线程就会全部进入该对象的锁池中,从新争夺锁的拥有权。 wait,notify 和 notifyAll,这些在多线程中被经常用到的保留关键字,在实际开发的时候很多时候却并没有被大家重视,而本文则是对这些关键字的使用进行描述。 存在即合理 在java中,每个对象都有两个池,锁池(monitor)和等待池(waitset),每个...

    Meathill 评论0 收藏0
  • 一起并发编程 - Volatile关键字详解

    摘要:比如用修饰的变量,就会确保变量在修改时,其它线程是可见的。。多核环境中,多个线程分别在不同的中运行,就意味着,多个线程都有可能将变量拷贝到当前运行的里。当线程读取变量时,它将能看见被线程写入的东西。 volatile是用来标记一个JAVA变量存储在主内存(main memory)中,多线程读写volatile变量会先从高速缓存中读取,但是写入的时候会立即通过内存总线刷到主存,同时内存总...

    vpants 评论0 收藏0
  • java并发编程习之Volatile

    摘要:但是的语义不足以确保递增操作的原子性,在多线程的情况下,线程不一定是安全的。检查某个状态标记,以判断是否退出循环某个方法这边和用普通的变量的区别是,在多线程的情况下,取到后,的值被改变了,判断会不正确。 多线程为什么是不安全的 这边简单的讲述一下,参考java并发编程学习之synchronize(一) 当线程A和线程B同时进入num = num + value; 线程A会把num的值...

    thekingisalwaysluc 评论0 收藏0

发表评论

0条评论

Juven

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<