资讯专栏INFORMATION COLUMN

Apache Beam学习笔记——几种常见的处理类Transform

Chiclaim / 3474人阅读

摘要:要说在中常见的函数是哪一个,当然是。是一个实现了接口的抽象类,其中是数据处理方法,强制子类必须实现。以上为学习一天的总结,有错误欢迎指正。相同的是这个方法处理的都是中的一个元素。

在阅读本文前,可先看一下官方的WordCount代码, 对Apache Beam有大概的了解。

要说在Apache Beam中常见的函数是哪一个,当然是apply()。常见的写法如下:

[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])

而在最简单的wordcount代码中,就出现了许多种不同的传入参数类型,除了输入输出的部分,还包括
1)使用ParDo.of():

.apply("ExtractWords-joe",
        ParDo.of(new DoFn() {
            @ProcessElement
            public void processElement(ProcessContext context) {
                System.out.println(context.element()+"~");
                for (String word : context.element().split(" ")) {
                    if (!word.isEmpty()) {
                        //输出到Output PCollection
                        context.output(word);
                    }
                }
            }
        })
)

2)使用MapElements.via():

.apply("FomatResults",
        MapElements.via(new SimpleFunction,String>() {
            @Override
            public String apply(KV input) {
                return input.getKey()+":"+input.getValue();
            }
        }))

3)以及使用PTransform子类:

.apply(new CountWords())
  public static class CountWords extends PTransform,
      PCollection>> {
    @Override
    public PCollection> expand(PCollection lines) {

      // Convert lines of text into individual words.
      PCollection words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection> wordCounts =
          words.apply(Count.perElement());

      return wordCounts;
    }
  }

这么多种传入方式到底有什么联系?通过查看源码可以看出apply函数的定义如下:

  public  OutputT apply(
      String name, PTransform root) {
    return begin().apply(name, root);
  }

传入的参数为PTransform类对象,也就是这几种传入参数其实都是PTransform类的变形
PTransform是一个实现了Serializable接口的抽象类,其中public abstract OutputT expand(InputT input); 是数据处理方法,强制子类必须实现。
因此第(3)种方式很容易理解,就是通过继承PTransform并实现了expand方法定义了CountWords类,给apply方法传递了一个CountWords对象。

在第(2)种方式中,MapElements是PTransform的子类,实现了expand方法,其实现方式是调用@Nullable private final SimpleFunction fn;成员中定义的数据处理方法,MapElements.via()则是一个为初始化fn的静态方法,定义如下:

  public static  MapElements via(
      final SimpleFunction fn) {
    return new MapElements<>(fn, null, fn.getClass());
  }

传入了一个SimpleFunction对象,SimpleFunction是一个必须实现public OutputT apply(InputT input) 方法的抽象类,用户在该apply方法中实现数据处理。
所以这种方式的实现方式如下:
定义SimpleFunction的子类并实现其中的apply方法,将该子类的对象传递给MapElements.via()。

第(1)种方式中,ParDo.of()方法传入一个DoFn对象, 返回一个SingleOutput对象:

  public static  SingleOutput of(DoFn fn) {
    validate(fn);
    return new SingleOutput(
        fn, Collections.>emptyList(), displayDataForFn(fn));
  }

SingleOutput与MapElements类似,也是PTransform的子类,实现了expand方法,使用private final DoFn fn;成员中的方法进行数据处理。
而DoFn是一个抽象类,用户必须实现其注解方法(存疑) public void processElement(ProcessContext c)。
所以这种方式的实现方式如下:
定义DoFn的子类并实现其中的processElement方法,将该子类的对象传递给ParDo.of()。
需要注意的是processElement方法与前2种方式不同,输入和输出数据都是在传入参数ProcessContext c中,而不是通过return进行传递。

以上为学习Apache Beam一天的总结,有错误欢迎指正。

**

Day2补充,3种方式的区别和联系:

**
1)MapElement.via(SimpleFunction)和PTransform
MapElements是PTransform的一个子类:
public class MapElements
extends PTransform, PCollection>
从泛型参数来看,PTransform处理的是PCollection,而MapElement处理的是PCollection中的一个元素,对比SimpleFunction的apply方法和PTransform的expand方法的实现方式得到验证。

2)MapElement.via(SimpleFunction)和ParDo.of(DoFn)
区别之前已经说过,DoFn的processElement方法的输入和输出都是从参数传入,而SimpleFunction的apply方法从参数传入输入,从return传出输出。
相同的是这2个方法处理的都是PCollection中的一个元素。
查看MapElement的expand方法源码:

@Override
public PCollection expand(PCollection input) {
  checkNotNull(fn, "Must specify a function on MapElements using .via()");
  return input.apply(
      "Map",
      ParDo.of(
          new DoFn() {
            @ProcessElement
            public void processElement(ProcessContext c) {
              c.output(fn.apply(c.element()));
            }
    //部分代码忽略
          }));
}

可以看出其实也是实现了DoFn的子类,在DoFn的processElement方法中调用SimpleFunction对象的apply方法进行处理。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70467.html

相关文章

  • Apache beam其他学习记录

    摘要:与用于与的转换。其中方法返回的是在中的位置下标。对于设置了多个触发器的,自动选择最后一个触发的结算结果。其他不是线程安全的,一般建议处理方法是幂等的。 Combine与GroupByKey GroupByKey是把相关key的元素聚合到一起,通常是形成一个Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是对聚...

    jasperyang 评论0 收藏0
  • Apache Beam分窗与触发器

    摘要:需要注意的是和方法生成的触发器是连续的而不是一次性的。其他的还有一次性触发器将一次性触发器变为连续型触发器,触发后再次等待触发。例如与一起用可以实现每个数据到达后的分钟进行处理,经常用于全局窗口,可以用触发器来设置停止条件。 本文参考Apache Beam官方编程手册 可以结合官方的Mobile Game 代码阅读本文。 在默认情况下,Apache Beam是不分窗的,也就是采用Gl...

    NickZhou 评论0 收藏0
  • ApacheCN 学习资源汇总 2018.11

    摘要:首页地址关于我们我们不是的官方组织机构团体,只是技术栈以及的爱好者基础编程思想和大数据中文文档中文文档中文文档中文文档中文文档中文文档中文文档中文文档中文文档中文文档区块链中文文档数学笔记线性代数笔记数据科学中文文档中文文档中文文档课本计算 首页地址:http://www.apachecn.org关于我们:http://www.apachecn.org/about 我们不是 Apach...

    Ethan815 评论0 收藏0
  • ApacheCN 学习资源汇总 2018.11

    摘要:首页地址关于我们我们不是的官方组织机构团体,只是技术栈以及的爱好者基础编程思想和大数据中文文档中文文档中文文档中文文档中文文档中文文档中文文档中文文档中文文档中文文档区块链中文文档数学笔记线性代数笔记数据科学中文文档中文文档中文文档课本计算 首页地址:http://www.apachecn.org关于我们:http://www.apachecn.org/about 我们不是 Apach...

    Rocture 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<