Skip to content

Commit

Permalink
Merge branch 'main' into chhwang/put-get-interface
Browse files Browse the repository at this point in the history
  • Loading branch information
chhwang authored Oct 7, 2023
2 parents d870266 + b3d0fdb commit 5cd97ad
Show file tree
Hide file tree
Showing 30 changed files with 388 additions and 90 deletions.
10 changes: 10 additions & 0 deletions .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
name: Bug report
about: Create a report to help us fix
title: "[Bug]"
labels: ''
assignees: ''

---


10 changes: 10 additions & 0 deletions .github/ISSUE_TEMPLATE/feature_request.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
name: Feature request
about: Suggest an idea for this project
title: "[Feature]"
labels: ''
assignees: ''

---


10 changes: 10 additions & 0 deletions .github/ISSUE_TEMPLATE/perf_improvement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
name: Performance improvement
about: Discuss on performance issues
title: "[Perf]"
labels: ''
assignees: ''

---


59 changes: 59 additions & 0 deletions docker/base-cuda11.8.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
FROM nvidia/cuda:11.8.0-devel-ubuntu20.04

LABEL maintainer="MSCCL++"
LABEL org.opencontainers.image.source https://github.com/microsoft/mscclpp

ENV DEBIAN_FRONTEND=noninteractive

RUN rm -rf /opt/nvidia

RUN apt-get clean && \
apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
ca-certificates \
curl \
git \
libcap2 \
libnuma-dev \
openssh-client \
openssh-server \
python3-dev \
python3-pip \
python3-setuptools \
python3-wheel \
sudo \
wget \
&& \
apt-get autoremove && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/*

# Install OFED
ENV OFED_VERSION=5.2-2.2.3.0
RUN cd /tmp && \
wget -q https://content.mellanox.com/ofed/MLNX_OFED-${OFED_VERSION}/MLNX_OFED_LINUX-${OFED_VERSION}-ubuntu20.04-x86_64.tgz && \
tar xzf MLNX_OFED_LINUX-${OFED_VERSION}-ubuntu20.04-x86_64.tgz && \
MLNX_OFED_LINUX-${OFED_VERSION}-ubuntu20.04-x86_64/mlnxofedinstall --user-space-only --without-fw-update --force --all && \
rm -rf /tmp/MLNX_OFED_LINUX-${OFED_VERSION}*

# Install OpenMPI
ENV OPENMPI_VERSION=4.1.5
RUN cd /tmp && \
export ompi_v_parsed="$(echo ${OPENMPI_VERSION} | sed -E 's/^([0-9]+)\.([0-9]+)\..*/\1.\2/')" && \
wget -q https://download.open-mpi.org/release/open-mpi/v${ompi_v_parsed}/openmpi-${OPENMPI_VERSION}.tar.gz && \
tar xzf openmpi-${OPENMPI_VERSION}.tar.gz && \
cd openmpi-${OPENMPI_VERSION} && \
./configure --prefix=/usr/local/mpi && \
make -j && \
make install && \
cd .. && \
rm -rf /tmp/openmpi-${OPENMPI_VERSION}*

ENV PATH="/usr/local/mpi/bin:${PATH}" \
LD_LIBRARY_PATH="/usr/local/mpi/lib:/usr/local/cuda-11.8/lib64:${LD_LIBRARY_PATH}"

RUN echo PATH="${PATH}" > /etc/environment && \
echo LD_LIBRARY_PATH="${LD_LIBRARY_PATH}" >> /etc/environment

ENTRYPOINT []
32 changes: 32 additions & 0 deletions docker/release-cuda11.8.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
FROM ghcr.io/microsoft/mscclpp/mscclpp:base-cuda11.8

LABEL maintainer="MSCCL++"
LABEL org.opencontainers.image.source https://github.com/microsoft/mscclpp

ENV MSCCLPP_HOME="/usr/local/mscclpp" \
MSCCLPP_SRC_DIR="/tmp/mscclpp" \
CMAKE_VERSION="3.26.4"

# Download cmake 3.26.4
ENV CMAKE_HOME="/tmp/cmake-${CMAKE_VERSION}-linux-x86_64" \
CMAKE_URL="https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.tar.gz"
RUN curl -L ${CMAKE_URL} -o ${CMAKE_HOME}.tar.gz && \
tar xzf ${CMAKE_HOME}.tar.gz -C /tmp

# Install MSCCL++
ADD . ${MSCCLPP_SRC_DIR}
WORKDIR ${MSCCLPP_SRC_DIR}
RUN rm -rf build && \
mkdir build && \
cd build && \
${CMAKE_HOME}/bin/cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=${MSCCLPP_HOME} .. && \
make -j mscclpp && \
make install/fast && \
strip ${MSCCLPP_HOME}/lib/libmscclpp.so.[0-9]*.[0-9]*.[0-9]*

ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${MSCCLPP_HOME}/lib"
RUN echo LD_LIBRARY_PATH="${LD_LIBRARY_PATH}" >> /etc/environment

# Cleanup
WORKDIR /
RUN rm -rf ${CMAKE_HOME}* ${MSCCLPP_SRC_DIR}
7 changes: 4 additions & 3 deletions include/mscclpp/concurrency.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ struct DeviceSyncer {
/// Synchronize all threads inside a kernel. Guarantee that all previous work of all threads in cooperating blocks is
/// finished.
/// @param blockNum The number of blocks that will synchronize.
__forceinline__ __device__ void sync(int blockNum) {
/// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative.
__forceinline__ __device__ void sync(int blockNum, int64_t maxSpinCount = 100000000) {
int maxOldCnt = blockNum - 1;
__syncthreads();
if (blockNum == 1) return;
Expand All @@ -33,12 +34,12 @@ struct DeviceSyncer {
if (atomicAdd(&count_, 1) == maxOldCnt) {
flag_ = 1;
}
POLL_MAYBE_JAILBREAK(!flag_, 1000000000);
POLL_MAYBE_JAILBREAK(!flag_, maxSpinCount);
} else {
if (atomicSub(&count_, 1) == 1) {
flag_ = 0;
}
POLL_MAYBE_JAILBREAK(flag_, 1000000000);
POLL_MAYBE_JAILBREAK(flag_, maxSpinCount);
}
isAdd_ = tmpIsAdd;
}
Expand Down
10 changes: 6 additions & 4 deletions include/mscclpp/fifo_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ struct FifoDeviceHandle {
/// Push a trigger to the FIFO.
///
/// @param trigger The trigger to push.
/// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative.
/// @return The new head of the FIFO.
__forceinline__ __device__ uint64_t push(ProxyTrigger trigger) {
__forceinline__ __device__ uint64_t push(ProxyTrigger trigger, int64_t maxSpinCount = 1000000) {
uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->head, 1);
// make the last bit intentionally non-zero so that we can safely poll. Don't worry, we will change it back in host
// side
Expand All @@ -49,7 +50,7 @@ struct FifoDeviceHandle {
// condition is not met.
if (curFifoHead >= size + *(this->tailReplica)) {
OR_POLL_MAYBE_JAILBREAK(curFifoHead >= size + *((volatile uint64_t*)this->tailReplica),
*(volatile uint64_t*)&this->triggers[curFifoHead % size] != 0, 1000000);
*(volatile uint64_t*)&this->triggers[curFifoHead % size] != 0, maxSpinCount);
}

ProxyTrigger* triggerPtr = (ProxyTrigger*)&(this->triggers[curFifoHead % size]);
Expand All @@ -60,11 +61,12 @@ struct FifoDeviceHandle {
/// Wait until there is a place in the FIFO to push a trigger.
///
/// @param curFifoHead The current head of the FIFO.
__forceinline__ __device__ void sync(uint64_t curFifoHead) {
/// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative.
__forceinline__ __device__ void sync(uint64_t curFifoHead, int64_t maxSpinCount = 1000000) {
// Same as push but in this case checking the fist condition is probably faster since for tail to be pushed we need
// to wait for cudaMemcpy to be done.
OR_POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&(this->triggers[curFifoHead % size]) != 0,
*(volatile uint64_t*)(this->tailReplica) <= curFifoHead, 1000000);
*(volatile uint64_t*)(this->tailReplica) <= curFifoHead, maxSpinCount);
}
#endif // __CUDACC__

Expand Down
5 changes: 3 additions & 2 deletions include/mscclpp/packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ union LLPacket {

/// Read 8 bytes of data from the packet.
/// @param flag The flag to read.
/// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative.
/// @return The 8-byte data read.
__forceinline__ __device__ uint2 read(uint32_t flag) const {
__forceinline__ __device__ uint2 read(uint32_t flag, int64_t maxSpinCount = 100000000) const {
uint2 data;
POLL_MAYBE_JAILBREAK(readOnce(flag, data), 100000000);
POLL_MAYBE_JAILBREAK(readOnce(flag, data), maxSpinCount);
return data;
}

Expand Down
10 changes: 5 additions & 5 deletions include/mscclpp/poll.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

#ifdef __CUDACC__

extern __device__ void __assert_fail(const char *__assertion, const char *__file, unsigned int __line,
const char *__function) __THROW;
extern "C" __device__ void __assert_fail(const char *__assertion, const char *__file, unsigned int __line,
const char *__function) __THROW;

// If a spin is stuck, escape from it and set status to 1.
#define POLL_MAYBE_JAILBREAK_ESCAPE(__cond, __max_spin_cnt, __status) \
do { \
uint64_t __spin_cnt = 0; \
int64_t __spin_cnt = 0; \
__status = 0; \
while (__cond) { \
if (__spin_cnt++ == __max_spin_cnt) { \
Expand All @@ -25,7 +25,7 @@ extern __device__ void __assert_fail(const char *__assertion, const char *__file
// If a spin is stuck, print a warning and keep spinning.
#define POLL_MAYBE_JAILBREAK(__cond, __max_spin_cnt) \
do { \
uint64_t __spin_cnt = 0; \
int64_t __spin_cnt = 0; \
while (__cond) { \
if (__spin_cnt++ == __max_spin_cnt) { \
__assert_fail(#__cond, __FILE__, __LINE__, __PRETTY_FUNCTION__); \
Expand All @@ -37,7 +37,7 @@ extern __device__ void __assert_fail(const char *__assertion, const char *__file
// this is specially useful when __cond1 is faster to check
#define OR_POLL_MAYBE_JAILBREAK(__cond1, __cond2, __max_spin_cnt) \
do { \
uint64_t __spin_cnt = 0; \
int64_t __spin_cnt = 0; \
while (true) { \
if (!(__cond1)) { \
break; \
Expand Down
3 changes: 3 additions & 0 deletions include/mscclpp/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class Proxy {
void start();
void stop();

/// This is a concurrent fifo which is multiple threads from the device
/// can produce for and the sole proxy thread consumes it.
/// @return the fifo
Fifo& fifo();

private:
Expand Down
10 changes: 4 additions & 6 deletions include/mscclpp/proxy_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ProxyService : public BaseProxyService {
private:
std::vector<std::shared_ptr<Host2DeviceSemaphore>> semaphores_;
std::vector<RegisteredMemory> memories_;
Proxy proxy_;
std::shared_ptr<Proxy> proxy_;
int deviceNumaNode;

void bindThread();
Expand All @@ -75,16 +75,14 @@ struct ProxyChannel {
private:
SemaphoreId semaphoreId_;

Host2DeviceSemaphore::DeviceHandle semaphore_;
std::shared_ptr<Host2DeviceSemaphore> semaphore_;

// this is a concurrent fifo which is multiple threads from the device
// can produce for and the sole proxy thread consumes it.
FifoDeviceHandle fifo_;
std::shared_ptr<Proxy> proxy_;

public:
ProxyChannel() = default;

ProxyChannel(SemaphoreId semaphoreId, Host2DeviceSemaphore::DeviceHandle semaphore, FifoDeviceHandle fifo);
ProxyChannel(SemaphoreId semaphoreId, std::shared_ptr<Host2DeviceSemaphore> semaphore, std::shared_ptr<Proxy> proxy);

ProxyChannel(const ProxyChannel& other) = default;

Expand Down
14 changes: 12 additions & 2 deletions include/mscclpp/proxy_channel_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,13 @@ struct ProxyChannelDeviceHandle {
fifo_.sync(curFifoHead);
}

/// Check if the proxy channel has been signaled.
/// @return true if the proxy channel has been signaled.
__forceinline__ __device__ bool poll() { return semaphore_.poll(); }

/// Wait for the proxy channel to be signaled.
__forceinline__ __device__ void wait() { semaphore_.wait(); }
/// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative.
__forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) { semaphore_.wait(maxSpinCount); }

#endif // __CUDACC__
};
Expand Down Expand Up @@ -217,8 +222,13 @@ struct SimpleProxyChannelDeviceHandle {
/// Push a @ref TriggerSync to the FIFO.
__forceinline__ __device__ void flush() { proxyChan_.flush(); }

/// Check if the proxy channel has been signaled.
/// @return true if the proxy channel has been signaled.
__forceinline__ __device__ bool poll() { return proxyChan_.poll(); }

/// Wait for the proxy channel to be signaled.
__forceinline__ __device__ void wait() { proxyChan_.wait(); }
/// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative.
__forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) { proxyChan_.wait(maxSpinCount); }
#endif // __CUDACC__
};

Expand Down
7 changes: 6 additions & 1 deletion include/mscclpp/semaphore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,13 @@ class Host2HostSemaphore : public BaseSemaphore<std::default_delete, std::defaul
/// Signal the remote host.
void signal();

/// Check if the remote host has signaled.
/// @return true if the remote host has signaled.
bool poll();

/// Wait for the remote host to signal.
void wait();
/// @param maxSpinCount The maximum number of spin counts before throwing an exception. Never throws if negative.
void wait(int64_t maxSpinCount = 10000000);

private:
std::shared_ptr<Connection> connection_;
Expand Down
24 changes: 20 additions & 4 deletions include/mscclpp/semaphore_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@ namespace mscclpp {
/// Device-side handle for @ref Host2DeviceSemaphore.
struct Host2DeviceSemaphoreDeviceHandle {
#ifdef __CUDACC__
/// Poll if the host has signaled.
/// @return true if the host has signaled.
__forceinline__ __device__ bool poll() {
bool signaled = (*(volatile uint64_t*)(inboundSemaphoreId) > (*expectedInboundSemaphoreId));
if (signaled) (*expectedInboundSemaphoreId) += 1;
return signaled;
}

/// Wait for the host to signal.
__forceinline__ __device__ void wait() {
__forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) {
(*expectedInboundSemaphoreId) += 1;
POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)(inboundSemaphoreId) < (*expectedInboundSemaphoreId), 100000000);
POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)(inboundSemaphoreId) < (*expectedInboundSemaphoreId), maxSpinCount);
}
#endif // __CUDACC__

Expand All @@ -25,10 +33,18 @@ struct Host2DeviceSemaphoreDeviceHandle {
/// Device-side handle for @ref SmDevice2DeviceSemaphore.
struct SmDevice2DeviceSemaphoreDeviceHandle {
#ifdef __CUDACC__
/// Poll if the remote device has signaled.
/// @return true if the remote device has signaled.
__forceinline__ __device__ bool poll() {
bool signaled = ((*inboundSemaphoreId) > (*expectedInboundSemaphoreId));
if (signaled) (*expectedInboundSemaphoreId) += 1;
return signaled;
}

/// Wait for the remote device to signal.
__forceinline__ __device__ void wait() {
__forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) {
(*expectedInboundSemaphoreId) += 1;
POLL_MAYBE_JAILBREAK(*inboundSemaphoreId < (*expectedInboundSemaphoreId), 100000000);
POLL_MAYBE_JAILBREAK((*inboundSemaphoreId) < (*expectedInboundSemaphoreId), maxSpinCount);
}

/// Signal the remote device.
Expand Down
7 changes: 6 additions & 1 deletion include/mscclpp/sm_channel_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,13 @@ struct SmChannelDeviceHandle {
/// Read the counter of the local semaphore.
__forceinline__ __device__ uint64_t semaphoreGetLocal() const { return semaphore_.semaphoreGetLocal(); }

/// Check if the remote semaphore has signaled.
/// @return true if the remote semaphore has signaled.
__forceinline__ __device__ bool poll() { return semaphore_.poll(); }

/// Wait for the remote semaphore to send a signal.
__forceinline__ __device__ void wait() { semaphore_.wait(); }
/// @param maxSpinCount The maximum number of spins before asserting. Never assert if negative.
__forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) { semaphore_.wait(maxSpinCount); }
#endif // __CUDACC__
};

Expand Down
4 changes: 2 additions & 2 deletions python/mscclpp/proxy_channel_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ void register_proxy_channel(nb::module_& m) {
.def("proxy_channel", &ProxyService::proxyChannel, nb::arg("id"));

nb::class_<ProxyChannel>(m, "ProxyChannel")
.def(nb::init<SemaphoreId, Host2DeviceSemaphore::DeviceHandle, FifoDeviceHandle>(), nb::arg("semaphoreId"),
nb::arg("semaphore"), nb::arg("fifo"))
.def(nb::init<SemaphoreId, std::shared_ptr<Host2DeviceSemaphore>, std::shared_ptr<Proxy>>(),
nb::arg("semaphoreId"), nb::arg("semaphore"), nb::arg("proxy"))
.def("device_handle", &ProxyChannel::deviceHandle);

nb::class_<ProxyChannel::DeviceHandle>(m, "ProxyChannelDeviceHandle")
Expand Down
3 changes: 2 additions & 1 deletion python/mscclpp/semaphore_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ void register_semaphore(nb::module_& m) {
.def(nb::init<Communicator&, std::shared_ptr<Connection>>(), nb::arg("communicator"), nb::arg("connection"))
.def("connection", &Host2HostSemaphore::connection)
.def("signal", &Host2HostSemaphore::signal)
.def("wait", &Host2HostSemaphore::wait);
.def("poll", &Host2HostSemaphore::poll)
.def("wait", &Host2HostSemaphore::wait, nb::arg("max_spin_count") = 10000000);

nb::class_<SmDevice2DeviceSemaphore> smDevice2DeviceSemaphore(m, "SmDevice2DeviceSemaphore");
smDevice2DeviceSemaphore
Expand Down
Loading

0 comments on commit 5cd97ad

Please sign in to comment.