MapReduce学习笔记 —— Map的中间结果

2549 查看

《Hadoop技术内幕——深入解析MapReduce架构设计与实现原理》(董西城著)一书中,第8章《Task运行过程分析》中第3小结详细介绍了Map Task的内部实现,过程如图所示:

在Spill阶段,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。其步骤如下:

  1. 对缓冲区kvbuffer中区间[bufstart, bufend)内的数据进行排序。会先partition,然后基于key排序。
  2. 按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out中。
  3. 将分区数据的元信息写道内存索引数据结构SpillRecorder中,每个元信息包含在临时文件中的偏移量、压缩前后的数据大小。

这些临时文件会在Combine阶段进行合并,最终生成一个文件,并保存到output/file.out中,同时生成响应的索引文件oiutput/file.out.index

那么问题来了,这些中间结果是否可以被第三方获取?是否可以进行加密?加密是否可逆?

在该书第11章《Hadoop安全机制》中有相关介绍。在未添加安全机制之前,任何用户可以通过URL来获取人意一个Map Task的中间输出结果。为了解决该问题,Hadoop在Reduce Task与TaskTracker之间的通信机制上添加了双向认证机制,以保证有且仅有同作业的Reduce Task才能够读取Map Task的中间结果。该双向认证是以它们之间的共享的作业令牌为基础。

过程如下:
TaskTracker对Reduce Task认证: Reduce Task从TaskTracker上获取数据之前,先要将HMAC-SHA1(URL, JobToken)发送给TaskTracker,TaskTracker利用自己保存的作业令牌计算HMAC-SHA1,然后比较该值与Reduce Task发过来的是否一致。
Reduce Task对TaskTracker认证:TaskTracker对Reduce Task认证成功后,需要使用Reduce Task发送过来的HMAC-SHA1值与作业令牌计算一个新的HMAC-SHA1,经Reduce Task验证后,双方才可以证实传送数据。

========================================

MapOutputBuffer中有一个变量叫做mapOutputFile。在sortAndSpill函数中(被flush调用),会通过这个变量拿到文件地址,并写出中间结果,在该方法中,调用了下文中提到的writer.append(key, value)来写出数据。看起来没有加密的过程。

在执行shuffle.run()时,会对map的数据进行提取并合并。就会调用merger.close(),
实际会调用到MergeManagerlmpl的close方法,代码如下:

  @Override
  public RawKeyValueIterator close() throws Throwable {
    // Wait for on-going merges to complete
    if (memToMemMerger != null) { 
      memToMemMerger.close();
    }
    inMemoryMerger.close();
    onDiskMerger.close();

    List<InMemoryMapOutput<K, V>> memory = 
      new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
    inMemoryMergedMapOutputs.clear();
    memory.addAll(inMemoryMapOutputs);
    inMemoryMapOutputs.clear();
    List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
    onDiskMapOutputs.clear();
    return finalMerge(jobConf, rfs, memory, disk);
  }

那么我们看到了memToMemMerger\inMemoryMerger\onDiskMerger三种不同的Merger,定义如下:

private IntermediateMemoryToMemoryMerger memToMemMerger;
private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
private final OnDiskMerger onDiskMerger;

其中IntermediateMemoryToMemoryMerger继承自 MergeThread<InMemoryMapOutput<K, V>, K, V>,然而MergeThread的close方法和run方法如下:

public synchronized void close() throws InterruptedException {
  closed = true;
  waitForMerge();
  interrupt();
}


public void run() {
  while (true) {
    List<T> inputs = null;
    try {
      // Wait for notification to start the merge...
      synchronized (pendingToBeMerged) {
      while(pendingToBeMerged.size() <= 0) {
        pendingToBeMerged.wait();
      }
      // Pickup the inputs to merge.
      inputs = pendingToBeMerged.removeFirst();
    }

    // Merge
    merge(inputs);
    } catch (InterruptedException ie) {
      numPending.set(0);
      return;
    } catch(Throwable t) {
      numPending.set(0);
      reporter.reportException(t);
      return;
    } finally {
      synchronized (this) {
      numPending.decrementAndGet();
      notifyAll();
    }
  }
}

而imMemoryMerger则是由createInMemoryMerger函数创建,其实是一个InMemoryMerger的实例。

这三者都会在merge方法中创建一个Writer变量,并调用Merger.writeFile(iter, writer, reporter, jobConf)。随后调用writer.close()来完成调用。close函数实现如下:

public void close() throws IOException {

  // When IFile writer is created by BackupStore, we do not have
  // Key and Value classes set. So, check before closing the
  // serializers
  if (keyClass != null) {
    keySerializer.close();
    valueSerializer.close();
  }

  // Write EOF_MARKER for key/value length
  WritableUtils.writeVInt(out, EOF_MARKER);
  WritableUtils.writeVInt(out, EOF_MARKER);
  decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);

  //Flush the stream
  out.flush();

  if (compressOutput) {
    // Flush
    compressedOut.finish();
    compressedOut.resetState();
  }

  // Close the underlying stream iff we own it...
  if (ownOutputStream) {
    out.close();
  }
  else {
    // Write the checksum
    checksumOut.finish();
  }

  compressedBytesWritten = rawOut.getPos() - start;

  if (compressOutput) {
    // Return back the compressor
    CodecPool.returnCompressor(compressor);
    compressor = null;
  }

  out = null;
  if(writtenRecordsCounter != null) {
    writtenRecordsCounter.increment(numRecordsWritten);
  }
}

我们会发现其中关键的就是out。out的创建如下:

    if (codec != null) {
    this.compressor = CodecPool.getCompressor(codec);
    if (this.compressor != null) {
      this.compressor.reset();
      this.compressedOut = codec.createOutputStream(checksumOut, compressor);
      this.out = new FSDataOutputStream(this.compressedOut,  null);
      this.compressOutput = true;
    } else {
      LOG.warn("Could not obtain compressor from CodecPool");
      this.out = new FSDataOutputStream(checksumOut,null);
    }
  } else {
    this.out = new FSDataOutputStream(checksumOut,null);
  }

这一部分解释了党我们传入了压缩格式的时候,中间结果如何进行压缩。

几个结论:

  • 输出应该是机遇Job Configuration里面的设定,压缩成具体的格式。可以参看:StackOverflow
  • 直接使用Map的中间结果应该也是不可行的,除非自己修改源代码。可以参看:StackOverflow。但是可以尝试实现IFile做一些常识。