摘要:开发指南是为简化计算模型,降低用户使用实时计算的门槛而设计的一套符合标准语义的开发套件。随后,将为该表生成字段,用于记录并表示事件时间。
UFli
接下来,开发者可以根据如下内容,逐渐熟悉并使用 UFli
数据类型 | 说明 | 映射的Java类型 |
---|---|---|
BOOLEAN | 逻辑值,true 或 false | Boolean.class |
INTEGER | 整形,4 字节整数 | Integer.class |
INT | 整型,4 字节整数 | Integer.class |
BIGINT | 长整型,8 字节整数 | Long.class |
TINYINT | 微整型,1 字节整数 | Byte.class |
SMALLINT | 短整型,2 字节整数 | Short.class |
VARCHAR | 变长字符串 | String.class |
FLOAT | 4 字节浮点数 | Float.class |
DOUBLE | 8 字节浮点数 | Double.class |
DATE | 日期类型: | Date.class |
TIMESTAMP | 时间戳类型 | Timestamp.class |
DECIMAL | 小数类型 | BigDecimal.class |
MAP | 映射类型 | li |
ARRAY | List 列表类型 | li |
LIST | List 列表类型 | li |
UFli
在 UFli
CREATE TABLE table_name(
columnName dataType
[ columnName dataType ]*
) WITH (
type = xxxx
xxxx = xxxx
);
以 Kafka 作为数据源表,建表语句示例如下:
CREATE TABLE ut_kafka_source(
user_id VARCHAR
pay_cash FLOAT
pay_time VARCHAR
)WITH(
type = kafka11
bootstrapServers = linux01:9092linux02:9092linux03:9092
topic = flink-test01
groupId = group1
parallelism = 1
);
上述代码用于在 UFli
随后,必须在 WITH 代码块中指定数据来源,以及数据源所使用的框架的必要参数。
可配置参数如下表所示:
参数名 | 必填 | 默认值 | 含义 | 举例 |
---|---|---|---|---|
type | 是 | 无 | 数据源类型 | type = kafka11 |
bootstrapServers | 是 | 无 | Kafka Broker地址 | bootstrapServers = linux01:9092linux02:9092 |
topic | 是 | 无 | 将要消费的 Kafka 主题 | topic = fli |
groupId | 是 | 无 | 消费者组ID | groupId = group01 |
offsetReset | 否 | latest | 启动时从何处消费 | offsetReset = earliest(或:latest、{"0": 12 "1": 32}) |
parallelism | 否 | 1 | 读取数据并行度 | parallelism = 1 |
在 UFli
CREATE TABLE table_name(
columnName dataType
[ columnName dataType ]*
) WITH (
type = xxxx
xxxx = xxxx
);
以 Kafka 作为结果表,建表语句示例如下:
CREATE TABLE ut_kafka_result(
uid VARCHAR
pay_cash FLOAT
sex VARCHAR
age VARCHAR
)WITH(
type = kafka11
bootstrapServers = linux01:9092linux02:9092linux03:9092
topic = flink-test02
parallelism = 1
);
上述代码用于在 UFli
随后,必须在 WITH 代码块中指定数据输出时的相关参数,以及数据源所使用的框架的必要参数。
可配置参数如下表所示:
参数名 | 必填 | 默认值 | 含义 | 举例 |
---|---|---|---|---|
type | 是 | 无 | 数据源类型 | type = kafka11 |
bootstrapServers | 是 | 无 | Kafka Broker地址 | bootstrapServers = linux01:9092linux02:9092 |
topic | 是 | 无 | 将要消费的 Kafka 主题 | topic = fli |
parallelism | 否 | 1 | 读取数据并行度 | parallelism = 1 |
以 MySQL 作为结果表,建表语句示例如下:
CREATE TABLE ut_mysql_result(
uid VARCHAR
pay_cash FLOAT
sex VARCHAR
age VARCHAR
)WITH(
type = mysql
url = jdbc:mysql://linux01:3306/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName = root
password = 123456
tableName = ut_mysql_result
parallelism = 1
);
可配置参数如下表所示:
参数名 | 必填 | 默认值 | 含义 | 举例 |
---|---|---|---|---|
type | 是 | 无 | 数据源类型 | type = mysql |
url | 是 | 无 | MySQL Databa | url = jdbc:mysql://linux01:3306/db_fli |
username | 是 | 无 | 用户名 | userName =root |
password | 是 | 无 | 密码 | password =123456 |
tablename | 是 | 无 | MySQL 中的表名 | tableName =ut_mysql_result |
parallelism | 否 | 1 | 读取数据并行度 | parallelism = 1 |
以 PostgreSQL 作为结果表,建表语句示例如下:
CREATE TABLE ut_postgre_result(
uid VARCHAR
pay_cash FLOAT
sex VARCHAR
age VARCHAR
)WITH(
type = postgre
url = jdbc:postgresql://10.9.11.122:5432/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName = root
password = 123456
tableName = ut_postgre_result
parallelism = 1
);
上述代码展示了如何使用 UFli
可配置参数如下表所示:
参数名 | 必填 | 默认值 | 含义 | 举例 |
---|---|---|---|---|
type | 是 | 无 | 数据源类型 | type = postgre |
url | 是 | 无 | PostgreSQL Databa | url = jdbc:postgresql://10.9.11.122:5432/db_fli |
username | 是 | 无 | 用户名 | userName =root |
password | 是 | 无 | 密码 | password =123456 |
tablename | 是 | 无 | PostgreSQL 中的表名 | tableName =ut_postgre_result |
parallelism | 否 | 1 | 读取数据并行度 | parallelism = 1 |
在 UFli
CREATE TABLE table_name(
columnName dataType
[ columnName dataType ]*
) WITH (
type = xxxx
xxxx = xxxx
SIDE DIMENSION SIGN
);
其中,维表中最后一个字段,必须为SIDE DIMENSION SIGN,以此来标识这是一张维度表。
以 MySQL 作为维度表,建表语句示例如下:
CREATE TABLE ut_mysql_side(
uid VARCHAR
sex VARCHAR
age VARCHAR
PRIMARY KEY(uid)
SIDE DIMENSION SIGN
)WITH(
type =mysql
url =jdbc:mysql://linux01:3306/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName =root
password =123456
tableName =ut_mysql_side
cache =LRU
cacheSize =2
cacheTTLMs =2001
parallelism =1
partitionedJoin=false
);
上述代码用于在 UFli
随后,必须在 WITH 代码块中指定数据输出时的相关参数,以及数据源所使用的框架的必要参数。
可配置参数如下表所示:
参数名 | 必填 | 默认值 | 含义 | 举例 |
---|---|---|---|---|
type | 是 | 无 | 数据源类型 | type = postgre |
url | 是 | 无 | MySQL Databa | url = jdbc:mysql://linux01:3306/db_fli |
username | 是 | 无 | 用户名 | userName =root |
password | 是 | 无 | 密码 | password =123456 |
tablename | 是 | 无 | MySQL 中的表名 | tableName =ut_postgre_result |
cache | 否 | NONE | 维表数据缓存方式 | cache = LRU cache = NONE |
cacheSize | 否 | 无 | 缓存行数,cache属性为LRU时生效 | cacheSize = 100 |
cacheTTLMs | 否 | 无 | 缓存过期时间,cache属性为LRU时生效,单位:毫秒 | cacheTTLMs =60000 |
partitionedJoin | 否 | 无 | 是否在 JOIN 数据之前,将维表数据按照 PRIMARY KEY指定的列进行reduceByKey操作,从而减少维表缓存数据量。 | partitionedJoin = true |
parallelism | 否 | 1 | 读取数据并行度 | parallelism = 1 |
以 MySQL 作为维度表,建表语句示例如下:
CREATE TABLE ut_postgre_side(
uid VARCHAR
sex VARCHAR
age VARCHAR
PRIMARY KEY(uid)
SIDE DIMENSION SIGN
)WITH(
type =postgre
url = jdbc:postgresql://10.9.116.184:5432/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName =root
password =birthDAY+-0230
tableName =ut_postgre_side
parallelism =1
cache =LRU
cacheSize =2
cacheTTLMs =2001
partitionedJoin=false
);
上述代码用于在 UFli
随后,必须在 WITH 代码块中指定数据输出时的相关参数,以及数据源所使用的框架的必要参数。
可配置参数如下表所示:
参数名 | 必填 | 默认值 | 含义 | 举例 |
---|---|---|---|---|
type | 是 | 无 | 数据源类型 | type = postgre |
url | 是 | 无 | PostgreSQL Databa | url = jdbc:postgresql://10.9.11.122:5432/db_fli |
username | 是 | 无 | 用户名 | userName =root |
password | 是 | 无 | 密码 | password =123456 |
tablename | 是 | 无 | PostgreSQL 中的表名 | tableName =ut_postgre_result |
cache | 否 | NONE | 维表数据缓存方式 | cache = LRU cache = NONE |
cacheSize | 否 | 无 | 缓存行数,cache属性为LRU时生效 | cacheSize = 100 |
cacheTTLMs | 否 | 无 | 缓存过期时间,cache属性为LRU时生效,单位:毫秒 | cacheTTLMs =60000 |
partitionedJoin | 否 | 无 | 是否在 JOIN 数据之前,将维表数据按照 PRIMARY KEY指定的列进行reduceByKey操作,从而减少维表缓存数据量。 | partitionedJoin = true |
parallelism | 否 | 1 | 读取数据并行度 | parallelism = 1 |
在 UFli
CREATE VIEW view_name AS SELECT columnName [ columnName]* FROM table_name[view_name];
在 Fli
在 UFli
INSERT INTO
table_name
SELECT
[ (columnName[ columnName]*) ]
queryStatement;
例如如下 SQL 操作:
INSERT INTO
ut_kafka_result
SELECT
t3.uid
t3.pay_cash
t3.sex
t3.age
FROM
(SELECT
t1.*
t2.uid
t2.sex
t2.age
FROM
ut_kafka_source t1
JOIN
ut_mysql_side t2
ON
t1.user_id = t2.uid
WHERE
t2.sex = 男
AND
t1.pay_cash > 100) AS t3
对于 INSERT INTO 操作,需遵循如下约束:
UFli
例如下面的数据:
{"grade_data":[{"class1":{"小明":"98""老王":"100"}}{"class2":{"凯特琳":"88""凯南":"99"}}]}
通过如下 SQL 语句进行 行转列 操作:
CREATE TABLE FUNCTION ParseArrayUDTF WITH cn.ucloud.sql.ParseArrayUDTF;
INSERT INTO
tb_class_grade_result
SELECT
class
student
grade
FROM
tb_class_grade LATERAL TABLE(ParseArrayUDTF(grade_data)) as T(class student grade)
执行完毕后,输出效果为:
class | student | grade |
---|---|---|
class1 | 小明 | 98 |
class1 | 老王 | 100 |
class2 | 凯特琳 | 88 |
class2 | 凯南 | 99 |
其中自定义函数的具体实现可参考第 5 小节中的内容。
同时也可以使用 WHERE 和 JOIN 语法,例如如下 SQL 语句:
SELECT
t3.uid
t3.pay_cash
t3.sex
t3.age
FROM
(SELECT
t1.*
t2.uid
t2.sex
t2.age
FROM
ut_kafka_source t1
JOIN
ut_mysql_side t2
ON
t1.user_id = t2.uid
WHERE
t2.sex = 男
AND
t1.pay_cash > 100) AS t3
另外同时支持 Fli
在 Fli
该类型函数主要特点为“一进一出”,即,依次对某单列数据进行处理,并输出当前列的处理结果(依然为单列)。
编写 UFli
public class XXX extends ScalarFunction{
public OUT eval(IN columnValue){
...
}
}
以对某列数据进行大写转换为例,通过继承 ScalarFunction 类实现 UDF 函数,代码如下:
package cn.ucloud.sql;
public class TransCaseUDF extends ScalarFunction {
private static final Logger LOG = LoggerFactory.getLogger(TransCaseUDF.class);
public String eval(object data){
return new String(data.toString().toLowerCase().getBytes() StandardCharsets.UTF_8);
}
}
其中 eval 为固定方法名。
在 UFli
CREATE SCALAR FUNCTION [UDFName] WITH [UDF Class Name];
注册示例如下所示:
CREATE SCALAR FUNCTION TransCaseUDF WITH cn.ucloud.sql.TransCaseUDF;
结合上述示例,使用该函数方法如下:
INSERT INTO
tb_name
SELECT
TransCaseUDF(name) as upper_name
FROM
tb_user_info;
该类型函数主要特点为 “多进一出”,即,对某几列多行数据进行聚合,并输出聚合结果。
编写 UFli
public static class XXXAccum {...)
public class XXXUDAF extends AggregateFunction {
public XXXAccum createAccumulator(){...} //创建累加器
public T getValue(){...} //获取聚合结果
public void accumulate(){...} //累加
public void merge(){...} //合并多个分区的累加器
public void resetAccumulator(){...} //重置累加器
}
以求某列的平均数为例,实现方法如下:
package cn.ucloud.sql;
import org.apache.flink.table.functions.AggregateFunction;
import java.util.Iterator;
public class AverageUDAF extends AggregateFunction {
public static class AverageAccum {
public float sum count = 0F;
}
@Override
public AverageAccum createAccumulator() {
return new AverageAccum();
}
@Override
public Float getValue(AverageAccum accumulator) {
if(accumulator.count == 0F) return null;
else return accumulator.sum / accumulator.count;
}
public void accumulate(AverageAccum accumulator float value) {
accumulator.sum += value;
accumulator.count += 1F;
}
public void merge(AverageAccum accumulator Iterable it) {
Iterator iter = it.iterator();
while(iter.hasNext()) {
AverageAccum acc = iter.next();
accumulator.sum += acc.sum;
accumulator.count += acc.count;
}
}
public void resetAccumulator(AverageAccum accumulator) {
accumulator.sum = 0F;
accumulator.count = 0F;
}
}
在 UFli
CREATE AGGREGATE FUNCTION [UDAFName] WITH [UDAF Class Name];
结合上述示例,使用该函数方法如下:
CREATE AGGREGATE FUNCTION AverageUDAF WITH cn.ucloud.sql.AverageUDAF;
结合上述示例,使用该函数方法如下:
CREATE AGGREGATE FUNCTION AverageUDAF WITH cn.ucloud.sql.AverageUDAF;
CREATE TABLE ut_kafka_source(
sex VARCHAR
age FLOAT
)WITH(
type =kafka11
bootstrapServers =linux01:9092linux02:9092linux03:9092
topic =flink-test01
groupId = group1
parallelism =1
);
CREATE TABLE ut_mysql_age_result(
sex VARCHAR
average_age FLOAT
)WITH(
type =mysql
url = jdbc:mysql://linux01:3306/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName =root
password =123456
tableName =ut_mysql_age_result
parallelism =1
);
INSERT INTO
ut_mysql_age_result
SELECT
sex
AverageUDAF(age) AS average_age
FROM
ut_kafka_source
GROUP BY
sex;
该类型函数主要特点为 “一进多出”,即,对某一行一列数据进行拆分,并输出结果到某列多行。
编写 UFli
public class ParseArrayUDTF extends TableFunction {
public void eval(T obj) {
...
}
@Override
public TypeInformation getResultType() {
return Types.ROW(A.class B.class C.class);
}
}
以拆分如下数据为例:
{"grade_data":[{"class1":{"小明":"98""老王":"100"}}{"class2":{"凯特琳":"88""凯南":"99"}}]}
将其拆分为如下表:
class | student | grade |
---|---|---|
class1 | 小明 | 98 |
class1 | 老王 | 100 |
class2 | 凯特琳 | 88 |
class2 | 凯南 | 99 |
此时,UDTF 函数实现如下:
package cn.ucloud.sql;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import java.util.List;
import java.util.Map;
public class ParseArrayUDTF extends TableFunction {
//list 数据为 [{"class1":{"小明":"98""老王":"100"}}{"class2":{"凯特琳":"88""凯南":"99"}}]
public void eval(List
在 UFli
CREATE TABLE FUNCTION [UDTFName] WITH [UDTF Class Name];
结合上述示例,使用该函数方法如下:
CREATE TABLE FUNCTION ParseArrayUDTF WITH cn.ucloud.sql.ParseArrayUDTF;
结合上述示例,使用该函数方法如下:
CREATE TABLE FUNCTION ParseArrayUDTF WITH cn.ucloud.sql.ParseArrayUDTF;
CREATE TABLE tb_class_grade(
grade_data ARRAY
)WITH(
type =kafka11
bootstrapServers =linux01:9092linux02:9092linux03:9092
zookeeperQuorum =linux01:2181/kafka
offsetReset =latest
topic =flink-sql-test02
groupId = group1
parallelism =1
);
CREATE TABLE tb_class_grade_result(
class VARCHAR
student VARCHAR
grade VARCHAR
)WITH(
type =mysql
url = jdbc:mysql://linux01:3306/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName =root
password =123456
tableName =tb_class_grade_result
parallelism =1
);
INSERT INTO
tb_class_grade_result
SELECT
class
student
grade
FROM
tb_class_grade LATERAL TABLE(ParseArrayUDTF(grade_data)) as T(class student grade)
在创建表时,在属性字段的最后一行,为时间列创建Watermark,语法如下:
WATERMARK FOR columnName AS WITHOFFSET(columnName delayTime(ms))
其中 columnName 必须为 LONG ,或BIGINT,或 TimeStamp类型的事件时间。
随后,Fli
举个例子来说明 Watermark 的相关用法,示例 SQL 内容如下:
CREATE TABLE ut_kafka_source(
user_id VARCHAR
pay_cash FLOAT
pay_time LONG
WATERMARK FOR pay_time AS WITHOFFSET(pay_time 2000)
)WITH(
type =kafka11
bootstrapServers =linux01:9092linux02:9092linux03:9092
topic =flink-test01
groupId = group1
parallelism =1
);
CREATE TABLE ut_kafka_result(
user_id VARCHAR
pay_cash FLOAT
)WITH(
type =kafka11
bootstrapServers =linux01:9092linux02:9092linux03:9092
topic =flink-test02
parallelism =1
);
INSERT INTO
ut_kafka_result
SELECT
user_id
SUM(pay_cash) as pay_cash
FROM
ut_kafka_source
GROUP BY
TUMBLE (
ROWTIME
INTERVAL 2 SECOND
)
user_id
其中,TUMBLE 为生成 滚动窗口 的函数,ROWTIME 为当前表的事件时间。
提示:上述操作需要在提交 SQL 任务时添加指定参数:time.characteristic: EventTime
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/126205.html
摘要:开发注意事项基于托管集群的应用开发,和自建集群类似,但是仍然有几个地方需要注意。和默认配置托管集群默认指定以及的堆大小为,目前不支持进行更改单个中的数量设置为高可用配置应用的高可用由集群以及共同保证。配置集群的运行时状态保存在的指定目录中。UFlink开发注意事项基于UFlink托管集群的Flink应用开发,和自建集群类似,但是仍然有几个地方需要注意。JobManager和TaskManag...
摘要:基于开发指南本节主要介绍如何创建项目,并开发简单的应用,从而使该应用可以被提交到平台运行。如果不设置为,可能会导致的类冲突,产生不可预见的问题。在自动生成的文件中,使用了来更方便的控制依赖的可见性。基于Maven开发指南本节主要介绍如何创建项目,并开发简单的Flink应用,从而使该Flink应用可以被提交到UFlink平台运行。==== 自动生成代码框架 ==== 对于Java开发者,可以使...
摘要:什么是实时计算实时计算基于构建,为分布式高性能随时可用以及准确的流处理应用程序提供流处理框架,可用于流式数据处理等应用场景。版本支持当前支持的版本为,,,可以在提交任务时选择所使用的版本。什么是实时计算实时计算(UFlink)基于ApacheFlink构建,为分布式、高性能、随时可用以及准确的流处理应用程序提供流处理框架,可用于流式数据处理等应用场景。产品优势100%开源兼容基于开源社区版本...
摘要:基于开发指南如果基于进行应用开发,需要在文件中加入如下配置注解注意修改的值,确保其符合您的应用。应用开发完成后,可以直接直接运行方法,在本地进行基本的测试。基于gradle开发指南如果基于gradle进行应用开发,需要在build.gradle文件中加入如下配置:buildscript { repositories { jcenter() // this applie...
摘要:集群管理进入集群管理页面通过集群列表页面进入集群管理页面获取集群详情通过集群列表的详情按钮进入详情页面调整集群大小点击调整容量调整集群大小查看点击详情页的按钮查看查看任务历史点击详情页的按钮查看历史任务节点密码重置点击集群列表页的集群管理1. 进入集群管理页面通过UFlink集群列表页面进入集群管理页面:2. 获取集群详情通过集群列表的详情按钮进入详情页面:3. 调整集群大小点击调整容量调整...
阅读 3473·2023-04-25 20:09
阅读 3685·2022-06-28 19:00
阅读 2994·2022-06-28 19:00
阅读 2995·2022-06-28 19:00
阅读 3048·2022-06-28 19:00
阅读 2834·2022-06-28 19:00
阅读 2969·2022-06-28 19:00
阅读 2578·2022-06-28 19:00