Skip to main content

2 - Establishment of Bootstrap Network Connection

·1555 words
NCCL Source Code Study - This article is part of a series.
Part 2: This Article
This is a Sonnet 3.6 translation of a Chinese article. Please be mindful of potential translation errors.

Last time we introduced how rank0 machine generated ncclUniqueId and completed the initialization of machine’s bootstrap network and communication network. This section continues to look at how bootstrap connections between all nodes are established.

Rank0 node executes ncclGetUniqueId to generate ncclUniqueId, broadcasts the Id to all nodes via mpi, then all nodes will execute ncclCommInitRank. Here other nodes will also initialize bootstrap network and communication network operations, then execute ncclCommInitRankSync

ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) {
  ncclResult_t res;
 
  CUDACHECK(cudaSetDevice(cudaDev));
  NCCLCHECKGOTO(commAlloc(newcomm, nranks, myrank), res, cleanup);
  NCCLCHECKGOTO(initTransportsRank(*newcomm, &commId), res, cleanup);
  NCCLCHECKGOTO(devCommSetup(*newcomm), res, cleanup);
 
  INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %x - Init COMPLETE", *newcomm, myrank, nranks, (*newcomm)->cudaDev, (*newcomm)->busId);
 
  return ncclSuccess;
cleanup:
  if ((*newcomm) && (*newcomm)->bootstrap) bootstrapAbort((*newcomm)->bootstrap);
  *newcomm = NULL;
  return res;
}

ncclComm_t is a pointer to ncclComm. ncclComm is a mixture containing all context information used for communication. Its fields will be introduced when used. Then newcom is allocated through commAlloc and initialized, such as which card it currently is, what the corresponding pcie busid is, then execute initTransportsRank

static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) {
  // We use 3 AllGathers
  // 1. { peerInfo, comm }
  // 2. ConnectTransport[nranks], ConnectValue[nranks]
  // 3. { nThreads, nrings, compCap, prev[MAXCHANNELS], next[MAXCHANNELS] }
 
  int rank = comm->rank;
  int nranks = comm->nRanks;
  uint64_t commHash = getHash(commId->internal, NCCL_UNIQUE_ID_BYTES);
  TRACE(NCCL_INIT, "comm %p, commHash %lx, rank %d nranks %d - BEGIN", comm, commHash, rank, nranks);
  NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap));
 
  // AllGather1 - begin
  struct {
    struct ncclPeerInfo peerInfo;
    struct ncclComm* comm;
  } *allGather1Data;
 
  NCCLCHECK(ncclCalloc(&allGather1Data, nranks));
  allGather1Data[rank].comm = comm;
  struct ncclPeerInfo* myInfo = &allGather1Data[rank].peerInfo;
  NCCLCHECK(fillInfo(comm, myInfo, commHash));
  NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGather1Data, sizeof(*allGather1Data)));
 
  NCCLCHECK(ncclCalloc(&comm->peerInfo, nranks+1)); // Extra rank to represent CollNet root
  for (int i = 0; i < nranks; i++) {
    memcpy(comm->peerInfo+i, &allGather1Data[i].peerInfo, sizeof(struct ncclPeerInfo));
    if ((i != rank) && (comm->peerInfo[i].hostHash == myInfo->hostHash) && (comm->peerInfo[i].busId == myInfo->busId)) {
      WARN("Duplicate GPU detected : rank %d and rank %d both on CUDA device %x", rank, i, myInfo->busId);
      return ncclInvalidUsage;
    }
  }

Let’s look at bootstrapInit

ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {
  ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;
  bool idFromEnv = getenv("NCCL_COMM_ID") != NULL;
  struct extState* state;
  NCCLCHECK(ncclCalloc(&state, 1));
  state->rank = rank;
  state->nranks = nranks;
  *commState = state;
 
  TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);
 
  struct extInfo info = { 0 };
  info.rank = rank;
  info.nranks = nranks;
  void *tmpSendComm, *tmpRecvComm;
  // Pass the remote address to listen via info
  if (idFromEnv) {
    memcpy(&info.extHandleListen, netHandle, sizeof(ncclNetHandle_t));
    memcpy(&info.extHandleListenRoot, netHandle, sizeof(ncclNetHandle_t));
  }
  // listen will return the local address via info (specify interface type 'findSubnetIf')
  state->dev = idFromEnv ? findSubnetIf : 0;
  void* extBstrapListenCommRoot;
  NCCLCHECK(bootstrapNetListen(state->dev, &info.extHandleListen, &state->extBstrapListenComm));
  NCCLCHECK(bootstrapNetListen(state->dev, &info.extHandleListenRoot, &extBstrapListenCommRoot));
 
  // stagger connection times to avoid an overload of the root at very high rank counts
  if (nranks > 128) {
    long msec = rank;
    struct timespec tv;
    tv.tv_sec = msec / 1000;
    tv.tv_nsec = 1000000 * (msec % 1000);
    TRACE(NCCL_INIT, "rank %d delaying connection to root by %ld msec", rank, msec);
    (void) nanosleep(&tv, NULL);
  }
 
  // send info on my listening socket to root
  NCCLCHECK(bootstrapNetConnect(state->dev, netHandle, &tmpSendComm));
  NCCLCHECK(bootstrapNetSend(tmpSendComm, &info, sizeof(info)));
  NCCLCHECK(bootstrapNetCloseSend(tmpSendComm));
 
  // get info on my "next" rank in the bootstrap ring from root
}

First look at commState, which is the bootstrap of ncclComm, type is extState

struct extState {
  void* extBstrapListenComm;
  void* extBstrapRingRecvComm;
  void* extBstrapRingSendComm;
  ncclNetHandle_t* peerBstrapHandles;
  struct unexConn* unexpectedConnections;
  int rank;
  int nranks;
  int dev;
};

Where extBstrapRingSendComm is the socket connection between current node and next, extBstrapRingRecvComm is the socket connection between current node and prev node, extBstrapListenComm is the listening socket of current node, peerBstrapHandles are the ip ports of all ranks (corresponding to extBstrapListenComm), dev defaults to 0, indicating which ip address to use

Then create extHandleListen and extHandleListenRoot two bootstrap comms through bootstrapNetListen. As mentioned earlier, bootstrap comm is actually just saving fd. The reason for creating two comms here is that extHandleListen is the actual bootstrap connection used between ranks, extHandleListenRoot is the connection used for communication between rank0 node and all other ranks

static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm)

The bootstrapNetListen function was introduced in the last section. It will get the ip of the current machine for the dev-th device, then listen to get the listening fd, write ip port to nethandle, and write the obtained bootstrap comm to listencomm

Then write rank, nrank, extHandleListen and extHandleListenRoot to extInfo

struct extInfo {
  int rank;
  int nranks;
  ncclNetHandle_t extHandleListenRoot;
  ncclNetHandle_t extHandleListen;
};

netHandle is ncclUniqueId, which is rank0’s ip port, then create bootstrap send comm through bootstrapNetConnect. Comparing to bootstrapNetListen, bootstrapNetConnect establishes socket connection to netHandle and writes socket to sendComm. The dev is not used here

static ncclResult_t bootstrapNetConnect(int dev, ncclNetHandle_t* netHandle, void** sendComm)

Then send extInfo through bootstrapNetSend, which is sent to rank0

static ncclResult_t bootstrapNetSend(void* sendComm, void* data, int size) {
  struct bootstrapNetComm* comm = (struct bootstrapNetComm*)sendComm;
  NCCLCHECK(socketSend(comm->fd, &size, sizeof(int)));
  NCCLCHECK(socketSend(comm->fd, data, size));
  return ncclSuccess;
}

Where socketSend executes the send interface to send data

Then close fd through bootstrapNetCloseSend

What will rank0 do after receiving the data? Recall that rank0’s node executes ncclGetUniqueId to generate ncclUniqueId, and at the end of executing bootstrapCreateRoot, it will start a thread to execute bootstrapRoot

static void *bootstrapRoot(void* listenComm) {
  struct extInfo info;
  ncclNetHandle_t *rankHandles = NULL;
  ncclNetHandle_t *rankHandlesRoot = NULL; // for initial rank <-> root information exchange
  ncclNetHandle_t zero = { 0 }; // for sanity checking
  void* tmpComm;
  ncclResult_t res;
  setFilesLimit();
 
  TRACE(NCCL_INIT, "BEGIN");
  /* Receive addresses from all ranks */
  int nranks = 0, c = 0;
  do {
    NCCLCHECKGOTO(bootstrapNetAccept(listenComm, &tmpComm), res, out);
    NCCLCHECKGOTO(bootstrapNetRecv(tmpComm, &info, sizeof(info)), res, out);
    NCCLCHECKGOTO(bootstrapNetCloseRecv(tmpComm), res, out);
 
    if (c == 0) {
      nranks = info.nranks;
      NCCLCHECKGOTO(ncclCalloc(&rankHandles, nranks), res, out);
      NCCLCHECKGOTO(ncclCalloc(&rankHandlesRoot, nranks), res, out);
    }
 
    if (nranks != info.nranks) {
      WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nranks, info.nranks);
      goto out;
    }
 
    if (memcmp(&zero, &rankHandlesRoot[info.rank], sizeof(ncclNetHandle_t)) != 0) {
      WARN("Bootstrap Root : rank %d of %d ranks has already checked in", info.rank, nranks);
      goto out;
    }
 
    // Save the connection handle for that rank
    memcpy(rankHandlesRoot+info.rank, info.extHandleListenRoot, sizeof(ncclNetHandle_t));
    memcpy(rankHandles+info.rank, info.extHandleListen, sizeof(ncclNetHandle_t));
 
    ++c;
    TRACE(NCCL_INIT, "Received connect from rank %d total %d/%d",  info.rank, c, nranks);
  } while (c < nranks);
  TRACE(NCCL_INIT, "COLLECTED ALL %d HANDLES", nranks);
 
  // Send the connect handle for the next rank in the AllGather ring
  for (int r=0; r<nranks; ++r) {
    int next = (r+1) % nranks;
    void *tmpSendComm;
    NCCLCHECKGOTO(bootstrapNetConnect(0, rankHandlesRoot+r, &tmpSendComm), res, out);
    NCCLCHECKGOTO(bootstrapNetSend(tmpSendComm, rankHandles+next, sizeof(ncclNetHandle_t)), res, out);
    NCCLCHECKGOTO(bootstrapNetCloseSend(tmpSendComm), res, out);
  }
  TRACE(NCCL_INIT, "SENT OUT ALL %d HANDLES", nranks);
 
out:
  bootstrapNetCloseListen(listenComm);
  if (rankHandles) free(rankHandles);
  if (rankHandlesRoot) free(rankHandlesRoot);
 
  TRACE(NCCL_INIT, "DONE");
  return NULL;
}

listenComm is the listening fd created by rank0 in the previous blog post. bootstrapNetAccept gets a new connection from listenComm and creates recvcomm using the new connection’s fd

static ncclResult_t bootstrapNetAccept(void* listenComm, void** recvComm)

Then read tmpComm’s data through bootstrapNetRecv, which is the extInfo sent by other ranks, and save other ranks’ extHandleListen and extHandleListenRoot. At this point rank0 has obtained the ip and port of all other ranks. After getting all ranks’ info, start building the ring by sending node (r+1) % nranks’s extHandleListen to node r, meaning sending node r’s next node’s nethandle to node r. Here we can see that each node created two listen comms, where rank0 communicates using extHandleListenRoot, and other nodes communicate with each other through extHandleListen

Then go back to continue looking at bootstrapInit

ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {
  // get info on my "next" rank in the bootstrap ring from root
  ncclNetHandle_t extHandleNext;
  NCCLCHECK(bootstrapNetAccept(extBstrapListenCommRoot, &tmpRecvComm));
  NCCLCHECK(bootstrapNetRecv(tmpRecvComm, &extHandleNext, sizeof(extHandleNext)));
  NCCLCHECK(bootstrapNetCloseRecv(tmpRecvComm));
  NCCLCHECK(bootstrapNetCloseListen(extBstrapListenCommRoot));
 
  NCCLCHECK(bootstrapNetConnect(state->dev, &extHandleNext, &state->extBstrapRingSendComm));
  // Accept the connect request from the previous rank in the AllGather ring
  NCCLCHECK(bootstrapNetAccept(state->extBstrapListenComm, &state->extBstrapRingRecvComm));
 
  // AllGather all listen handlers
  NCCLCHECK(ncclCalloc(&state->peerBstrapHandles, nranks));
  memcpy(state->peerBstrapHandles+rank, info.extHandleListen, sizeof(ncclNetHandle_t));
  NCCLCHECK(bootstrapAllGather(state, state->peerBstrapHandles, sizeof(ncclNetHandle_t)));
 
  TRACE(NCCL_INIT, "rank %d nranks %d - DONE", rank, nranks);
 
  return ncclSuccess;
}

Then all ranks will receive new connections on extHandleListenRoot to create tmpRecvComm, then receive the current rank’s next ip, port; then connect next to create bscomm to state->extBstrapRingSendComm, receive prev’s connection to create bscomm to state->extBstrapRingRecvComm. Now the bootstrap network connection is fully established, as shown below

image

Finally gather all ranks’ ip ports, first put own nethandle to the corresponding position in peerBstrapHandles, as shown below

image

Then execute bootstrapAllGather

ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) {
  struct extState* state = (struct extState*)commState;
  char* data = (char*)allData;
  int rank = state->rank;
  int nranks = state->nranks;
 
  TRACE(NCCL_INIT, "rank %d nranks %d size %d", rank, nranks, size);
 
  /* Simple ring based AllGather
   * At each step i receive data from (rank-i-1) from left
   * and send previous step's data from (rank-i) to right
   */
  for (int i=0; i<nranks-1; i++) {
    size_t rslice = (rank - i - 1 + nranks) % nranks;
    size_t sslice = (rank - i + nranks) % nranks;
 
    // Send slice to the right
    NCCLCHECK(bootstrapNetSend(state->extBstrapRingSendComm, data+sslice*size, size));
    // Recv slice from the left
    NCCLCHECK(bootstrapNetRecv(state->extBstrapRingRecvComm, data+rslice*size, size));
  }
 
  TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
  return ncclSuccess;
}

Each time send own data to corresponding rank, then receive data sent from other ranks, as shown below

Step 1:

image

Step 2:

image

At this point each rank has the global ip ports of all ranks

Finally to summarize, this section mainly created the bootstrap ring network connection and saved it to ncclComm.

NCCL Source Code Study - This article is part of a series.
Part 2: This Article