资讯专栏INFORMATION COLUMN

使用函数计算对表格存储中数据做简单清洗

XUI / 593人阅读

摘要:是用于获取表中增量数据的一个数据通道,通过创建触发器,能够实现和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理表中发生的数据修改。开启数据源表的功能触发器功能需要先开启数据表的功能,才能在函数计算中处理写入表格存储中的增量数据。

摘要: 表格存储的增量数据流功能能够使用户使用API获取Table Store表中增量数据,并可以进行增量数据流的实时增量分析、数据增量同步等。通过创建Table Store触发器,能够实现Table Store Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Table Store表中发生的数据修改,充分的利用了函数计算全托管、弹性伸缩的特点。

函数计算(Function Compute) 是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而用户只需根据实际代码运行所消耗的资源进行付费。

Table Store Stream是用于获取Table Store表中增量数据的一个数据通道,通过创建Table Store触发器,能够实现Table Store Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Table Store表中发生的数据修改。

表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储,我们可以将数据写入到表格存储中,同时在函数计算中对新增的数据做简单的清洗、转换、聚合计算等操作,并将清洗之后的数据写回到表格存储的结果表中,并对原始明细数据及结果数据提供实时访问。

下面,我们使用函数计算对表格存储中的数据做简单的清洗,并写入到结果表中。

数据定义
我们假设写入的为日志数据,包括三个基础字段:

字段名称 类型 含义
id 整型 日志id
level 整型 日志的等级,越大表明等级越高
message 字符串 日志的内容
我们需要将 level>1 的日志写入到另外一张数据表中,用作专门的查询。

实现过程:
创建实例及数据表
在表格存储的控制台创建表格存储实例(__本次以 华东2 distribute-test 为例__),并创建源表(__source_data__)及结果表(__result__),主键为均 __id (整型)__,由于表格存储是 schemafree 结构,无需预先定义其他属性列字段。

开启数据源表的Stream功能
触发器功能需要先开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。

Stream记录过期时长 为通过 StreamAPI 能够读取到的增量数据的最长时间。

由于触发器只能绑定现有的函数,故先到函数计算的控制台上在同region创建服务及函数。

创建函数计算服务
在函数计算的控制台上创建服务及处理函数,我们继续使用华东2节点。

1.在华东2节点创建服务。

2.创建函数依次选择:空白函数——不创建触发器。

函数名称为:etl_test,选择 python2.7 环境,在线编辑代码
函数入口为:etl_test.handler
代码稍后编辑,点击下一步。
3.进行服务授权

由于函数计算需要将运行中的日志写入到日志服务中,同时,需要对表格存储的表进行读写,故需要对函数计算进行授权,为方便起见,我们先添加 AliyunOTSFullAccess 与 __AliyunLogFullAccess __权限,实际生产中,建议根据权限最小原则来添加权限。

4.点击授权完成,并创建函数。

5.修改函数代码。

创建好函数之后,点击对应的函数—代码执行,编辑代码并保存,其中,INSTANCE_NAME(表格存储的实例名称)、REGION(使用的区域)需要根据情况进行修改:

使用示例代码如下:

!/usr/bin/env python -- coding: utf-8 --

import cbor
import json
import tablestore as ots

INSTANCE_NAME = "distribute-test"
REGION = "cn-shanghai"
ENDPOINT = "http://%s.%s.ots-internal.ali..."%(INSTANCE_NAME, REGION)
RESULT_TABLENAME = "result"

def _utf8(input):

return str(bytearray(input, "utf-8"))

def get_attrbute_value(record, column):

attrs = record[u"Columns"]
for x in attrs:
    if x[u"ColumnName"] == column:
        return x["Value"]

def get_pk_value(record, column):

attrs = record[u"PrimaryKey"]
for x in attrs:
    if x["ColumnName"] == column:
        return x["Value"]
由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限

def get_ots_client(context):

creds = context.credentials
client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
return client

def save_to_ots(client, record):

id = int(get_pk_value(record, "id"))
level = int(get_attrbute_value(record, "level"))
msg = get_attrbute_value(record, "message")

pk = [(_utf8("id"), id),]
attr = [(_utf8("level"), level), (_utf8("message"), _utf8(msg)),]
row = ots.Row(pk, attr)
client.put_row(RESULT_TABLENAME, row)

def handler(event, context):

records = cbor.loads(event)
#records = json.loads(event)
client = get_ots_client(context)
for record in records["Records"]:
    level = int(get_attrbute_value(record, "level"))
    if level > 1:
        save_to_ots(client, record)
    else:
        print "Level <= 1, ignore."

对表格存储 Stream 数据的格式详情请参考Stream 数据处理

绑定触发器
1.回到表格存储的实例管理页面,点击表 source_data 后的 使用触发器 按钮,进入触发器绑定界面,点击使用已有函数计算, 选择刚创建的服务及函数,勾选 表格存储发送事件通知的权限, 进行确定。

2.绑定成功之后,能够看到如下的信息:

运行验证
1.向 source_data 表中写入数据。

2.在 result 表中查询清洗后的数据

点击 result 表的数据管理页面,会查询到刚写入到 source_data 中的数据。
当然,向 soure_data 写入level <=1的数据将不会同步到 result 表中

原文链接

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/8050.html

相关文章

  • XCel 项目总结 - Electron 与 Vue 的性能优化

    摘要:而这里的单元格信息是唯一的,所以直接通过为一个空对象赋值即可。和相关的知识和技巧高亮的列单元格采用展示。在中,被选中的单元格会高亮相应的行和列,以提醒用户。 showImg(https://segmentfault.com/img/bVGkdk?w=900&h=500); XCEL 是一个 Excel 数据清洗工具,其通过可视化的方式让用户轻松地对 Excel 数据进行筛选。 XCEL...

    XUI 评论0 收藏0
  • 精读《Tableau 入门》

    摘要:可以看到,遥遥领先的城市有三个,加州是销售之王。将再拖拽到,并右键将其粒度改为月。从上图可以看到,指定了个分类,最右上角加州就是最突出的一组,整个聚类只有它一个元素,而画面偏左下角的也是一类,这些是业绩较差的一组数据。 1. 引言 引用著名瑞典统计学家 Hans Rosling 的一句话:想法来源于数字、信息,再到理解。 分析数据的最好方式是可视化,因为可视化承载的信息密度更高,甚至可...

    svtter 评论0 收藏0
  • 打通本地部署和公有云,混合云架构让“鱼”和“熊掌”兼得(一)

    摘要:对于上述问题,混合云架构无疑是企业的最佳选择。解决方案将本地环境与公有云连通组成混合云架构,实现对本地环境计算能力的快速扩展。前言当前各行各业在积极拥抱云计算,但由于一些历史原因和合规要求导致很多企业全面上云比较困难,比如企业监管制度及合规要求一些核心数据库必须保留在本地数据中心;本地数据中心作为企业固定资产不容易完全抛弃;有些大型集团企业IT架构复杂,全面迁移上云的影响难以评估等等。因此,...

    Tecode 评论0 收藏0
  • 打通本地部署和公有云,混合云架构让“鱼”和“熊掌”兼得(一)

    摘要:对于上述问题,混合云架构无疑是企业的最佳选择。解决方案将本地环境与公有云连通组成混合云架构,实现对本地环境计算能力的快速扩展。前言当前各行各业在积极拥抱云计算,但由于一些历史原因和合规要求导致很多企业全面上云比较困难,比如企业监管制度及合规要求一些核心数据库必须保留在本地数据中心;本地数据中心作为企业固定资产不容易完全抛弃;有些大型集团企业IT架构复杂,全面迁移上云的影响难以评估等等。因此,...

    Tecode 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<