Spark AppendOnlyMap
# 一、概念
AppendOnlyMap
是 Spark 中基于数组实现的一个哈希表,类似于 Java 中的 Map 结构,用以存储键值对。
<img src="/img/spark/AppendOnlyMap.png"21.Spark_AppendOnlyMap.md />
它具有以下特点:
- 基于数组实现。
- 哈希表中的 Key 只能追加,不能删除,就像它的名字一样 Append-Only。
- 哈希表中的 Value 可以被修改。
- 最多可以存储 375809638 (0.7 * 2 ^ 29) 个元素。
- 每次扩容都扩为原来的 2 倍。
# 二、AppendOnlyMap 源码
// initialCapacity 指定初始容量,默认为 64
class AppendOnlyMap[K, V](initialCapacity: Int = 64)
extends Iterable[(K, V)] with Serializable {
import AppendOnlyMap._
require(initialCapacity <= MAXIMUM_CAPACITY,
s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
require(initialCapacity >= 1, "Invalid initial capacity")
// 负载因子,即 curSize 达到 LOAD_FACTOR * capacity 时,开始对 data 扩容
private val LOAD_FACTOR = 0.7
// data 数组的真实 Key 的容量,初始值为 initialCapacity 向上取最近的一个 2 的幂
// 比如 initialCapacity 为 64,则 capacity 为 64
// 如果 initialCapacity 为 72,则 capacity 为 128,因为 72 往上最近的一个 2 的幂为 128(2^7)
private var capacity = nextPowerOf2(initialCapacity)
// 计算数据存放位置的掩码
private var mask = capacity - 1
// 记录已经存入 data 的 key 的数量
private var curSize = 0
// 扩容的阈值
private var growThreshold = (LOAD_FACTOR * capacity).toInt
// 用来保存 Key 和 Value 的数组
private var data = new Array[AnyRef](2 * capacity)
// data 中是否已经存入了 null key
private var haveNullValue = false
// 存入的 null key 对应的 value
private var nullValue: V = null.asInstanceOf[V]
// 表示 data 数组是否不再使用
private var destroyed = false
// 当 destroyed 为 true 时打印的信息
private val destructionMessage = "Map state is invalid from destructive sorting!"
// 根据 key 获取 value,相当于 HashMap 的 get() 方法
def apply(key: K): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
// 针对 null 单独处理
if (k.eq(null)) {
return nullValue
}
// 根据 hash 值计算下标
var pos = rehash(k.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
// 刚好该位置的 key 是想要的,返回 value
if (k.eq(curKey) || k.equals(curKey)) {
return data(2 * pos + 1).asInstanceOf[V]
} else if (curKey.eq(null)) {
// key 不存在,返回 null
return null.asInstanceOf[V]
} else {
// 解决 hash 冲突
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V]
}
// 更新 Key 的 Value
def update(key: K, value: V): Unit = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
// 针对 null Key 的处理,从这里可以看到 null 是没有存在 data 数组中,而是单独创建了两个变量来表示
if (k.eq(null)) {
if (!haveNullValue) {
incrementSize()
}
nullValue = value
haveNullValue = true
return
}
// 根据 Key 的 Hash 值计算元素放入 data 中的下标
var pos = rehash(key.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
// 说明 data 的当前位置还没有放入元素
if (curKey.eq(null)) {
// 在当前位置放入 Key
data(2 * pos) = k
// 在下一个位置放入 Value
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
// 对当前已经存入的键值对计数 +1
incrementSize()
return
} else if (k.eq(curKey) || k.equals(curKey)) {
// 当前位置已经有值,且和传进来的 Key 相同,直接更新 Value
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
return
} else {
// 该位置已经有了值,但是该值不等于传进来的 Key,不断向后找一个空位置插入
// 实际上就是在解决 hash 冲突
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
}
// 缓存聚合算法,使用 updateFunc 函数对 key 和 V 进行聚合,并更新 V 的值,key 为待聚合的key
// updateFunc 中的 Boolean 表示 key 是否已经在 data 数组中进行过聚合,V 表示之前的聚合值,新的聚合值在 V 的基础上产生
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
// 针对 null 值的更新,单独处理
if (k.eq(null)) {
if (!haveNullValue) {
incrementSize()
}
nullValue = updateFunc(haveNullValue, nullValue)
haveNullValue = true
return nullValue
}
var pos = rehash(k.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
// 如果 key 的位置为 null,说明之前的 V 也是 null
if (curKey.eq(null)) {
val newValue = updateFunc(false, null.asInstanceOf[V])
data(2 * pos) = k
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
incrementSize()
return newValue
} else if (k.eq(curKey) || k.equals(curKey)) {
// key 不为 null,聚合新旧 value
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
} else {
// 解决 hash 冲突,继续向后寻找 key
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}
/** Iterator method from Iterable */
override def iterator: Iterator[(K, V)] = {
assert(!destroyed, destructionMessage)
new Iterator[(K, V)] {
var pos = -1
/** Get the next value we should return from next(), or null if we're finished iterating */
def nextValue(): (K, V) = {
if (pos == -1) { // Treat position -1 as looking at the null value
if (haveNullValue) {
return (null.asInstanceOf[K], nullValue)
}
pos += 1
}
while (pos < capacity) {
if (!data(2 * pos).eq(null)) {
return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
}
pos += 1
}
null
}
override def hasNext: Boolean = nextValue() != null
override def next(): (K, V) = {
val value = nextValue()
if (value == null) {
throw new NoSuchElementException("End of iterator")
}
pos += 1
value
}
}
}
override def size: Int = curSize
// 已经存入的 Key 加 1
private def incrementSize(): Unit = {
curSize += 1
if (curSize > growThreshold) {
growTable()
}
}
/**
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
*/
private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
// 真正的扩容方法,每次扩容为原来的 2 倍,并针对元素 re-hash
protected def growTable(): Unit = {
// 扩容为原来的 2 倍
val newCapacity = capacity * 2
require(newCapacity <= MAXIMUM_CAPACITY, s"Can't contain more than ${growThreshold} elements")
// 创建一个容量为当前数组 2 倍的新数组
val newData = new Array[AnyRef](2 * newCapacity)
// 计算新数组的掩码
val newMask = newCapacity - 1
// 将旧数组中的元素拷贝的新数组,由于旧数组中的元素是唯一的,所以这里不需要检查key是否相等
var oldPos = 0
while (oldPos < capacity) {
if (!data(2 * oldPos).eq(null)) {
val key = data(2 * oldPos)
val value = data(2 * oldPos + 1)
var newPos = rehash(key.hashCode) & newMask
var i = 1
var keepGoing = true
while (keepGoing) {
val curKey = newData(2 * newPos)
if (curKey.eq(null)) {
newData(2 * newPos) = key
newData(2 * newPos + 1) = value
keepGoing = false
} else {
val delta = i
newPos = (newPos + delta) & newMask
i += 1
}
}
}
oldPos += 1
}
// 将新数组赋值给 data 变量
data = newData
// 赋值新容量
capacity = newCapacity
// 赋值新的掩码
mask = newMask
// 赋值新的扩容阈值
growThreshold = (LOAD_FACTOR * newCapacity).toInt
}
// 向上计算最接近 n 的 2 的 幂
private def nextPowerOf2(n: Int): Int = {
val highBit = Integer.highestOneBit(n)
if (highBit == n) n else highBit << 1
}
// 在不使用额外内存和不牺牲有效性的前提下,返回 AppendOnlyMap 排序后的迭代器
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
destroyed = true
// 将 data 数组中的元素整体向前移动补空位置,保证第一个和最后一个元素中间都是连续的,没有留空位置
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(2 * newIndex) = data(2 * keyIndex)
data(2 * newIndex + 1) = data(2 * keyIndex + 1)
newIndex += 1
}
keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
// 执行排序
new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
// 生成访问 data 中元素的迭代器
new Iterator[(K, V)] {
var i = 0
var nullValueReady = haveNullValue
def hasNext: Boolean = (i < newIndex || nullValueReady)
def next(): (K, V) = {
if (nullValueReady) {
nullValueReady = false
(null.asInstanceOf[K], nullValue)
} else {
val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
i += 1
item
}
}
}
}
/**
* Return whether the next insert will cause the map to grow
*/
def atGrowThreshold: Boolean = curSize == growThreshold
}
private object AppendOnlyMap {
// 表示 data 数组的最大容量
// data 真正可以存储数据的部分仅仅为 0.7 * MAXIMUM_CAPACITY
// 1 << 29 表示 2 ^ 29,使用位运算更快
val MAXIMUM_CAPACITY = (1 << 29)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# 三、SizeTrackingAppendOnlyMap 源码
AppendOnlyMap
还有两个子类,分别是SizeTrackingAppendOnlyMap
和PartitionedAppendOnlyMap
,它们的关系是:
PartitionedAppendOnlyMap
继承自SizeTrackingAppendOnlyMap
,SizeTrackingAppendOnlyMap
继承自 AppendOnlyMap
。
SizeTrackingAppendOnlyMap
继承了AppendOnlyMap
并实现了SizeTracker
接口,这意味着它将同时拥有两者都能力,即可以在内存中对任务执行结果进行更新和聚合,也可以对自身大小进行样本采集和大小估算。
private[spark] class SizeTrackingAppendOnlyMap[K, V]
extends AppendOnlyMap[K, V] with SizeTracker
{
override def update(key: K, value: V): Unit = {
super.update(key, value)
super.afterUpdate()
}
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
super.afterUpdate()
newValue
}
override protected def growTable(): Unit = {
super.growTable()
resetSamples()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 四、PartitionedAppendOnlyMap 源码
private[spark] class PartitionedAppendOnlyMap[K, V]
extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] {
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
destructiveSortedIterator(comparator)
}
def insert(partition: Int, key: K, value: V): Unit = {
update((partition, key), value)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
上次更新: 2023/11/07, 10:52:14