资讯专栏INFORMATION COLUMN

Vert.x Blueprint 系列教程(三) | Micro-Shop 微服务应用实战

QiShare / 3387人阅读

摘要:本教程是蓝图系列的第三篇教程,对应的版本为。提供了一个服务发现模块用于发布和获取服务记录。前端此微服务的前端部分,目前已整合至组件中。监视仪表板用于监视微服务系统的状态以及日志统计数据的查看。而服务则负责发布其它服务如服务或消息源并且部署。

本文章是 Vert.x 蓝图系列 的第三篇教程。全系列:

Vert.x Blueprint 系列教程(一) | 待办事项服务开发教程

Vert.x Blueprint 系列教程(二) | 开发基于消息的应用 - Vert.x Kue 教程

Vert.x Blueprint 系列教程(三) | Micro-Shop 微服务应用实战

本系列已发布至Vert.x官网:Vert.x Blueprint Tutorials

前言

欢迎回到Vert.x 蓝图系列!当今,微服务架构 变得越来越流行,开发者们都想尝试一下微服务应用的开发和架构设计。令人激动的是,Vert.x给我们提供了一系列用于微服务开发的组件,包括 Service Discovery (服务发现)、Circuit Breaker (断路器) 以及其它的一些组件。有了Vert.x微服务组件的帮助,我们就可以快速利用Vert.x搭建我们的微服务应用。在这篇蓝图教程中,我们一起来探索一个利用Vert.x的各个组件开发的 Micro-Shop 微服务应用~

通过本教程,你将会学习到以下内容:

微服务架构

如何利用Vert.x来开发微服务应用

异步开发模式

响应式、函数式编程

事件溯源 (Event Sourcing)

通过分布式 Event Bus 进行异步RPC调用

各种各样的服务类型(例如REST、数据源、Event Bus服务等)

如何使用服务发现模块 (Vert.x Service Discovery)

如何使用断路器模块 (Vert.x Circuit Breaker)

如何利用Vert.x实现API Gateway

如何进行权限认证 (OAuth 2 + Keycloak)

如何配置及使用 SockJS - Event Bus Bridge

以及其它的一些东西。。。

本教程是 Vert.x 蓝图系列 的第三篇教程,对应的Vert.x版本为 3.3.2 。本教程中的完整代码已托管至 GitHub。

踏入微服务之门

哈~你一定对“微服务”这个词很熟悉——至少听起来很熟悉~越来越多的开发者开始拥抱微服务架构,那么微服务究竟是什么呢?一句话总结一下:

</>复制代码

  1. Microservices are small, autonomous services that work together.

我们来深入一下微服务的各种特性,来看看微服务为何如此出色:

首先,微服务的重要的一点是“微”。每个微服务都是独立的,每个多带带的微服务组件都注重某一特定的逻辑。在微服务架构中,我们将传统的单体应用拆分成许多互相独立的组件。每个组件都由其特定的“逻辑边界”,因此组件不会过于庞大。不过话又说回来了,每个组件应该有多小呢?这个问题可不好回答,它通常取决与我们的业务与负载。正如Sam Newman在其《Building
Microservices》书中所讲的那样:

</>复制代码

  1. We seem to have a very good sense of what is too big, and so it could be argued that once a piece of code no longer feels too big, it’s probably small enough.

因此,当我们觉得每个组件不是特别大的时候,组件的大小可能就刚刚好。

在微服务架构中,组件之间可以通过任意协议进行通信,比如 HTTPAMQP

每个组件是独立的,因此我们可以在不同的组件中使用不同的编程语言,不同的技术 —— 这就是所谓的 polyglot support (不错,Vert.x也是支持多语言的!)

每个组件都是独立开发、部署以及发布的,所以这减少了部署及发布的难度。

微服务架构通常与分布式系统形影不离,所以我们还需要考虑分布式系统中的方方面面,包括可用性、弹性以及可扩展性。

微服务架构通常被设计成为 面向失败的,因为在分布式系统中失败的场景非常复杂,我们需要有效地处理失败的手段。

虽然微服务有如此多的优点,但是不要忘了,微服务可不是银弹,因为它引入了分布式系统中所带来的各种问题,因此设计架构时我们都要考虑这些情况。

服务发现

在微服务架构中,每个组件都是独立的,它们都不知道其他组件的位置,但是组件之间又需要通信,因此我们必须知道各个组件的位置。然而,把位置信息写死在代码中显然不好,因此我们需要一种机制可以动态地记录每个组件的位置 —— 这就是 服务发现。有了服务发现模块,我们就可以将服务位置发布至服务发现模块中,其它服务就可以从服务发现模块中获取想要调用的服务的位置并进行调用。在调用服务的过程中,我们不需要知道对应服务的位置,所以当服务位置或环境变动时,服务调用可以不受影响,这使得我们的架构更加灵活。

Vert.x提供了一个服务发现模块用于发布和获取服务记录。在Vert.x 服务发现模块,每个服务都被抽象成一个Record(服务记录)。服务提供者可以向服务发现模块中发布服务,此时Record会根据底层ServiceDiscoveryBackend的配置存储在本地Map、分布式Map或Redis中。服务消费者可以从服务发现模块中获取服务记录,并且通过服务记录获取对应的服务实例然后进行服务调用。目前Vert.x原生支持好几种服务类型,比如 Event Bus 服务(即服务代理)、HTTP 端点消息源 以及 数据源。当然我们也可以实现自己的服务类型,可以参考相关的文档。在后面我们还会详细讲述如何使用服务发现模块,这里先简单做个了解。

异步的、响应式的Vert.x

异步与响应式风格都很适合微服务架构,而Vert.x兼具这两种风格!异步开发模式相信大家已经了然于胸了,而如果大家读过前几篇蓝图教程的话,响应式风格大家一定不会陌生。有了基于Future以及基于RxJava的异步开发模式,我们可以随心所欲地对异步过程进行组合和变换,这样代码可以非常简洁,非常优美!在本蓝图教程中,我们会见到大量基于Future和RxJava的异步方法。

Mirco Shop 微服务应用

好啦,现在大家应该对微服务架构有了一个大致的了解了,下面我们来讲一下本蓝图中的微服务应用。这是一个简单的 Micro-Shop 微服务应用 (目前只完成了基本功能),人们可以进行网上购物以及交易。。。当前版本的微服务应用包含下列组件:

账户服务:提供用户账户的操作服务,使用MySQL作为后端存储。

商品服务:提供商品的操作服务,使用MySQL作为后端存储。

库存服务:提供商品库存的操作服务,如查询库存、增加库存即减少库存。使用Redis作为后端存储。

网店服务:提供网店的操作即管理服务,使用MongoDB作为后端存储。

购物车服务:提供购物车事件的生成以及购物车操作(添加、删除商品以及结算)服务。我们通过此服务来讲述 事件溯源

订单服务:订单服务从Event Bus接收购物车服务发送的订单请求,接着处理订单并将订单发送至下层服务(本例中仅仅简单地存储至数据库中)。

Micro Shop 前端:此微服务的前端部分(SPA),目前已整合至API Gateway组件中。

监视仪表板:用于监视微服务系统的状态以及日志、统计数据的查看。

API Gateway:整个微服务的入口,它负责将收到的请求按照一定的规则分发至对应的组件的REST端点中(相当于反向代理)。它也负责权限认证与管理,负载均衡,心跳检测以及失败处理(使用Vert.x Circuit Breaker)。

Micro Shop 微服务架构

我们来看一下Micro Shop微服务应用的架构:

用户请求首先经过API Gateway,再经其处理并分发至对应的业务端点。

我们再来看一下每个基础组件内部的结构(基础组件即图中最下面的各个业务组件)。

组件结构

每个基础组件至少有两个Verticle:服务Verticle以及REST Verticle。REST Vertice提供了服务对应的REST端点,并且也负责将此端点发布至服务发现层。而服务Verticle则负责发布其它服务(如Event Bus服务或消息源)并且部署REST Verticle。

每个基础组件中都包含对应的服务接口,如商品组件中包含ProductService接口。这些服务接口都是Event Bus 服务,由@ProxyGen注解修饰。上篇蓝图教程中我们讲过,Vert.x Service Proxy可以自动为@ProxyGen注解修饰的接口生成服务代理类,因此我们可以很方便地在Event Bus上进行异步RPC调用而不用写额外的代码。很酷吧!并且有了服务发现组件以后,我们可以非常方便地将Event Bus服务发布至服务发现层,这样其它组件可以更方便地调用服务。

组件之间的通信

我们先来看一下我们的微服务应用中用到的服务类型:

HTTP端点 (e.g. REST 端点以及API Gateway) - 此服务的位置用URL描述

Event Bus服务 - 此服务的位置用Event Bus上的一个特定地址描述

事件源 - 事件源服务对应Event Bus上某个地址的事件消费者。此服务的位置用Event Bus上的一个特定地址描述

因此,我们各个组件之间可以通过HTTP以及Event Bus(本质是TCP)进行通信,例如:

API Gateway与其它组件通过HTTP进行通信。

让我们开始吧!

好啦,现在开始我们的微服务蓝图旅程吧!首先我们从GitHub上clone项目:

</>复制代码

  1. git clone https://github.com/sczyh30/vertx-blueprint-microservice.git

在本蓝图教程中,我们使用 Maven 作为构建工具。我们首先来看一下pom.xml配置文件。我们可以看到,我们的蓝图应用由许多模块构成:

</>复制代码

  1. microservice-blueprint-common
  2. account-microservice
  3. product-microservice
  4. inventory-microservice
  5. store-microservice
  6. shopping-cart-microservice
  7. order-microservice
  8. api-gateway
  9. cache-infrastructure
  10. monitor-dashboard

每个模块代表一个组件。看着配置文件,似乎有不少组件呢!不要担心,我们将会一一探究这些组件。下面我们先来看一下所有组件的基础模块 - microservice-blueprint-common

微服务基础模块

microservice-blueprint-common模块提供了一些微服务功能相关的辅助类以及辅助Verticle。我们先来看一下两个base verticles - BaseMicroserviceVerticleRestAPIVerticle

Base Microservice Verticle

BaseMicroserviceVerticle提供了与微服务相关的初始化函数以及各种各样的辅助函数。其它每一个Verticle都会继承此Verticle,因此这个基础Verticle非常重要。

首先我们来看一下其中的成员变量:

</>复制代码

  1. protected ServiceDiscovery discovery;
  2. protected CircuitBreaker circuitBreaker;
  3. protected Set registeredRecords = new ConcurrentHashSet<>();

discovery以及circuitBreaker分别代表服务发现实例以及断路器实例,而registeredRecords代表当前已发布的服务记录的集合,用于在结束Verticle时注销服务。

start函数中主要是对服务发现实例和断路器实例进行初始化,配置文件从config()中获取。它的实现非常简单:

</>复制代码

  1. @Override
  2. public void start() throws Exception {
  3. // init service discovery instance
  4. discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions().setBackendConfiguration(config()));
  5. // init circuit breaker instance
  6. JsonObject cbOptions = config().getJsonObject("circuit-breaker") != null ?
  7. config().getJsonObject("circuit-breaker") : new JsonObject();
  8. circuitBreaker = CircuitBreaker.create(cbOptions.getString("name", "circuit-breaker"), vertx,
  9. new CircuitBreakerOptions()
  10. .setMaxFailures(cbOptions.getInteger("max-failures", 5))
  11. .setTimeout(cbOptions.getLong("timeout", 10000L))
  12. .setFallbackOnFailure(true)
  13. .setResetTimeout(cbOptions.getLong("reset-timeout", 30000L))
  14. );
  15. }

下面我们还提供了几个辅助函数用于发布各种各样的服务。这些函数都是异步的,并且基于Future:

</>复制代码

  1. protected Future publishHttpEndpoint(String name, String host, int port) {
  2. Record record = HttpEndpoint.createRecord(name, host, port, "/",
  3. new JsonObject().put("api.name", config().getString("api.name", ""))
  4. );
  5. return publish(record);
  6. }
  7. protected Future publishMessageSource(String name, String address) {
  8. Record record = MessageSource.createRecord(name, address);
  9. return publish(record);
  10. }
  11. protected Future publishJDBCDataSource(String name, JsonObject location) {
  12. Record record = JDBCDataSource.createRecord(name, location, new JsonObject());
  13. return publish(record);
  14. }
  15. protected Future publishEventBusService(String name, String address, Class serviceClass) {
  16. Record record = EventBusService.createRecord(name, address, serviceClass);
  17. return publish(record);
  18. }

之前我们提到过,每个服务记录Record代表一个服务,其中服务类型由记录中的type字段标识。Vert.x原生支持的各种服务接口中都包含着好几个createRecord方法因此我们可以利用这些方法来方便地创建服务记录。通常情况下我们需要给每个服务都指定一个name,这样之后我们就可以通过名称来获取服务了。我们还可以通过setMetadata方法来给服务记录添加额外的元数据。

你可能注意到在publishHttpEndpoint方法中我们就提供了含有api-name的元数据,之后我们会了解到,API Gateway在进行反向代理时会用到它。

下面我们来看一下发布服务的通用方法 —— publish方法:

</>复制代码

  1. private Future publish(Record record) {
  2. Future future = Future.future();
  3. // publish the service
  4. discovery.publish(record, ar -> {
  5. if (ar.succeeded()) {
  6. registeredRecords.add(record);
  7. logger.info("Service <" + ar.result().getName() + "> published");
  8. future.complete();
  9. } else {
  10. future.fail(ar.cause());
  11. }
  12. });
  13. return future;
  14. }

publish方法中,我们调用了服务发现实例discoverypublish方法来将服务发布至服务发现模块。它同样也是一个异步方法,当发布成功时,我们将此服务记录存储至registeredRecords中,输出日志然后通知future操作已完成。最后返回对应的future

注意,在Vert.x Service Discovery当前版本(3.3.2)的设计中,服务发布者需要在必要时手动注销服务,因此当Verticle结束时,我们需要将注册的服务都注销掉:

</>复制代码

  1. @Override
  2. public void stop(Future future) throws Exception {
  3. // In current design, the publisher is responsible for removing the service
  4. List futures = new ArrayList<>();
  5. for (Record record : registeredRecords) {
  6. Future unregistrationFuture = Future.future();
  7. futures.add(unregistrationFuture);
  8. discovery.unpublish(record.getRegistration(), unregistrationFuture.completer());
  9. }
  10. if (futures.isEmpty()) {
  11. discovery.close();
  12. future.complete();
  13. } else {
  14. CompositeFuture.all(futures)
  15. .setHandler(ar -> {
  16. discovery.close();
  17. if (ar.failed()) {
  18. future.fail(ar.cause());
  19. } else {
  20. future.complete();
  21. }
  22. });
  23. }
  24. }

stop方法中,我们遍历registeredRecords集合并且尝试注销每一个服务,并将异步结果future添加至futures列表中。之后我们调用CompositeFuture.all(futures)来依次获取每个异步结果的状态。all方法返回一个组合的Future,当列表中的所有Future都成功赋值时方为成功状态,反之只要有一个异步结果失败,它就为失败状态。因此,我们给它绑定一个Handler,当所有服务都被注销时,服务发现模块就可以安全地关闭了,否则结束函数会失败。

REST API Verticle

RestAPIVerticle抽象类继承了BaseMicroserviceVerticle抽象类。从名字上就可以看出,它提供了诸多的用于REST API开发的辅助方法。我们在其中封装了诸如创建服务端、开启Cookie和Session支持,开启心跳检测支持(通过HTTP),各种各样的路由处理封装以及用于权限验证的路由处理器。在之后的章节中我们将会见到这些方法。

好啦,现在我们已经了解了整个蓝图应用中的两个基础Verticle,下面是时候探索各个模块了!在探索逻辑组件之前,我们先来看一下其中最重要的组件之一 —— API Gateway。

API Gateway

我们把API Gateway的内容多带带归为一篇教程,请见:Vert.x 蓝图 - Micro Shop 微服务实战 (API Gateway篇)。

Event Bus 服务 - 账户、网店及商品服务 在Event Bus上进行异步RPC

在之前的 Vert.x Kue 蓝图教程 中我们已经介绍过Vert.x中的异步RPC(也叫服务代理)了,这里我们再来回顾一下,并且说一说如何利用服务发现模块更方便地进行异步RPC。

传统的RPC有一个缺点:消费者需要阻塞等待生产者的回应。这是一种阻塞模型,和Vert.x推崇的异步开发模式不相符。并且,传统的RPC不是真正面向失败设计的。还好,Vert.x提供了一种高效的、响应式的RPC —— 异步RPC。我们不需要等待生产者的回应,而只需要传递一个Handler>参数给异步方法。这样当收到生产者结果时,对应的Handler就会被调用,非常方便,这与Vert.x的异步开发模式相符。并且,AsyncResult也是面向失败设计的。

Vert.x Service Proxy(服务代理组件)可以自动处理含有@ProxyGen注解的服务接口,生成相应的服务代理类。生成的服务代理类可以帮我们将数据封装好后发送至Event Bus、从Event Bus接收数据,以及对数据进行编码和解码,因此我们可以省掉不少代码。我们需要做的就是遵循@ProxyGen注解的一些限定。

比如,这里有一个Event Bus服务接口:

</>复制代码

  1. @ProxyGen
  2. public interface MyService {
  3. @Fluent
  4. MyService retrieveData(String id, Handler> resultHandler);
  5. }

我们可以通过Vert.x Service Proxy组件生成对应的代理类。然后我们就可以通过ProxyHelper类的registerService方法将此服务注册至Event Bus上:

</>复制代码

  1. MyService myService = MyService.createService(vertx, config);
  2. ProxyHelper.registerService(MyService.class, vertx, myService, SERVICE_ADDRESS);

有了服务发现组件之后,将服务发布至服务发现层就非常容易了。比如在我们的蓝图应用中我们使用封装好的方法:

</>复制代码

  1. publishEventBusService(SERVICE_NAME, SERVICE_ADDRESS, MyService.class)

OK,现在服务已经成功地发布至服务发现模块。现在我们就可以通过EventBusService接口的getProxy方法来从服务发现层获取发布的Event Bus服务,并且像调用普通异步方法那样进行异步RPC:

</>复制代码

  1. EventBusService.getProxy(discovery, new JsonObject().put("name", SERVICE_NAME), ar -> {
  2. if (ar.succeeded()) {
  3. MyService myService = ar.result();
  4. myService.retrieveData(...);
  5. }
  6. });
几个服务模块的通用特性

在我们的Micro Shop微服务应用中,账户、网店及商品服务有几个通用的特性及约定。现在我们来解释一下。

在这三个模块中,每个模块都包含:

一个Event Bus服务接口。此服务接口定义了对实体存储的各种操作

服务接口的实现

REST API Verticle,用于创建服务端并将其发布至服务发现模块

Main Verticle,用于部署其它的verticles以及将Event Bus服务和消息源发布至服务发现层

其中,用户账户服务以及商品服务都使用 MySQL 作为后端存储,而网店服务则以 MongoDB 作为后端存储。这里我们只挑两个典型的服务介绍如何通过Vert.x操作不同的数据库:product-microservicestore-microserviceaccount-microservice的实现与product-microservice非常相似,大家可以查阅 GitHub 上的代码。

基于MySQL的商品服务

商品微服务模块提供了商品的操作功能,包括添加、查询(搜索)、删除与更新商品等。其中最重要的是ProductService服务接口以及其实现了。我们先来看一下此服务接口的定义:

</>复制代码

  1. @VertxGen
  2. @ProxyGen
  3. public interface ProductService {
  4. /**
  5. * The name of the event bus service.
  6. */
  7. String SERVICE_NAME = "product-eb-service";
  8. /**
  9. * The address on which the service is published.
  10. */
  11. String SERVICE_ADDRESS = "service.product";
  12. /**
  13. * Initialize the persistence.
  14. */
  15. @Fluent
  16. ProductService initializePersistence(Handler> resultHandler);
  17. /**
  18. * Add a product to the persistence.
  19. */
  20. @Fluent
  21. ProductService addProduct(Product product, Handler> resultHandler);
  22. /**
  23. * Retrieve the product with certain `productId`.
  24. */
  25. @Fluent
  26. ProductService retrieveProduct(String productId, Handler> resultHandler);
  27. /**
  28. * Retrieve the product price with certain `productId`.
  29. */
  30. @Fluent
  31. ProductService retrieveProductPrice(String productId, Handler> resultHandler);
  32. /**
  33. * Retrieve all products.
  34. */
  35. @Fluent
  36. ProductService retrieveAllProducts(Handler>> resultHandler);
  37. /**
  38. * Retrieve products by page.
  39. */
  40. @Fluent
  41. ProductService retrieveProductsByPage(int page, Handler>> resultHandler);
  42. /**
  43. * Delete a product from the persistence
  44. */
  45. @Fluent
  46. ProductService deleteProduct(String productId, Handler> resultHandler);
  47. /**
  48. * Delete all products from the persistence
  49. */
  50. @Fluent
  51. ProductService deleteAllProducts(Handler> resultHandler);
  52. }

正如我们之前所提到的那样,这个服务接口是一个Event Bus服务,所以我们需要给它加上@ProxyGen注解。这些方法都是异步的,因此每个方法都需要接受一个Handler>参数。当异步操作完成时,对应的Handler会被调用。注意到我们还给此接口加了@VertxGen注解。上篇蓝图教程中我们提到过,这是为了开启多语言支持(polyglot language support)。Vert.x Codegen注解处理器会自动处理含有@VertxGen注解的类,并生成支持的其它语言的代码,如Ruby、JS等。。。这是非常适合微服务架构的,因为不同的组件可以用不同的语言进行开发!

每个方法的含义都在注释中给出了,这里就不解释了。

商品服务接口的实现位于ProductServiceImpl类中。商品信息存储在MySQL中,因此我们可以通过 Vert.x-JDBC 对数据库进行操作。我们在 第一篇蓝图教程 中已经详细讲述过Vert.x JDBC的使用细节了,因此这里我们就不过多地讨论细节了。这里我们只关注如何减少代码量。因为通常简单数据库操作的过程都是千篇一律的,因此做个封装是很有必要的。

首先来回顾一下每次数据库操作的过程:

JDBCClient中获取数据库连接SQLConnection,这是一个异步过程

执行SQL语句,绑定回调Handler

最后不要忘记关闭数据库连接以释放资源

对于正常的CRUD操作来说,它们的实现都很相似,因此我们封装了一个JdbcRepositoryWrapper类来实现这些通用逻辑。它位于io.vertx.blueprint.microservice.common.service包中:

我们提供了以下的封装方法:

executeNoResult: 执行含参数的SQL语句 (通过updateWithParams方法)。执行结果是不需要的,因此只需要接受一个 Handler> 类型的参数。此方法通常用于insert之类的操作。

retrieveOne: 执行含参数的SQL语句,用于获取某一特定实体(通过 queryWithParams方法)。此方法是基于Future的,它返回一个Future>类型的异步结果。如果结果集为空,那么返回一个空的Optional monad。如果结果集不为空,则返回第一个结果并用Optional进行包装。

retrieveMany: 获取多个实体,返回Future>作为异步结果。

retrieveByPage: 与retrieveMany 方法相似,但包含分页逻辑。

retrieveAll: similar to retrieveMany method but does not require query parameters as it simply executes statement such as SELECT * FROM xx_table.

removeOne and removeAll: remove entity from the database.

当然这与Spring JPA相比的不足之处在于SQL语句得自己写,自己封装也不是很方便。。。考虑到Vert.x JDBC底层也只是使用了Worker线程池包装了原生的JDBC(不是真正的异步),我们也可以结合Spring Data的相关组件来简化开发。另外,Vert.x JDBC使用C3P0作为默认的数据库连接池,C3P0的性能我想大家应该都懂。。。因此换成性能更优的HikariCP是很有必要的。

回到JdbcRepositoryWrapper中来。这层封装可以大大地减少代码量。比如,我们的ProductServiceImpl实现类就可以继承JdbcRepositoryWrapper类,然后利用这些封装好的方法。看个例子 —— retrieveProduct方法的实现:

</>复制代码

  1. @Override
  2. public ProductService retrieveProduct(String productId, Handler> resultHandler) {
  3. this.retrieveOne(productId, FETCH_STATEMENT)
  4. .map(option -> option.map(Product::new).orElse(null))
  5. .setHandler(resultHandler);
  6. return this;
  7. }

我们唯一需要做的只是将结果变换成需要的类型。是不是很方便呢?

当然这不是唯一方法。在下面的章节中,我们将会讲到一种更Reactive,更Functional的方法 —— 利用Rx版本的Vert.x JDBC。另外,用vertx-sync也是一种不错的选择(类似于async/await)。

好啦!看完服务实现,下面轮到REST API了。我们来看看RestProductAPIVerticle的实现:

</>复制代码

  1. public class RestProductAPIVerticle extends RestAPIVerticle {
  2. public static final String SERVICE_NAME = "product-rest-api";
  3. private static final String API_ADD = "/add";
  4. private static final String API_RETRIEVE = "/:productId";
  5. private static final String API_RETRIEVE_BY_PAGE = "/products";
  6. private static final String API_RETRIEVE_PRICE = "/:productId/price";
  7. private static final String API_RETRIEVE_ALL = "/products";
  8. private static final String API_DELETE = "/:productId";
  9. private static final String API_DELETE_ALL = "/all";
  10. private final ProductService service;
  11. public RestProductAPIVerticle(ProductService service) {
  12. this.service = service;
  13. }
  14. @Override
  15. public void start(Future future) throws Exception {
  16. super.start();
  17. final Router router = Router.router(vertx);
  18. // body handler
  19. router.route().handler(BodyHandler.create());
  20. // API route handler
  21. router.post(API_ADD).handler(this::apiAdd);
  22. router.get(API_RETRIEVE).handler(this::apiRetrieve);
  23. router.get(API_RETRIEVE_BY_PAGE).handler(this::apiRetrieveByPage);
  24. router.get(API_RETRIEVE_PRICE).handler(this::apiRetrievePrice);
  25. router.get(API_RETRIEVE_ALL).handler(this::apiRetrieveAll);
  26. router.patch(API_UPDATE).handler(this::apiUpdate);
  27. router.delete(API_DELETE).handler(this::apiDelete);
  28. router.delete(API_DELETE_ALL).handler(context -> requireLogin(context, this::apiDeleteAll));
  29. enableHeartbeatCheck(router, config());
  30. // get HTTP host and port from configuration, or use default value
  31. String host = config().getString("product.http.address", "0.0.0.0");
  32. int port = config().getInteger("product.http.port", 8082);
  33. // create HTTP server and publish REST service
  34. createHttpServer(router, host, port)
  35. .compose(serverCreated -> publishHttpEndpoint(SERVICE_NAME, host, port))
  36. .setHandler(future.completer());
  37. }
  38. private void apiAdd(RoutingContext context) {
  39. try {
  40. Product product = new Product(new JsonObject(context.getBodyAsString()));
  41. service.addProduct(product, resultHandler(context, r -> {
  42. String result = new JsonObject().put("message", "product_added")
  43. .put("productId", product.getProductId())
  44. .encodePrettily();
  45. context.response().setStatusCode(201)
  46. .putHeader("content-type", "application/json")
  47. .end(result);
  48. }));
  49. } catch (DecodeException e) {
  50. badRequest(context, e);
  51. }
  52. }
  53. private void apiRetrieve(RoutingContext context) {
  54. String productId = context.request().getParam("productId");
  55. service.retrieveProduct(productId, resultHandlerNonEmpty(context));
  56. }
  57. private void apiRetrievePrice(RoutingContext context) {
  58. String productId = context.request().getParam("productId");
  59. service.retrieveProductPrice(productId, resultHandlerNonEmpty(context));
  60. }
  61. private void apiRetrieveByPage(RoutingContext context) {
  62. try {
  63. String p = context.request().getParam("p");
  64. int page = p == null ? 1 : Integer.parseInt(p);
  65. service.retrieveProductsByPage(page, resultHandler(context, Json::encodePrettily));
  66. } catch (Exception ex) {
  67. badRequest(context, ex);
  68. }
  69. }
  70. private void apiRetrieveAll(RoutingContext context) {
  71. service.retrieveAllProducts(resultHandler(context, Json::encodePrettily));
  72. }
  73. private void apiDelete(RoutingContext context) {
  74. String productId = context.request().getParam("productId");
  75. service.deleteProduct(productId, deleteResultHandler(context));
  76. }
  77. private void apiDeleteAll(RoutingContext context, JsonObject principle) {
  78. service.deleteAllProducts(deleteResultHandler(context));
  79. }
  80. }

此Verticle继承了RestAPIVerticle,因此我们可以利用其中诸多的辅助方法。首先来看一下启动过程,即start方法。首先我们先调用super.start()来初始化服务发现组件,然后创建Router,绑定BodyHandler以便操作请求正文,然后创建各个API路由并绑定相应的处理函数。接着我们调用enableHeartbeatCheck方法开启简单的心跳检测支持。最后我们通过封装好的createHttpServer创建HTTP服务端,并通过publishHttpEndpoint方法将HTTP端点发布至服务发现模块。

其中createHttpServer方法非常简单,我们只是把vertx.createHttpServer方法变成了基于Future的:

</>复制代码

  1. protected Future createHttpServer(Router router, String host, int port) {
  2. Future httpServerFuture = Future.future();
  3. vertx.createHttpServer()
  4. .requestHandler(router::accept)
  5. .listen(port, host, httpServerFuture.completer());
  6. return httpServerFuture.map(r -> null);
  7. }

至于各个路由处理逻辑如何实现,可以参考 Vert.x Blueprint - Todo Backend Tutorial 获取相信信息。

最后我们打开此微服务模块中的Main Verticle - ProductVerticle类。正如我们之前所提到的,它负责发布服务以及部署REST Verticle。我们来看一下其start方法:

</>复制代码

  1. @Override
  2. public void start(Future future) throws Exception {
  3. super.start();
  4. // create the service instance
  5. ProductService productService = new ProductServiceImpl(vertx, config()); // (1)
  6. // register the service proxy on event bus
  7. ProxyHelper.registerService(ProductService.class, vertx, productService, SERVICE_ADDRESS); // (2)
  8. // publish the service in the discovery infrastructure
  9. initProductDatabase(productService) // (3)
  10. .compose(databaseOkay -> publishEventBusService(ProductService.SERVICE_NAME, SERVICE_ADDRESS, ProductService.class)) // (4)
  11. .compose(servicePublished -> deployRestService(productService)) // (5)
  12. .setHandler(future.completer()); // (6)
  13. }

首先我们创建一个ProductService服务实例(1),然后通过registerService方法将服务注册至Event Bus(2)。接着我们初始化数据库表(3),将商品服务发布至服务发现层(4)然后部署REST Verticle(5)。这是一系列的异步方法的组合操作,很溜吧!最后我们将future.completer()绑定至组合后的Future上,这样当所有异步操作都OK的时候,Future会自动完成。

当然,不要忘记在配置里指定api.name。之前我们在 API Gateway章节 提到过,API Gateway的反向代理部分就是通过对应REST服务的 api.name 来进行请求分发的。默认情况下api.nameproduct:

</>复制代码

  1. {
  2. "api.name": "product"
  3. }
基于MongoDB的网店服务

网店服务用于网店的操作,如开店、关闭、更新数据。正常情况下,开店都需要人工申请,不过在本蓝图教程中,我们把这一步简化掉了。网店服务模块的结构和商品服务模块非常相似,所以我们就不细说了。我们这里仅仅瞅一瞅如何使用Vert.x Mongo Client。

使用Vert.x Mongo Client非常简单,首先我们需要创建一个MongoClient实例,过程类似于JDBCClient

</>复制代码

  1. private final MongoClient client;
  2. public StoreCRUDServiceImpl(Vertx vertx, JsonObject config) {
  3. this.client = MongoClient.createNonShared(vertx, config);
  4. }

然后我们就可以通过它来操作Mongo了。比如我们想执行存储(save)操作,我们可以这样写:

</>复制代码

  1. @Override
  2. public void saveStore(Store store, Handler> resultHandler) {
  3. client.save(COLLECTION, new JsonObject().put("_id", store.getSellerId())
  4. .put("name", store.getName())
  5. .put("description", store.getDescription())
  6. .put("openTime", store.getOpenTime()),
  7. ar -> {
  8. if (ar.succeeded()) {
  9. resultHandler.handle(Future.succeededFuture());
  10. } else {
  11. resultHandler.handle(Future.failedFuture(ar.cause()));
  12. }
  13. }
  14. );
  15. }

这些操作都是异步的,因此你一定非常熟悉这种模式!当然如果不喜欢基于回调的异步模式的话,你也可以选择Rx版本的API~

更多关于Vert.x Mongo Client的使用细节,请参考官方文档。

基于Redis的商品库存服务

商品库存服务负责操作商品的库存数量,比如添加库存、减少库存以及获取当前库存数量。库存使用Redis来存储。

与之前的Event Bus服务不同,我们这里的商品库存服务是基于Future的,而不是基于回调的。由于服务代理模块不支持处理基于Future的服务接口,因此这里我们就不用异步RPC了,只发布一个REST API端点,所有的调用都通过REST进行。

首先来看一下InventoryService服务接口:

</>复制代码

  1. public interface InventoryService {
  2. /**
  3. * Create a new inventory service instance.
  4. *
  5. * @param vertx Vertx instance
  6. * @param config configuration object
  7. * @return a new inventory service instance
  8. */
  9. static InventoryService createService(Vertx vertx, JsonObject config) {
  10. return new InventoryServiceImpl(vertx, config);
  11. }
  12. /**
  13. * Increase the inventory amount of a certain product.
  14. *
  15. * @param productId the id of the product
  16. * @param increase increase amount
  17. * @return the asynchronous result of current amount
  18. */
  19. Future increase(String productId, int increase);
  20. /**
  21. * Decrease the inventory amount of a certain product.
  22. *
  23. * @param productId the id of the product
  24. * @param decrease decrease amount
  25. * @return the asynchronous result of current amount
  26. */
  27. Future decrease(String productId, int decrease);
  28. /**
  29. * Retrieve the inventory amount of a certain product.
  30. *
  31. * @param productId the id of the product
  32. * @return the asynchronous result of current amount
  33. */
  34. Future retrieveInventoryForProduct(String productId);
  35. }

接口定义非常简单,含义都在注释中给出了。接着我们再看一下服务的实现类InventoryServiceImpl类。在Redis中,所有的库存数量都被存储在inventory:v1命名空间中,并以商品号productId作为标识。比如商品A123456会被存储至inventory:v1:A123456键值对中。

Vert.x Redis提供了incrbydecrby命令,可以很方便地实现库存增加和减少功能,代码类似。这里我们只看库存增加功能:

</>复制代码

  1. @Override
  2. public Future increase(String productId, int increase) {
  3. Future future = Future.future();
  4. client.incrby(PREFIX + productId, increase, future.completer());
  5. return future.map(Long::intValue);
  6. }

由于库存数量不会非常大,Integer就足够了,因此我们需要通过Long::intValue方法引用来将Long结果变换成Integer类型的。

retrieveInventoryForProduct方法的实现也非常短小精悍:

</>复制代码

  1. @Override
  2. public Future retrieveInventoryForProduct(String productId) {
  3. Future future = Future.future();
  4. client.get(PREFIX + productId, future.completer());
  5. return future.map(r -> r == null ? "0" : r)
  6. .map(Integer::valueOf);
  7. }

我们通过get命令来获取值。由于结果是String类型的,因此我们需要自行将其转换为Integer类型。如果结果为空,我们就认为商品没有库存,返回0

至于REST Verticle(在此模块中也为Main Verticle),其实现模式与前面的大同小异,这里就不展开说了。不要忘记在config.json中指定api.name:

</>复制代码

  1. {
  2. "api.name": "inventory",
  3. "redis.host": "redis",
  4. "inventory.http.address": "inventory-microservice",
  5. "inventory.http.port": 8086
  6. }
事件溯源 - 购物车服务

好了,现在我们与基础服务模块告一段落了。下面我们来到了另一个重要的服务模块 —— 购物车微服务。此模块负责购物车的获取、购物车事件的添加以及结算功能。与传统的实现不同,这里我们要介绍一种不同的开发模式 —— 事件溯源(Event Sourcing)。

解道Event Sourcing

在传统的数据存储模式中,我们通常直接将数据本身的状态存储至数据库中。这在一般场景中是没有问题的,但有些时候,我们不仅想获取到数据,还想获取数据操作的过程(即此数据是经过怎样的操作生成的),这时候我们就可以利用事件溯源(Event Sourcing)来解决这个问题。

事件溯源保证了数据状态的变换都以一系列的事件的形式存储在数据库中。所以,我们不仅可以获取每个变换的事件,而且可以通过过去的事件来组合出过去任意时刻的数据状态!这真是极好的~注意,有一点很重要,我们不能更改已经保存的事件以及它们的序列 —— 也就是说,事件存储是只能添加而不能删除的,并且需要不可变。是不是感觉和数据库事务日志的原理差不多呢?

在微服务架构中,事件溯源模式可以带来以下的好处:

我们可以从过去的事件序列中组建出任意时刻的数据状态

每个过去的事件都得以保存,因此这使得补偿事务成为可能

我们可以从事件存储中获取事件流,并且以异步、响应式风格对其进行变换和处理

事件存储同样可以当作为数据日志

事件存储的选择也需要好好考虑。Apache Kafka非常适合这种场景,在此版本的Micro Shop微服务中,为了简化其实现,我们简单地使用了MySQL作为事件存储。下个版本我们将把Kafka整合进来。

</>复制代码

  1. 注:在实际生产环境中,购物车通常被存储于Session或缓存内。本章节仅为介绍事件溯源而使用事件存储模式。

购物车事件

我们来看一下代表购物车事件的CartEvent数据对象:

</>复制代码

  1. @DataObject(generateConverter = true)
  2. public class CartEvent {
  3. private Long id;
  4. private CartEventType cartEventType;
  5. private String userId;
  6. private String productId;
  7. private Integer amount;
  8. private long createdAt;
  9. public CartEvent() {
  10. this.createdAt = System.currentTimeMillis();
  11. }
  12. public CartEvent(JsonObject json) {
  13. CartEventConverter.fromJson(json, this);
  14. }
  15. public CartEvent(CartEventType cartEventType, String userId, String productId, Integer amount) {
  16. this.cartEventType = cartEventType;
  17. this.userId = userId;
  18. this.productId = productId;
  19. this.amount = amount;
  20. this.createdAt = System.currentTimeMillis();
  21. }
  22. public static CartEvent createCheckoutEvent(String userId) {
  23. return new CartEvent(CartEventType.CHECKOUT, userId, "all", 0);
  24. }
  25. public static CartEvent createClearEvent(String userId) {
  26. return new CartEvent(CartEventType.CLEAR_CART, userId, "all", 0);
  27. }
  28. public JsonObject toJson() {
  29. JsonObject json = new JsonObject();
  30. CartEventConverter.toJson(this, json);
  31. return json;
  32. }
  33. public static boolean isTerminal(CartEventType eventType) {
  34. return eventType == CartEventType.CLEAR_CART || eventType == CartEventType.CHECKOUT;
  35. }
  36. }

一个购物车事件存储着事件的类型、发生的时间、操作用户、对应的商品ID以及商品数量变动。在我们的蓝图应用中,购物车事件一共有四种,它们用CartEventType枚举类表示:

</>复制代码

  1. public enum CartEventType {
  2. ADD_ITEM, // 添加商品至购物车
  3. REMOVE_ITEM, // 从购物车中删除商品
  4. CHECKOUT, // 结算并清空
  5. CLEAR_CART // 清空
  6. }

其中CHECKOUTCLEAR_CART事件是对整个购物车实体进行操作,对应的购物车事件参数类似,因此我们写了两个静态方法来创建这两种事件。

另外我们还注意到一个静态方法isTerminal,它用于检测当前购物车事件是否为一个“终结”事件。所谓的“终结”,指的是到此就对整个购物车进行操作(结算或清空)。在从购物车事件流构建出对应的购物车状态的时候,此方法非常有用。

购物车实体

看完了购物车事件,我们再来看一下购物车。购物车实体用ShoppingCart数据对象表示,它包含着一个商品列表表示当前购物车中的商品即数量:

</>复制代码

  1. private List productItems = new ArrayList<>();

其中ProductTuple数据对象包含着商品号、商品卖家ID、单价以及当前购物车中次商品的数目amount

为了方便,我们还在ShoppingCart类中放了一个amountMap用于暂时存储商品数量:

</>复制代码

  1. private Map amountMap = new HashMap<>();

由于它只是暂时存储,我们不希望在对应的JSON数据中看到它,所以把它的getter和setter方法都注解上@GenIgnore

在事件溯源模式中,我们要从一系列的购物车事件构建对应的购物车状态,因此我们需要一个incorporate方法将每个购物车事件“合并”至购物车内以变更对应的商品数目:

</>复制代码

  1. public ShoppingCart incorporate(CartEvent cartEvent) {
  2. // 此事件必须为添加或删除事件
  3. boolean ifValid = Stream.of(CartEventType.ADD_ITEM, CartEventType.REMOVE_ITEM)
  4. .anyMatch(cartEventType ->
  5. cartEvent.getCartEventType().equals(cartEventType));
  6. if (ifValid) {
  7. amountMap.put(cartEvent.getProductId(),
  8. amountMap.getOrDefault(cartEvent.getProductId(), 0) +
  9. (cartEvent.getAmount() * (cartEvent.getCartEventType()
  10. .equals(CartEventType.ADD_ITEM) ? 1 : -1)));
  11. }
  12. return this;
  13. }

实现倒是比较简单,我们首先来检查要合并的事件是不是添加商品或移除商品事件,如果是的话,我们就根据事件类型以及对应的数量变更来改变当前购物车中该商品的数量(amountMap)。

使用Rx版本的Vert.x JDBC

我们现在已经了解购物车微服务中的实体类了,下面该看看购物车事件存储服务了。

之前用callback-based API写Vert.x JDBC操作总感觉心累,还好Vert.x支持与RxJava进行整合,并且几乎每个Vert.x组件都有对应的Rx版本!是不是瞬间感觉整个人都变得Reactive了呢~(⊙o⊙) 这里我们就来使用Rx版本的Vert.x JDBC来写我们的购物车事件存储服务。也就是说,里面所有的异步方法都将是基于Observable的,很有FRP风格!

我们首先定义了一个简单的CRUD接口SimpleCrudDataSource

</>复制代码

  1. public interface SimpleCrudDataSource {
  2. Observable save(T entity);
  3. Observable retrieveOne(ID id);
  4. Observable delete(ID id);
  5. }

接着定义了一个CartEventDataSource接口,定义了购物车事件获取的相关方法:

</>复制代码

  1. public interface CartEventDataSource extends SimpleCrudDataSource {
  2. Observable streamByUser(String userId);
  3. }

可以看到这个接口只有一个方法 —— streamByUser方法会返回某一用户对应的购物车事件流,这样后面我们就可以对其进行流式变换操作了!

下面我们来看一下服务的实现类CartEventDataSourceImpl。首先是save方法,它将一个事件存储至事件数据库中:

</>复制代码

  1. @Override
  2. public Observable save(CartEvent cartEvent) {
  3. JsonArray params = new JsonArray().add(cartEvent.getCartEventType().name())
  4. .add(cartEvent.getUserId())
  5. .add(cartEvent.getProductId())
  6. .add(cartEvent.getAmount())
  7. .add(cartEvent.getCreatedAt() > 0 ? cartEvent.getCreatedAt() : System.currentTimeMillis());
  8. return client.getConnectionObservable()
  9. .flatMap(conn -> conn.updateWithParamsObservable(SAVE_STATEMENT, params))
  10. .map(r -> null);
  11. }

看看我们的代码,在对比对比普通的callback-based的Vert.x JDBC,是不是更加简洁,更加Reactive呢?我们可以非常简单地通过getConnectionObservable方法获取数据库连接,然后组合updateWithParamsObservable方法执行对应的含参SQL语句。只需要两行有木有!而如果用callback-based的风格的话,你只能这么写:

</>复制代码

  1. client.getConnection(ar -> {
  2. if (ar.succeeded) {
  3. SQLConnection connection = ar.result();
  4. connection.updateWithParams(SAVE_STATEMENT, params, ar2 -> {
  5. // ...
  6. })
  7. } else {
  8. resultHandler.handle(Future.failedFuture(ar.cause()));
  9. }
  10. })

因此,使用RxJava是非常愉快的一件事!当然vertx-sync也是一个不错的选择。

当然,不要忘记返回的Observablecold 的,因此只有在它被subscribe的时候,数据才会被发射。

不过话说回来了,Vert.x JDBC底层本质还是阻塞型的调用,要实现真正的异步数据库操作,我们可以利用 Vert.x MySQL / PostgreSQL Client 这个组件,底层使用Scala写的异步数据库操作库,不过目前还不是很稳定,大家可以自己尝尝鲜。

下面我们再来看一下retrieveOne方法,它从数据存储中获取特定ID的事件:

</>复制代码

  1. @Override
  2. public Observable retrieveOne(Long id) {
  3. return client.getConnectionObservable()
  4. .flatMap(conn -> conn.queryWithParamsObservable(RETRIEVE_STATEMENT, new JsonArray().add(id)))
  5. .map(ResultSet::getRows)
  6. .filter(list -> !list.isEmpty())
  7. .map(res -> res.get(0))
  8. .map(this::wrapCartEvent);
  9. }

非常简洁明了,就像之前我们的基于Future的范式相似,因此这里就不再详细解释了~

下面我们来看一下里面最重要的方法 —— streamByUser方法:

</>复制代码

  1. @Override
  2. public Observable streamByUser(String userId) {
  3. JsonArray params = new JsonArray().add(userId).add(userId);
  4. return client.getConnectionObservable()
  5. .flatMap(conn -> conn.queryWithParamsObservable(STREAM_STATEMENT, params))
  6. .map(ResultSet::getRows)
  7. .flatMapIterable(item -> item) // list merge into observable
  8. .map(this::wrapCartEvent);
  9. }

其核心在于它的SQL语句STREAM_STATEMENT

</>复制代码

  1. SELECT * FROM cart_event c
  2. WHERE c.user_id = ? AND c.created_at > coalesce(
  3. (SELECT created_at FROM cart_event
  4. WHERE user_id = ? AND (`type` = "CHECKOUT" OR `type` = "CLEAR_CART")
  5. ORDER BY cart_event.created_at DESC
  6. LIMIT 1),
  7. 0)
  8. ORDER BY c.created_at ASC;

此SQL语句执行时会获取与当前购物车相关的所有购物车事件。注意到我们有许多用户,每个用户可能会有许多购物车事件,它们属于不同时间的购物车,那么如何来获取相关的事件呢?方法是 —— 首先我们获取最近一次“终结”事件发生对应的时间,那么当前购物车相关的购物车事件就是在此终结事件发生后所有的购物车事件。

明白了这一点,我们再回到streamByUser方法中来。既然此方法是从数据库中获取一个事件列表,那么为什么此方法返回Observable而不是Observable>呢?我们来看看其中的奥秘 —— flatMapIterable算子,它将一个序列变换为一串数据流。所以,这里的Observable与Vert.x中的Future以及Java 8中的CompletableFuture就有些不同了。CompletableFuture更像是RxJava中的Single,它仅仅发送一个值或一个错误信息,而Observable本身则就像是一个数据流,数据源源不断地从发布者流向订阅者。之前retrieveOnesave方法中返回的Observable的使用更像是一个Single,但是在streamByUser方法中,Observable是真真正正的事件数据流。我们将会在购物车服务ShoppingCartService中处理事件流。

哇!现在你一定又被Rx这种函数响应式风格所吸引了~在下面的部分中,我们将探索购物车服务及其实现,基于Future,同样非常Reactive!

根据购物车事件序列构建对应的购物车状态

我们首先来看一下ShoppingCartService —— 购物车服务接口,它也是一个Event Bus服务:

</>复制代码

  1. @VertxGen
  2. @ProxyGen
  3. public interface ShoppingCartService {
  4. /**
  5. * The name of the event bus service.
  6. */
  7. String SERVICE_NAME = "shopping-cart-eb-service";
  8. /**
  9. * The address on which the service is published.
  10. */
  11. String SERVICE_ADDRESS = "service.shopping.cart";
  12. @Fluent
  13. ShoppingCartService addCartEvent(CartEvent event, Handler> resultHandler);
  14. @Fluent
  15. ShoppingCartService getShoppingCart(String userId, Handler> resultHandler);
  16. }

这里我们定义了两个方法:addCartEvent用于将购物车事件存储至事件存储中;getShoppingCart方法用于获取某个用户当前购物车的状态。

下面我们来看一下其实现类 —— ShoppingCartServiceImpl。首先是addCartEvent方法,它非常简单:

</>复制代码

  1. @Override
  2. public ShoppingCartService addCartEvent(CartEvent event, Handler> resultHandler) {
  3. Future future = Future.future();
  4. repository.save(event).toSingle().subscribe(future::complete, future::fail);
  5. future.setHandler(resultHand
  6. return this;
  7. }

正如之前我们所提到的,这里save方法返回的Observable其实更像个Single,因此我们将其通过toSingle方法变换为Single,然后通过subscribe(future::complete, future::fail)将其转化为Future以便于给其绑定一个Handler>类型的处理函数。

getShoppingCart方法的逻辑位于aggregateCartEvents方法中,此方法非常重要,并且是基于Future的。我们先来看一下代码:

</>复制代码

  1. private Future aggregateCartEvents(String userId) {
  2. Future future = Future.future();
  3. // aggregate cart events into raw shopping cart
  4. repository.streamByUser(userId) // (1)
  5. .takeWhile(cartEvent -> !CartEvent.isTerminal(cartEvent.getCartEventType())) // (2)
  6. .reduce(new ShoppingCart(), ShoppingCart::incorporate) // (3)
  7. .toSingle()
  8. .subscribe(future::complete, future::fail); // (4)
  9. return future.compose(cart ->
  10. getProductService() // (5)
  11. .compose(service -> prepareProduct(service, cart)) // (6) prepare product data
  12. .compose(productList -> generateCurrentCartFromStream(cart, productList)) // (7) prepare product items
  13. );
  14. }

我们来详细地解释一下。首先我们先创建个Future,然后先通过repository.streamByUser(userId)方法获取事件流(1),然后我们使用takeWhile算子来获取所有的ADD_ITEMREMOVE_ITEM类型的事件(2)。takeWhile算子在判定条件变为假时停止发射新的数据,因此当事件流遇到一个终结事件时,新的事件就不再往外发送了,之前的事件将会继续被传递。

下面就是产生购物车状态的过程了!我们通过reduce算子将事件流来“聚合”成购物车实体(3)。这个过程可以总结为以下几步:首先我们先创建一个空的购物车,然后依次将各个购物车事件“合并”至购物车实体中。最后聚合而成的购物车实体应该包含一个完整的amountMap

现在此Observable已经包含了我们想要的初始状态的购物车了。我们将其转化为Single然后通过subscribe(future::complete, future::fail)转化为Future(4)。

现在我们需要更多的信息以组件一个完整的购物车,所以我们首先组合getProductService异步方法来从服务发现层获取商品服务(5),然后通过prepareProduct方法来获取需要的商品数据(6),最后通过generateCurrentCartFromStream异步方法组合出完整的购物车实体(7)。这里面包含了好几个组合过程,我们来一一解释。

首先来看getProductService异步方法。它用于从服务发现层获取商品服务,然后返回其异步结果:

</>复制代码

  1. private Future getProductService() {
  2. Future future = Future.future();
  3. EventBusService.getProxy(discovery,
  4. new JsonObject().put("name", ProductService.SERVICE_NAME),
  5. future.completer());
  6. return future;
  7. }

现在我们获取到商品服务了,那么下一步自然是获取需要的商品数据了。这个过程通过prepareProduct异步方法实现:

</>复制代码

  1. private Future> prepareProduct(ProductService service, ShoppingCart cart) {
  2. List> futures = cart.getAmountMap().keySet() // (1)
  3. .stream()
  4. .map(productId -> {
  5. Future future = Future.future();
  6. service.retrieveProduct(productId, future.completer());
  7. return future; // (2)
  8. })
  9. .collect(Collectors.toList()); // (3)
  10. return Functional.sequenceFuture(futures); // (4)
  11. }

在此实现中,首先我们从amountMap中获取购物车中所有商品的ID(1),然后我们根据每个ID异步调用商品服务的retrieveProduct方法并且以Future包装(2),然后将此流转化为List>类型的列表(3)。我们这里想获得的是所有商品的异步结果,即Future>,那么如何转换呢?这里我写了一个辅助函数sequenceFuture来实现这样的变换,它位于io.vertx.blueprint.microservice.common.functional包下的Functional类中:

</>复制代码

  1. public static Future> sequenceFuture(List> futures) {
  2. return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()])) // (1)
  3. .map(v -> futures.stream()
  4. .map(Future::result) // (2)
  5. .collect(Collectors.toList()) // (3)
  6. );
  7. }

此方法对于想将一个Future序列变换成单个Future的情况非常有用。这里我们首先调用CompositeFutureImpl类的all方法(1),它返回一个组合的Future,当且仅当序列中所有的Future都成功完成时,它为成功状态,否则为失败状态。下面我们就对此组合Future做变换:获取每个Future对应的结果(因为all方法已经强制获取所有结果),然后归结成列表(3)。

回到之前的组合中来!现在我们得到了我们需要的商品信息列表List,接下来就根据这些信息来构建完整的购物车实体了!我们来看一下generateCurrentCartFromStream方法的实现:

</>复制代码

  1. private Future generateCurrentCartFromStream(ShoppingCart rawCart, List productList) {
  2. Future future = Future.future();
  3. // check if any of the product is invalid
  4. if (productList.stream().anyMatch(e -> e == null)) { // (1)
  5. future.fail("Error when retrieve products: empty");
  6. return future;
  7. }
  8. // construct the product items
  9. List currentItems = rawCart.getAmountMap().entrySet() // (2)
  10. .stream()
  11. .map(item -> new ProductTuple(getProductFromStream(productList, item.getKey()), // (3)
  12. item.getValue())) // (4) amount value
  13. .filter(item -> item.getAmount() > 0) // (5) amount must be greater than zero
  14. .collect(Collectors.toList());
  15. ShoppingCart cart = rawCart.setProductItems(currentItems); // (6)
  16. return Future.succeededFuture(cart); // (7)
  17. }

看起来非常混乱的样子。。。不要担心,我们慢慢来~注意这个方法本身不是异步的,但我们需要表示此方法成功或失败两种状态(即AsyncResult),所以此方法仍然返回Future。首先我们创建一个Future,然后通过anyMatch方法检查商品列表是否合法(1)。若不合法,返回一个失败的Future;若合法,我们对每个商品依次构建出对应的ProductTuple。在(3)中,我们通过这个构造函数来构建ProductTuple:

</>复制代码

  1. public ProductTuple(Product product, Integer amount) {
  2. this.productId = product.getProductId();
  3. this.sellerId = product.getSellerId();
  4. this.price = product.getPrice();
  5. this.amount = amount;
  6. }

其中第一个参数是对应的商品实体。为了从列表中获取对应的商品实体,我们写了一个getProductFromStream方法:

</>复制代码

  1. private Product getProductFromStream(List productList, String productId) {
  2. return productList.stream()
  3. .filter(product -> product.getProductId().equals(productId))
  4. .findFirst()
  5. .get();
  6. }

当每个商品的ProductTuple都构建完毕的时候,我们就可以将列表赋值给对应的购物车实体了(6),并且返回购物车实体结果(7)。现在我们终于整合出一个完整的购物车了!

结算 - 根据购物车产生订单

现在我们已经选好了自己喜爱的商品,把购物车填的慢慢当当了,下面是时候进行结算了!我们这里同样定义了一个结算服务接口CheckoutService,它只包含一个特定的方法:checkout

</>复制代码

  1. @VertxGen
  2. @ProxyGen
  3. public interface CheckoutService {
  4. /**
  5. * The name of the event bus service.
  6. */
  7. String SERVICE_NAME = "shopping-checkout-eb-service";
  8. /**
  9. * The address on which the service is published.
  10. */
  11. String SERVICE_ADDRESS = "service.shopping.cart.checkout";
  12. /**
  13. * Order event source address.
  14. */
  15. String ORDER_EVENT_ADDRESS = "events.service.shopping.to.order";
  16. /**
  17. * Create a shopping checkout service instance
  18. */
  19. static CheckoutService createService(Vertx vertx, ServiceDiscovery discovery) {
  20. return new CheckoutServiceImpl(vertx, discovery);
  21. }
  22. void checkout(String userId, Handler> handler);
  23. }

接口非常简单,下面我们来看其实现 —— CheckoutServiceImpl类。尽管接口只包含一个checkout方法,但我们都知道结算过程可不简单。。。它包含库存检测、付款(这里暂时省掉了)以及生成订单的逻辑。我们先来看看checkout方法的源码:

</>复制代码

  1. @Override
  2. public void checkout(String userId, Handler> resultHandler) {
  3. if (userId == null) { // (1)
  4. resultHandler.handle(Future.failedFuture(new IllegalStateException("Invalid user")));
  5. return;
  6. }
  7. Future cartFuture = getCurrentCart(userId); // (2)
  8. Future orderFuture = cartFuture.compose(cart ->
  9. checkAvailableInventory(cart).compose(checkResult -> { // (3)
  10. if (checkResult.getBoolean("res")) { // (3)
  11. double totalPrice = calculateTotalPrice(cart); // (4)
  12. // 创建订单实体
  13. Order order = new Order().setBuyerId(userId) // (5)
  14. .setPayId("TEST")
  15. .setProducts(cart.getProductItems())
  16. .setTotalPrice(totalPrice);
  17. // 设置订单流水号,然后向订单组件发送订单并等待回应
  18. return retrieveCounter("order") // (6)
  19. .compose(id -> sendOrderAwaitResult(order.setOrderId(id))) // (7)
  20. .compose(result -

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/65105.html

相关文章

  • Vert.x Blueprint 系列教程(二) | 开发基于消息的应用 - Vert.x Kue

    摘要:本文章是蓝图系列的第二篇教程。这就是请求回应模式。好多属性我们一个一个地解释一个序列,作为的地址任务的编号任务的类型任务携带的数据,以类型表示任务优先级,以枚举类型表示。默认优先级为正常任务的延迟时间,默认是任务状态,以枚举类型表示。 本文章是 Vert.x 蓝图系列 的第二篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待办事项服务开发教程 Vert.x B...

    elina 评论0 收藏0
  • Vert.x Blueprint 系列教程(一) | 待办事项服务开发教程

    摘要:本文章是蓝图系列的第一篇教程。是事件驱动的,同时也是非阻塞的。是一组负责分发和处理事件的线程。注意,我们绝对不能去阻塞线程,否则事件的处理过程会被阻塞,我们的应用就失去了响应能力。每个负责处理请求并且写入回应结果。 本文章是 Vert.x 蓝图系列 的第一篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待办事项服务开发教程 Vert.x Blueprint 系...

    frank_fun 评论0 收藏0
  • Vert.x Blueprint 系列教程(二) | Vert.x Kue 教程(Web部分)

    摘要:上部分蓝图教程中我们一起探索了如何用开发一个基于消息的应用。对部分来说,如果看过我们之前的蓝图待办事项服务开发教程的话,你应该对这一部分非常熟悉了,因此这里我们就不详细解释了。有关使用实现的教程可参考蓝图待办事项服务开发教程。 上部分蓝图教程中我们一起探索了如何用Vert.x开发一个基于消息的应用。在这部分教程中,我们将粗略地探索一下kue-http模块的实现。 Vert.x Kue ...

    Kerr1Gan 评论0 收藏0
  • Vert.x入坑须知(4)

    摘要:主要是避免引入太多的复杂性,并且出于灵活部署的需要。以应用为例,由于实际上是在上执行,若它被阻塞,即导致后续请求全部无法得到处理。因此,最合适的做法就是对于简单业务,采用异步库。本系列其他文章入坑须知入坑须知入坑须知 最开始觉得这个系列也就最多3篇了不起了(因为事不过三嘛),没曾想居然迎来了第四篇! Kotlin 由于最近决定投身到区块链的学习当中的缘故,出于更好的理解它的基本概念,自...

    summerpxy 评论0 收藏0
  • 【小项目】全栈开发培训手册 | 后端(1) vert.x框架理解

    摘要:二来,给大家新开坑的项目一个参考。因此,本系列以主要以官方文档为基础,将尽可能多的特性融入本项目,并标注官网原文出处,有兴趣的小伙伴可点击深入了解。可以通过一些特殊协议例如将消息作为统一消息服务导出。下载完成后自行修改和。 开坑前言 我给这个专栏的名气取名叫做小项目,听名字就知道,这个专题最终的目的是带领大家完成一个项目。为什么要开这么大一个坑呢,一来,虽然网上讲IT知识点的书籍铺天盖...

    hightopo 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<