Spark 快速入门
前置条件
- 已部署好 Spark 3.1.1
- 已部署 Python 3.6+(yum install python3)
- 已安装 pyspark(pip3 install pyspark)
1. 创建 SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
1
2
3
4
5
6
7
2
3
4
5
6
7
2. 创建 Dataframe
people.json 文件位于 Spark 安装包的 examples 目录中。
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
1
2
3
4
2
3
4
3. Dataframe 操作
# Print the schema in a tree format
df.printSchema()
1
2
3
2
3
# Select only the "name" column
df.select("name").show()
1
2
3
2
3
#Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
1
2
3
2
3
# Select people older than 21
df.filter(df['age']>21).show()
1
2
3
2
3
df.groupby('age').count().show()
1
4. Running SQL Queries Programmatically(以编程方式运行SQL查询)
# Register the DataFrame as a SQL temporary view
# 将 DataFrame 注册为 SQL 临时视图
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
1
2
3
4
5
6
2
3
4
5
6
5.Global Temporary View
Spark SQL 中的临时视图是会话范围的,如果创建它的会话终止,临时视图将消失。如果您希望拥有在所有会话之间共享的临时视图,并且在 Spark 应用程序终止之前保持活动状态,则可以创建全局临时视图。全局临时视图绑定到系统保留的数据库 GLOBAL_TEMP,我们必须使用限定名称来引用它,例如 SELECT * FROM GLOBAL_TEMP.view1。
df.createGlobalTempView("people")
1
spark.sql("select * from global_temp.people").show()
1
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
1
2
2
6. Interoperating with RDDs
6.1 Inferring the Schema Using Reflection
!cat examples/src/main/resources/people.txt
1
from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
# Name: Justin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
6.2 Programmatically Specifying the Schema
当无法提前定义 kwarg
字典时(例如,记录的结构编码为字符串,或者将解析文本数据集,并针对不同的用户以不同的方式投影字段),可以通过以下三个步骤以编程方式创建 DataFrame
。
从原始 RDD
创建元组或列表的 RDD
; 创建由 StructType
表示的模式,该 StructType
与在步骤1中创建的 RDD
中的元组或列表的结构相匹配。 通过 SparkSession
提供的 createDataFrame
方法将模式应用于 RDD
。 例如:
# Import data types
from pyspark.sql.types import StringType, StructType, StructField
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")
results.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
reference
- [1] Spark. Spark SQL Guideopen in new window