Spark:9、Spark SQL 使用

Spark SQL 查询

命令行查询

打开Spark shell

创建如下JSON文件,注意JSON的格式:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> val spark = SparkSession.builder().config(sparkconf).getOrCreate()
18/05/11 11:23:51 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@128d397c

scala> val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:29

scala> import spark.implicits._
import spark.implicits._

scala> val df = spark.read.json("hdfs://hadoop001:9000/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]                

scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


scala> df.createOrReplaceTempView("persons")

scala> spark.sql("select * from persons")
res3: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> spark.sql("select * from persons").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> spark.sql("select * from persons where age > 21").show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

scala>

IDEA创建SparkSQL程序

IDEA中程序的打包和运行方式都和SparkCore类似,Maven依赖中需要添加新的依赖项:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.coderfei</groupId>
    <artifactId>CoreWordCount</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

代码如下:

package main

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object DataFrameSyllabus {

  def main(args: Array[String]): Unit = {
    //创建SparkConf()并设置App名称
    val sparkconf = new SparkConf().setMaster("local[*]").setAppName("test").set("spark.port.maxRetries","1000")
    val spark = SparkSession.builder().config(sparkconf).getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._
    val df = spark.read.json("hdfs://hadoop001:9000/people.json")

    // Displays the content of the DataFrame to stdout
    df.show()
    df.filter($"age" > 21).show()
    df.createOrReplaceTempView("persons")
    spark.sql("SELECT * FROM persons where age > 21").show()
    spark.stop()

  }
}
您的支持将鼓励我继续创作!
-------------本文结束感谢您的阅读-------------