资讯专栏INFORMATION COLUMN

PyODPS 中使用 Python UDF

evin2016 / 1600人阅读

摘要:中使用首先,我们需要写一个文件,假设我们就是把某一列按格式放的一列转成格式。这里我们指定了函数名叫,主类使我们上传的文件里的类。现在我们就可以在中调用这个了。这样我们就完成了在中使用的整个过程。

摘要: PyODPS 中使用 Python UDF 包含两方面,一个是直接使用,也就是在 MaxCompute SQL 中使用;一个是间接的方式,也就是 PyODPS DataFrame,这种方式你不需要直接写 Python UDF,而是写普通的 Python 函数或者类。

点此查看原文:http://click.aliyun.com/m/41092/

PyODPS 中使用 Python UDF 包含两方面,一个是直接使用,也就是在 MaxCompute SQL 中使用;一个是间接的方式,也就是 PyODPS DataFrame,这种方式你不需要直接写 Python UDF,而是写普通的 Python 函数或者类。下面我们分开说明。

作为准备工作,我们需要 ODPS 入口,可以通过直接初始化,或者使用 room 机制 加载。

from odps import ODPS

o = ODPS("your-access-id", "your-access-key", "your-project")

MaxCompute SQL 中使用 Python UDF

首先,我们需要写一个 Python 文件,假设我们就是把某一列按 csv 格式放的一列转成 json 格式。

import json

from odps.udf import annotate

@annotate("string->string")
class Transform(object):
    def evaluate(self, x):
        columns = list("abc")
        d = dict(zip(columns, x.split(",")))
        return json.dumps(d)

假设这个文件叫 my.py,接下来我们就需要创建 py 资源。

r = o.create_resource("csv_to_json.py", "py", fileobj=open("my.py"))

fileobj 参数也可以是 str 类型,就是表示文件的内容

接着我们就可以创建 Python UDF 了。

o.create_function("csv_to_json", class_type="csv_to_json.Transform", resources=[r])

这里我们指定了函数名叫 csv_to_json,主类使我们上传的 csv_to_json.py 文件里的 Transform 类。

现在我们就可以在 MaxCompute SQL 中调用这个 UDF 了。

o.execute_sql("select csv_to_json(raw) from pyodps_test_udf")

这样我们就完成了在 PyODPS 中使用 MaxCompute SQL + Python UDF 的整个过程。

PyODPS DataFrame

对于 PyODPS DataFrame 来说,用户只需要写普通的 Python 函数或者类,在函数或者类里,甚至可以读取全局变量,这样给开发带来了极大的方便。

和上面的例子目标相同,我们定义一个 transform 函数即可。然后我们对于 DataFrame 的一列调用 map 方法来应用这个函数。

passed_columns = list("abc")  # 可以从数据库中读取或者写死

def transform(x):
    import json
    d = dict(zip(passed_columns, x.split(",")))
    return json.dumps(d)

df.raw.map(transform)
In [30]: df
     raw
0  1,2,3
1  4,5,6
2  7,8,9

In [31]: df.raw.map(transform)
                              raw
0  {"a": "1", "c": "3", "b": "2"}
1  {"a": "4", "c": "6", "b": "5"}
2  {"a": "7", "c": "9", "b": "8"}

实际上,PyODPS DataFrame 在用 MaxCompute 执行的时候,也会创建 Python UDF 来实现这个功能,但用户不需要去创建文件、资源和函数这些过程,一切都是 Python 原生函数和类,整个过程相当顺畅。

另外可以看到,在上面的 my.py 里,我们也是定义了一个 columns 参数的,而如果这个参数是通过变量传进去的话,在 Python UDF 里非常麻烦,可能常常需要用一些 tricky 的方法,比如写到某个文件资源,然后在 UDF 里读取之类的。而对于 DataFrame 来说,完全没有这个问题,我们可以自由读取全局变量。

不过要注意的是,这个全局变量是被序列化到各个机器上的,所以你修改它不会全局生效。

好了,还有什么问题可以随时和我们取得联系。

文档:http://pyodps.readthedocs.io/...
代码:https://github.com/aliyun/ali... ,欢迎提 issue 和 merge request

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/41326.html

相关文章

  • PyODPS 使用 Python UDF

    摘要:中使用首先,我们需要写一个文件,假设我们就是把某一列按格式放的一列转成格式。这里我们指定了函数名叫,主类使我们上传的文件里的类。现在我们就可以在中调用这个了。这样我们就完成了在中使用的整个过程。 摘要: PyODPS 中使用 Python UDF 包含两方面,一个是直接使用,也就是在 MaxCompute SQL 中使用;一个是间接的方式,也就是 PyODPS DataFrame,这种...

    jcc 评论0 收藏0
  • MaxCompute Studio使用心得系列6——一个工具完成整个Python UDF开发

    摘要:摘要北京云栖大会上阿里云发布了最新的功能,万众期待的功能终于支持啦,我怎么能不一试为快,今天就分享如何通过进行开发。注册函数在脚本中编辑试用好了,一个简单完整的通过开发实践分享完成。 摘要: 2017/12/20 北京云栖大会上阿里云MaxCompute发布了最新的功能Python UDF,万众期待的功能终于支持啦,我怎么能不一试为快,今天就分享如何通过Studio进行Python u...

    张迁 评论0 收藏0
  • 在 MaxCompute UDF 运行 Scipy

    摘要:编写完成后,将代码保存为,并在中执行此后创建函数。执行创建后,便可以在中执行查询暂不支持,因而需禁用其他如果包依赖了其他包,需要一并上传并同时加入到依赖中。 摘要: 新版 MaxCompute Isolation Session 支持 Python UDF。也就是说,Python UDF 中已经可以跑二进制包。刚才以 Scipy 为例踩了一下坑,把相关的过程分享出来。 新版 MaxCo...

    kbyyd24 评论0 收藏0
  • PyODPS开发的最佳实践

    摘要:摘要支持用来对对象进行操作,它提供了来用类似的接口进行大规模数据分析以及预处理,并且可以用模块来执行机器学习算法。现在为了让大家能更好地使用,我们总结开发过程中的最佳实践,来让大家更高效地开发程序。 摘要: PyODPS支持用 Python 来对 MaxCompute 对象进行操作,它提供了 DataFrame API 来用类似 pandas 的接口进行大规模数据分析以及预处理,并且可...

    hellowoody 评论0 收藏0

发表评论

0条评论

evin2016

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<