MapOutputBuffer中有一个变量叫做mapOutputFile。在sortAndSpill函数中(被flush调用),会通过这个变量拿到文件地址,并写出中间结果,在该方法中,调用了下文中提到的writer.append(key, value)
来写出数据。看起来没有加密的过程。
在执行shuffle.run()时,会对map的数据进行提取并合并。就会调用merger.close(),
实际会调用到MergeManagerlmpl的close方法,代码如下:
@Override
public RawKeyValueIterator close() throws Throwable {
// Wait for on-going merges to complete
if (memToMemMerger != null) {
memToMemMerger.close();
}
inMemoryMerger.close();
onDiskMerger.close();
List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
onDiskMapOutputs.clear();
return finalMerge(jobConf, rfs, memory, disk);
}
那么我们看到了memToMemMerger\inMemoryMerger\onDiskMerger三种不同的Merger,定义如下:
private IntermediateMemoryToMemoryMerger memToMemMerger;
private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
private final OnDiskMerger onDiskMerger;
其中IntermediateMemoryToMemoryMerger继承自 MergeThread<InMemoryMapOutput<K, V>, K, V>,然而MergeThread的close方法和run方法如下:
public synchronized void close() throws InterruptedException {
closed = true;
waitForMerge();
interrupt();
}
public void run() {
while (true) {
List<T> inputs = null;
try {
// Wait for notification to start the merge...
synchronized (pendingToBeMerged) {
while(pendingToBeMerged.size() <= 0) {
pendingToBeMerged.wait();
}
// Pickup the inputs to merge.
inputs = pendingToBeMerged.removeFirst();
}
// Merge
merge(inputs);
} catch (InterruptedException ie) {
numPending.set(0);
return;
} catch(Throwable t) {
numPending.set(0);
reporter.reportException(t);
return;
} finally {
synchronized (this) {
numPending.decrementAndGet();
notifyAll();
}
}
}
而imMemoryMerger则是由createInMemoryMerger函数创建,其实是一个InMemoryMerger的实例。
这三者都会在merge方法中创建一个Writer变量,并调用Merger.writeFile(iter, writer, reporter, jobConf)
。随后调用writer.close()
来完成调用。close函数实现如下:
public void close() throws IOException {
// When IFile writer is created by BackupStore, we do not have
// Key and Value classes set. So, check before closing the
// serializers
if (keyClass != null) {
keySerializer.close();
valueSerializer.close();
}
// Write EOF_MARKER for key/value length
WritableUtils.writeVInt(out, EOF_MARKER);
WritableUtils.writeVInt(out, EOF_MARKER);
decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
//Flush the stream
out.flush();
if (compressOutput) {
// Flush
compressedOut.finish();
compressedOut.resetState();
}
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
}
else {
// Write the checksum
checksumOut.finish();
}
compressedBytesWritten = rawOut.getPos() - start;
if (compressOutput) {
// Return back the compressor
CodecPool.returnCompressor(compressor);
compressor = null;
}
out = null;
if(writtenRecordsCounter != null) {
writtenRecordsCounter.increment(numRecordsWritten);
}
}
我们会发现其中关键的就是out。out的创建如下:
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
if (this.compressor != null) {
this.compressor.reset();
this.compressedOut = codec.createOutputStream(checksumOut, compressor);
this.out = new FSDataOutputStream(this.compressedOut, null);
this.compressOutput = true;
} else {
LOG.warn("Could not obtain compressor from CodecPool");
this.out = new FSDataOutputStream(checksumOut,null);
}
} else {
this.out = new FSDataOutputStream(checksumOut,null);
}
这一部分解释了党我们传入了压缩格式的时候,中间结果如何进行压缩。
几个结论:
- 输出应该是机遇Job Configuration里面的设定,压缩成具体的格式。可以参看:StackOverflow
- 直接使用Map的中间结果应该也是不可行的,除非自己修改源代码。可以参看:StackOverflow。但是可以尝试实现IFile做一些常识。