加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

MaxCompute中使用spark的方法

(2020-03-23 20:34:07)
标签:

spark

maxcompute

odps

分类: 大数据处理
1、阿里云spark环境搭建流程见:
文中的git工程暂无权限下载

2、示例工程见:
https://github.com/aliyun/MaxCompute-Spark

3、spark读取maxcompute表的示例见:

具体代码见:

4、访问spark UI界面
如果想要访问spark ui,需要配置spark.hadoop.odps.task.major.version = cupid_v2
具体查看位置为: logview -> Detail -> Main Content -> Summary -> HistoryServer下面显示的链接,需要等程序运行完之后几分钟才能打开链接,不能立即打开这个链接的。


5、pom文件配置(将<替换为了[,将>替换为了]
[?xml version="1.0" encoding="UTF-8"?]
[!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
--]
[project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"]
    [modelVersion]4.0.0[/modelVersion]

    [properties]
        [spark.version]2.3.0[/spark.version]
        [cupid.sdk.version]3.3.8-public[/cupid.sdk.version]
        [scala.version]2.11.8[/scala.version]
        [scala.binary.version]2.11[/scala.binary.version]
    [/properties]

    [groupId]com.aliyun.odps[/groupId]
    [artifactId]spark-examples[/artifactId]
    [version]1.0[/version]
    [packaging]jar[/packaging]

    [dependencies]
        [dependency]
            [groupId]org.apache.spark[/groupId]
            [artifactId]spark-core_${scala.binary.version}[/artifactId]
            [version]${spark.version}[/version]
            [scope]provided[/scope]
            [exclusions]
                [exclusion]
                    [groupId]org.scala-lang[/groupId]
                    [artifactId]scala-library[/artifactId]
                [/exclusion]
                [exclusion]
                    [groupId]org.scala-lang[/groupId]
                    [artifactId]scalap[/artifactId]
                [/exclusion]
            [/exclusions]
        [/dependency]

        [dependency]
            [groupId]org.apache.spark[/groupId]
            [artifactId]spark-sql_${scala.binary.version}[/artifactId]
            [version]${spark.version}[/version]
            [scope]provided[/scope]
        [/dependency]

        [dependency]
            [groupId]com.aliyun.odps[/groupId]
            [artifactId]cupid-sdk[/artifactId]
            [version]${cupid.sdk.version}[/version]
            [scope]provided[/scope]
        [/dependency]

        [dependency]
            [groupId]org.scala-lang[/groupId]
            [artifactId]scala-library[/artifactId]
            [version]${scala.version}[/version]
        [/dependency]

        [dependency]
            [groupId]org.scala-lang[/groupId]
            [artifactId]scala-actors[/artifactId]
            [version]${scala.version}[/version]
        [/dependency]

        [!-- 支持 MaxCompute数据源 (Spark 2.x环境)--]
        [dependency]
            [groupId]com.aliyun.emr[/groupId]
            [artifactId]emr-maxcompute_2.11[/artifactId]
            [version]2.0.0[/version]
        [/dependency]

    [/dependencies]

    [build]
        [plugins]
            [plugin]
                [groupId]org.apache.maven.plugins[/groupId]
                [artifactId]maven-shade-plugin[/artifactId]
                [version]2.4.3[/version]
                [executions]
                    [execution]
                        [phase]package[/phase]
                        [goals]
                            [goal]shade[/goal]
                        [/goals]
                        [configuration]
                            [minimizeJar]false[/minimizeJar]
                            [shadedArtifactAttached]true[/shadedArtifactAttached]
                            [artifactSet]
                                [includes]
                                    [!-- Include here the dependencies you
                                        want to be packed in your fat jar --]
                                    [include]*:*[/include]
                                [/includes]
                            [/artifactSet]
                            [filters]
                                [filter]
                                    [artifact]*:*[/artifact]
                                    [excludes]
                                        [exclude]META-INFlog4j.properties[/exclude]
                                    [/excludes]
                                [/filter]
                            [/filters]
                            [transformers]
                                [transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"]
                                    [resource]reference.conf[/resource]
                                [/transformer]
                                [transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"]
                                    [resource]META-INF/services/org.apache.spark.sql.sources.DataSourceRegister[/resource]
                                [/transformer]
                            [/transformers]
                        [/configuration]
                    [/execution]
                [/executions]
            [/plugin]
            [plugin]
                [groupId]net.alchim31.maven[/groupId]
                [artifactId]scala-maven-plugin[/artifactId]
                [version]3.3.2[/version]
                [executions]
                    [execution]
                        [id]scala-compile-first[/id]
                        [phase]process-resources[/phase]
                        [goals]
                            [goal]compile[/goal]
                        [/goals]
                    [/execution]
                    [execution]
                        [id]scala-test-compile-first[/id]
                        [phase]process-test-resources[/phase]
                        [goals]
                            [goal]testCompile[/goal]
                        [/goals]
                    [/execution]
                [/executions]
            [/plugin]
        [/plugins]
    [/build]

[/project]

6、示例代码:
 NormalText Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.xx.spark.sample

import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.aliyun.odps.OdpsOps
import org.apache.spark.{SparkConf, SparkContext}

object Statistics {

  def main(args: Array[String]): Unit {
    val accessKeyId ""
    val accessKeySecret ""
    val urls Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com")
    val conf new SparkConf().setAppName("Statistics Test")
    val sc new SparkContext(conf)
    val odpsOps OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))

    val project ""
    val table "result_table_upper_education"
    val tableOut "result_table_num"
    val numPartitions 2
    val inputData odpsOps.readTable(project, table, read, numPartitions) // education, num

    val numOut inputData.map(x => x._2)

    odpsOps.saveToTable(project, tableOut, numOut, write)
  }

  def read(record: Record, schema: TableSchema): Tuple2[String, Long] {
    (record.getString(0), record.getBigint(1))
  }

  def write(s: Long, emptyReord: Record, schema: TableSchema): Unit {
    val emptyReord
    r.set(0, s) // 数据存入第一个字段中
  }

}

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有