Apache Spark PySpark¶
为什么要学 PySpark¶
Apache Spark 是大数据处理的标准框架,PySpark 是其 Python API。它能处理 PB 级别的数据,支持批处理、流处理、SQL 查询、机器学习和图计算。对于需要处理超大规模数据的数据工程师和数据科学家来说,Spark 是绕不过的技术。
核心概念¶
| 概念 | 白话解释 | 用途 |
|---|---|---|
| RDD | 弹性分布式数据集 | 最底层的数据抽象 |
| DataFrame | 分布式数据框 | 类似 Pandas 的高层 API |
| SparkSession | 会话 | Spark 应用的入口 |
| Transformation | 转换操作 | 延迟执行的数据变换 |
| Action | 动作操作 | 触发实际计算的操作 |
| Partition | 分区 | 数据的并行处理单元 |
安装配置¶
快速上手¶
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApp") \
.master("local[*]") \
.getOrCreate()
# 读取数据
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df = spark.read.parquet("data.parquet")
df = spark.read.json("data.json")
# 基本操作
df.show(5)
df.printSchema()
df.select("name", "age").filter(df.age > 25).show()
df.groupBy("department").agg({"salary": "avg"}).show()
# SQL 查询
df.createOrReplaceTempView("employees")
result = spark.sql("SELECT department, AVG(salary) FROM employees GROUP BY department")
result.show()
进阶用法¶
Spark SQL 与窗口函数¶
from pyspark.sql.window import Window
from pyspark.sql import functions as F
window = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("rank", F.rank().over(window)).show()
UDF 自定义函数¶
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
@udf(returnType=StringType())
def categorize_age(age):
if age < 30: return "young"
elif age < 50: return "middle"
else: return "senior"
df.withColumn("age_group", categorize_age(df.age)).show()
性能调优¶
# 缓存常用数据
df.cache()
# 重分区
df.repartition(200, "key_column")
# 广播小表 Join
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "id")
常见问题¶
Q: PySpark vs Pandas?¶
- Pandas:单机,内存限制,适合 GB 级
- PySpark:分布式,适合 TB-PB 级
Q: 本地开发如何测试?¶
master("local[*]") 在本地模拟集群,用小数据集开发验证。
参考资源¶
- 官网:https://spark.apache.org/
- PySpark 文档:https://spark.apache.org/docs/latest/api/python/
- GitHub:https://github.com/apache/spark