加载中…
个人资料
裴大帅2020
裴大帅2020
  • 博客等级:
  • 博客积分:0
  • 博客访问:688,206
  • 关注人气:63
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
相关博文
推荐博文
谁看过这篇博文
加载中…
正文 字体大小:

pyodps关键功能

(2020-07-30 01:31:47)
分类: 大数据处理

一、重点问题:
1、dataworks为惰性求值,调试阶段如果取不到值需要执行execute()。举例:
cnt = sku_category_df.count().
print(cnt)
得到的为:Count[int64] collection:  ref_0
调试阶段正确的操作方式为:
cnt = sku_category_df.count().execute()
print(cnt)
其他会导致立即执行的操作有:execute / persist / head / tail / to_pandas

2、dataframe列操作时,如果使用map后sequence的类型发生了变化,需要显示指定map后的类型。举例:
warehouse_basic_df.build_date.map(lambda x: len(x), 'int64')
类型从string变到了int64,需要显示指出,否则会报错:
PyOdpsFunc: TypeError: bad argument type for built-in operation

3、map操作中的lambda表达式不能调用函数。举例:
def test(self, x):
        return (datetime.strptime(time.strftime('%Y-%m-%d'), '%Y-%m-%d') - datetime.strptime(x, '%Y-%m-%d')).days if x != '' else -1

warehouse_basic_df['build_diff_date'] = warehouse_basic_df.build_date.map(lambda x: self.test(x), 'int64')
会报错:
 if not slot.startswith('__'))
AttributeError: _rtype
需要修改为:
warehouse_basic_df['build_diff_date'] = warehouse_basic_df.build_date.map(lambda x: (datetime.strptime(time.strftime('%Y-%m-%d'), '%Y-%m-%d') - datetime.strptime(x, '%Y-%m-%d')).days if x != '' else -1, 'int64')

4、apply操作需要指定types参数
假如没有指定,默认是string类型,可以通过schema进行确认。
如果类型出错,那么聚合操作的mean()等方法都用不了,mean只针对数字类型。
apply使用举例:
b2c_history_sale_df['price'] = b2c_history_sale_df.apply(lambda x: x.actual_receive_total_amount / (x.sale_num * 100), axis=1, reduce=True, names=['price'], types='decimal')
另外astype不是inplace操作

5、数据处理推荐对一行数据使用自定义函数apply方法
举例如下:
from odps.df import output

out_field_name = ['warehouse_name', 'sku_code', xx]
out_field_type = ['string', 'string', xx]
@output(out_field_name, out_field_type)
def handle_row(row):
    warehouse_name = row.rdc_warehouse_name_x
    sku_code = row.sku_code_x
    order_year = row.order_year_x
    order_week = row.order_week_x
    sale_num = row.sale_num_x
    xxx

    yield warehouse_name, sku_code, xxx

predict_sale_feature_df.apply(handle_row, axis=1)

6、join注意事项
(1)如果根据一个字段进行join,则必须通过on指定特定列
(2)如果是根据多个字段进行join,则不需要制定join,系统会自动根据两个dataframe中相同字段名进行join



二、具体使用:
1、检查表格是否存在
o.exist_table()

2、列出项目空间下的所有表
o.list_tables()

3、获取表
o.get_table(, project=<>)
project用于获取跨project的表

4、创建表
o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

5、同步表更新
table.reload()

6、行记录
record表示表的一行记录,支持形式:
record[0]
record[0:4]
record['num']

7、查看少量数据
使用head方法

8、获取表数据
with t.open_reader(partition='pt=test') as reader:
count = reader.count
for record in reader:
# 处理一条记录
9、向表写数据
with t.open_writer(partition='pt=test', create_partition=True) as writer:
records = [[111, 'aaa', True],
   [222, 'bbb', False]]  # 使用list结构
writer.write(records) # 这里records可以是可迭代对象

每次调用write_table,maxcompute都会在服务端生成一个文件。这一操作需要较大的时间开销,同时过多的文件会降低后续的查询效率。
使用多进程并行写入数据:
每个进程写数据时共享同一个session_id,但是有不同的block_id,每个block对应服务端的一个文件,最后主进程执行commit,完成数据上传。
import random
from multiprocessing import Pool
from odps.tunnel import TableTunnel

def write_records(session_id, block_id):
    # 使用指定的 id 创建 session
    local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
    # 创建 writer 时指定 block_id
    with local_session.open_record_writer(block_id) as writer:
        for i in range(5):
            # 生成数据并写入对应 block
            record = table.new_record([random.randint(1, 100), random.random()])
            writer.write(record)

if __name__ == '__main__':
    N_WORKERS = 3

    table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
    tunnel = TableTunnel(o)
    upload_session = tunnel.create_upload_session(table.name)

    # 每个进程使用同一个 session_id
    session_id = upload_session.id

    pool = Pool(processes=N_WORKERS)
    futures = []
    block_ids = []
    for i in range(N_WORKERS):
        futures.append(pool.apply_async(write_records, (session_id, i)))
        block_ids.append(i)
    [f.get() for f in futures]

    # 最后执行 commit,并指定所有 block
    upload_session.commit(block_ids)
10、删除表
o.delete_table('', if_exists=True) # 只有表存在时删除
t.drop() # table对象存在的时候可以直接执行drop函数

11、创建DataFrame
t.to_df()方法

12、表分区
(1)判断是否为分区表
table.schema.partitions
(2)遍历表全部分区
for partition in table.partitions
(3)判断分区是否存在
table.exist_partition('pt=test')
(4)获取分区
table.get_partition('pt=test')
(5)创建分区
t.create_partition('pt=test', if_not_exists=True)
(6)删除分区
t.delete_partition('pt=test', if_exists=True)
partition.drop()

12、执行SQL
(1)同步方式执行,会阻塞直到sql执行完成
o.execute_sql('')
(2)异步方式执行
instance = o.run_sql('')
print(instance.get_logview_address()) # 获取logview地址
instance.wait_for_sucess() # 阻塞直到完成

13、设置运行参数
全局配置设置sql.settings后,每次运行时都会添加相关的运行时参数
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('')

14、读取SQL执行结果
with o.execute_sql('').open_reader() as reader:
for record in reader:
15、XFlow和模型
(1)XFlow概述:
它是ODPS对算法包的封装,使用pyODPS可以执行XFlow
(2)对于下面的PAI命令:
PAI -name AlgoName -project algo_public -Dparam1=param_value1 -Dparam2=param_value2 ...
可以使用如下方法调用:
异步调用:
inst = o.run_xflow('AlgoName', 'algo_public', parameters={'param1': 'param_value1', 'param2': 'param_value2', ...})
同步调用:
inst = o.execute_xflow('AlgoName', 'algo_public', parameters={'param1': 'param_value1', 'param2': 'param_value2', ...})
(3)离线模型
离线模型是XFlow分类/回归算法输出的模型,用于可以使用pyODPS ML或直接使用odps.run_xflow创建一个离线模型,例如:
o.run_xflow('LogisticRegression', 'algo_public', dict(modelName='logistic_regression_model_name',
regularizedLevel='1', maxIter='100', regularizedType='l1', epsilon='0.000001', labelColName='y',
featureColNames='pdays,emp_var_rate', goodValue='1', inputTableName='bank_data'))
可以通过模型名获取模型并读取模型PMML
model = o.get_offline_model('logistic_regression_model_name')
pmml = model.get_model()

16、DataFrame快速入门
(1)创建一个DataFrame对象
users = DataFrame(o.get_table('pyodps_ml_100k_users'))
(2)查看DataFrame有哪些字段
users.dtypes
(3)取前N条数据,快速预览数据
users.head(10)
(4)筛选一部分字段进行预览
users[['user_id','age']].head(5)
(5)排除个别字段进行预览
users.exclude('zip_code', 'age').head(5)
(6)查看年龄在20到25岁之间的人有多少个
users[users.age.between(20,25)].count()
(7)用户按职业划分,从高到低,人数最多的前10职业是哪些
users.groupby('occupation').agg(count=users['occupation'].count())
df.sort(df['count'], ascending=False)[:10]
(8)把年龄分成从0到80岁,分成8个年龄段
labels = ['0-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70-79']
cut_lens = lens[lens, lens.age.cut(range(0, 81, 10), right=False, labels=labels).rename('年龄分组')
(9)查看各个年龄分组下,用户的评分总数和评分均值分别是多少
cut_lens.groupby('年龄分组').agg(cut_lens.rating.count().rename('评分总数'), cut_lens.rating.mean().rename('评分均值'))

17、DataFrame基本概念
(1)三个对象
Collection(DataFrame):表结构(二维结构)
Sequence: 列(一维结构)
Scalar: 标量
(2)获取列
iris['sepallength'].head(5)
或:iris.sepallength.head(5)
(3)修改一列的类型
iris.sepallength.astype('int')
(4)列名重命名
iris.sepallength.rename('sepal_width').head(5)
(5)简单的列变换
对一个Sequence进行四则运算,返回一个新的Sequence。对数值列,Sequence支持四则运算,而对字符串则支持字符串相加等操作
(iris.sepallength + 7).head(5)
(6)对数据进行条件赋值
iris[iris.sepallength>5, 'sepalwidth'] = iris.sepalwidth * 2
(7)多个查询条件过滤数据
iris[(iris.sepallength < 5} & (iris['petallength'] > 1.5)].head(5)
(8)限制条数
iris[:3]
(9)保存执行结果维ODPS表
对Collection,我们可以调用persist方法,参数为表名,返回一个新的DataFrame对象
iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris2', partitions=['name'])

18、DataFrame列运算
对于一个Sequence来说,对它加上一个常量、或者执行sin函数的这类操作时,是作用于每个元素上的。
(1)null相关
iris.sepallength.isnull().head(5)
(2)逻辑判断(ifelse, switch)
(iris.sepallength > 5).ifelse('gt5', 'lte5').rename('cmp5').head(5) # 当条件成立时,返回第0个参数,否则返回第1个参数
switch用于多条件判断的情况
switch(iris.sepallength == 4.9, 'eq4.9', iris.sepallength == 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)
(3)提供了between这个函数进行是否在某个区间的判断
(iris.sepallength.between(3,5)).head(5)
(4)String相关操作:提供了一系列针对String类型的Sequence或者Scalar的操作
fields = [
iris.name.upper().rename('upper_name'),
iris.name.extract('Iris(.*)', group=1)
]
(5)时间相关操作
对于datetime类型Sequence或者Scalar,可以调用时间相关的内置函数
df = lens[[lens.unix_timestamp.astype('datetime').rename('dt')]]
df[df.dt,
   df.dt.year.rename('year'),
   df.dt.month.rename('month'),
   df.dt.day.rename('day'),
   df.dt.hour.rename('hour')].head(5)
(6)集合类型相关操作
支持的集合类型有List和Dict。
两种集合均有explode方法,用于展开集合中的内容。对于List,explode默认返回一列,当传入参数pos时,将返回两列;
对于Dict,explode会返回两列,分别表示keys以及values.
(7)isin/notin判断Sequence元素是否在某个集合中。
iris.sepallength.isin([4.9, 5.1]).rename('sepallength').head(5)
(8)cut提供离散化的操作,可以将Sequence的数据拆成几个区段。
iris.sepallength.cut(range(6), labels=['0-1', '1-2', '2-3', '3-4', '4-5']).rename('sepallength_cut').head(5)
(9)使用自定义函数
使用map函数对它的每个元素调用自定义函数。
iris.sepallength.map(lambda x: x + 1).head(5)
如果map前后,Sequence的类型发生了变化,则需要显式指定map后的类型
iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)

19、DataFrame聚合操作
(1)有用的聚合操作
quantile(p):p分位数,仅在整数值下可取得准确值
cat: 按sep做字符串连接操作
(2)分组聚合
groupby执行分组操作,分组后的一个主要操作就是通过调用agg方法来执行聚合操作
iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
(3)编写自定义聚合
buffer():返回一个mutable的object(比如 list、dict),buffer大小不应随数据而递增。
__call__(buffer, *val):将值聚合到中间 buffer。
merge(buffer, pbuffer):将 pbuffer 聚合到 buffer 中。
getvalue(buffer):返回最终值
计算一个平均值的例子:
class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]

20、排序
排序操作只能作用于Collection
iris.sort('sepalwidth', ascending=False).head(5)

21、去重
iris[['name']].distinct()

22、数据缩放
(1)通过最大/最小值对数据进行缩放
df.min_max_scale(columns=['fid'])
df.min_max_scale(columns=['fid'], feature_range=(-1, 1)) # 指定输出值范围
df.min_max_scale(columns=['fid'], group=['name']) # 使用group指定一个或多个分组列,在分组列中分别取值进行缩放
df.std_scale(columns=['fid']) # 依照标准正态分布对数据进行缩放

23、空值处理
(1)使用dropna可删除subset中包含空值的行
df.dropna(subset=['f1', 'f2', 'f3', 'f4'])
(2)如果行中包含非空值则不删除
df.dropna(how='all', subset=['f1', 'f2', 'f3', 'f4'])
(3)使用fillna可使用常数或已有的列填充未知值
df.fillna(100, subset=['f1', 'f2', 'f3', 'f4'])
(4)向前/向后填充
bfill: 向前填充
ffill: 向后填充
df.fillna(method='bfill', subset=['f1', 'f2', 'f3', 'f4'])

24、对所有行/列调用自定义函数
(1)对一行数据使用自定义函数
apply的自定义函数接收一个参数,为上一步Collection的一行数据,用户可以通过属性、或者偏移取得一个字段的数据
iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
reduce为True时,表示返回结果为Sequence,否则返回结果为Collection。
在apply的自定义函数中,reduce为False时,也可以使用yield关键字来返回多行结果
def handle(row):
yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
(2)对所有列调用自定义聚合
调用apply方法,当我们不指定axis,或者axis为0的时候,我们可以通过传入一个自定义聚合类来对所有sequence进行聚合操作
iris.exclude('name').apply(Agg)

25、MapReduce API
from odps.df import output

@output(['word', 'cnt'], ['string', 'int'])
def mapper(row):
    for word in row[0].split():
        yield word.lower(), 1

@output(['word', 'cnt'], ['string', 'int'])
def reducer(keys):
    # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
    cnt = [0]
    def h(row, done):  # done表示这个key已经迭代结束
        cnt[0] += row.cnt
        if done:
            yield keys.word, cnt[0]
    return h

words_df.map_reduce(mapper, reducer, group='word')

26、布隆过滤器
给定某个collection,和它的某个列计算的sequence1,我们能对另外一个sequence2进行布隆过滤。
这样的好处是能快速对collection进行过滤一些无用的数据。这在大规模join的时候,一般数据量远大于另一边数据,而大部分并不会join上的场景。
比如,我们在join用户的浏览数据和交易数据时,用户的浏览大部分不会带来交易,我们可以利用交易数据先对浏览数据进行布隆过滤,然后再join能很好提升性能。
少量不能过滤并不会影响正确性,但能较大提升join的性能。
df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
df1.bloom_filter('a', df2.a)

27、透视表
(1)最简单的透视表必须提供一个 rows 参数,表示按一个或者多个字段做取平均值的操作
df['A', 'D', 'E'].pivot_table(rows='A')
(2)我们可以指定 values 来显示指定要计算的列
df.pivot_table(rows=['A', 'B'], values='D')
(3)计算值列时,默认会计算平均值,用户可以指定一个或者多个聚合函数
df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])

28、数据合并
(1)join操作
movies.join(ratings, on='movie_id').head(3)
movies.join(ratings2, on=[('movie_id', 'movie_id2')]).head(3)
(2)union操作
mov1.union(mov2)

29、窗口函数
grouped = iris.groupby('name')
grouped.mutate(grouped.sepallength.cumsum(), grouped.sort('sepallength').row_number()).head(10)


30、绘图功能
如果要使用绘图功能,需要pandas合matplotlib的安装,可在jupyter中运行
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
%matplotlib inline
iris.sepalwidth.plot()

31、调试指南
由于PyODPS DataFrame本身会对整个操作执行优化,为了能直观地反应整个过程, 我们可以使用可视化的方式显示整个表达式的计算过程。
值得注意的是,可视化需要依赖 graphviz 软件 和 graphviz Python 包
我们可以直接调用 compile 方法来查看ODPS SQL后端编译到SQL的结果

32、在DataFrame中如何使用max_pt
from odps.df import func
df = o.get_table('your_table').to_df()
df[df.ds == func.max_pt('your_project.your_table')]  # ds 是分区字段

0

阅读 评论 收藏 转载 喜欢 打印举报/Report
前一篇:协方差矩阵
  • 评论加载中,请稍候...
发评论

    发评论

    以上网友发言只代表其个人观点,不代表新浪网的观点或立场。

    < 前一篇协方差矩阵
      

    新浪BLOG意见反馈留言板 电话:4000520066 提示音后按1键(按当地市话标准计费) 欢迎批评指正

    新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 会员注册 | 产品答疑

    新浪公司 版权所有