#include<stdio.h>#include"cuda_runtime.h"#include"nccl.h"#include"mpi.h"#include<unistd.h>#include<stdint.h>#define MPICHECK(cmd) do { \
int e = cmd; \
if( e != MPI_SUCCESS ) { \
printf("Failed: MPI error %s:%d '%d'\n", \
__FILE__,__LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define CUDACHECK(cmd) do { \
cudaError_t e = cmd; \
if( e != cudaSuccess ) { \
printf("Failed: Cuda error %s:%d '%s'\n", \
__FILE__,__LINE__,cudaGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r!= ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", \
__FILE__,__LINE__,ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
staticuint64_tgetHostHash(constchar*string){// Based on DJB2a, result = result * 33 ^ char
uint64_tresult=5381;for(intc=0;string[c]!='\0';c++){result=((result<<5)+result)^string[c];}returnresult;}staticvoidgetHostName(char*hostname,intmaxlen){gethostname(hostname,maxlen);for(inti=0;i<maxlen;i++){if(hostname[i]=='.'){hostname[i]='\0';return;}}}intmain(intargc,char*argv[]){intsize=32*1024*1024;intmyRank,nRanks,localRank=0;//initializing MPI
MPICHECK(MPI_Init(&argc,&argv));MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD,&myRank));MPICHECK(MPI_Comm_size(MPI_COMM_WORLD,&nRanks));//calculating localRank which is used in selecting a GPU
uint64_thostHashs[nRanks];charhostname[1024];getHostName(hostname,1024);hostHashs[myRank]=getHostHash(hostname);MPICHECK(MPI_Allgather(MPI_IN_PLACE,0,MPI_DATATYPE_NULL,hostHashs,sizeof(uint64_t),MPI_BYTE,MPI_COMM_WORLD));for(intp=0;p<nRanks;p++){if(p==myRank)break;if(hostHashs[p]==hostHashs[myRank])localRank++;}//each process is using two GPUs
intnDev=2;float**sendbuff=(float**)malloc(nDev*sizeof(float*));float**recvbuff=(float**)malloc(nDev*sizeof(float*));cudaStream_t*s=(cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);//picking GPUs based on localRank
for(inti=0;i<nDev;++i){CUDACHECK(cudaSetDevice(localRank*nDev+i));CUDACHECK(cudaMalloc(sendbuff+i,size*sizeof(float)));CUDACHECK(cudaMalloc(recvbuff+i,size*sizeof(float)));CUDACHECK(cudaMemset(sendbuff[i],1,size*sizeof(float)));CUDACHECK(cudaMemset(recvbuff[i],0,size*sizeof(float)));CUDACHECK(cudaStreamCreate(s+i));}ncclUniqueIdid;ncclComm_tcomms[nDev];//generating NCCL unique ID at one process and broadcasting it to all
if(myRank==0)ncclGetUniqueId(&id);MPICHECK(MPI_Bcast((void*)&id,sizeof(id),MPI_BYTE,0,MPI_COMM_WORLD));//initializing NCCL, group API is required around ncclCommInitRank as it is
//called across multiple GPUs in each thread/process
NCCLCHECK(ncclGroupStart());for(inti=0;i<nDev;i++){CUDACHECK(cudaSetDevice(localRank*nDev+i));NCCLCHECK(ncclCommInitRank(comms+i,nRanks*nDev,id,myRank*nDev+i));}NCCLCHECK(ncclGroupEnd());//calling NCCL communication API. Group API is required when using
//multiple devices per thread/process
NCCLCHECK(ncclGroupStart());for(inti=0;i<nDev;i++)NCCLCHECK(ncclAllReduce((constvoid*)sendbuff[i],(void*)recvbuff[i],size,ncclFloat,ncclSum,comms[i],s[i]));NCCLCHECK(ncclGroupEnd());//synchronizing on CUDA stream to complete NCCL communication
for(inti=0;i<nDev;i++)CUDACHECK(cudaStreamSynchronize(s[i]));//freeing device memory
for(inti=0;i<nDev;i++){CUDACHECK(cudaFree(sendbuff[i]));CUDACHECK(cudaFree(recvbuff[i]));}//finalizing NCCL
for(inti=0;i<nDev;i++){ncclCommDestroy(comms[i]);}//finalizing MPI
MPICHECK(MPI_Finalize());printf("[MPI Rank %d] Success \n",myRank);return0;}
rank0 will execute ncclGetUniqueId to obtain an Id, then broadcast it to other ranks via MPI. Let’s see how the UniqueId is generated
First, execute initEnv to set environment variables
Then execute initNet to initialize the networks needed by NCCL, including two networks: one is the bootstrap network, and the other is the data communication network. The bootstrap network is mainly used to exchange simple information during initialization, such as IP addresses and ports of each machine. Since the data volume is small and it’s mainly executed once during initialization, bootstrap uses TCP. The communication network is used for actual data transmission, so it prioritizes RDMA (with GDR preferred if supported)
bootstrapNetInit is the initialization of the bootstrap network. It mainly uses findInterfaces to traverse all network card information on the machine, uses prefixList to match and select which network cards to use, and saves the available network card information. The ifa_name is saved to the global bootstrapNetIfNames, and IP addresses are saved to global bootstrapNetIfAddrs. By default, all network cards except docker and lo can be used. For example, if there are three network cards on the test machine - xgbe0, xgbe1, and xgbe2 - it will save these three ifanames and their corresponding IP addresses. Additionally, NCCL provides the environment variable NCCL_SOCKET_IFNAME to specify which network card to use, for example, using export NCCL_SOCKET_IFNAME=xgbe0 to specify xgbe0, which is achieved through prefixList matching
The ncclNet_t struct contains a series of function pointers, such as initialization, send, receive, etc. Communication methods like socket and IB implement their own ncclNet_t, such as ncclNetSocket and ncclNetIb. The process of initializing the communication network is to check which communication mode is available and then assign it to the global ncclNet
ncclResult_tncclIbInit(ncclDebugLogger_tlogFunction){staticintshownIbHcaEnv=0;if(wrap_ibv_symbols()!=ncclSuccess){returnncclInternalError;}if(ncclParamIbDisable())returnncclInternalError;if(ncclNIbDevs==-1){pthread_mutex_lock(&ncclIbLock);wrap_ibv_fork_init();if(ncclNIbDevs==-1){ncclNIbDevs=0;if(findInterfaces(ncclIbIfName,&ncclIbIfAddr,MAX_IF_NAME_SIZE,1)!=1){WARN("NET/IB : No IP interface found.");returnncclInternalError;}// Detect IB cards
intnIbDevs;structibv_device**devices;// Check if user defined which IB device:port to use
char*userIbEnv=getenv("NCCL_IB_HCA");if(userIbEnv!=NULL&&shownIbHcaEnv++==0)INFO(NCCL_NET|NCCL_ENV,"NCCL_IB_HCA set to %s",userIbEnv);structnetIfuserIfs[MAX_IB_DEVS];boolsearchNot=userIbEnv&&userIbEnv[0]=='^';if(searchNot)userIbEnv++;boolsearchExact=userIbEnv&&userIbEnv[0]=='=';if(searchExact)userIbEnv++;intnUserIfs=parseStringList(userIbEnv,userIfs,MAX_IB_DEVS);if(ncclSuccess!=wrap_ibv_get_device_list(&devices,&nIbDevs))returnncclInternalError;for(intd=0;d<nIbDevs&&ncclNIbDevs<MAX_IB_DEVS;d++){structibv_context*context;if(ncclSuccess!=wrap_ibv_open_device(&context,devices[d])||context==NULL){WARN("NET/IB : Unable to open device %s",devices[d]->name);continue;}intnPorts=0;structibv_device_attrdevAttr;memset(&devAttr,0,sizeof(devAttr));if(ncclSuccess!=wrap_ibv_query_device(context,&devAttr)){WARN("NET/IB : Unable to query device %s",devices[d]->name);if(ncclSuccess!=wrap_ibv_close_device(context)){returnncclInternalError;}continue;}for(intport=1;port<=devAttr.phys_port_cnt;port++){structibv_port_attrportAttr;if(ncclSuccess!=wrap_ibv_query_port(context,port,&portAttr)){WARN("NET/IB : Unable to query port %d",port);continue;}if(portAttr.state!=IBV_PORT_ACTIVE)continue;if(portAttr.link_layer!=IBV_LINK_LAYER_INFINIBAND&&portAttr.link_layer!=IBV_LINK_LAYER_ETHERNET)continue;// check against user specified HCAs/ports
if(!(matchIfList(devices[d]->name,port,userIfs,nUserIfs,searchExact)^searchNot)){continue;}TRACE(NCCL_INIT|NCCL_NET,"NET/IB: [%d] %s:%d/%s ",d,devices[d]->name,port,portAttr.link_layer==IBV_LINK_LAYER_INFINIBAND?"IB":"RoCE");ncclIbDevs[ncclNIbDevs].device=d;ncclIbDevs[ncclNIbDevs].guid=devAttr.sys_image_guid;ncclIbDevs[ncclNIbDevs].port=port;ncclIbDevs[ncclNIbDevs].link=portAttr.link_layer;ncclIbDevs[ncclNIbDevs].speed=ncclIbSpeed(portAttr.active_speed)*ncclIbWidth(portAttr.active_width);ncclIbDevs[ncclNIbDevs].context=context;strncpy(ncclIbDevs[ncclNIbDevs].devName,devices[d]->name,MAXNAMESIZE);NCCLCHECK(ncclIbGetPciPath(ncclIbDevs[ncclNIbDevs].devName,&ncclIbDevs[ncclNIbDevs].pciPath,&ncclIbDevs[ncclNIbDevs].realPort));ncclIbDevs[ncclNIbDevs].maxQp=devAttr.max_qp;ncclNIbDevs++;nPorts++;pthread_create(&ncclIbAsyncThread,NULL,ncclIbAsyncThreadMain,context);}if(nPorts==0&&ncclSuccess!=wrap_ibv_close_device(context)){returnncclInternalError;}}if(nIbDevs&&(ncclSuccess!=wrap_ibv_free_device_list(devices))){returnncclInternalError;};}if(ncclNIbDevs==0){INFO(NCCL_INIT|NCCL_NET,"NET/IB : No device found.");}else{charline[1024];line[0]='\0';for(intd=0;d<ncclNIbDevs;d++){snprintf(line+strlen(line),1023-strlen(line)," [%d]%s:%d/%s",d,ncclIbDevs[d].devName,ncclIbDevs[d].port,ncclIbDevs[d].link==IBV_LINK_LAYER_INFINIBAND?"IB":"RoCE");}line[1023]='\0';charaddrline[1024];INFO(NCCL_INIT|NCCL_NET,"NET/IB : Using%s ; OOB %s:%s",line,ncclIbIfName,socketToString(&ncclIbIfAddr.sa,addrline));}pthread_mutex_unlock(&ncclIbLock);}returnncclSuccess;}
First, line 3 loads the dynamic library libibverbs.so through wrap_ibv_symbols, then gets various functions from the dynamic library
Then use wrap_ibv_fork_init to prevent RDMA network card read/write errors caused by fork
Later, the IB network will also use socket for out-of-band network transmission, so here it also uses findInterfaces to get an available network card and save it to ncclIbIfAddr
Then get all RDMA devices to devices through ibv_get_device_list, traverse each device in devices. Since each HCA may have multiple physical ports, it traverses each physical port for each device, gets information for each port, and saves relevant information to global ncclIbDevs, such as which device’s which port, whether using IB or ROCE, device’s PCI path, maxqp, device name, etc. Note that there’s also an environment variable similar to bootstrap network’s NCCL_SOCKET_IFNAME, called NCCL_IB_HCA, which can specify which IB HCA to use
At this point, the entire initialization process is complete. In summary, it obtains and saves all available IB network cards and regular ethernet cards on the current machine
ncclNetHandle_t is also a character array, then execute bootstrapNetListen
cpp12 lines hidden
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
staticncclResult_tbootstrapNetListen(intdev,ncclNetHandle_t*netHandle,void**listenComm){unionsocketAddress*connectAddr=(unionsocketAddress*)netHandle;static_assert(sizeof(unionsocketAddress)<NCCL_NET_HANDLE_MAXSIZE,"union socketAddress size is too large");// if dev >= 0, listen based on dev
if(dev>=0){NCCLCHECK(bootstrapNetGetSocketAddr(dev,connectAddr));}elseif(dev==findSubnetIf){...}// Otherwise, handle stores a local address
structbootstrapNetComm*comm;NCCLCHECK(bootstrapNetNewComm(&comm));NCCLCHECK(createListenSocket(&comm->fd,connectAddr));*listenComm=comm;returnncclSuccess;}
Let’s look at these three functions in order. First, get an available IP address through bootstrapNetGetSocketAddr
Here dev is 0, bootstrapNetIfs is the number of available network cards found during bootstrap network initialization. This gets the IP address of the first available one
Then create bootstrapNetComm through bootstrapNetNewComm. bootstrapNetComm is actually fd, and bootstrapNetNewComm just creates a new bootstrapNetComm
cpp
1
2
3
structbootstrapNetComm{intfd;};
Then start socket server through createListenSocket
staticncclResult_tcreateListenSocket(int*fd,unionsocketAddress*localAddr){/* IPv4/IPv6 support */intfamily=localAddr->sa.sa_family;intsalen=(family==AF_INET)?sizeof(sockaddr_in):sizeof(sockaddr_in6);/* Create socket and bind it to a port */intsockfd=socket(family,SOCK_STREAM,0);if(sockfd==-1){WARN("Net : Socket creation failed : %s",strerror(errno));returnncclSystemError;}if(socketToPort(&localAddr->sa)){// Port is forced by env. Make sure we get the port.
intopt=1;#if defined(SO_REUSEPORT)
SYSCHECK(setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt)),"setsockopt");#else
SYSCHECK(setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)),"setsockopt");#endif
}// localAddr port should be 0 (Any port)
SYSCHECK(bind(sockfd,&localAddr->sa,salen),"bind");/* Get the assigned Port */socklen_tsize=salen;SYSCHECK(getsockname(sockfd,&localAddr->sa,&size),"getsockname");#ifdef ENABLE_TRACE
charline[1024];TRACE(NCCL_INIT|NCCL_NET,"Listening on socket %s",socketToString(&localAddr->sa,line));#endif
/* Put the socket in listen mode
* NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn
*/SYSCHECK(listen(sockfd,16384),"listen");*fd=sockfd;returnncclSuccess;}
Create a listening fd, IP specified by localaddr, initial port is 0, bind finds a random available port, and writes the IP and port back to localaddr through getsockname(sockfd, &localAddr->sa, &size). Here localaddr is UniqueId
At this point, UniqueId is generated, which is actually the current machine’s IP and port
NCCL Source Code Study - This article is part of a series.