跳转至

Apache Spark PySpark

为什么要学 PySpark

Apache Spark 是大数据处理的标准框架,PySpark 是其 Python API。它能处理 PB 级别的数据,支持批处理、流处理、SQL 查询、机器学习和图计算。对于需要处理超大规模数据的数据工程师和数据科学家来说,Spark 是绕不过的技术。


核心概念

概念白话解释用途
RDD弹性分布式数据集最底层的数据抽象
DataFrame分布式数据框类似 Pandas 的高层 API
SparkSession会话Spark 应用的入口
Transformation转换操作延迟执行的数据变换
Action动作操作触发实际计算的操作
Partition分区数据的并行处理单元

安装配置

pip install pyspark

# 或使用 conda
conda install pyspark

# 验证
pyspark --version

快速上手

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .getOrCreate()

# 读取数据
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df = spark.read.parquet("data.parquet")
df = spark.read.json("data.json")

# 基本操作
df.show(5)
df.printSchema()
df.select("name", "age").filter(df.age > 25).show()
df.groupBy("department").agg({"salary": "avg"}).show()

# SQL 查询
df.createOrReplaceTempView("employees")
result = spark.sql("SELECT department, AVG(salary) FROM employees GROUP BY department")
result.show()

进阶用法

Spark SQL 与窗口函数

from pyspark.sql.window import Window
from pyspark.sql import functions as F

window = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("rank", F.rank().over(window)).show()

UDF 自定义函数

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def categorize_age(age):
    if age < 30: return "young"
    elif age < 50: return "middle"
    else: return "senior"

df.withColumn("age_group", categorize_age(df.age)).show()

性能调优

# 缓存常用数据
df.cache()

# 重分区
df.repartition(200, "key_column")

# 广播小表 Join
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "id")

常见问题

Q: PySpark vs Pandas?

  • Pandas:单机,内存限制,适合 GB 级
  • PySpark:分布式,适合 TB-PB 级

Q: 本地开发如何测试?

master("local[*]") 在本地模拟集群,用小数据集开发验证。


参考资源

  • 官网:https://spark.apache.org/
  • PySpark 文档:https://spark.apache.org/docs/latest/api/python/
  • GitHub:https://github.com/apache/spark