HyperAIHyperAI

Command Palette

Search for a command to run...

Understanding Point-to-Point and Collective Operations in Multi-GPU AI Distributed Computing

AI in Multiple GPUs: Point-to-Point and Collective Operations is part of a series about distributed AI across multiple GPUs. Introduction In the previous post, we established the host-device paradigm and introduced the concept of ranks for multi-GPU workloads. Now, we’ll explore the specific communication patterns provided by PyTorch’s torch.distributed module to coordinate work and exchange data between these ranks. These operations, known as collectives, are the building blocks of distributed workloads. Although PyTorch exposes these operations, it ultimately calls a backend framework that actually implements the communication. For NVIDIA GPUs, it’s NCCL (NVIDIA Collective Communications Library), while for AMD it’s RCCL (ROCm Communication Collectives Library). NCCL implements multi-GPU and multi-node communication primitives optimized for NVIDIA GPUs and networking. It automatically detects the current topology (communication channels like PCIe, NVLink, InfiniBand) and selects the most efficient one. Disclaimer 1: Since NVIDIA GPUs are the most common, we’ll focus on the NCCL backend for this post. Disclaimer 2: For brevity, the code presented below only provides the main arguments of each method instead of all available arguments. Disclaimer 3: For simplicity, we’re not showing the memory deallocation of tensors, but operations like scatter will not automatically free the memory of the source rank (if you don’t understand what I mean, that’s fine, it’ll become clear very soon). Communication: Blocking vs. Non-Blocking To work together, GPUs must exchange data. The CPU initiates the communication by enqueuing NCCL kernels into CUDA streams (if you don’t know what CUDA Streams are, check out the first blog post of this series), but the actual data transfer happens directly between GPUs over the interconnect, bypassing the CPU’s main memory entirely. Ideally, the GPUs are connected with a high-speed interconnect like NVLink or InfiniBand (these interconnects are covered in the third post of this series). This communication may be synchronous (blocking) or asynchronous (non-blocking), which we explore below. Synchronous (Blocking) Communication In synchronous operations, the calling thread blocks until the communication completes. This ensures that the data has been fully transferred before proceeding. Asynchronous (Non-Blocking) Communication In asynchronous operations, the call returns immediately after enqueuing the communication task. The actual transfer happens in the background, allowing the CPU to continue executing other tasks. You must explicitly wait for completion using methods like request.wait() or torch.cuda.synchronize(). Point-to-Point (One-to-One) These operations are not considered collectives, but they are foundational communication primitives. They facilitate direct data transfer between two specific ranks and are fundamental for tasks where one GPU needs to send specific information to another. When PyTorch launches an NCCL kernel, it automatically inserts a dependency (i.e. forces a synchronization) between your current active stream and the NCCL stream. This means the NCCL stream won’t start until all previously enqueued work on the active stream finishes — guaranteeing the tensor being sent already holds the final values. Similarly, calling req.wait() inserts a dependency in the other direction. Any work you enqueue on the current stream after req.wait() won’t execute until the NCCL operation completes, so you can safely use the received tensors. Major “Gotchas” in NCCL While send and recv are labeled “synchronous,” their behavior in NCCL can be confusing. A synchronous call on a CUDA tensor blocks the host CPU thread only until the data transfer kernel is enqueued to the stream, not until the data transfer completes. The CPU is then free to enqueue other tasks. There is an exception: the very first call to torch.distributed.recv() in a process is truly blocking and waits for the transfer to finish, likely due to internal NCCL warm-up procedures. Subsequent calls will only block until the operation is enqueued. Consider this example where rank 1 hangs because the CPU tries to access a tensor that the GPU has not yet received: rank = torch.distributed.get_rank() if rank == 0: t = torch.tensor([1,2,3], dtype=torch.float32, device=device) torch.distributed.send(t, dst=1) else: t = torch.empty(3, dtype=torch.float32, device=device) torch.distributed.recv(t, src=0) print("This WILL print if NCCL is warmed-up") print(t) # CPU needs data from GPU, causing a block print("This will NOT print") The CPU process at rank 1 gets stuck on print(t) because it triggers a host-device synchronization to access the tensor’s data, which never arrives. If you run this code multiple times, notice that "This WILL print if NCCL is warmed-up" will not get printed in later executions, since the CPU is still stuck at print(t). Collectives Every collective operation function supports both sync and async operations through the async_op argument. It defaults to False, meaning synchronous operations. One-to-All Collectives These operations involve one rank sending data to all other ranks in the group. Broadcast rank = torch.distributed.get_rank() if rank == 0: tensor = torch.tensor([1,2,3], dtype=torch.int64, device=device) else: tensor = torch.empty(3, dtype=torch.int64, device=device) torch.distributed.broadcast(tensor, src=0) Scatter scatter_list = None if rank != 0 else [torch.tensor([i, i+1]).to(device) for i in range(0,4,2)] tensor = torch.empty(2, dtype=torch.int64).to(device) torch.distributed.scatter(tensor, scatter_list, src=0) print(f'Rank {rank} received: {tensor}') All-to-One Collectives These operations gather data from all ranks and consolidate it onto a single destination rank. Reduce rank = torch.distributed.get_rank() tensor = torch.tensor([rank+1, rank+2, rank+3], device=device) torch.distributed.reduce(tensor, dst=0, op=torch.distributed.ReduceOp.SUM) print(tensor) Gather rank = torch.distributed.get_rank() world_size = torch.distributed.get_world_size() gather_list = None if rank != 0 else [torch.zeros(3, dtype=torch.int64).to(device) for _ in range(world_size)] t = torch.tensor([0+rank, 1+rank, 2+rank], dtype=torch.int64).to(device) torch.distributed.gather(t, gather_list, dst=0) print(f'After op, Rank {rank} has: {gather_list}') All-to-All Collectives In these operations, every rank both sends and receives data from all other ranks. All Reduce rank = torch.distributed.get_rank() tensor = torch.tensor([rank+1, rank+2, rank+3], dtype=torch.float32, device=device) torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.SUM) print(f"Rank {rank} after all_reduce: {tensor}") All Gather rank = torch.distributed.get_rank() world_size = torch.distributed.get_world_size() input_tensor = torch.tensor([rank], dtype=torch.float32, device=device) tensor_list = [torch.empty(1, dtype=torch.float32, device=device) for _ in range(world_size)] torch.distributed.all_gather(tensor_list, input_tensor) print(f"Rank {rank} gathered: {[t.item() for t in tensor_list]}") Reduce Scatter rank = torch.distributed.get_rank() world_size = torch.distributed.get_world_size() input_list = [torch.tensor([rank + i], dtype=torch.float32, device=device) for i in range(world_size)] output = torch.empty(1, dtype=torch.float32, device=device) torch.distributed.reduce_scatter(output, input_list, op=torch.distributed.ReduceOp.SUM) print(f"Rank {rank} received reduced value: {output.item()}") Synchronization The two most frequently used operations are request.wait() and torch.cuda.synchronize(). It’s crucial to understand the difference between these two: request.wait() waits for a specific asynchronous operation to complete. torch.cuda.synchronize() waits for all CUDA operations in the current context to finish. Conclusion Congratulations on making it to the end! In this post, you learned about: Point-to-point communication between ranks Synchronous and asynchronous operations Key collective operations: broadcast, scatter, reduce, gather, all_reduce, all_gather, reduce_scatter How to manage synchronization in distributed workflows In the next blog post, we’ll dive into PCIe, NVLink, and other mechanisms that enable communication in a distributed setting!

Related Links