diff --git a/python/mscclpp/core_py.cpp b/python/mscclpp/core_py.cpp index 5aa4ebdc2..0d057d910 100644 --- a/python/mscclpp/core_py.cpp +++ b/python/mscclpp/core_py.cpp @@ -27,6 +27,9 @@ void def_future(nb::handle& m, const std::string& typestr) { } void register_core(nb::module_& m) { + // For Bootstrap::recv() + def_future(m, "void"); + nb::class_(m, "Bootstrap") .def_prop_ro("rank", &Bootstrap::rank) .def_prop_ro("size", &Bootstrap::size) diff --git a/python/test/mscclpp_group.py b/python/test/mscclpp_group.py index 1b6138467..d4dcbf2d0 100644 --- a/python/test/mscclpp_group.py +++ b/python/test/mscclpp_group.py @@ -41,8 +41,8 @@ def __init__(self, mpi_group: MpiGroup, interfaceIpPortTrio=""): # use this instead self.bootstrap.initialize(interfaceIpPortTrio) self.communicator = Communicator(self.bootstrap) - self.my_rank = self.bootstrap.get_rank() - self.nranks = self.bootstrap.get_n_ranks() + self.my_rank = self.bootstrap.rank + self.nranks = self.bootstrap.size def barrier(self): self.bootstrap.barrier() @@ -51,7 +51,7 @@ def send(self, tensor: np.ndarray, peer: int, tag: int): self.bootstrap.send(tensor.ctypes.data, tensor.size * tensor.itemsize, peer, tag) def recv(self, tensor: np.ndarray, peer: int, tag: int): - self.bootstrap.recv(tensor.ctypes.data, tensor.size * tensor.itemsize, peer, tag) + self.bootstrap.recv(tensor.ctypes.data, tensor.size * tensor.itemsize, peer, tag).get() def my_ib_device(self, local_rank: int) -> Transport: if local_rank == 0: @@ -74,12 +74,8 @@ def my_ib_device(self, local_rank: int) -> Transport: assert False # only 8 IBs are supported def make_connection(self, remote_ranks: list[int], transport: Transport) -> dict[int, Connection]: - connections = {} - for rank in remote_ranks: - connections[rank] = self.communicator.connect_on_setup(rank, 0, transport) - self.communicator.setup() - connections = {rank: connections[rank].get() for rank in connections} - return connections + connections = {rank: self.communicator.connect(rank, 0, transport) for rank in remote_ranks} + return {k: v.get() for k, v in connections.items()} def register_tensor_with_connections( self, tensor: Type[cp.ndarray] or Type[np.ndarray], connections: dict[int, Connection] @@ -93,9 +89,8 @@ def register_tensor_with_connections( all_registered_memories[self.my_rank] = local_reg_memory future_memories = {} for rank in connections: - self.communicator.send_memory_on_setup(local_reg_memory, rank, 0) - future_memories[rank] = self.communicator.recv_memory_on_setup(rank, 0) - self.communicator.setup() + self.communicator.send_memory(local_reg_memory, rank, 0) + future_memories[rank] = self.communicator.recv_memory(rank, 0) for rank in connections: all_registered_memories[rank] = future_memories[rank].get() return all_registered_memories @@ -108,7 +103,6 @@ def make_semaphore( semaphores = {} for rank in connections: semaphores[rank] = semaphore_type(self.communicator, connections[rank]) - self.communicator.setup() return semaphores def make_sm_channels(self, tensor: cp.ndarray, connections: dict[int, Connection]) -> dict[int, SmChannel]: