Apache Spark 入门
一句话概述:Apache Spark 是分布式大数据计算引擎,能在集群上并行处理TB级数据,比传统MapReduce快10-100倍。
核心知识点表
| 概念 | 白话解释 |
|---|
| RDD | 弹性分布式数据集,Spark最底层的数据抽象(现在一般不直接用) |
| DataFrame | 类似pandas的表格数据结构,但可以分布在多台机器上 |
| SparkSession | Spark程序的入口,所有操作都从它开始 |
| Transformation | 转换操作(如filter、map),懒执行,不立即计算 |
| Action | 触发计算的操作(如collect、count),调用后才真正开始算 |
| Partition | 数据分区,一份大数据被切成多块,分配到不同机器上并行处理 |
| Executor | 工作节点,真正干活的进程 |
| Driver | 驱动程序,负责调度和协调 |
| Spark SQL | 用SQL语法查询分布式数据 |
| PySpark | Spark的Python API |
版本信息(2026年5月)
- Spark 4.1.1(当前稳定版)
- Spark 4.2.0(预览中)
- 亮点:Spark Declarative Pipelines、实时流处理、VARIANT数据类型
安装配置
方式一:PySpark(最简单,学习推荐)
# 创建虚拟环境
conda create -n spark python=3.12 -y
conda activate spark
# 安装PySpark
pip install pyspark # 包含了Spark引擎,不需要额外装Java(自带)
# 验证安装
pyspark # 启动PySpark交互式Shell
# 或者
python -c "import pyspark; print(pyspark.__version__)"
方式二:完整安装
# 1. 安装Java 17+(Spark 4.x要求)
sudo apt install openjdk-17-jdk -y # Ubuntu
java -version # 验证
# 2. 下载Spark
wget https://downloads.apache.org/spark/spark-4.1.1/spark-4.1.1-bin-hadoop3.tgz
tar -xzf spark-4.1.1-bin-hadoop3.tgz # 解压
sudo mv spark-4.1.1-bin-hadoop3 /opt/spark # 移到/opt
# 3. 配置环境变量
echo 'export SPARK_HOME=/opt/spark' >> ~/.bashrc
echo 'export PATH=$SPARK_HOME/bin:$PATH' >> ~/.bashrc
source ~/.bashrc
# 4. 验证
spark-shell # Scala交互式Shell
pyspark # Python交互式Shell
spark-submit --version # 查看版本
方式三:Docker
docker pull apache/spark:4.1.1 # 拉取镜像
docker run -it apache/spark:4.1.1 /opt/spark/bin/pyspark # 启动PySpark
基本使用
创建SparkSession
# spark_basic.py
from pyspark.sql import SparkSession # 导入SparkSession
# 创建Spark会话(程序入口)
spark = SparkSession.builder \
.appName("MyFirstApp") \ # 应用名称(在Spark UI上显示)
.master("local[*]") \ # 本地模式,用所有CPU核心
.getOrCreate() # 获取或创建会话
# 查看Spark版本
print(spark.version)
# Spark UI 地址:http://localhost:4040
读取和操作数据
# ===== 从CSV读取 =====
df = spark.read.csv(
"data/users.csv", # 文件路径
header=True, # 第一行是列名
inferSchema=True, # 自动推断数据类型
)
df.show(5) # 显示前5行
df.printSchema() # 显示表结构(列名+类型)
df.count() # 统计行数
# ===== 从JSON读取 =====
df_json = spark.read.json("data/events.json")
# ===== 从Parquet读取(推荐格式,压缩+列式存储) =====
df_parquet = spark.read.parquet("data/sales.parquet")
# ===== 手动创建DataFrame =====
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)] # 数据
schema = StructType([ # 定义Schema
StructField("name", StringType(), True), # 名字,字符串,可为空
StructField("age", IntegerType(), True), # 年龄,整数,可为空
])
df = spark.createDataFrame(data, schema) # 创建DataFrame
DataFrame常用操作
from pyspark.sql.functions import col, avg, count, when, lit # 导入常用函数
# 选择列
df.select("name", "age").show() # 选择name和age列
# 过滤
df.filter(col("age") > 25).show() # 筛选年龄大于25的
df.where(col("status") == "active").show() # where和filter等价
# 添加新列
df = df.withColumn(
"age_group", # 新列名
when(col("age") < 30, "young") # 条件:小于30→young
.when(col("age") < 50, "middle") # 30-49→middle
.otherwise("senior") # 其他→senior
)
# 分组聚合
df.groupBy("department") \
.agg(
count("*").alias("employee_count"), # 员工数
avg("salary").alias("avg_salary"), # 平均工资
) \
.orderBy(col("avg_salary").desc()) \ # 按平均工资降序
.show()
# 去重
df.dropDuplicates(["email"]).show() # 按email去重
# 排序
df.orderBy(col("age").desc()).show() # 按年龄降序
# 重命名列
df = df.withColumnRenamed("old_name", "new_name")
Spark SQL
# 注册临时视图
df.createOrReplaceTempView("users") # 注册为SQL临时表
# 用SQL查询
result = spark.sql("""
SELECT
department,
COUNT(*) AS cnt,
AVG(salary) AS avg_salary
FROM users
WHERE status = 'active'
GROUP BY department
HAVING COUNT(*) > 5
ORDER BY avg_salary DESC
""")
result.show()
高级用法
读写Parquet(生产推荐格式)
# 写入Parquet
df.write.parquet(
"output/users.parquet", # 输出路径
mode="overwrite", # 覆盖写入(append=追加)
compression="snappy", # 压缩算法
)
# 分区写入(按日期分区,查询时自动剪枝)
df.write.partitionBy("date") \
.parquet("output/events/", mode="overwrite")
# 读取分区数据(自动发现分区)
df = spark.read.parquet("output/events/")
df.filter(col("date") == "2026-05-13").show() # 只读2026-05-13的分区
UDF用户自定义函数
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# 定义Python函数
def classify_age(age):
"""根据年龄分类"""
if age < 18:
return "未成年"
elif age < 60:
return "成年"
return "老年"
# 注册为UDF
classify_udf = udf(classify_age, StringType()) # 返回类型是字符串
# 使用UDF
df = df.withColumn("age_category", classify_udf(col("age")))
窗口函数
from pyspark.sql.window import Window # 导入窗口
from pyspark.sql.functions import row_number, rank, lag
# 定义窗口:按部门分组,按工资降序
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
df = df.withColumn("salary_rank", rank().over(window_spec)) # 工资排名
df = df.withColumn("row_num", row_number().over(window_spec)) # 行号
连接(Join)
# 内连接
result = df_users.join(
df_orders,
df_users["user_id"] == df_orders["user_id"], # 连接条件
"inner" # 连接类型:inner/left/right/full/cross
)
# 广播连接(小表广播到所有节点,避免Shuffle)
from pyspark.sql.functions import broadcast
result = df_big.join(
broadcast(df_small), # 小表广播
"user_id"
)
常见报错与解决
| 报错信息 | 原因 | 解决方案 |
|---|
Java not found | 没装Java或版本不对 | 安装Java 17+:sudo apt install openjdk-17-jdk |
OutOfMemoryError | 数据太大内存不够 | 增大内存:.config("spark.driver.memory", "4g") |
AnalysisException: cannot resolve | 列名写错 | df.columns查看所有列名 |
Py4JJavaError | Java底层报错 | 看完整错误栈,通常是数据类型问题 |
FileNotFoundException | 文件路径错误 | 检查路径,Spark不支持~,用绝对路径 |
Task not serializable | UDF中引用了不可序列化对象 | 用broadcast变量或简化UDF |
速查表
# ===== SparkSession =====
spark = SparkSession.builder.appName("X").master("local[*]").getOrCreate()
spark.stop() # 停止会话
# ===== 读取数据 =====
spark.read.csv("path", header=True, inferSchema=True)
spark.read.json("path")
spark.read.parquet("path")
spark.read.jdbc(url, table, properties)
# ===== 写入数据 =====
df.write.csv("path", header=True, mode="overwrite")
df.write.parquet("path", mode="overwrite")
df.write.json("path")
# ===== 常用操作 =====
df.select("col1", "col2") # 选列
df.filter(col("x") > 10) # 过滤
df.groupBy("col").agg(count("*")) # 分组聚合
df.join(df2, "key", "inner") # 连接
df.orderBy(col("x").desc()) # 排序
df.dropDuplicates(["col"]) # 去重
df.withColumn("new", expr) # 新增列
df.drop("col") # 删除列
# ===== 常用函数 =====
# col, lit, when, otherwise
# count, sum, avg, min, max
# concat, substring, lower, upper, trim
# year, month, day, date_format
# row_number, rank, dense_rank, lag, lead
同类工具对比
| 特性 | Spark | Polars | Pandas | Dask |
|---|
| 数据规模 | TB-PB级 | GB-TB级 | MB-GB级 | GB-TB级 |
| 运行方式 | 分布式集群 | 单机多线程 | 单机单线程 | 分布式 |
| 延迟 | 秒级 | 毫秒级 | 毫秒级 | 秒级 |
| 语言 | Scala/Python/R/SQL | Rust/Python | Python | Python |
| 学习曲线 | 中高 | 低 | 最低 | 中 |
| 适合场景 | 大规模数据处理 | 中型高性能处理 | 小数据探索 | pandas扩展 |
面试建议:Spark是大数据面试必考项。重点理解:1)懒执行(Transformation vs Action);2)DataFrame vs RDD;3)Shuffle的概念(数据跨节点重分布);4)broadcast join优化。能解释"Spark为什么比MapReduce快"(内存计算+DAG优化+懒执行)是加分项。