Skip to content

Commit

Permalink
Trying to fix python tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chhwang committed Oct 8, 2023
1 parent fcc87ad commit 1773207
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
3 changes: 3 additions & 0 deletions python/mscclpp/core_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ void def_future(nb::handle& m, const std::string& typestr) {
}

void register_core(nb::module_& m) {
// For Bootstrap::recv()
def_future<void>(m, "void");

nb::class_<Bootstrap>(m, "Bootstrap")
.def_prop_ro("rank", &Bootstrap::rank)
.def_prop_ro("size", &Bootstrap::size)
Expand Down
20 changes: 7 additions & 13 deletions python/test/mscclpp_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def __init__(self, mpi_group: MpiGroup, interfaceIpPortTrio=""):
# use this instead
self.bootstrap.initialize(interfaceIpPortTrio)
self.communicator = Communicator(self.bootstrap)
self.my_rank = self.bootstrap.get_rank()
self.nranks = self.bootstrap.get_n_ranks()
self.my_rank = self.bootstrap.rank
self.nranks = self.bootstrap.size

def barrier(self):
self.bootstrap.barrier()
Expand All @@ -51,7 +51,7 @@ def send(self, tensor: np.ndarray, peer: int, tag: int):
self.bootstrap.send(tensor.ctypes.data, tensor.size * tensor.itemsize, peer, tag)

def recv(self, tensor: np.ndarray, peer: int, tag: int):
self.bootstrap.recv(tensor.ctypes.data, tensor.size * tensor.itemsize, peer, tag)
self.bootstrap.recv(tensor.ctypes.data, tensor.size * tensor.itemsize, peer, tag).get()

def my_ib_device(self, local_rank: int) -> Transport:
if local_rank == 0:
Expand All @@ -74,12 +74,8 @@ def my_ib_device(self, local_rank: int) -> Transport:
assert False # only 8 IBs are supported

def make_connection(self, remote_ranks: list[int], transport: Transport) -> dict[int, Connection]:
connections = {}
for rank in remote_ranks:
connections[rank] = self.communicator.connect_on_setup(rank, 0, transport)
self.communicator.setup()
connections = {rank: connections[rank].get() for rank in connections}
return connections
connections = {rank: self.communicator.connect(rank, 0, transport) for rank in remote_ranks}
return {k: v.get() for k, v in connections.items()}

def register_tensor_with_connections(
self, tensor: Type[cp.ndarray] or Type[np.ndarray], connections: dict[int, Connection]
Expand All @@ -93,9 +89,8 @@ def register_tensor_with_connections(
all_registered_memories[self.my_rank] = local_reg_memory
future_memories = {}
for rank in connections:
self.communicator.send_memory_on_setup(local_reg_memory, rank, 0)
future_memories[rank] = self.communicator.recv_memory_on_setup(rank, 0)
self.communicator.setup()
self.communicator.send_memory(local_reg_memory, rank, 0)
future_memories[rank] = self.communicator.recv_memory(rank, 0)
for rank in connections:
all_registered_memories[rank] = future_memories[rank].get()
return all_registered_memories
Expand All @@ -108,7 +103,6 @@ def make_semaphore(
semaphores = {}
for rank in connections:
semaphores[rank] = semaphore_type(self.communicator, connections[rank])
self.communicator.setup()
return semaphores

def make_sm_channels(self, tensor: cp.ndarray, connections: dict[int, Connection]) -> dict[int, SmChannel]:
Expand Down

0 comments on commit 1773207

Please sign in to comment.