摘要:一直接访问引入的相关包使用代替给指定配置与访问本地文件一样访问文件实际测试中发现本地如能够成功读写,但是集群模式下如读写失败,原因未知。二通过访问除了直接读写的数据,还可以通过来进行读写。
一、直接访问
1.引入HDFS的相关jar包:
org.apache.beam beam-sdks-java-io-hadoop-file-system 2.1.0
2.使用HadoopFileSystemOptions代替PipelineOptions
public interface WordCountOptions extends HadoopFileSystemOptions { @Description("input file") @Default.String("hdfs://localhost:9000/tmp/words2") String getInputFile(); void setInputFile(String in); @Description("output") @Default.String("hdfs://localhost:9000/tmp/hdfsWordCount") String getOutput(); void setOutput(String out); }
3.给Options指定HDFS配置
Configuration conf=new Configuration(); conf.set("fs.default.name", "hdfs://localhost:9000"); HDFSOption options= PipelineOptionsFactory.fromArgs(args).withValidation() .as(HDFSOption.class); options.setHdfsConfiguration(ImmutableList.of(conf));
4.与访问本地文件一样访问HDFS文件
Pipeline p = Pipeline.create(options); Data = p.apply("Read from HDFS", TextIO.read().from(options.getInputFile()));
实际测试中发现本地runner(如Direct, Flink Local, Spark Local...)能够成功读写HDFS,但是集群模式下(如Flink Cluster, Spark Cluster...)读写HDFS失败,原因未知。
二、通过HBase访问除了直接读写HDFS的数据,还可以通过HBase来进行读写。
1.添加相关jar包
org.apache.beam beam-sdks-java-io-hbase ${beam.verson}
2.设置HBase连接信息
Configuration conf = new Configuration(); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); conf.setStrings("hbase.master.hostname", "localhost"); conf.setStrings("hbase.regionserver.hostname", "localhost");
3.使用上述的conf读HBase数据
pipe //指定配置和表名 .apply("Read from HBase", HBaseIO.read().withConfiguration(conf).withTableId("test_tb")) .apply(ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { //读到的数据是HBase API中定义的Result格式,需要按照HBase官方说明进行剥取 Result result = c.element(); String rowkey = Bytes.toString(result.getRow()); System.out.println("row key: "); for(Cell cell : result.listCells()) { System.out.println("qualifier:"+Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("value:"+Bytes.toString(CellUtil.cloneValue(cell))); } c.output(rowkey); } }));
4.写入到HBase
//写入前需要将string数据封装为Hbase数据格式mutation .apply(ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext context) { byte[] qual = Bytes.toBytes("qual"); byte[] cf = Bytes.toBytes("cf"); byte[] row = Bytes.toBytes("kafka"); byte[] val = Bytes.toBytes(context.element()); final Charset UTF_8 = Charset.forName("UTF-8"); Mutation mutation = new Put(row).addColumn(cf, qual, val); context.output(mutation); } })) .apply("write to Hbase", HBaseIO.write() .withConfiguration(conf) .withTableId("test_tb"));
经测试,无论本地runner还是集群runner都能成功读写。
但是发现一个问题,使用mvn exec:java进行调试成功,而使用shade插件打包成jar运行却一直报错,说Mutation没有指定coder,beam论坛上求助后得到的回复是maven-shade-plugin版本太旧,需要更新到3.0.0以上版本,但我改了3.0的版本之后还是一样的错误。后来添加了ServicesResourceTransformer才解决。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/67699.html
摘要:最近在用做流上的异常检测,期间遇到了很多问题,但是发现网上相关的资料很少,基本只能自己啃文档和瞎尝试。其中如有错漏,欢迎指出。即从一条数据中获得时间戳,然后以的格式返回。丢弃掉中的附加信息使用这一设置时,得到的中的元素是的和组成的键值对。 最近在用Apache beam做流上的异常检测,期间遇到了很多问题,但是发现网上相关的资料很少,基本只能自己啃文档和瞎尝试。所以想把自己踩过的坑记录...
摘要:要说在中常见的函数是哪一个,当然是。是一个实现了接口的抽象类,其中是数据处理方法,强制子类必须实现。以上为学习一天的总结,有错误欢迎指正。相同的是这个方法处理的都是中的一个元素。 在阅读本文前,可先看一下官方的WordCount代码, 对Apache Beam有大概的了解。 要说在Apache Beam中常见的函数是哪一个,当然是apply()。常见的写法如下: [Final Outp...
摘要:需要注意的是和方法生成的触发器是连续的而不是一次性的。其他的还有一次性触发器将一次性触发器变为连续型触发器,触发后再次等待触发。例如与一起用可以实现每个数据到达后的分钟进行处理,经常用于全局窗口,可以用触发器来设置停止条件。 本文参考Apache Beam官方编程手册 可以结合官方的Mobile Game 代码阅读本文。 在默认情况下,Apache Beam是不分窗的,也就是采用Gl...
摘要:与用于与的转换。其中方法返回的是在中的位置下标。对于设置了多个触发器的,自动选择最后一个触发的结算结果。其他不是线程安全的,一般建议处理方法是幂等的。 Combine与GroupByKey GroupByKey是把相关key的元素聚合到一起,通常是形成一个Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是对聚...
阅读 2830·2021-10-14 09:42
阅读 3146·2019-08-30 15:52
阅读 3157·2019-08-30 14:02
阅读 1078·2019-08-29 15:42
阅读 502·2019-08-29 13:20
阅读 1136·2019-08-29 12:24
阅读 443·2019-08-26 10:20
阅读 663·2019-08-23 18:31