This is a Sonnet 3.6 translation of a Chinese article. Please be mindful of potential translation errors.
Here’s the translation of the content while maintaining all Markdown formatting and technical terms:
In the previous section, we used ringGraph as an example to introduce the channel connection process between machines. Now each rank in the ring knows which rank to receive data from and which rank to send data to. This section will specifically introduce the process of establishing data communication links in P2P and rdma NET scenarios.
As mentioned in the previous section, nccl completed the establishment of data communication links through ncclTransportP2pSetup. Let’s continue with the example of a sixteen-card ring across two machines:
First, let’s introduce ncclPeer. ncclPeer maintains two connectors; for rank 10, send communicates with rank 9, and recv communicates with rank 1. For ease of explanation later, let’s assume rank 10 is called the receiver and rank 1 is called the sender.
In ncclConnector, connected indicates whether the connection has been established, transportResources are the buffers used during communication, and proxyAppend will be discussed later when we cover the data communication process.
ncclConnInfo records the communication context information. In this section, we only need to focus on buffs, which are the buffers used during communication, actually located in transportResources, with pointers pointing to them.
cpp11 lines hidden
1
2
3
4
5
6
7
8
9
10
11
12
13
14
structncclConnInfo{// Regular comm mechanism
char*buffs[NCCL_NUM_PROTOCOLS];// Local for recv, remote for send
uint64_t*tail;// Local for recv, remote for send
uint64_t*head;// Local for send, remote for recv
intdirect;// Direct communication
void**ptrExchange;// Pointer exchange for direct communication
int*fifo;// Size fifo for proxy
uint64_tstep;// Keep where we are
uint64_tllLastCleaning;};
ncclConnector’s ncclTransportComm defines a series of communication-related function pointers. Users can implement these interfaces themselves. ncclTransport defines send and recv ncclTransportComm. This section will introduce two ncclTransports: P2P and NET.
Let’s continue from the previous section about ncclTransportP2pSetup. Since the current rank is 10, nrecv is 1, peerRecv is 1, nsend is 1, and peerSend is 9. Then it begins creating communication to 1, initializing peers[1].recv this ncclConnector through selectTransport.
cpp13 lines hidden
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ncclResult_tncclTransportP2pSetup(structncclComm*comm,structncclTopoGraph*graph,structncclChannel*channel,intnrecv,int*peerRecv,intnsend,int*peerSend){TRACE(NCCL_INIT,"nsend %d nrecv %d",nsend,nrecv);uint32_tnSkippedSend=0,nSkippedRecv=0;/* for tracing */structncclConnectconnect;structncclConnector*conn;for(inti=0;i<nrecv;i++){intpeer=peerRecv[i];if(peer==-1||peer>=comm->nRanks)continue;conn=&channel->peers[peer].recv;if(conn->connected){++nSkippedRecv;continue;}memset(&connect,0,sizeof(connect));NCCLCHECK(selectTransport<0>(comm->topo,graph,comm->peerInfo+comm->rank,comm->peerInfo+peer,&connect,conn,channel->id));NCCLCHECK(bootstrapSend(comm->bootstrap,peer,&connect,sizeof(structncclConnect)));}...}
nccl currently has three transports: P2P communicates through card-to-card p2p, SHM communicates through machine-internal shared host memory, and NET communicates through network. nccl will check the availability of these three transports sequentially through canConnect and select the first available one. Since rank 1 is not on the current machine, only NET’s recv is available, setting the connector’s transportComm to netTransport’s recv.
cpp13 lines hidden
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template<inttype>staticncclResult_tselectTransport(structncclTopoSystem*topo,structncclTopoGraph*graph,structncclPeerInfo*myInfo,structncclPeerInfo*peerInfo,structncclConnect*connect,structncclConnector*connector,intchannelId){for(intt=0;t<NTRANSPORTS;t++){structncclTransport*transport=ncclTransports+t;structncclTransportComm*transportComm=type==1?&transport->send:&transport->recv;intret=0;NCCLCHECK(transport->canConnect(&ret,topo,graph,myInfo,peerInfo));if(ret){connector->transportComm=transportComm;NCCLCHECK(transportComm->setup(topo,graph,myInfo,peerInfo,connect,connector,channelId));returnncclSuccess;}}WARN("No transport found !");returnncclInternalError;}
First, netRecvResources is allocated to ncclConnector. The main field meanings are in the comments. LOC_COUNT is 2, indicating two buffers. If gdr is supported, it will use the LOC_DEVMEM (1) buffer, which is GPU memory. If gdr is not supported, it will use the LOC_HOSTMEM (0) buffer, which is locked page memory. sendMem and recvMem record the fifo’s head and tail to coordinate producers and consumers, which we’ll cover in the next section and can be ignored for now. When users execute communication operations like ncclSend for a block of data, nccl will divide this block into multiple smaller blocks for pipelined transmission. Step indicates which small block it is, which we’ll also cover in the next section.
ncclTopoGetNetDev selects a network card for the current rank’s GPU. When searching for channels, we recorded the network card corresponding to the ring in graph->inter, so we can find the corresponding network card through inter here.
cpp11 lines hidden
1
2
3
4
5
6
7
8
9
10
11
12
13
14
ncclResult_tncclTopoGetNetDev(structncclTopoSystem*system,intrank,structncclTopoGraph*graph,intchannelId,int*dev){if(graph){// Honor the net device in the graph
intchannel=channelId%graph->nChannels;intngpus=system->nodes[GPU].count;intindex=graph->intra[channel*ngpus]==rank?0:1;*dev=graph->inter[channel*2+index];}else{int64_tid;NCCLCHECK(ncclTopoGetLocalNet(system,rank,&id,channelId));*dev=id;}returnncclSuccess;}
ncclTopoCheckGdr checks whether the selected network card and the current rank’s GPU support gdr. The specific logic was covered in section five, so we won’t repeat it here. Then it allocates locked page memory for sendMem and recvMem, setting head and tail. The test machine supports gdr, so protoLoc is LOC_DEVMEM (GPU memory). Then it allocates buffers needed for three protocols, stored continuously, with offset recording each starting address saved to conn. mhandles are mr used by rdma, and mhandlesProtoc points to mhandles.
ncclResult_tnetRecvSetup(structncclTopoSystem*topo,structncclTopoGraph*graph,structncclPeerInfo*myInfo,structncclPeerInfo*peerInfo,structncclConnect*connectInfo,structncclConnector*recv,intchannelId){...NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem,1));NCCLCHECK(ncclCudaHostCalloc(&resources->recvMem,1));recv->conn.direct|=resources->useGdr?NCCL_DIRECT_NIC:0;recv->conn.tail=&resources->recvMem->tail;recv->conn.head=&resources->sendMem->head;intprotoLoc[NCCL_NUM_PROTOCOLS];for(intp=0;p<NCCL_NUM_PROTOCOLS;p++){protoLoc[p]=resources->useGdr?LOC_DEVMEM:LOC_HOSTMEM;}intbuffSizes[NCCL_NUM_PROTOCOLS];for(intp=0;p<NCCL_NUM_PROTOCOLS;p++){// Only allocate buffers for simple for p2p connections
buffSizes[p]=graph==NULL&&p!=NCCL_PROTO_SIMPLE?0:recv->comm->buffSizes[p];resources->buffSizes[protoLoc[p]]+=buffSizes[p];}if(resources->buffSizes[LOC_DEVMEM]){NCCLCHECK(ncclCudaCalloc(resources->buffers+LOC_DEVMEM,resources->buffSizes[LOC_DEVMEM]));}if(resources->buffSizes[LOC_HOSTMEM]){NCCLCHECK(ncclCudaHostCalloc(resources->buffers+LOC_HOSTMEM,resources->buffSizes[LOC_HOSTMEM]));}intoffsets[LOC_COUNT];offsets[LOC_HOSTMEM]=offsets[LOC_DEVMEM]=0;for(intp=0;p<NCCL_NUM_PROTOCOLS;p++){resources->mhandlesProto[p]=resources->mhandles+protoLoc[p];recv->conn.buffs[p]=resources->buffers[protoLoc[p]]+offsets[protoLoc[p]];offsets[protoLoc[p]]+=buffSizes[p];}INFO(NCCL_INIT|NCCL_NET,"Channel %02d : %d[%lx] -> %d[%lx] [receive] via NET/%s/%d%s",channelId,peerInfo->rank,peerInfo->busId,myInfo->rank,myInfo->busId,ncclNetName(),resources->netDev,resources->useGdr?"/GDRDMA":"");structnetConnectInfo*info=(structnetConnectInfo*)connectInfo;NCCLCHECK(ncclNetListen(resources->netDev,&info->netHandle,&resources->netListenComm));returnncclSuccess;}
Since socket-based link establishment needs to exchange information between sender and receiver through socket, such as qp number, port, mtu, gid or lid, ncclIbListen creates a listening socket here, similar to bootstrap. The fd is written to listenComm, and ip port is written to handle, which is connectInfo.
cpp13 lines hidden
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ncclResult_tncclIbListen(intdev,void*opaqueHandle,void**listenComm){structncclIbListenComm*comm;NCCLCHECK(ncclCalloc(&comm,1));structncclIbHandle*handle=(structncclIbHandle*)opaqueHandle;static_assert(sizeof(structncclIbHandle)<NCCL_NET_HANDLE_MAXSIZE,"ncclIbHandle size too large");comm->dev=dev;NCCLCHECK(GetSocketAddr(&(handle->connectAddr)));NCCLCHECK(createListenSocket(&comm->fd,&handle->connectAddr));*listenComm=comm;returnncclSuccess;}structncclIbListenComm{intdev;intfd;};
At this point, recv initialization is complete. Back to ncclTransportP2pSetup, connectInfo is sent to peer (rank 1) through bootstrapSend. ConnectInfo is the ip port mentioned above. When rank 1 executes this function, it will iterate through nsend. At this time, rank 1’s peer is rank 10, then execute selectTransport, which will execute netTransport’s send setup, namely netSendSetup. This logic is basically consistent with netRecvSetup, mainly allocating various buffers, which won’t be repeated here. Let’s look at the following logic.
Then rank 1 receives the ip and port sent from rank 10 through bootstrapRecv, and executes connect, namely netSendConnect
cpp6 lines hidden
1
2
3
4
5
6
7
8
9
ncclResult_tnetSendConnect(structncclConnect*connectInfo,intnranks,intrank,structncclConnector*send){// Setup device pointers
structnetSendResources*resources=(structnetSendResources*)send->transportResources;structnetConnectInfo*info=(structnetConnectInfo*)connectInfo;// Connect to remote peer
NCCLCHECK(ncclNetConnect(resources->netDev,info->netHandle,&resources->netSendComm));...}
Here, info is rank 10’s ip and port, then executes ncclNetConnect, namely ncclIbConnect, which mainly creates qp and sends related information to the receiver through socket.
Let’s look at the process of ncclIbConnect creating qp, first looking at two APIs that will be used
ncclResult_tncclIbCreateQp(uint8_tib_port,structncclIbVerbs*verbs,intaccess_flags,structibv_qp**qp){structibv_qp_init_attrqpInitAttr;memset(&qpInitAttr,0,sizeof(structibv_qp_init_attr));qpInitAttr.send_cq=verbs->cq;qpInitAttr.recv_cq=verbs->cq;qpInitAttr.qp_type=IBV_QPT_RC;// We might send 2 requests per send (RDMA_WRITE+RDMA_WRITE_WITH_IMM)
qpInitAttr.cap.max_send_wr=2*MAX_REQUESTS;qpInitAttr.cap.max_recv_wr=MAX_REQUESTS;qpInitAttr.cap.max_send_sge=1;qpInitAttr.cap.max_recv_sge=1;qpInitAttr.cap.max_inline_data=0;NCCLCHECK(wrap_ibv_create_qp(qp,verbs->pd,&qpInitAttr));structibv_qp_attrqpAttr;memset(&qpAttr,0,sizeof(structibv_qp_attr));qpAttr.qp_state=IBV_QPS_INIT;qpAttr.pkey_index=0;qpAttr.port_num=ib_port;qpAttr.qp_access_flags=access_flags;NCCLCHECK(wrap_ibv_modify_qp(*qp,&qpAttr,IBV_QP_STATE|IBV_QP_PKEY_INDEX|IBV_QP_PORT|IBV_QP_ACCESS_FLAGS));returnncclSuccess;}
ncclIbCreateQp is used to create and initialize qp, setting completion queues used by send and recv, setting qp_type as rc, setting maximum number of wr for send and recv, and how many sge can be in each wr, then creates qp. At this time, the qp is in RST state and cannot do anything. Then sets qp_state to init, sets port and access_flag to IBV_ACCESS_REMOTE_WRITE, indicating qp can accept remote writes, then modifies qp state. At this time, qp is in INIT state, where qp can post recv wr, but received messages won’t be processed.
Then let’s look at ncclIbConnect. ncclIbMalloc allocates page-aligned memory. As we can see later, nccl performs page alignment when registering memory, but ibv_reg_mr doesn’t require memory to be page-aligned.
The registered memory buffer doesn’t have to be page-aligned.
ncclResult_tncclIbConnect(intdev,void*opaqueHandle,void**sendComm){structncclIbSendComm*comm;NCCLCHECK(ncclIbMalloc((void**)&comm,sizeof(structncclIbSendComm)));structncclIbHandle*handle=(structncclIbHandle*)opaqueHandle;NCCLCHECK(connectAddress(&comm->fd,&handle->connectAddr));*sendComm=comm;// IB Setup
ibv_context*ctx=ncclIbDevs[dev].context;NCCLCHECK(ncclIbInitVerbs(ctx,&comm->verbs));uint8_tib_port=ncclIbDevs[dev].port;NCCLCHECK(ncclIbCreateQp(ib_port,&comm->verbs,IBV_ACCESS_REMOTE_WRITE,&comm->qp));// Send my QP Info to receiver through the socket. Hope this won't block.
structibv_port_attrportAttr;NCCLCHECK(wrap_ibv_query_port(ctx,ib_port,&portAttr));structncclIbQpInfoqpInfo;qpInfo.ib_port=ib_port;qpInfo.qpn=comm->qp->qp_num;qpInfo.mtu=portAttr.active_mtu;// Prepare my fifo
NCCLCHECK(wrap_ibv_reg_mr(&comm->fifoMr,comm->verbs.pd,comm->fifo,sizeof(structncclIbSendFifo)*MAX_REQUESTS,IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ));qpInfo.fifoRkey=comm->fifoMr->rkey;qpInfo.fifoAddr=(uint64_t)comm->fifo;// RoCE support
qpInfo.lid=portAttr.lid;if(qpInfo.lid){// IB
INFO(NCCL_NET,"NET/IB: Dev %d Port %d qpn %d mtu %d LID %d",dev,ib_port,qpInfo.qpn,qpInfo.mtu,qpInfo.lid);}else{// RoCE
unionibv_gidgid;NCCLCHECK(wrap_ibv_query_gid(ctx,ib_port,ncclParamIbGidIndex(),&gid));qpInfo.spn=gid.global.subnet_prefix;qpInfo.iid=gid.global.interface_id;INFO(NCCL_NET,"NET/IB: Dev %d Port %d qpn %d mtu %d GID %ld (%lX/%lX)",dev,ib_port,qpInfo.qpn,qpInfo.mtu,ncclParamIbGidIndex(),qpInfo.spn,qpInfo.iid);}NCCLCHECK(socketSend(comm->fd,&qpInfo,sizeof(qpInfo)));returnncclSuccess;}
After QP initialization, it prepares to exchange information between sender and receiver through socket, gets port related information, assigns port, mtu, qpn to qpInfo, then determines whether it’s using ib or roce. In roce, lid is 0 and can only communicate using gid, while ib can communicate using lid. Finally, qpInfo is sent to the receiver (rank 10) through socket.
Back to netSendConnect, the data buffer allocated during setup needs to be registered, namely ncclIbRegMr. Page alignment is performed here, and mr is written to resource’s mhandle.
Then back to ncclTransportP2pSetup, rank 1 executed connect, sending qp related information to rank 10 through socket. At this time, rank 10 continues to execute the following connect, namely netRecvConnect. Additionally, in the rdma scenario, the ncclConnect received through bootstrap here is not used.
Rank 10 will execute ncclIbAccept, receive rank 1’s qp information through socket, then get the corresponding network card’s context and port through net dev. Like the above process, create pd and cq through ncclIbInitVerbs, create qp through ncclIbCreateQp, then adjust mtu according to rank 1
ncclResult_tncclIbAccept(void*listenComm,void**recvComm){structncclIbListenComm*lComm=(structncclIbListenComm*)listenComm;structncclIbRecvComm*rComm;NCCLCHECK(ncclIbMalloc((void**)&rComm,sizeof(structncclIbRecvComm)));structsockaddr_insockaddr;socklen_tsocklen=sizeof(structsockaddr_in);SYSCHECKVAL(accept(lComm->fd,(structsockaddr*)&sockaddr,&socklen),"accept",rComm->fd);structncclIbQpInforemQpInfo;NCCLCHECK(socketReceive(rComm->fd,&remQpInfo,sizeof(remQpInfo)));// IB setup
ibv_context*ctx=ncclIbDevs[lComm->dev].context;uint8_tib_port=ncclIbDevs[lComm->dev].port;structibv_port_attrportAttr;NCCLCHECK(wrap_ibv_query_port(ctx,ib_port,&portAttr));unionibv_gidgid;NCCLCHECK(wrap_ibv_query_gid(ctx,ib_port,ncclParamIbGidIndex(),&gid));// QP Creation
NCCLCHECK(ncclIbInitVerbs(ctx,&rComm->verbs));NCCLCHECK(ncclIbCreateQp(ib_port,&rComm->verbs,IBV_ACCESS_REMOTE_WRITE,&rComm->qp));// Adjust the MTU
remQpInfo.mtu=(enumibv_mtu)std::min(remQpInfo.mtu,portAttr.active_mtu);// Setup QP
structibv_qp*qp=rComm->qp;NCCLCHECK(ncclIbRtrQp(qp,&remQpInfo));NCCLCHECK(ncclIbRtsQp(qp));...}
Then execute ncclIbRtrQp, transitioning qp from INIT state to RTR state, setting mtu, peer’s qpn, gid and port information. At this time, qp can post recv messages and receive normally
Then continue looking at ncclIbAccept, here fifo is also used to control the sending process, which will be written when introducing data communication later.
ncclResult_tncclIbAccept(void*listenComm,void**recvComm){...// Retain remote fifo info and prepare my RDMA ops
rComm->remFifo.rkey=remQpInfo.fifoRkey;rComm->remFifo.addr=remQpInfo.fifoAddr;NCCLCHECK(wrap_ibv_reg_mr(&rComm->remFifo.mr,rComm->verbs.pd,&rComm->remFifo.elems,sizeof(structncclIbSendFifo)*MAX_REQUESTS,IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_READ));rComm->remFifo.sge.length=sizeof(structncclIbSendFifo);rComm->remFifo.sge.lkey=rComm->remFifo.mr->lkey;#if USE_RDMA_SEND_INLINE
// Determine whether the remFifo element data can be sent INLINE
structibv_qp_attrattr;structibv_qp_init_attrinit_attr;NCCLCHECK(wrap_ibv_query_qp(qp,&attr,IBV_QP_CAP,&init_attr));if(init_attr.cap.max_inline_data>=rComm->remFifo.sge.length)rComm->remFifo.flags=IBV_SEND_INLINE;#endif
// Allocate Flush dummy buffer for GPU Direct RDMA
rComm->gpuFlush.enabled=(ncclIbGdrSupport(lComm->dev)==0)&&(ncclParamIbGdrFlushDisable()==0)?1:0;if(rComm->gpuFlush.enabled){NCCLCHECK(wrap_ibv_reg_mr(&rComm->gpuFlush.hostMr,rComm->verbs.pd,&rComm->gpuFlush.hostMem,sizeof(int),IBV_ACCESS_LOCAL_WRITE));rComm->gpuFlush.sge.addr=(uint64_t)&rComm->gpuFlush.hostMem;rComm->gpuFlush.sge.length=1;rComm->gpuFlush.sge.lkey=rComm->gpuFlush.hostMr->lkey;NCCLCHECK(ncclIbCreateQp(ib_port,&rComm->verbs,IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_READ,&rComm->gpuFlush.qp));structncclIbQpInfolocalQpInfo={.lid=portAttr.lid,.ib_port=ib_port,.qpn=rComm->gpuFlush.qp->qp_num,.spn=gid.global.subnet_prefix,.iid=gid.global.interface_id,.mtu=portAttr.active_mtu};NCCLCHECK(ncclIbRtrQp(rComm->gpuFlush.qp,&localQpInfo));NCCLCHECK(ncclIbRtsQp(rComm->gpuFlush.qp));}// Fill Handle
structncclIbQpInfoqpInfo={.lid=portAttr.lid,.ib_port=ib_port,.qpn=qp->qp_num,.spn=gid.global.subnet_prefix,.iid=gid.global.interface_id,.mtu=remQpInfo.mtu};NCCLCHECK(socketSend(rComm->fd,&qpInfo,sizeof(qpInfo)));*recvComm=rComm;returnncclSuccess;}
gpuFlush also corresponds to a qp, but this qp is local, meaning its peer qp is itself. When gdr is enabled, a flush needs to be executed after each data reception, which is actually an rdma read operation, using the network card to read the first int of the received data into hostMem. The official issue explains that when data reception completes through gdr and generates wc to cpu, the received data may not be readable on the gpu side, at which time a read needs to be executed on the cpu side.
Finally, rank 10’s port, qpn, gid, etc., are sent back to rank 1 through socket. At this point, ncclTransportP2pSetup has completed execution, but rdma hasn’t completed establishing connection because rank 1 hasn’t received rank 10’s information, and qp is still in INIT state. Rank 1 won’t check whether the final step of link establishment is complete until it starts sending data. If the link hasn’t been established, it executes ncclSendCheck, the process is the same as above and won’t be repeated.
cpp16 lines hidden
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ncclResult_tncclSendCheck(structncclIbSendComm*comm){structncclIbQpInforemQpInfo;structibv_qp*qp=comm->qp;// Do not block on this receive, return if not ready.
intbytes=0;NCCLCHECK(socketProgress(NCCL_SOCKET_RECV,comm->fd,&remQpInfo,sizeof(remQpInfo),&bytes));if(bytes==0)returnncclSuccess;// Try again later
NCCLCHECK(socketWait(NCCL_SOCKET_RECV,comm->fd,&remQpInfo,sizeof(remQpInfo),&bytes));NCCLCHECK(ncclIbRtrQp(qp,&remQpInfo));NCCLCHECK(ncclIbRtsQp(qp));comm->ready=1;// Block until this is done. It *should* not block indefinitely.
NCCLCHECK(socketSend(comm->fd,&comm->ready,sizeof(int)));returnncclSuccess;}
At this point, the rdma connection between rank 1 and rank 10 has been established. Then let’s look at the p2p connection between rank 10 and rank 9.
In the p2p scenario, the connectInfo exchanged between ranks is shown below
Still following the previous order, rank 9 first executes recv setup, first allocating resource, with data communication buffer saved in ncclRecvMem’s buff field.
cpp14 lines hidden
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
structp2pRecvResources{structncclRecvMem*devMem;void*ipcPtr;};structncclRecvMem{union{struct{uint64_ttail;charpad1[CACHE_LINE_SIZE-sizeof(uint64_t)];charpad2[CACHE_LINE_SIZE-sizeof(uint64_t)];intsizesFifo[NCCL_STEPS];};charpad4[MEM_ALIGN];};charbuff[1];// Actually larger than that
};
Then judge useRead. If the path type between two ranks is less than p2pLevel (default is PATH_SYS), then useP2P is 1. If the path type is PATH_NVL and it’s Ampere architecture, then useRead is 1. ncclRecvMem uses flexible array to store buffer, still only focusing on NCCL_PROTO_SIMPLE. If read is 1, then no buffer needs to be allocated. Since the current scenario is single process, record direct as 1, record devMem to directPtr, then enable card-to-card p2p access through cudaDeviceEnablePeerAccess.
ncclResult_tp2pRecvSetup(structncclTopoSystem*topo,structncclTopoGraph*graph,structncclPeerInfo*myInfo,structncclPeerInfo*peerInfo,structncclConnect*connectInfo,structncclConnector*recv,intchannelId){structp2pRecvResources*resources;NCCLCHECK(ncclCalloc(&resources,1));recv->transportResources=resources;intuseRead=p2pUseRead(topo,myInfo,peerInfo);intrecvSize=offsetof(structncclRecvMem,buff);// For P2P Read the SIMPLE buffer is tagged on the end of the ncclSendMem structure
for(intp=0;p<NCCL_NUM_PROTOCOLS;p++)if(!(useRead&&p==NCCL_PROTO_SIMPLE))recvSize+=recv->comm->buffSizes[p];ALIGN_SIZE(recvSize,CUDA_IPC_MIN);NCCLCHECK(ncclCudaCalloc((char**)&resources->devMem,recvSize));structp2pConnectInfoinfo;info.read=useRead;if(myInfo->pidHash==peerInfo->pidHash){info.direct=1;info.directPtr=resources->devMem;if(myInfo->cudaDev==peerInfo->cudaDev){TRACE(NCCL_INIT|NCCL_P2P,"%d <- %d via P2P/common device",myInfo->rank,peerInfo->rank);}else{// Enable P2P access
cudaError_terr=cudaDeviceEnablePeerAccess(peerInfo->cudaDev,0);if(err==cudaErrorPeerAccessAlreadyEnabled){cudaGetLastError();}elseif(err!=cudaSuccess){WARN("failed to peer with device %d(=%lx): %d %s",peerInfo->cudaDev,peerInfo->busId,err,cudaGetErrorString(err));returnncclInternalError;}TRACE(NCCL_INIT|NCCL_P2P,"Channel %02d : %d[%lx] <- %d[%lx] via P2P/direct pointer",channelId,myInfo->rank,myInfo->busId,peerInfo->rank,peerInfo->busId);}}else{...}static_assert(sizeof(structp2pConnectInfo)<=sizeof(structncclConnect),"p2p Connect Info is too big");memcpy(connectInfo,&info,sizeof(structp2pConnectInfo));returnncclSuccess;}
Next, rank 10 will execute send setup, with similar overall logic. From here we can see the purpose of useRead: if useRead is 1, the buffer is placed on the send rank; if 0, it’s placed on the recv rank.
ncclResult_tp2pSendSetup(structncclTopoSystem*topo,structncclTopoGraph*graph,structncclPeerInfo*myInfo,structncclPeerInfo*peerInfo,structncclConnect*connectInfo,structncclConnector*send,intchannelId){structp2pSendResources*resources;NCCLCHECK(ncclCalloc(&resources,1));send->transportResources=resources;intuseRead=p2pUseRead(topo,myInfo,peerInfo);intsendSize=sizeof(structncclSendMem);// For P2P Read the SIMPLE buffer is tagged on the end of the ncclSendMem structure
if(useRead)sendSize+=send->comm->buffSizes[NCCL_PROTO_SIMPLE];ALIGN_SIZE(sendSize,CUDA_IPC_MIN);NCCLCHECK(ncclCudaCalloc((char**)&resources->devMem,sendSize));structp2pConnectInfoinfo;info.read=useRead;constchar*useReadStr=info.read?"/read":"";if(myInfo->pidHash==peerInfo->pidHash){info.direct=1;info.directPtr=resources->devMem;if(myInfo->cudaDev==peerInfo->cudaDev){INFO(NCCL_INIT|NCCL_P2P,"Channel %02d : %d[%d] -> %d[%d] via P2P/common device%s",channelId,myInfo->rank,myInfo->cudaDev,peerInfo->rank,peerInfo->cudaDev,useReadStr);returnncclInternalError;}else{// Enable P2P access
cudaError_terr=cudaDeviceEnablePeerAccess(peerInfo->cudaDev,0);if(err==cudaErrorPeerAccessAlreadyEnabled){cudaGetLastError();}elseif(err!=cudaSuccess){WARN("failed to peer with device %d(=%lx): %d %s",peerInfo->cudaDev,peerInfo->busId,err,cudaGetErrorString(err));returnncclInternalError;}INFO(NCCL_INIT|NCCL_P2P,"Channel %02d : %d[%lx] -> %d[%lx] via P2P/direct pointer%s",channelId,myInfo->rank,myInfo->busId,peerInfo->rank,peerInfo->busId,useReadStr);}}else{...}static_assert(sizeof(structp2pConnectInfo)<=sizeof(structncclConnect),"p2p Connect Info is too big");memcpy(connectInfo,&info,sizeof(structp2pConnectInfo));returnncclSuccess;}
Then rank 10 executes the send connect process. Info is rank 9’s information, remDevMem is the GPU memory just allocated by rank 9. If read is 0, conn’s direct needs to be set. Next set conn’s buff: if read is 1, buff is the current card, otherwise set to rank 9’s GPU memory. The following head and tail settings are used to coordinate sender and receiver, which will be detailed in the next section.
staticncclResult_tp2pSendConnect(structncclConnect*connectInfo,intnranks,intrank,structncclConnector*send){structp2pSendResources*resources=(structp2pSendResources*)send->transportResources;structncclRecvMem*remDevMem;structp2pConnectInfo*info=(structp2pConnectInfo*)connectInfo;if(info->direct){remDevMem=(structncclRecvMem*)(info->directPtr);if(info->read==0)send->conn.direct|=NCCL_DIRECT_GPU;}else{...}intoffset=0;for(intp=0;p<NCCL_NUM_PROTOCOLS;p++){if(info->read&&p==NCCL_PROTO_SIMPLE){/* For P2P Read the SIMPLE buffer is local (ncclSendMem) */send->conn.buffs[p]=resources->devMem->buff;}else{send->conn.buffs[p]=remDevMem->buff+offset;offset+=send->comm->buffSizes[p];}}send->conn.tail=&remDevMem->tail;send->conn.head=&resources->devMem->head;send->conn.ptrExchange=&resources->devMem->ptrExchange;returnncclSuccess;}
ncclResult_tp2pRecvConnect(structncclConnect*connectInfo,intnranks,intrank,structncclConnector*recv){structp2pRecvResources*resources=(structp2pRecvResources*)recv->transportResources;structncclSendMem*remDevMem;structp2pConnectInfo*info=(structp2pConnectInfo*)connectInfo;if(info->direct){remDevMem=(structncclSendMem*)(info->directPtr);if(info->read==0){recv->conn.direct|=NCCL_DIRECT_GPU;recv->conn.ptrExchange=&remDevMem->ptrExchange;}}else{//TRACE_DUMP_IPC(&info->devIpc);
cudaError_terr=cudaIpcOpenMemHandle(&resources->ipcPtr,info->devIpc,cudaIpcMemLazyEnablePeerAccess);remDevMem=(structncclSendMem*)resources->ipcPtr;if(err!=cudaSuccess){WARN("failed to open CUDA IPC handle : %d %s",err,cudaGetErrorString(err));returnncclUnhandledCudaError;}}intoffset=0;for(intp=0;p<NCCL_NUM_PROTOCOLS;p++){if(info->read&&p==NCCL_PROTO_SIMPLE){/* For P2P Read the SIMPLE buffer is remote (ncclSendMem) */recv->conn.buffs[p]=remDevMem->buff;}else{recv->conn.buffs[p]=resources->devMem->buff+offset;offset+=recv->comm->buffSizes[p];}}recv->conn.tail=&resources->devMem->tail;recv->conn.head=&remDevMem->head;returnncclSuccess;}
Finally, a brief summary of the link establishment process:
Receiver executes recv setup, creates buffer etc., records related information to connectInfo, starts a listening socket, records ip port to connectInfo as well, sends connectInfo to Sender through bootstrap.
Sender executes send setup, creates buffer etc., records related information to connectInfo, then sends to Receiver. This step’s connectInfo is not used in rdma scenario.
Sender receives information from Receiver in step 1, then establishes connection from Sender to Receiver. In p2p scenario, it simply records peer buffer; in rdma scenario, it needs to initialize qp to INIT state.
Receiver receives information sent from send in step 2, then establishes connection from Receiver to Sender. P2p scenario still records peer buffer; rdma scenario needs to initialize qp to RTS state and send back local qp information to peer.
In rdma scenario, Sender needs to receive peer qp status and initialize local qp to RTS state.
NCCL Source Code Study - This article is part of a series.