Apache Parquet 是大数据/数据湖事实标准的列式存储格式。核心三板斧:列式 IO + 高压缩比 + 谓词/投影下推,让海量 OLAP 查询秒级返回。但不适合 OLTP 与流式单行写入。
核心特性
列式存储(最大优势)
| 特性 | 行式(CSV/JSON) | 列式(Parquet) |
|---|---|---|
| 读取模式 | 必须读整行 | 仅读目标列(投影下推) |
| 压缩比 | 同行混合数据类型,比例低 | 同列数据类型一致,5–10× |
| 适用 | OLTP 事务、单行增删 | OLAP 分析、批量 ETL、海量归档 |
压缩与编码
- 编码:字典、RLE 游程、Delta、位打包,针对列特性优化
- 压缩:Snappy / LZ4 / GZip / ZSTD 任选
通常存储空间仅为 CSV 的 1/5 ~ 1/10。
下推优化
- 投影下推:只读需要的列
- 谓词下推:根据每列的 min/max/null/distinct 统计信息,跳过不满足条件的行组
嵌套结构
基于 Google Dremel 算法,原生支持数组、Struct、Map 多层嵌套,无需扁平化。
Schema 自描述与演进
- 文件末尾自带 Schema 元数据,无需外部依赖
- 兼容式演进:新增/删除列、修改可选性,旧读取器可读新文件
全生态兼容
跨语言(Java/Python/C++/Go/R)、跨引擎(Spark/Flink/Hive/Trino/Pandas/DuckDB/Arrow)、跨存储(HDFS/S3/OSS)。
物理结构
Parquet 文件
├─ 行组 Row Group
│ ├─ 列块 Column Chunk
│ │ ├─ 数据页 Data Page
│ │ ├─ 字典页 Dictionary Page
│ │ └─ 索引页 Index Page
├─ Footer 元数据
└─ 魔数 PAR1| 层级 | 作用 | 大小建议 |
|---|---|---|
| 行组(Row Group) | 并行读取的基本单位 | 128 MB ~ 1 GB |
| 列块(Column Chunk) | 同一列的连续存储,列读取单位 | — |
| 页(Page) | 编码与压缩的最小单位 | 默认 8 KB |
| Footer | Schema、行组/列块位置与统计 | — |
| PAR1 | 文件首尾魔数,定位 Footer | — |
Python:pandas + pyarrow
pip install pandas pyarrowimport pandas as pd
import numpy as np
df = pd.DataFrame({
"id": np.arange(10000),
"name": [f"user_{i}" for i in range(10000)],
"city": np.random.choice(["BJ", "SH", "GZ", "SZ"], 10000),
"salary": np.random.normal(15000, 3000, 10000).round(2),
})
# 写入(指定压缩、行组、字典编码)
df.to_parquet(
"user.parquet",
engine="pyarrow",
compression="snappy",
row_group_size=5000,
use_dictionary=True,
)
# 投影下推:只读指定列
pd.read_parquet("user.parquet", columns=["id", "name"])
# 谓词下推:读取时过滤
pd.read_parquet(
"user.parquet",
filters=[("salary", ">", 18000), ("city", "==", "BJ")],
)查看元数据:
import pyarrow.parquet as pq
pf = pq.ParquetFile("user.parquet")
print(pf.schema)
print(pf.num_row_groups, pf.metadata.num_rows)
print(pf.metadata.row_group(0))PySpark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ParquetDemo").getOrCreate()
# 写入 + 分区 + 压缩
df.write \
.option("compression", "snappy") \
.partitionBy("city") \
.mode("overwrite") \
.parquet("hdfs://path/parquet_data_partitioned")
# 自动触发投影 + 谓词 + 分区裁剪
spark.read.parquet("hdfs://path/parquet_data_partitioned") \
.select("id", "name", "salary") \
.filter("city == 'BJ' AND salary > 18000")Hive / Trino
CREATE TABLE user_info (
id BIGINT, name STRING, age INT, salary DECIMAL(10,2)
)
PARTITIONED BY (city STRING, dt STRING)
STORED AS PARQUET
TBLPROPERTIES (
"parquet.compression" = "SNAPPY",
"parquet.dictionary.enabled" = "true"
);
SELECT id, name, age FROM user_info
WHERE city='BJ' AND dt='2026-03-31' AND age > 30; -- 自动 3 重下推DuckDB(本地秒级查询)
SELECT id, name, age FROM 'user.parquet' WHERE age > 30;
SELECT * FROM 'parquet_data/*.parquet';
COPY (SELECT * FROM user_info) TO 'out.parquet' (FORMAT PARQUET, COMPRESSION ZSTD);最佳实践
| 主题 | 建议 |
|---|---|
| 压缩选型 | 热数据 Snappy/LZ4;冷数据 ZSTD(综合首选) |
| 文件大小 | ≥128 MB,避免 KB 级小文件爆元数据 |
| 行组大小 | 与 HDFS 块对齐,128 MB ~ 1 GB |
| 分区 | 选低基数高频过滤字段(日期、地区),避免过度分区 |
| 分桶 | 高频 JOIN/去重字段(如 user_id) |
| Schema | 用最小适用类型;字符串列开字典编码;演进只做兼容变更 |
| 查询 | 严禁 SELECT *;过滤优先用分区与统计列 |
适用与不适用
| 场景 | 推荐 |
|---|---|
| OLAP / 数仓 / 数据湖 | Parquet ✓ |
| BI、批 ETL、特征存储 | Parquet ✓ |
| 海量结构化归档 | Parquet ✓ |
| OLTP 单行增删 | MySQL/PG ✗ |
| MB 级小数据 | CSV/JSON ✗ |
| 高频流式单行写入 | Iceberg/Delta/Hudi(底层用 Parquet) |