Iceberg Spark SQL 存储过程
Iceberg 0.11.0 及之后的版本中,对原生的 Spark 进行了一些扩展,主要在 SQL 命令中新增了存储过程和部分Alter Table
语法。如果要使用扩展功能,需要在 Spark 中新增以下配置项:
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
本节介绍存储过程部分。
存储过程(Procedure)是数据库领域的概念,类似于编程语言中的方法或函数,是对实现特定操作的封装,原生的 Spark SQL 中是不支持存储过程的,Iceberg 对其进行了扩展,并提供了部分存储过程的实现。Iceberg 中提供的所有存储过程都在system
namespace 下。
# 语法
call catalog_name.system.procedure_name(args...)
其中catalog_name
为 Iceberg 配置的 catalog 名称;procedure_name
为需要调用的存储过程名称;system
为固定写法;args
为存储过程参数,可以有多个。
有两种方式进行参数传递:一种是基于参数位置,一种是基于参数名称。如:
-- 基于参数位置
call catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n);
-- 基于参数名称
call catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1);
2
3
4
5
# 使用
Iceberg 原生支持的存储过程可以按照其功能分为三大类:
- 快照管理
- 元数据管理
- 表迁移
# 快照管理
- rollback_to_snapshot
将表回滚到特定的快照 ID。
参数:
- table:string,必须,表名。
- snapshot_id:long,必须,快照 ID。
输出:
- previous_snapshot_id:long,回滚前的快照 ID。
- current_snapshot_id:long,回滚后的快照 ID。
示例:
call catalog_name.system.rollback_to_snapshot('db.sample', 1);
- rollback_to_timestamp
将表回滚到特定时间的快照。
参数:
- table:string,必须,表名。
- timestamp:timestamp,必须,回滚的时间戳。
输出:
- previous_snapshot_id:long,回滚前的快照 ID;
- current_snapshot_id:long,回滚后的快照 ID。
示例:
call catalog_name.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000');
- set_current_snapshot
设置表的当前快照 ID。与回滚不同的是,set_current_snapshot
可以在各个快照之间任意穿梭。
参数:
- table:string,必须,表名。
- snapshot_id:long,必须,快照 ID。
输出:
- previous_snapshot_id:long,回滚前的快照 ID;
- current_snapshot_id:long,回滚后的快照 ID。
示例:
call catalog_name.system.set_current_snapshot('db.sample', 1);
- cherrypick_snapshot
从快照 cherry-pick 到当前表状态。从现有快照创建新快照,而不更改或删除原始快照。只追加和动态覆盖可以 cherry-pick 的快照。
参数:
- table:string,必须,表名
- snapshot_id:long,必须,快照 ID。
输出:
- source_snapshot_id:long,cherry-pick 之前的快照 ID;
- current_snapshot_id:long,通过 cherry-pick 创建的快照 ID。
- CALL catalog_name.system.cherrypick_snapshot('my_table', 1)
# 元数据管理
- expire_snapshots
删除过期快照和相关数据文件。
参数:
- table:string,必须,表名。
- older_than:timestamp,非必须,该时间戳之前的快照将被删除,默认为 5 天前。
- retain_last:int,非必须,和 older_than 同时存在的时候,要保留的快照数(默认为 1)。
- max_concurrent_deletes:int,非必须,用于删除文件操作的线程池大小(默认不使用线程池)。
输出:
- deleted_data_files_count:long,删除的数据文件数。
- deleted_manifest_files_count:long,删除的 manifest 文件数。
- deleted_manifest_lists_count:删除的 manifest list 文件数。
示例:
call hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100);
- remove_orphan_files
删除 Iceberg 元数据文件中未被引用的“孤儿”文件。 参数:
- table:string,必须,表名。
- older_than:timestamp,非必须,删除在此时间戳之前创建的孤立文件(默认为 3 天前)。
- location:string,非必须,查找文件的目录(默认为表的位置)。
- dry_run:boolean,非必须,当为 true 时,实际上不删除文件(默认为 false)。
- max_concurrent_deletes:int,非必须,用于删除文件操作的线程池大小(默认不使用线程池)。
输出:
- orphan_file_location:string,被确定为孤立文件的每个文件的路径。
示例:
call catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data');
- rewrite_data_files
合并小文件,加速文件扫描。
参数:
- table:string,必须,表名。
- strategy:string,非必须,合并策略
binpack
或sort
。默认为binpack
。- sort_order:string,非必须,用以描述排序方式,多个字段之间用逗号分隔。如:
name asc nulls last, age desc nulls first
。- options:map<string, string>,非必须,用以重写文件时的其它参数。
- where:string,非必须,指定过滤条件。
输出:
- rewritten_data_files_count:被重写的文件数量。
- added_data_files_count:此命令写入的新数据文件数。
示例:
call catalog_name.system.rewrite_data_files('db.sample');
call catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST');
call catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'));
call catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"');
2
3
4
- rewrite_manifests
重写 manifest 文件来优化扫描计划。
参数:
- table:string,必须,表名。
- use_caching:boolean,非必须,是否使用 Spark 缓存(默认为 true)。
输出:
- rewritten_manifests_count:int,被重写的 manifest 文件数。
- added_mainfests_count:int,新生成的 manifest 文件数。
示例:
call catalog_name.system.rewrite_manifests('db.sample', false)
- ancestors_of
获取指定快照的血缘关系。
参数:
- table:string,必须,表名。
- snapshot_id:long,非必须,指定的快照 ID。
输出:
- snapshot_id:long,祖先快照 ID。
- timestamp:long,快照生成时间。
示例:
call spark_catalog.system.ancestors_of('db.tbl', 1);
call spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl');
2
# 表迁移
- snapshots
在不影响原始表的情况下创建一个新的轻量级快照表用以测试,测试结束之后可以通过drop table
删除掉。如果在新的快照表中没有插入新数据,则依然使用的是原始表的数据文件,如果插入操作,则新的数据文件放在快照表的数据目录下,不会影响原始表。
在新表中任何只影响元数据的操作都是允许的,如 inset、delete。但是会影响到物理数据文件的操作是禁止的,如删除过期快照(expire_snapshots),因为会影响到原始表。
参数:
- source_table:string,必须,原始表表名。
- table:string,必须,新表(快照表)表名。
- location:string,非必须,新表的存储目录,默认有 catalog 管理。
- properties:map<string, string>,非必须,添加到新表中的属性。
输出:
- imported_files_count:long,添加到新表的文件数。
示例:
call catalog_name.system.snapshot('db.sample', 'db.snap');
call catalog_name.system.snapshot('db.sample', 'db.snap', '/tmp/temptable/');
2
- migrate
将一个非 Iceberg 表转为 Iceberg 表,原始表中的文件会加载到新的 Iceberg 表中,原始表中的 schema 信息、partition 信息、属性信息以及位置都会拷贝至新表。
提示
迁移成功的前提是新表兼容原始表使用的文件格式。
参数:
- table:string,必须,需要迁移的原始表。
- properties:map<string, string>,非必须,新 Iceberg 表的属性。
输出:
- migrated_files_count:long,添加到新 Iceberg 表中的数据文件数。
示例:
call catalog_name.system.migrate('db.sample');
- add_files
将 Hive 或其它基于文件的表中的数据文件添加到指定的 Iceberg 表中,可以从一个或多个分区导入文件。add_files
只会为需要导入的数据文件增加元数据信息,并不会物理地移动数据文件,而且不会考虑导入文件的 Scheme 信息是否和 Iceberg 表匹配。
注意
expire_snapshot
等可能删除物理文件的操作同样会删除新导入的文件。- 导入过程中不会验证表的 Schema 信息,如果源表和目标表的 Schema 不匹配可能会带来新的问题。
参数:
- table:string,必须,目标表,数据被导入其中。
- source_table:string,必须,源表,提供需要被导入的数据。如果是 Hive 或 Spark 中的表,可以为
db.tbname
;如果文件,可以为file_format
.path
。- partition_filter:map<string, string>,非必须,要从中导入的源表中的分区集合。
示例:
-- 将 db.src_tbl 表(非Iceberg表)中分区 year = 2022 的数据添加到 db.tbl 中
call spark_catalog.system.add_files(
table => 'db.tbl',
source_table => 'db.src_tbl',
partition_filter => map('year', '2022')
)
-- 将 path/to/table 目录中 格式为 parquet 的文件添加到 db.tbl 表中,
call spark_catalog.system.add_files(
table => 'db.tbl',
source_table => '`parquet`.`path/to/table`'
)
2
3
4
5
6
7
8
9
10
11
12