资讯专栏INFORMATION COLUMN

使用akka作异步任务处理

shiweifu / 1650人阅读

摘要:创建订单时同步操作有查询库存,扣款,刷新库存可异步的操作有通知风控系统,给买家发送扣款邮件和短信,通知卖家,创建一些定时任务。

同步转异步是一种常见的优化手段,最近一次在做调优时便大量使用了这种方式。通常在一个业务场景中会包含多个操作,有些操作的结果需要让用户立马知道,但有些操作则不需要。这些用户不需要等待结果的操作,我们在编程的时候便可以异步处理。这么做最直接的效果就是缩短接口响应速度,提升用户体验。

我此次优化的是下单场景。创建订单时同步操作有: 查询库存,扣款,刷新库存; 可异步的操作有: 通知风控系统,给买家发送扣款邮件和短信,通知卖家,创建一些定时任务。

最初我用的方案是Spring提供的@Async机制。这是一种很轻量的做法,只需要在可异步调用的方法上加上@Async注解即可。但是这种做法也存在两个问题: 1. 不支持类内部方法之间的调用。使用这种方式,我必须要把一些需要异步调用的方法转移到一个新类里,这点让人不爽。2. 当系统crash的时候,缓存的任务就丢了。因此,这个方案并不特别理想。

两年之前用akka做过一个社交应用的后端服务,而且消息模型天生异步,所以自然想到了用akka。但是用akka的话也有一些地方需要注意。第一,Actor是单线程顺序执行,如果任务比较多最好使用actor router。actor router管理多个actor,可以做到一定限度的并行执行。第二,使用有持久化actor,确保任务不会丢失。我会以发push提醒为例描述一下这个方案的实现细节。多数场景中发push提醒都可进行异步调用。

下单逻辑都放在OrderService中,下单成功给卖家发送push提醒时,Orderservice会给NotificationActor发送一个消息。

NotificationActor有两个职责:1. 保存接收到的任务;2. 把消息转发给NotificationWorker,当Worker执行成功之后把消息删除。在最新版本的akka中可以使用At-Least-Once Delivery实现这两个功能。

NotificationWorkerRouter仅仅处理发送逻辑。WorkerActor以Router方式进行部署,以实现并行处理,提高处理效率。

下边看一下具体实现细节:

public class NotificationActor extends UntypedPersistentActorWithAtLeastOnceDelivery {
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private ActorRef notificationWorkers = null;
    private final String uniqueId = UUID.randomUUID().toString();

    @Autowired
    public NotificationActor(final ActorSystemManager actorSystemManager) {
        this.notificationWorkers = actorSystemManager.notificationWorkers;
    }

    @Override public String persistenceId() {
        return "journal:notification-actor:" + uniqueId;
    }

    @Override public void onReceiveRecover(final Object msg) throws Throwable {
        if (msg instanceof NotificationMessage) {
            deliverAckMessage((NotificationMessage) msg);
        }
    }

    @Override public void onReceiveCommand(final Object msg) throws Throwable {
        if (msg instanceof NotificationMessage) {
            persist(msg, m -> { deliverAckMessage((NotificationMessage) m); });
        } else if (msg instanceof Confirm) {
            Confirm confirm = (Confirm) msg;
            confirmMessage(new MsgConfirmed(confirm.deliveryId));
        } else if (msg instanceof UnconfirmedWarning) {
            UnconfirmedWarning warning = (UnconfirmedWarning) msg;
            warning.getUnconfirmedDeliveries().forEach(d -> {
                log.error("[NOTIFICATION-ACTOR] Unconfirmed Messages: {}", d.message());

                confirmMessage(new MsgConfirmed(d.deliveryId()));
            });
        } else {
            unhandled(msg);
        }
    }

    private void deliverAckMessage(NotificationMessage event) {
        deliver(notificationWorkers.path(), (Function) deliveryId -> new AckMessage(deliveryId, event));
    }

    private void confirmMessage(final MsgConfirmed evt) {
        confirmDelivery(evt.deliveryId);
        deleteMessages(evt.deliveryId);
    }

    public interface NotificationMessage extends Event {}

    public static final @Data class PushMessage implements NotificationMessage {
        private final Long source;
        private final Long target;
        private final String trigger;
        private final ImmutableMap payload;
    }
}

public class NotificationWorkerActor extends UntypedActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private final @NonNull NotificationService notificationService;

    @Autowired
    public NotificationWorkerActor(final NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @Override public void onReceive(final Object event) throws Throwable {
        if (event instanceof AckMessage) {
            final AckMessage ackMessage = (AckMessage) event;
            NotificationMessage msg = (NotificationMessage) ackMessage.msg;
            log.info("[NOTIFICATION] receive message: {}", msg);

            if (msg instanceof PushMessage) {
                final PushMessage m = (PushMessage) msg;
                log.info("[NOTIFICATION] send push notification from: {} to: {}", m.getSource(), m.getTarget());
                notificationService.notify(m.getSource(), m.getTarget(), m.getTrigger(), m.getPayload());
            }
            sender().tell(new Confirm(ackMessage.deliveryId), self());
        } else {
            unhandled(event);
        }
    }
}

public class OrderService {
    public void createOrder() {
        actorSystemManager.notificationActor.tell(
          new PushMessage(), ActorRef.noSender()
        );
    }
}

最早实施这个方案的时候遇到一个问题,说一下这个问题如何产生的。我们一共有三台服务器,三台服务器都会部署同样的代码,以NotificationActor为例,它会分别部署在三个机器上。actor journal我们使用mysql存储。akka persistent actor内部有一个sequence number用来对接收到的消息进行计数,这个数字是递增的。同时这个数字也会在journal中记录。最初我的persistenceId方法是这样实现的:

@Override public String persistenceId() {
    return "journal:notification-actor";
}

那么,假如server1上的NotificationActor接收了一个消息,那么它的sequence number会变成1,mysql中将会存储的sequence number为1的消息。这时server2上也接收到了一个消息,因为它的最初sequence number是0,所以它也会把现在接收到的消息的sequence number设置为1。但是显然这条消息是不能持久化的,因为它和数据库记录的sequence number冲突了。根本原因是三台服务器上的NotificationActor的persistenceId是一样的。

上边代码中给出了一种方案,把persistenceId变成random的,每次actor启动的时候都会得到不同的persistenceId,这样就解决了上述问题。还有一种方案是引入akka cluster,使用akka singleton。这种方案会在下一篇文章中详细说明。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66470.html

相关文章

  • 使用Akka Cluster Singleton实现集群单例

    摘要:接下来会选择一个最老的实例并在上面创建单例。可以确保整个集群中至多有一个的实例,言下之意,存在没有实例的时刻。访问需要借助于,会把所有的消息给当前被代理的实例。 上篇文章主要讲了如何使用Akka作异步任务处理。最后还抛出一个问题。 具体问题的描述就不在这篇文章赘述了,我们仅简单回顾一下第一种解决方案:覆写persistenceId()时,加一个UUID,这样三台服务器上的Actor就不...

    xiangzhihong 评论0 收藏0
  • 使用Akka Cluster Singleton实现集群单例

    摘要:接下来会选择一个最老的实例并在上面创建单例。可以确保整个集群中至多有一个的实例,言下之意,存在没有实例的时刻。访问需要借助于,会把所有的消息给当前被代理的实例。 上篇文章主要讲了如何使用Akka作异步任务处理。最后还抛出一个问题。 具体问题的描述就不在这篇文章赘述了,我们仅简单回顾一下第一种解决方案:覆写persistenceId()时,加一个UUID,这样三台服务器上的Actor就不...

    TZLLOG 评论0 收藏0
  • JVM并发编程模型览

    摘要:本文介绍和点评上的等并发编程模型。异步更适合并发编程。同步使线程阻塞,导致等待。基本模型这是最简单的模型,创建线程来执行一个任务,完毕后销毁线程。响应式编程是一种面向数据流和变化传播的编程模式。起源于电信领域的的编程模型。 本文介绍和点评JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并发编程模型。本人经验...

    cppowboy 评论0 收藏0
  • JVM并发编程模型览

    摘要:本文介绍和点评上的等并发编程模型。异步更适合并发编程。同步使线程阻塞,导致等待。基本模型这是最简单的模型,创建线程来执行一个任务,完毕后销毁线程。响应式编程是一种面向数据流和变化传播的编程模式。起源于电信领域的的编程模型。 本文介绍和点评JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并发编程模型。本人经验...

    wudengzan 评论0 收藏0
  • 关于分布式计算的一些概念

    摘要:关于三者的一些概括总结离线分析框架,适合离线的复杂的大数据处理内存计算框架,适合在线离线快速的大数据处理流式计算框架,适合在线的实时的大数据处理我是一个以架构师为年之内目标的小小白。 整理自《架构解密从分布式到微服务》第七章——聊聊分布式计算.做了相应补充和修改。 [TOC] 前言 不管是网络、内存、还是存储的分布式,它们最终目的都是为了实现计算的分布式:数据在各个计算机节点上流动,同...

    Ververica 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<