coderplay
10/14/2013 - 11:37 PM

Giraph notes

Giraph notes

Master start

  1. aggregator handler
  2. start a netty master server - to receive messages from workers
  3. start a netty master client - to send messages to workers
  4. create vertex/edge input splits, write splits to zk in parallel
  5. while () do coordinatesuperstep()

Worker start

  1. partitioner
  2. start a worker server
  3. start a worker client - to send requests to other workers or master
  4. aggregate handler
  5. setup() - run the first initialsuperstep : INPUT_SUPERSTEP
  6. while () do superstep computing

I/O

Giraph不使用Hadoop的InputFormat, 而是使用VertexInputFormat/EdgeInputFormat. 接口与Hadoop的InputFormat类似,但内部读取方法不一样. Master在createInputSplits()时会调用VertexInputFormat/EdgeInputFormat的getSplits()方法, 将这些InputSplit序列化到zookeeper. Worker在启动后, 读取其中的一些input split. 每个input split都是vertex/edge的集合, worker不一定全部读取到自己partition的vertex/edge. 所以如果读取到别的partition的vertex/edge, 就通过netty发送到目标worker. 详见RequestServerHandler类. 这里有一个优化: 如果目标节点, 就是本身节点, 就不用通过netty, 而是直接加入到本地队列.


  /**
   * When doing the request, short circuit if it is local
   *
   * @param workerInfo Worker info
   * @param writableRequest Request to either submit or run locally
   */
  public void doRequest(WorkerInfo workerInfo,
                         WritableRequest writableRequest) {
    // If this is local, execute locally
    if (serviceWorker.getWorkerInfo().getTaskId() ==
        workerInfo.getTaskId()) {
      ((WorkerRequest) writableRequest).doRequest(serverData);
      localRequests.inc();
    } else {
      workerClient.sendWritableRequest(
          workerInfo.getTaskId(), writableRequest);
      remoteRequests.inc();
    }
  }