coderplay
4/14/2014 - 11:44 PM

Push mode data shuffling desgin notes

Push mode data shuffling desgin notes

MapReduce Pull mode

Push mode

Map启动时, 需要知道下游所有Reduce的地址

Reduce启动时, 需要知道上游所有Map的地址

Fault tolerance

Data skew

pseudo code

1. 建立partition与NetworkChannel的对应关系

// 由于partition是连续的整数, 所以可以用数组表示partition与NetworkChannel的对应关系
// 这种方法比HashMap<ParitionId, Channel>更高效
Channel[] channels = new Channel[partitions.length]
// 一个partition对应一个文件
String[]  files = new String[partition.length]

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup) ...

for(int i = 0; i < partitions.length; i++) {
  ChannelFuture future = bootstrap.connect(new InetSocketAddress(partition[i].targetIP,partition[i].targetPort));
  future.addListener( new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
      channels[i] = future.channel();
      files[i] = "../stageid/part-" + i "" 
    }
  }
}

2. 用户map()方法插入KV

write(K key, V value) {
  int partition = partitioner.getPartition(key);
  // pipeline下游的handler判断: 如果buffer已满, 则spill buffer 并send buffer
  channels[partition].write(key, value, partition)
}

3. 处理map输出的handler

//  此handler判断: 如果buffer已满, 则spill buffer 并send buffer
class SpillAndSendHandler extends ChannelOutboundHandlerAdapter {
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws IOException {
    Channel channel = ctx.channel();
    Byte buf = channel.unsafe().outboundBuffer();
    buf.write(msg);
    channel.unsafe().outboundBuffer() = another new buffer from threadpool
    if(buf is full) {
      spillToDisk(files[msg.getPartition], buf);           // buf写入磁盘, block io
      channel.writeAndFlush(buf); // 向目标partition节点发送buf
    }
  }
}

Benchmark Design

借助akka cluster的功能, 可以监测到节点up和down的事件, 方便统一调度工作节点. 首先启一个master, 这个master用来监测到节点up和down的事件。然后所有slaves节点启动, master监测所有节点都已经启动之后, 分配一些节点作为运行map tasks的节点, 另一些节点作为运行reduce tasks的节点. map 节点分配的同时, 告诉它所有reduce节点的位置. reduce节点分配的同时, 告诉它所有map节点的位置. map task一旦启动, 就进一个循环, 不断地造数据, 分partition后把数据push给对应的reduce端. reduce task启动, 用netty监听网络事件, 有数据到达, 则进入reduce 接收数据的阶段.