diff --git a/cmake/Options.cmake b/cmake/Options.cmake index fe37babd8..fec0ca656 100644 --- a/cmake/Options.cmake +++ b/cmake/Options.cmake @@ -31,6 +31,7 @@ endmacro() # TODO: Default to ON if CUDA available. option(TP_USE_CUDA "Enable support for CUDA tensors" OFF) +option(TP_USE_XPU "Enable support for XPU tensors" OFF) # Optional features option(TP_BUILD_BENCHMARK "Build benchmarks" OFF) diff --git a/tensorpipe/CMakeLists.txt b/tensorpipe/CMakeLists.txt index efcffc257..d4afacf69 100644 --- a/tensorpipe/CMakeLists.txt +++ b/tensorpipe/CMakeLists.txt @@ -321,6 +321,67 @@ if(TP_USE_CUDA) endif() +## XPU + +if(TP_USE_XPU) + # TP_SRCS is the list of source files that we need to build libtensorpipe. + set(TP_XPU_SRCS) + + # TP_PUBLIC_HDRS is the list of public header files that we need to install. + set(TP_XPU_PUBLIC_HDRS) + + # TP_LINK_LIBRARIES is list of dependent libraries to be linked + set(TP_XPU_LINK_LIBRARIES) + + # TP_INCLUDE_DIRS is list of include path to be used + set(TP_XPU_INCLUDE_DIRS) + + find_package(IntelSYCL REQUIRED) + list(APPEND TP_XPU_LINK_LIBRARIES ${SYCL_LIBRARIES}) + list(APPEND TP_XPU_INCLUDE_DIRS ${SYCL_INCLUDE_DIRS}) + + list(APPEND TP_XPU_SRCS + common/xpu_buffer.cc) + list(APPEND TP_XPU_PUBLIC_HDRS + tensorpipe_xpu.h + common/xpu_buffer.h) + + ### xpu_basic + + list(APPEND TP_XPU_SRCS + channel/xpu_basic/channel_impl.cc + channel/xpu_basic/context_impl.cc + channel/xpu_basic/factory.cc + common/xpu_loop.cc) + list(APPEND TP_XPU_PUBLIC_HDRS + channel/xpu_basic/factory.h) + + + add_library(tensorpipe_xpu ${TP_STATIC_OR_SHARED} ${TP_XPU_SRCS}) + + if(BUILD_SHARED_LIBS) + set_target_properties(tensorpipe_xpu PROPERTIES POSITION_INDEPENDENT_CODE 1) + endif() + + target_link_libraries(tensorpipe_xpu PUBLIC tensorpipe) + target_link_libraries(tensorpipe_xpu PRIVATE ${TP_XPU_LINK_LIBRARIES}) + target_include_directories(tensorpipe_xpu PUBLIC ${TP_XPU_INCLUDE_DIRS}) + + install(TARGETS tensorpipe_xpu + EXPORT TensorpipeTargets + LIBRARY DESTINATION ${TP_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${TP_INSTALL_LIBDIR}) + + foreach(_header_file ${TP_XPU_PUBLIC_HDRS}) + get_filename_component(_TP_HEADER_SUBDIR "${_header_file}" DIRECTORY) + install(FILES ${_header_file} + DESTINATION ${TP_INSTALL_INCLUDEDIR}/tensorpipe/${_TP_HEADER_SUBDIR}) + endforeach() + + install(FILES ${CMAKE_CURRENT_BINARY_DIR}/config.h + DESTINATION ${TP_INSTALL_INCLUDEDIR}/tensorpipe) + +endif() ## Python bindings diff --git a/tensorpipe/benchmark/channel_registry.cc b/tensorpipe/benchmark/channel_registry.cc index af6678d72..d51a6624f 100644 --- a/tensorpipe/benchmark/channel_registry.cc +++ b/tensorpipe/benchmark/channel_registry.cc @@ -89,6 +89,18 @@ std::shared_ptr makeCudaGdrChannel() { TP_REGISTER_CREATOR(TensorpipeChannelRegistry, cuda_gdr, makeCudaGdrChannel); #endif // TENSORPIPE_HAS_CUDA_GDR_CHANNEL +// XPU BASIC + +std::shared_ptr makeXpuBasicChannel() { + return tensorpipe::channel::xpu_basic::create( + tensorpipe::channel::basic::create()); +} + +TP_REGISTER_CREATOR( + TensorpipeChannelRegistry, + xpu_basic, + makeXpuBasicChannel); + void validateChannelContext( std::shared_ptr context) { if (!context) { diff --git a/tensorpipe/channel/xpu_basic/channel_impl.cc b/tensorpipe/channel/xpu_basic/channel_impl.cc new file mode 100644 index 000000000..a97d78ec3 --- /dev/null +++ b/tensorpipe/channel/xpu_basic/channel_impl.cc @@ -0,0 +1,627 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace tensorpipe { +namespace channel { +namespace xpu_basic { + +namespace { + +size_t ceilOfRatio(size_t n, size_t d) { + return (n + d - 1) / d; +} + +} // namespace + +ChannelImpl::ChannelImpl( + ConstructorToken token, + std::shared_ptr context, + std::string id, + std::shared_ptr connection, + std::shared_ptr cpuChannel, + XpuLoop& xpuLoop) + : ChannelImplBoilerplate( + token, + std::move(context), + std::move(id)), + connection_(std::move(connection)), + cpuChannel_(std::move(cpuChannel)), + xpuLoop_(xpuLoop) {} + +void ChannelImpl::initImplFromLoop() { + context_->enroll(*this); +} + +void ChannelImpl::xpuCopy( + void* dst, + const void* src, + size_t length, + int deviceIdx, + sycl::queue* stream, + std::function callback) { + sycl::queue& q = + (stream == nullptr) ? xpu::getDefaultXPUQueue(deviceIdx) : *stream; + + sycl::event e; + try { + e = q.memcpy(dst, src, length); + } catch (sycl::exception const& ex) { + auto err = TP_CREATE_ERROR(xpu::XpuError, ex.what()); + callback(err); + return; + } + xpuLoop_.addCallback(deviceIdx, e, std::move(callback)); +} + +void ChannelImpl::sendImplFromLoop( + uint64_t sequenceNumber, + Buffer buffer, + size_t length, + TSendCallback callback) { + if (length == 0) { + callback(error_); + return; + } + + const Device device = buffer.device(); + const size_t chunkLength = kSlotSize; + const size_t numChunks = ceilOfRatio(length, chunkLength); + for (size_t offset = 0; offset < length; offset += chunkLength) { + ChunkSendOpIter opIter = chunkSendOps_.emplaceBack(nextChunkBeingSent_++); + ChunkSendOperation& op = *opIter; + op.bufferSequenceNumber = sequenceNumber; + op.chunkId = offset / chunkLength; + op.numChunks = numChunks; + op.length = std::min(length - offset, chunkLength); + // Operations are processed in order, so we can afford to trigger the + // callback once the last operation is done. + if (op.chunkId == numChunks - 1) { + op.callback = std::move(callback); + } + + if (device.type == kCpuDeviceType) { + op.isCpuBuffer = true; + op.devicePtr = + static_cast(buffer.unwrap().ptr) + offset; + } else if (device.type == kXpuDeviceType) { + op.isCpuBuffer = false; + op.devicePtr = + static_cast(buffer.unwrap().ptr) + offset; + op.q = buffer.unwrap().queue; + op.deviceIdx = device.index; + } else { + TP_THROW_ASSERT() << "Unexpected device type: " << device.type; + } + + chunkSendOps_.advanceOperation(opIter); + } +} + +void ChannelImpl::advanceChunkSendOperation( + ChunkSendOpIter opIter, + ChunkSendOperation::State prevOpState) { + TP_DCHECK(context_->inLoop()); + + ChunkSendOperation& op = *opIter; + + // Needs to go after previous op invoked its callback because the last chunk + // in a series (that corresponds to one operation) must invoke its callback + // only when all chunks in the series are done. + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::UNINITIALIZED, + /*to=*/ChunkSendOperation::FINISHED, + /*cond=*/error_ && prevOpState >= ChunkSendOperation::INVOKED_CALLBACK, + /*actions=*/{&ChannelImpl::callSendCallback}); + + // Needs to go after previous op to ensure predictable and consistent ordering + // of send calls on CPU channel. + // This transition shortcuts the allocation of/copy to staging memory when the + // buffer is already on CPU. + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::UNINITIALIZED, + /*to=*/ChunkSendOperation::SENDING_CPU_BUFFER, + /*cond=*/!error_ && op.isCpuBuffer && + prevOpState >= ChunkSendOperation::SENDING_CPU_BUFFER, + /*actions=*/ + {&ChannelImpl::writeReadyToSend, &ChannelImpl::sendCpuBuffer}); + + // Needs to go after previous op to ensure later operations are not holding + // staging buffers while earlier ones are still blocked waiting for them, + // because the staging buffer will only be returned to the allocator once the + // operation is destroyed, but this won't happen until earlier operations have + // completed, and if they are blocked waiting for buffers we may deadlock. + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::UNINITIALIZED, + /*to=*/ChunkSendOperation::ALLOCATING_CPU_BUFFER, + /*cond=*/!error_ && !op.isCpuBuffer && + prevOpState >= ChunkSendOperation::ALLOCATING_CPU_BUFFER, + /*actions=*/{&ChannelImpl::allocateSendCpuBuffer}); + + // See above for why this needs to go after previous op. + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::ALLOCATING_CPU_BUFFER, + /*to=*/ChunkSendOperation::FINISHED, + /*cond=*/error_ && op.doneAllocatingCpuStagingBuffer && + prevOpState >= ChunkSendOperation::INVOKED_CALLBACK, + /*actions=*/ + {&ChannelImpl::callSendCallback, &ChannelImpl::returnSendCpuBuffer}); + + // Needs to go after previous op to ensure predictable and consistent ordering + // of write calls on the control connection. + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::ALLOCATING_CPU_BUFFER, + /*to=*/ChunkSendOperation::COPYING_FROM_GPU_TO_CPU, + /*cond=*/!error_ && op.doneAllocatingCpuStagingBuffer && + prevOpState >= ChunkSendOperation::COPYING_FROM_GPU_TO_CPU, + /*actions=*/ + {&ChannelImpl::writeReadyToSend, &ChannelImpl::copyFromGpuToCpu}); + + // See above for why this needs to go after previous op. + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::COPYING_FROM_GPU_TO_CPU, + /*to=*/ChunkSendOperation::FINISHED, + /*cond=*/error_ && op.doneCopyingFromGpuToCpu && + prevOpState >= ChunkSendOperation::INVOKED_CALLBACK, + /*actions=*/ + {&ChannelImpl::callSendCallback, &ChannelImpl::returnSendCpuBuffer}); + + // See above for why this needs to go after previous op. + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::COPYING_FROM_GPU_TO_CPU, + /*to=*/ChunkSendOperation::INVOKED_CALLBACK, + /*cond=*/!error_ && op.doneCopyingFromGpuToCpu && + prevOpState >= ChunkSendOperation::INVOKED_CALLBACK, + /*actions=*/{&ChannelImpl::callSendCallback}); + + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::INVOKED_CALLBACK, + /*to=*/ChunkSendOperation::FINISHED, + /*cond=*/error_, + /*actions=*/{&ChannelImpl::returnSendCpuBuffer}); + + // Needs to go after previous op to ensure predictable and consistent ordering + // of send calls on CPU channel. + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::INVOKED_CALLBACK, + /*to=*/ChunkSendOperation::SENDING_CPU_BUFFER, + /*cond=*/!error_ && prevOpState >= ChunkSendOperation::SENDING_CPU_BUFFER, + /*actions=*/{&ChannelImpl::sendCpuBuffer}); + + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::SENDING_CPU_BUFFER, + /*to=*/ChunkSendOperation::FINISHED, + /*cond=*/op.doneSendingCpuBuffer && op.isCpuBuffer, + /*actions=*/{&ChannelImpl::callSendCallback}); + + chunkSendOps_.attemptTransition( + opIter, + /*from=*/ChunkSendOperation::SENDING_CPU_BUFFER, + /*to=*/ChunkSendOperation::FINISHED, + /*cond=*/op.doneSendingCpuBuffer && !op.isCpuBuffer, + /*actions=*/{&ChannelImpl::returnSendCpuBuffer}); +} + +void ChannelImpl::allocateSendCpuBuffer(ChunkSendOpIter opIter) { + ChunkSendOperation& op = *opIter; + + TP_VLOG(5) << "Channel " << id_ + << " is allocating temporary memory for chunk #" << op.chunkId + << " of " << op.numChunks << " for buffer #" + << op.bufferSequenceNumber; + Allocator& xpuHostAllocator = context_->getXpuHostSendAllocator(*op.q); + xpuHostAllocator.alloc( + op.length, + callbackWrapper_( + [opIter](ChannelImpl& impl, std::shared_ptr tmpBuffer) { + TP_VLOG(5) << "Channel " << impl.id_ + << " is done allocating temporary memory for chunk #" + << opIter->chunkId << " of " << opIter->numChunks + << " for buffer #" << opIter->bufferSequenceNumber; + opIter->doneAllocatingCpuStagingBuffer = true; + if (!impl.error_) { + opIter->tmpBuffer = std::move(tmpBuffer); + } + impl.chunkSendOps_.advanceOperation(opIter); + })); +} + +void ChannelImpl::writeReadyToSend(ChunkSendOpIter opIter) { + ChunkSendOperation& op = *opIter; + + TP_VLOG(6) << "Channel " << id_ + << " is sending ready-to-send notification for chunk #" + << op.chunkId << " of " << op.numChunks << " for buffer #" + << op.bufferSequenceNumber; + connection_->write( + nullptr, + 0, + callbackWrapper_([bufferSequenceNumber{op.bufferSequenceNumber}, + chunkId{op.chunkId}, + numChunks{op.numChunks}](ChannelImpl& impl) { + TP_VLOG(6) << "Channel " << impl.id_ + << " is done sending ready-to-send notification for chunk #" + << chunkId << " of " << numChunks << " for buffer #" + << bufferSequenceNumber; + })); +} + +void ChannelImpl::copyFromGpuToCpu(ChunkSendOpIter opIter) { + ChunkSendOperation& op = *opIter; + + TP_VLOG(5) << "Channel " << id_ << " is copying chunk #" << op.chunkId + << " of " << op.numChunks << " for buffer #" + << op.bufferSequenceNumber << " from XPU device to CPU"; + xpuCopy( + op.tmpBuffer.get(), + op.devicePtr, + op.length, + op.deviceIdx, + op.q, + callbackWrapper_([opIter](ChannelImpl& impl) { + TP_VLOG(5) << "Channel " << impl.id_ << " is done copying chunk #" + << opIter->chunkId << " of " << opIter->numChunks + << " for buffer #" << opIter->bufferSequenceNumber + << " from XPU device to CPU"; + opIter->doneCopyingFromGpuToCpu = true; + impl.chunkSendOps_.advanceOperation(opIter); + })); +} + +void ChannelImpl::sendCpuBuffer(ChunkSendOpIter opIter) { + ChunkSendOperation& op = *opIter; + + TP_VLOG(6) << "Channel " << id_ << " is sending chunk #" << op.chunkId + << " of " << op.numChunks << " for buffer #" + << op.bufferSequenceNumber << " through CPU channel"; + + cpuChannel_->send( + CpuBuffer{.ptr = op.isCpuBuffer ? op.devicePtr : op.tmpBuffer.get()}, + op.length, + callbackWrapper_([opIter](ChannelImpl& impl) { + TP_VLOG(6) << "Channel " << impl.id_ << " is done sending chunk #" + << opIter->chunkId << " of " << opIter->numChunks + << " for buffer #" << opIter->bufferSequenceNumber + << " through CPU channel"; + opIter->doneSendingCpuBuffer = true; + impl.chunkSendOps_.advanceOperation(opIter); + })); +} + +void ChannelImpl::callSendCallback(ChunkSendOpIter opIter) { + ChunkSendOperation& op = *opIter; + + if (op.callback) { + op.callback(error_); + // Reset callback to release the resources it was holding. + op.callback = nullptr; + } +} + +void ChannelImpl::returnSendCpuBuffer(ChunkSendOpIter opIter) { + ChunkSendOperation& op = *opIter; + + // The pointer's deleter will return the buffer to the allocator. + op.tmpBuffer = nullptr; +} + +void ChannelImpl::recvImplFromLoop( + uint64_t sequenceNumber, + Buffer buffer, + size_t length, + TRecvCallback callback) { + if (length == 0) { + callback(error_); + return; + } + + const Device device = buffer.device(); + const size_t chunkLength = kSlotSize; + const size_t numChunks = ceilOfRatio(length, chunkLength); + for (size_t offset = 0; offset < length; offset += chunkLength) { + ChunkRecvOpIter opIter = + chunkRecvOps_.emplaceBack(nextChunkBeingReceived_++); + ChunkRecvOperation& op = *opIter; + op.bufferSequenceNumber = sequenceNumber; + op.chunkId = offset / chunkLength; + op.numChunks = numChunks; + op.length = std::min(length - offset, chunkLength); + // Operations are processed in order, so we can afford to trigger the + // callback once the last operation is done. + if (op.chunkId == numChunks - 1) { + op.callback = std::move(callback); + } + + if (device.type == kCpuDeviceType) { + op.isCpuBuffer = true; + op.devicePtr = + static_cast(buffer.unwrap().ptr) + offset; + } else if (device.type == kXpuDeviceType) { + op.isCpuBuffer = false; + op.devicePtr = + static_cast(buffer.unwrap().ptr) + offset; + op.q = buffer.unwrap().queue; + op.deviceIdx = device.index; + } else { + TP_THROW_ASSERT() << "Unexpected device type: " << device.type; + } + + chunkRecvOps_.advanceOperation(opIter); + } +} + +void ChannelImpl::advanceChunkRecvOperation( + ChunkRecvOpIter opIter, + ChunkRecvOperation::State prevOpState) { + TP_DCHECK(context_->inLoop()); + + ChunkRecvOperation& op = *opIter; + + // Needs to go after previous op invoked its callback because the last chunk + // in a series (that corresponds to one operation) must invoke its callback + // only when all chunks in the series are done. + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::UNINITIALIZED, + /*to=*/ChunkRecvOperation::FINISHED, + /*cond=*/error_ && + prevOpState >= + ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK, + /*actions=*/{&ChannelImpl::callRecvCallback}); + + // Needs to go after previous op to ensure predictable and consistent ordering + // of read calls on control connection. + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::UNINITIALIZED, + /*to=*/ChunkRecvOperation::READING_READY_TO_SEND, + /*cond=*/!error_ && + prevOpState >= ChunkRecvOperation::READING_READY_TO_SEND, + /*actions=*/{&ChannelImpl::readReadyToSend}); + + // See above for why this needs to go after previous op. + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::READING_READY_TO_SEND, + /*to=*/ChunkRecvOperation::FINISHED, + /*cond=*/error_ && op.doneReadingReadyToSend && + prevOpState >= + ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK, + /*actions=*/{&ChannelImpl::callRecvCallback}); + + // Needs to go after previous op to ensure predictable and consistent ordering + // of recv calls on CPU channel. + // This operation shortcuts allocating staging memory when receiving directly + // on CPU. + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::READING_READY_TO_SEND, + /*to=*/ChunkRecvOperation::RECEIVING_CPU_BUFFER, + /*cond=*/!error_ && op.doneReadingReadyToSend && op.isCpuBuffer && + prevOpState >= ChunkRecvOperation::RECEIVING_CPU_BUFFER, + /*actions=*/{&ChannelImpl::receiveCpuBuffer}); + + // Needs to go after previous op to ensure later operations are not holding + // staging buffers while earlier ones are still blocked waiting for them, + // because the staging buffer will only be returned to the allocator once the + // operation is destroyed, but this won't happen until earlier operations have + // completed, and if they are blocked waiting for buffers we may deadlock. + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::READING_READY_TO_SEND, + /*to=*/ChunkRecvOperation::ALLOCATING_CPU_BUFFER, + /*cond=*/!error_ && op.doneReadingReadyToSend && !op.isCpuBuffer && + prevOpState >= ChunkRecvOperation::ALLOCATING_CPU_BUFFER, + /*actions=*/{&ChannelImpl::allocateRecvCpuBuffer}); + + // See above for why this needs to go after previous op. + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::ALLOCATING_CPU_BUFFER, + /*to=*/ChunkRecvOperation::FINISHED, + /*cond=*/error_ && op.doneAllocatingCpuStagingBuffer && + prevOpState >= + ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK, + /*actions=*/ + {&ChannelImpl::callRecvCallback, &ChannelImpl::returnRecvCpuBuffer}); + + // Needs to go after previous op to ensure predictable and consistent ordering + // of recv calls on CPU channel. + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::ALLOCATING_CPU_BUFFER, + /*to=*/ChunkRecvOperation::RECEIVING_CPU_BUFFER, + /*cond=*/!error_ && op.doneAllocatingCpuStagingBuffer && + prevOpState >= ChunkRecvOperation::RECEIVING_CPU_BUFFER, + /*actions=*/{&ChannelImpl::receiveCpuBuffer}); + + // See above for why this needs to go after previous op. + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::RECEIVING_CPU_BUFFER, + /*to=*/ChunkRecvOperation::FINISHED, + /*cond=*/error_ && op.doneReceivingCpuBuffer && !op.isCpuBuffer && + prevOpState >= + ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK, + /*actions=*/ + {&ChannelImpl::callRecvCallback, &ChannelImpl::returnRecvCpuBuffer}); + + // This transition shortcuts the copy to GPU when receiving on CPU memory. + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::RECEIVING_CPU_BUFFER, + /*to=*/ChunkRecvOperation::FINISHED, + /*cond=*/op.doneReceivingCpuBuffer && op.isCpuBuffer, + /*actions=*/{&ChannelImpl::callRecvCallback}); + + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::RECEIVING_CPU_BUFFER, + /*to=*/ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU, + /*cond=*/!error_ && op.doneReceivingCpuBuffer && !op.isCpuBuffer, + /*actions=*/{&ChannelImpl::copyFromCpuToGpu}); + + // See above for why this needs to go after previous op. + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU, + /*to=*/ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK, + /*cond=*/prevOpState >= + ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK, + /*actions=*/{&ChannelImpl::callRecvCallback}); + + chunkRecvOps_.attemptTransition( + opIter, + /*from=*/ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK, + /*to=*/ChunkRecvOperation::FINISHED, + /*cond=*/op.doneCopyingFromCpuToGpu, + /*actions=*/{&ChannelImpl::returnRecvCpuBuffer}); +} + +void ChannelImpl::readReadyToSend(ChunkRecvOpIter opIter) { + ChunkRecvOperation& op = *opIter; + + TP_VLOG(6) << "Channel " << id_ + << " is reading ready-to-send notification for chunk #" + << op.chunkId << " of " << op.numChunks << " for buffer #" + << op.bufferSequenceNumber; + connection_->read(callbackWrapper_( + [opIter]( + ChannelImpl& impl, const void* /* unused */, size_t /* unused */) { + TP_VLOG(6) << "Channel " << impl.id_ + << " is done reading ready-to-send notification for chunk #" + << opIter->chunkId << " of " << opIter->numChunks + << " for buffer #" << opIter->bufferSequenceNumber; + opIter->doneReadingReadyToSend = true; + impl.chunkRecvOps_.advanceOperation(opIter); + })); +} + +void ChannelImpl::allocateRecvCpuBuffer(ChunkRecvOpIter opIter) { + ChunkRecvOperation& op = *opIter; + + TP_VLOG(5) << "Channel " << id_ + << " is allocating temporary memory for chunk #" << op.chunkId + << " of " << op.numChunks << " for buffer #" + << op.bufferSequenceNumber; + Allocator& xpuHostAllocator = context_->getXpuHostRecvAllocator(*op.q); + xpuHostAllocator.alloc( + op.length, + callbackWrapper_( + [opIter]( + ChannelImpl& impl, std::shared_ptr tmpBuffer) mutable { + TP_VLOG(5) << "Channel " << impl.id_ + << " is done allocating temporary memory for chunk #" + << opIter->chunkId << " of " << opIter->numChunks + << " for buffer #" << opIter->bufferSequenceNumber; + opIter->doneAllocatingCpuStagingBuffer = true; + if (!impl.error_) { + opIter->tmpBuffer = std::move(tmpBuffer); + } + impl.chunkRecvOps_.advanceOperation(opIter); + })); +} + +void ChannelImpl::receiveCpuBuffer(ChunkRecvOpIter opIter) { + ChunkRecvOperation& op = *opIter; + + TP_VLOG(6) << "Channel " << id_ << " is sending chunk #" << op.chunkId + << " of " << op.numChunks << " for buffer #" + << op.bufferSequenceNumber << " through CPU channel"; + cpuChannel_->recv( + CpuBuffer{.ptr = op.isCpuBuffer ? op.devicePtr : op.tmpBuffer.get()}, + op.length, + callbackWrapper_([opIter](ChannelImpl& impl) { + TP_VLOG(6) << "Channel " << impl.id_ << " is done sending chunk #" + << opIter->chunkId << " of " << opIter->numChunks + << " for buffer #" << opIter->bufferSequenceNumber + << " through CPU channel"; + opIter->doneReceivingCpuBuffer = true; + impl.chunkRecvOps_.advanceOperation(opIter); + })); +} + +void ChannelImpl::copyFromCpuToGpu(ChunkRecvOpIter opIter) { + ChunkRecvOperation& op = *opIter; + + TP_VLOG(5) << "Channel " << id_ << " is copying chunk #" << op.chunkId + << " of " << op.numChunks << " for buffer #" + << op.bufferSequenceNumber << " from CPU to XPU device"; + xpuCopy( + op.devicePtr, + op.tmpBuffer.get(), + op.length, + op.deviceIdx, + op.q, + callbackWrapper_([opIter](ChannelImpl& impl) { + TP_VLOG(5) << "Channel " << impl.id_ << " is done copying chunk #" + << opIter->chunkId << " of " << opIter->numChunks + << " for buffer #" << opIter->bufferSequenceNumber + << " from CPU to XPU device"; + opIter->doneCopyingFromCpuToGpu = true; + impl.chunkRecvOps_.advanceOperation(opIter); + })); +} + +void ChannelImpl::callRecvCallback(ChunkRecvOpIter opIter) { + ChunkRecvOperation& op = *opIter; + + if (op.callback) { + op.callback(error_); + // Reset callback to release the resources it was holding. + op.callback = nullptr; + } +} + +void ChannelImpl::returnRecvCpuBuffer(ChunkRecvOpIter opIter) { + ChunkRecvOperation& op = *opIter; + + // The pointer's deleter will return the buffer to the allocator. + op.tmpBuffer = nullptr; +} + +void ChannelImpl::setIdImpl() { + cpuChannel_->setId(id_ + ".cpu"); +} + +void ChannelImpl::handleErrorImpl() { + chunkSendOps_.advanceAllOperations(); + chunkRecvOps_.advanceAllOperations(); + + connection_->close(); + cpuChannel_->close(); + + context_->unenroll(*this); +} + +} // namespace xpu_basic +} // namespace channel +} // namespace tensorpipe diff --git a/tensorpipe/channel/xpu_basic/channel_impl.h b/tensorpipe/channel/xpu_basic/channel_impl.h new file mode 100644 index 000000000..3100f24df --- /dev/null +++ b/tensorpipe/channel/xpu_basic/channel_impl.h @@ -0,0 +1,182 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace tensorpipe { +namespace channel { +namespace xpu_basic { + +class ContextImpl; + +struct ChunkSendOperation { + enum State { + UNINITIALIZED, + ALLOCATING_CPU_BUFFER, + COPYING_FROM_GPU_TO_CPU, + INVOKED_CALLBACK, + SENDING_CPU_BUFFER, + FINISHED + }; + + // Fields used by the state machine + uint64_t sequenceNumber{0}; + State state{UNINITIALIZED}; + + // Arguments at creation + uint64_t bufferSequenceNumber{0}; + bool isCpuBuffer{false}; + void* devicePtr{nullptr}; + size_t chunkId{0}; + size_t numChunks{0}; + size_t length{0}; + std::function callback; + + // For XPU buffers + int deviceIdx{0}; + sycl::queue* q = &xpu::getDefaultXPUQueue(deviceIdx); + + // Data collected during processing + std::shared_ptr tmpBuffer; + + // Progress flags + bool doneAllocatingCpuStagingBuffer{false}; + bool doneCopyingFromGpuToCpu{false}; + bool doneSendingCpuBuffer{false}; +}; + +struct ChunkRecvOperation { + enum State { + UNINITIALIZED, + READING_READY_TO_SEND, + ALLOCATING_CPU_BUFFER, + RECEIVING_CPU_BUFFER, + COPYING_FROM_CPU_TO_GPU, + COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK, + FINISHED + }; + + // Fields used by the state machine + uint64_t sequenceNumber{0}; + State state{UNINITIALIZED}; + + // Arguments at creation + uint64_t bufferSequenceNumber{0}; + bool isCpuBuffer{false}; + void* devicePtr{nullptr}; + size_t chunkId{0}; + size_t numChunks{0}; + size_t length{0}; + std::function callback; + + // For XPU buffers + int deviceIdx{0}; + sycl::queue* q = &xpu::getDefaultXPUQueue(deviceIdx); + + // Data collected during processing + std::shared_ptr tmpBuffer; + + // Progress flags + bool doneReadingReadyToSend{false}; + bool doneAllocatingCpuStagingBuffer{false}; + bool doneReceivingCpuBuffer{false}; + bool doneCopyingFromCpuToGpu{false}; +}; + +class ChannelImpl final + : public ChannelImplBoilerplate { + public: + ChannelImpl( + ConstructorToken token, + std::shared_ptr context, + std::string id, + std::shared_ptr connection, + std::shared_ptr cpuChannel, + XpuLoop& xpuLoop); + + protected: + // Implement the entry points called by ChannelImplBoilerplate. + void initImplFromLoop() override; + void sendImplFromLoop( + uint64_t sequenceNumber, + Buffer buffer, + size_t length, + TSendCallback callback) override; + void recvImplFromLoop( + uint64_t sequenceNumber, + Buffer buffer, + size_t length, + TRecvCallback callback) override; + void handleErrorImpl() override; + void setIdImpl() override; + + private: + const std::shared_ptr connection_; + const std::shared_ptr cpuChannel_; + XpuLoop& xpuLoop_; + + // A sequence number for the chunks. + uint64_t nextChunkBeingSent_{0}; + uint64_t nextChunkBeingReceived_{0}; + + OpsStateMachine chunkSendOps_{ + *this, + &ChannelImpl::advanceChunkSendOperation}; + using ChunkSendOpIter = decltype(chunkSendOps_)::Iter; + OpsStateMachine chunkRecvOps_{ + *this, + &ChannelImpl::advanceChunkRecvOperation}; + using ChunkRecvOpIter = decltype(chunkRecvOps_)::Iter; + + // State machines for send and recv ops. + void advanceChunkSendOperation( + ChunkSendOpIter opIter, + ChunkSendOperation::State prevOpState); + void advanceChunkRecvOperation( + ChunkRecvOpIter opIter, + ChunkRecvOperation::State prevOpState); + + // Actions (i.e., methods that begin a state transition). + // For send operations: + void allocateSendCpuBuffer(ChunkSendOpIter opIter); + void copyFromGpuToCpu(ChunkSendOpIter opIter); + void callSendCallback(ChunkSendOpIter opIter); + void sendCpuBuffer(ChunkSendOpIter opIter); + void writeReadyToSend(ChunkSendOpIter opIter); + void returnSendCpuBuffer(ChunkSendOpIter opIter); + // For recv operations: + void readReadyToSend(ChunkRecvOpIter opIter); + void allocateRecvCpuBuffer(ChunkRecvOpIter opIter); + void receiveCpuBuffer(ChunkRecvOpIter opIter); + void copyFromCpuToGpu(ChunkRecvOpIter opIter); + void callRecvCallback(ChunkRecvOpIter opIter); + void returnRecvCpuBuffer(ChunkRecvOpIter opIter); + + void xpuCopy( + void* dst, + const void* src, + size_t length, + int deviceIdx, + sycl::queue*, + std::function callback); +}; + +} // namespace xpu_basic +} // namespace channel +} // namespace tensorpipe diff --git a/tensorpipe/channel/xpu_basic/constants.h b/tensorpipe/channel/xpu_basic/constants.h new file mode 100644 index 000000000..43f0d0982 --- /dev/null +++ b/tensorpipe/channel/xpu_basic/constants.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include + +namespace tensorpipe { +namespace channel { +namespace xpu_basic { + +// FIXME Avoid this anonymous namespace and use inline variables in C++-17. +namespace { + +// Define all three (redundant) values to make them explicit and avoid +// misunderstandings due to miscalculations. +static constexpr size_t kStagingAreaSize = 16 * 1024 * 1024; +static constexpr size_t kSlotSize = 1024 * 1024; +static constexpr size_t kNumSlots = 16; + +static_assert(kStagingAreaSize == kSlotSize * kNumSlots, ""); + +} // namespace + +} // namespace xpu_basic +} // namespace channel +} // namespace tensorpipe diff --git a/tensorpipe/channel/xpu_basic/context_impl.cc b/tensorpipe/channel/xpu_basic/context_impl.cc new file mode 100644 index 000000000..e1470bdef --- /dev/null +++ b/tensorpipe/channel/xpu_basic/context_impl.cc @@ -0,0 +1,174 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace tensorpipe { +namespace channel { +namespace xpu_basic { + +namespace { + +struct DeviceDescriptor { + std::string deviceType; + std::string descriptor; + NOP_STRUCTURE(DeviceDescriptor, deviceType, descriptor); +}; + +DeviceDescriptor deserializeDeviceDescriptor( + const std::string& deviceDescriptor) { + NopHolder nopHolder; + loadDescriptor(nopHolder, deviceDescriptor); + return std::move(nopHolder.getObject()); +} + +} // namespace + +std::shared_ptr ContextImpl::create( + std::shared_ptr cpuContext) { + if (cpuContext->deviceDescriptors().count(Device{kCpuDeviceType, 0}) == 0) { + TP_THROW_ASSERT() << "XPU basic channel needs a CPU channel"; + + return nullptr; + } + + if (!cpuContext->isViable()) { + return nullptr; + } + + std::unordered_map deviceDescriptors; + // NOTE: Assume there is only one CPU. + TP_DCHECK_EQ( + cpuContext->deviceDescriptors().count(Device{kCpuDeviceType, 0}), 1); + const auto cpuDeviceDescriptor = + cpuContext->deviceDescriptors().begin()->second; + + NopHolder nopHolder; + DeviceDescriptor& deviceDescriptor = nopHolder.getObject(); + deviceDescriptor.descriptor = cpuDeviceDescriptor; + + deviceDescriptor.deviceType = kCpuDeviceType; + deviceDescriptors[Device{kCpuDeviceType, 0}] = saveDescriptor(nopHolder); + for (const auto& device : xpu::getXpuDevices()) { + deviceDescriptor.deviceType = kXpuDeviceType; + deviceDescriptors[device] = saveDescriptor(nopHolder); + } + + return std::make_shared( + std::move(cpuContext), std::move(deviceDescriptors)); +} + +ContextImpl::ContextImpl( + std::shared_ptr cpuContext, + std::unordered_map deviceDescriptors) + : ContextImplBoilerplate( + std::move(deviceDescriptors)), + cpuContext_(std::move(cpuContext)) {} + +std::shared_ptr ContextImpl::createChannel( + std::vector> connections, + Endpoint endpoint) { + TP_DCHECK_EQ(numConnectionsNeeded(), connections.size()); + auto conn = std::move(connections.back()); + connections.pop_back(); + auto cpuChannel = + cpuContext_->createChannel(std::move(connections), endpoint); + return createChannelInternal( + std::move(conn), std::move(cpuChannel), xpuLoop_); +} + +size_t ContextImpl::numConnectionsNeeded() const { + return 1 + cpuContext_->numConnectionsNeeded(); +} + +bool ContextImpl::canCommunicateWithRemote( + const std::string& localDeviceDescriptor, + const std::string& remoteDeviceDescriptor) const { + DeviceDescriptor nopLocalDeviceDescriptor = + deserializeDeviceDescriptor(localDeviceDescriptor); + DeviceDescriptor nopRemoteDeviceDescriptor = + deserializeDeviceDescriptor(remoteDeviceDescriptor); + + // Prevent XpuBasic from being mistakenly used for CPU to CPU transfers, as + // there are always better options. + if (nopLocalDeviceDescriptor.deviceType == kCpuDeviceType && + nopRemoteDeviceDescriptor.deviceType == kCpuDeviceType) { + return false; + } + + return nopLocalDeviceDescriptor.descriptor == + nopRemoteDeviceDescriptor.descriptor; +} + +Allocator& ContextImpl::getXpuHostSendAllocator(sycl::queue& q) { + if (!xpuHostSendAllocator_.has_value()) { + xpu::XpuPinnedBuffer buffer = xpu::makeXpuPinnedBuffer(kStagingAreaSize, q); + uint8_t* ptr = buffer.get(); + xpuHostSendAllocator_.emplace(XpuHostAllocator{ + std::move(buffer), Allocator(ptr, kNumSlots, kSlotSize)}); + } + + return xpuHostSendAllocator_->allocator; +} + +Allocator& ContextImpl::getXpuHostRecvAllocator(sycl::queue& q) { + if (!xpuHostRecvAllocator_.has_value()) { + xpu::XpuPinnedBuffer buffer = xpu::makeXpuPinnedBuffer(kStagingAreaSize, q); + uint8_t* ptr = buffer.get(); + xpuHostRecvAllocator_.emplace(XpuHostAllocator{ + std::move(buffer), Allocator(ptr, kNumSlots, kSlotSize)}); + } + + return xpuHostRecvAllocator_->allocator; +} + +void ContextImpl::handleErrorImpl() { + if (cpuContext_ != nullptr) { + cpuContext_->close(); + } + xpuLoop_.close(); + + if (xpuHostSendAllocator_.has_value()) { + xpuHostSendAllocator_->allocator.close(); + } + if (xpuHostRecvAllocator_.has_value()) { + xpuHostRecvAllocator_->allocator.close(); + } +} + +void ContextImpl::joinImpl() { + if (cpuContext_ != nullptr) { + cpuContext_->join(); + } + xpuLoop_.join(); +} + +bool ContextImpl::inLoop() const { + return loop_.inLoop(); +}; + +void ContextImpl::deferToLoop(std::function fn) { + loop_.deferToLoop(std::move(fn)); +}; + +void ContextImpl::setIdImpl() { + cpuContext_->setId(id_ + ".cpu"); +} + +} // namespace xpu_basic +} // namespace channel +} // namespace tensorpipe diff --git a/tensorpipe/channel/xpu_basic/context_impl.h b/tensorpipe/channel/xpu_basic/context_impl.h new file mode 100644 index 000000000..11cfdadea --- /dev/null +++ b/tensorpipe/channel/xpu_basic/context_impl.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace tensorpipe { +namespace channel { +namespace xpu_basic { + +class ChannelImpl; + +class ContextImpl final + : public ContextImplBoilerplate { + public: + static std::shared_ptr create( + std::shared_ptr cpuContext); + + ContextImpl( + std::shared_ptr cpuContext, + std::unordered_map deviceDescriptors); + + std::shared_ptr createChannel( + std::vector> connections, + Endpoint endpoint); + + size_t numConnectionsNeeded() const override; + + bool canCommunicateWithRemote( + const std::string& localDeviceDescriptor, + const std::string& remoteDeviceDescriptor) const override; + + Allocator& getXpuHostSendAllocator(sycl::queue&); + Allocator& getXpuHostRecvAllocator(sycl::queue&); + + // Implement the DeferredExecutor interface. + bool inLoop() const override; + void deferToLoop(std::function fn) override; + + protected: + // Implement the entry points called by ContextImplBoilerplate. + void handleErrorImpl() override; + void joinImpl() override; + void setIdImpl() override; + + private: + OnDemandDeferredExecutor loop_; + + const std::shared_ptr cpuContext_; + // TODO: Lazy initialization of xpu loop. + XpuLoop xpuLoop_; + + struct XpuHostAllocator { + xpu::XpuPinnedBuffer buffer; + Allocator allocator; + }; + optional xpuHostSendAllocator_; + optional xpuHostRecvAllocator_; +}; + +} // namespace xpu_basic +} // namespace channel +} // namespace tensorpipe diff --git a/tensorpipe/channel/xpu_basic/factory.cc b/tensorpipe/channel/xpu_basic/factory.cc new file mode 100644 index 000000000..f63de6518 --- /dev/null +++ b/tensorpipe/channel/xpu_basic/factory.cc @@ -0,0 +1,26 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include +#include +#include + +namespace tensorpipe { +namespace channel { +namespace xpu_basic { + +std::shared_ptr create(std::shared_ptr cpuContext) { + return std::make_shared>( + std::move(cpuContext)); +} + +} // namespace xpu_basic +} // namespace channel +} // namespace tensorpipe diff --git a/tensorpipe/channel/xpu_basic/factory.h b/tensorpipe/channel/xpu_basic/factory.h new file mode 100644 index 000000000..6e322c7b7 --- /dev/null +++ b/tensorpipe/channel/xpu_basic/factory.h @@ -0,0 +1,23 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include + +#include + +namespace tensorpipe { +namespace channel { +namespace xpu_basic { + +std::shared_ptr create(std::shared_ptr cpuContext); + +} // namespace xpu_basic +} // namespace channel +} // namespace tensorpipe diff --git a/tensorpipe/common/device.h b/tensorpipe/common/device.h index d7ff69bf2..e444446c1 100644 --- a/tensorpipe/common/device.h +++ b/tensorpipe/common/device.h @@ -16,6 +16,7 @@ namespace tensorpipe { const std::string kCpuDeviceType{"cpu"}; const std::string kCudaDeviceType{"cuda"}; +const std::string kXpuDeviceType{"xpu"}; struct Device { std::string type; diff --git a/tensorpipe/common/xpu.h b/tensorpipe/common/xpu.h new file mode 100644 index 000000000..1fe44dbe3 --- /dev/null +++ b/tensorpipe/common/xpu.h @@ -0,0 +1,194 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace tensorpipe { +namespace xpu { + +class XpuError final : public BaseError { + public: + explicit XpuError(const sycl::exception& e) : msg_(e.what()) {} + explicit XpuError(std::string msg) : msg_(std::move(msg)) {} + + std::string what() const override { + return msg_; + } + + private: + std::string msg_; +}; + +class XpuEvent { + public: + XpuEvent() = default; + + inline bool isCreated() const { + return (event_.get() != nullptr); + } + + void record(sycl::queue& q) { + if (!isCreated()) { + event_ = std::make_unique(q.ext_oneapi_submit_barrier()); + } else { + event_.reset(); + event_ = std::make_unique(q.ext_oneapi_submit_barrier()); + } + } + + void synchronize() { + if (isCreated()) { + event().wait_and_throw(); + } + } + + bool query() const { + if (!isCreated()) + return true; + return event().get_info() == + sycl::info::event_command_status::complete; + } + + void wait(sycl::queue& q) { + if (isCreated()) { + q.ext_oneapi_submit_barrier({event()}); + } + } + + sycl::event& event() const { + return *event_; + } + + private: + std::unique_ptr event_; + sycl::queue q; +}; + +inline std::vector getXpuDevices() { + // Enumerate available SYCL GPU devices + auto devices = sycl::device::get_devices(sycl::info::device_type::gpu); + + std::vector result; + result.reserve(devices.size()); + + for (size_t devIdx = 0; devIdx < devices.size(); ++devIdx) { + result.push_back(Device{kXpuDeviceType, static_cast(devIdx)}); + } + + return result; +} + +inline int xpuDeviceForPointer(const void* ptr) { + TP_THROW_ASSERT_IF(ptr == nullptr) << "Pointer is null"; + + // List all SYCL GPU devices + auto devices = sycl::device::get_devices(sycl::info::device_type::gpu); + + for (int i = 0; i < devices.size(); i++) { + const auto& dev = devices[i]; + sycl::context ctx(dev); + + // Check if the pointer belongs to this context + auto type = sycl::get_pointer_type(ptr, ctx); + + if (type != sycl::usm::alloc::unknown) { + // The pointer belongs to this device + return i; + } + } + + // Pointer is not a USM pointer owned by any XPU device + return -1; +} + +class XpuPinnedMemoryDeleter { + public: + explicit XpuPinnedMemoryDeleter(sycl::queue* queue) : q_(queue) {} + + void operator()(uint8_t* ptr) const { + sycl::free(ptr, *q_); + } + + private: + sycl::queue* q_; +}; + +using XpuPinnedBuffer = std::unique_ptr; + +inline XpuPinnedBuffer makeXpuPinnedBuffer(size_t length, sycl::queue& q) { + uint8_t* ptr = static_cast(sycl::malloc_host(length, q)); + TP_THROW_ASSERT_IF(ptr == nullptr) << "malloc_host failed"; + + return XpuPinnedBuffer(ptr, XpuPinnedMemoryDeleter(&q)); +} + +class XpuDeviceBuffer { + public: + XpuDeviceBuffer() = default; + + XpuDeviceBuffer(size_t length, sycl::queue& q) : q_(&q) { + uint8_t* ptr = static_cast(sycl::malloc_device(length, q)); + TP_THROW_ASSERT_IF(ptr == nullptr) << "malloc_device failed"; + + ptr_ = {ptr, Deleter{q_}}; + } + + uint8_t* ptr() const { + return ptr_.get(); + } + + void reset() { + ptr_.reset(); + } + + private: + struct Deleter { + sycl::queue* q_; + + void operator()(uint8_t* ptr) { + sycl::free(ptr, *q_); + } + }; + + std::unique_ptr ptr_; + sycl::queue* q_{nullptr}; +}; + +inline sycl::queue& getDefaultXPUQueue(int deviceIndex) { + static std::once_flag initFlag; + static std::vector> queues; + + std::call_once(initFlag, []() { + auto devs = sycl::device::get_devices(sycl::info::device_type::gpu); + queues.resize(devs.size()); + + for (size_t i = 0; i < devs.size(); i++) { + queues[i] = std::make_unique(devs[i]); + } + }); + return *queues[deviceIndex]; +} + +} // namespace xpu +} // namespace tensorpipe diff --git a/tensorpipe/common/xpu_buffer.cc b/tensorpipe/common/xpu_buffer.cc new file mode 100644 index 000000000..4be9c8d4f --- /dev/null +++ b/tensorpipe/common/xpu_buffer.cc @@ -0,0 +1,20 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include +#include + +namespace tensorpipe { + +Device XpuBuffer::getDevice() const { + return Device{kXpuDeviceType, xpu::xpuDeviceForPointer(ptr)}; +} + +} // namespace tensorpipe diff --git a/tensorpipe/common/xpu_buffer.h b/tensorpipe/common/xpu_buffer.h new file mode 100644 index 000000000..5b0327700 --- /dev/null +++ b/tensorpipe/common/xpu_buffer.h @@ -0,0 +1,22 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +namespace tensorpipe { + +struct XpuBuffer { + void* ptr{nullptr}; + sycl::queue* queue{nullptr}; + + Device getDevice() const; +}; +} // namespace tensorpipe diff --git a/tensorpipe/common/xpu_loop.cc b/tensorpipe/common/xpu_loop.cc new file mode 100644 index 000000000..0f8b50d9e --- /dev/null +++ b/tensorpipe/common/xpu_loop.cc @@ -0,0 +1,76 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include +#include + +namespace tensorpipe { + +XpuLoop::XpuLoop() { + thread_ = std::thread(&XpuLoop::run, this); +} + +XpuLoop::~XpuLoop() { + close(); + join(); +} + +void XpuLoop::addCallback( + int deviceIdx, + sycl::event event, + std::function fn) { + { + std::lock_guard lock(mutex_); + pending_.push_back({deviceIdx, std::move(event), std::move(fn)}); + } + cv_.notify_one(); +} + +void XpuLoop::close() { + { + std::lock_guard lock(mutex_); + done_ = true; + } + cv_.notify_all(); +} + +void XpuLoop::join() { + if (thread_.joinable()) { + thread_.join(); + } +} + +void XpuLoop::run() { + while (true) { + std::vector currentOps; + + { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return done_ || !pending_.empty(); }); + if (done_ && pending_.empty()) { + return; + } + currentOps.swap(pending_); + } + + for (auto& op : currentOps) { + try { + // Wait for the SYCL event to complete asynchronously + op.event.wait(); + op.fn(Error::kSuccess); + } catch (const sycl::exception& e) { + auto err = TP_CREATE_ERROR(xpu::XpuError, e.what()); + op.fn(err); + } + } + } +} + +} // namespace tensorpipe diff --git a/tensorpipe/common/xpu_loop.h b/tensorpipe/common/xpu_loop.h new file mode 100644 index 000000000..881a2425c --- /dev/null +++ b/tensorpipe/common/xpu_loop.h @@ -0,0 +1,40 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include + +namespace tensorpipe { +class XpuLoop { + struct Op { + int deviceIdx; + sycl::event event; + std::function fn; + Error error; + }; + + public: + XpuLoop(); + ~XpuLoop(); + + void addCallback( + int deviceIdx, + sycl::event event, + std::function fn); + + void close(); + void join(); + + private: + void run(); + std::thread thread_; + std::mutex mutex_; + std::condition_variable cv_; + std::vector pending_; + bool done_{false}; +}; +} // namespace tensorpipe diff --git a/tensorpipe/test/CMakeLists.txt b/tensorpipe/test/CMakeLists.txt index 53e8643fd..9fda76e3d 100644 --- a/tensorpipe/test/CMakeLists.txt +++ b/tensorpipe/test/CMakeLists.txt @@ -104,6 +104,20 @@ if(TP_USE_CUDA) list(APPEND TP_TEST_LINK_LIBRARIES tensorpipe_cuda) endif() +if(TP_USE_XPU) + find_package(IntelSYCL REQUIRED) + list(APPEND TP_TEST_LINK_LIBRARIES ${SYCL_LIBRARIES}) + list(APPEND TP_TEST_INCLUDE_DIRS ${SYCL_INCLUDE_DIRS}) + list(APPEND TP_TEST_COMPILE_DEFS TP_USE_XPU) + + list(APPEND TP_TEST_SRCS + channel/channel_test_xpu.cc + channel/xpu_basic/xpu_basic_test.cc + ) + + list(APPEND TP_TEST_LINK_LIBRARIES tensorpipe_xpu) +endif() + add_subdirectory(${PROJECT_SOURCE_DIR}/third_party/googletest ${PROJECT_BINARY_DIR}/third_party/googletest EXCLUDE_FROM_ALL) diff --git a/tensorpipe/test/channel/channel_test_xpu.cc b/tensorpipe/test/channel/channel_test_xpu.cc new file mode 100644 index 000000000..7e6ec62c6 --- /dev/null +++ b/tensorpipe/test/channel/channel_test_xpu.cc @@ -0,0 +1,171 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include +#include + +using namespace tensorpipe; +using namespace tensorpipe::channel; + +void slowKernel(sycl::queue& q, void* ptr, size_t bytes) { + // Interpret pointer as uint8_t* and run some compute to take time. + auto n = bytes; + q.submit([&](sycl::handler& h) { + uint8_t* dev = static_cast(ptr); + h.parallel_for( + sycl::range<1>{(n + 255) / 256}, [=](sycl::id<1> id) { + // a tiny busy-work loop (kernel-level). Keep simple to be portable. + size_t base = id[0] * 256; + for (size_t i = 0; i < 256; ++i) { + if (base + i < n) { + // do some dummy arithmetic to create latency + dev[base + i] = (uint8_t)((dev[base + i] + 17) ^ 13); + } + } + }); + }); +} + +class ReceiverWaitsForStartEventTest : public ClientServerChannelTestCase { + static constexpr size_t kSize = 1024; + + void server(std::shared_ptr channel) override { + // Use device 0's queue + sycl::queue& sendQueue = tensorpipe::xpu::getDefaultXPUQueue(0); + + // Allocate device memory + void* ptr = sycl::malloc_device(kSize, sendQueue); + + // Delay sendQueue with computations on buffer. + slowKernel(sendQueue, ptr, kSize); + + // Set buffer to target value (memset). DPC++ supports queue.memset. + sendQueue.memset(ptr, 0x42, kSize).wait(); + + // Perform send using channel, wrap in promise/future like CUDA test + auto sendPromise = std::make_shared>(); + auto sendFuture = sendPromise->get_future(); + + XpuBuffer buf{.ptr = ptr, .queue = &sendQueue}; + channel->send( + buf, + kSize, + [sendPromise{std::move(sendPromise)}](const tensorpipe::Error& error) { + sendPromise->set_value(error); + }); + + Error sendError = sendFuture.get(); + EXPECT_FALSE(sendError) << sendError.what(); + + sycl::free(ptr, sendQueue); + + this->peers_->done(PeerGroup::kServer); + this->peers_->join(PeerGroup::kServer); + } + + void client(std::shared_ptr channel) override { + sycl::queue& recvQueue = tensorpipe::xpu::getDefaultXPUQueue(0); + + void* ptr = sycl::malloc_device(kSize, recvQueue); + + auto recvPromise = std::make_shared>(); + auto recvFuture = recvPromise->get_future(); + + XpuBuffer buf{.ptr = ptr, .queue = &recvQueue}; + channel->recv( + buf, + kSize, + [recvPromise{std::move(recvPromise)}](const tensorpipe::Error& error) { + recvPromise->set_value(error); + }); + + Error recvError = recvFuture.get(); + EXPECT_FALSE(recvError) << recvError.what(); + + // ensure recv work (and any copies) have completed on recvQueue + recvQueue.wait(); + + std::vector data(kSize); + // copy device -> host + recvQueue.memcpy(data.data(), ptr, kSize).wait(); + + for (auto v : data) { + EXPECT_EQ(v, 0x42); + } + + sycl::free(ptr, recvQueue); + + this->peers_->done(PeerGroup::kClient); + this->peers_->join(PeerGroup::kClient); + } +}; + +CHANNEL_TEST(XpuChannelTestSuite, ReceiverWaitsForStartEvent); + +class SendOffsetAllocationTest : public ClientServerChannelTestCase { + public: + static constexpr int kDataSize = 256; + static constexpr int kOffset = 128; + + void server(std::shared_ptr channel) override { + sycl::queue& q = tensorpipe::xpu::getDefaultXPUQueue(0); + + // allocate a larger region and use an offset inside it + void* basePtr = sycl::malloc_device(kOffset + kDataSize, q); + + // Set the head region to 0xff and the data region to 0x42 + q.memset(basePtr, 0xff, kOffset).wait(); + q.memset(static_cast(basePtr) + kOffset, 0x42, kDataSize).wait(); + + // build buffer pointing at offset + XpuBuffer buf{.ptr = static_cast(basePtr) + kOffset, .queue = &q}; + + // send and wait (using helper pattern, or do the promise/future) + std::future sendFuture; + { + auto sendPromise = std::make_shared>(); + sendFuture = sendPromise->get_future(); + channel->send( + buf, + kDataSize, + [sendPromise{std::move(sendPromise)}]( + const tensorpipe::Error& error) { + sendPromise->set_value(error); + }); + } + + Error sendError = sendFuture.get(); + EXPECT_FALSE(sendError) << sendError.what(); + + sycl::free(basePtr, q); + + this->peers_->done(PeerGroup::kServer); + this->peers_->join(PeerGroup::kServer); + } + + void client(std::shared_ptr channel) override { + // Use your helper to create a DataWrapper (or do manual allocate) + std::unique_ptr wrappedData = + helper_->makeDataWrapper(kDataSize); + + std::future recvFuture = recvWithFuture(channel, *wrappedData); + Error recvError = recvFuture.get(); + EXPECT_FALSE(recvError) << recvError.what(); + + // Validate contents + EXPECT_THAT(wrappedData->unwrap(), ::testing::Each(0x42)); + + this->peers_->done(PeerGroup::kClient); + this->peers_->join(PeerGroup::kClient); + } +}; + +CHANNEL_TEST(XpuChannelTestSuite, SendOffsetAllocation); diff --git a/tensorpipe/test/channel/channel_test_xpu.h b/tensorpipe/test/channel/channel_test_xpu.h new file mode 100644 index 000000000..2c5f9e597 --- /dev/null +++ b/tensorpipe/test/channel/channel_test_xpu.h @@ -0,0 +1,85 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +class XpuDataWrapper : public DataWrapper { + public: + XpuDataWrapper(const XpuDataWrapper&) = delete; + XpuDataWrapper& operator=(const XpuDataWrapper&) = delete; + + explicit XpuDataWrapper(size_t length) : length_(length) { + if (length_ > 0) { + queue_ = &tensorpipe::xpu::getDefaultXPUQueue(0); + ptr_ = sycl::malloc_device(length_, *queue_); + } + } + + explicit XpuDataWrapper(std::vector v) : XpuDataWrapper(v.size()) { + if (length_ > 0) { + queue_->memcpy(ptr_, v.data(), length_).wait(); + } + } + + tensorpipe::Buffer buffer() const override { + return tensorpipe::XpuBuffer{ + .ptr = ptr_, + .queue = queue_, + }; + } + + size_t bufferLength() const override { + return length_; + } + + std::vector unwrap() override { + std::vector v(length_); + if (length_ > 0) { + queue_->memcpy(v.data(), ptr_, length_).wait(); + } + return v; + } + + ~XpuDataWrapper() override { + if (ptr_ != nullptr) { + sycl::free(ptr_, *queue_); + } + } + + private: + uint8_t* ptr_{nullptr}; + size_t length_{0}; + sycl::queue* queue_{nullptr}; +}; + +class XpuChannelTestHelper : public ChannelTestHelper { + public: + std::unique_ptr makeDataWrapper(size_t length) override { + return std::make_unique(length); + } + + std::unique_ptr makeDataWrapper( + std::vector v) override { + return std::make_unique(std::move(v)); + } +}; + +class XpuChannelTestSuite + : public ::testing::TestWithParam {}; + +class XpuMultiGPUChannelTestSuite + : public ::testing::TestWithParam {}; diff --git a/tensorpipe/test/channel/xpu_basic/xpu_basic_test.c b/tensorpipe/test/channel/xpu_basic/xpu_basic_test.c new file mode 100644 index 000000000..0e1169482 --- /dev/null +++ b/tensorpipe/test/channel/xpu_basic/xpu_basic_test.c @@ -0,0 +1,77 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include +#include +#include + +namespace { + +class XpuBasicChannelTestHelper : public XpuChannelTestHelper { + protected: + std::shared_ptr makeContextInternal( + std::string id) override { + auto cpuContext = tensorpipe::channel::basic::create(); + auto context = + tensorpipe::channel::xpu_basic::create(std::move(cpuContext)); + context->setId(std::move(id)); + return context; + } + + public: + std::shared_ptr makePeerGroup() override { + return std::make_shared(); + } +}; + +XpuBasicChannelTestHelper helper; + +class XpuBasicChannelTestSuite : public ChannelTestSuite {}; + +} // namespace + +class CannotCommunicateCpuToCpuTest : public ChannelTestCase { + public: + void run(ChannelTestHelper* /* unused */) override { + ForkedThreadPeerGroup pg; + pg.spawn( + [&]() { + auto cpuContext = tensorpipe::channel::basic::create(); + auto ctx = + tensorpipe::channel::xpu_basic::create(std::move(cpuContext)); + auto deviceDescriptors = ctx->deviceDescriptors(); + auto it = deviceDescriptors.find( + tensorpipe::Device{tensorpipe::kCpuDeviceType, 0}); + EXPECT_FALSE(it == deviceDescriptors.end()); + auto descriptor = it->second; + EXPECT_FALSE(ctx->canCommunicateWithRemote(descriptor, descriptor)); + }, + [&]() { + // Do nothing. + }); + } +}; + +CHANNEL_TEST(XpuBasicChannelTestSuite, CannotCommunicateCpuToCpu); + +INSTANTIATE_TEST_CASE_P( + XpuBasic, + ChannelTestSuite, + ::testing::Values(&helper)); + +INSTANTIATE_TEST_CASE_P( + XpuBasic, + XpuChannelTestSuite, + ::testing::Values(&helper)); + +INSTANTIATE_TEST_CASE_P( + XpuBasic, + XpuBasicChannelTestSuite, + ::testing::Values(&helper)); diff --git a/tensorpipe/test/channel/xpu_basic/xpu_basic_test.cc b/tensorpipe/test/channel/xpu_basic/xpu_basic_test.cc new file mode 100644 index 000000000..8be0787b9 --- /dev/null +++ b/tensorpipe/test/channel/xpu_basic/xpu_basic_test.cc @@ -0,0 +1,76 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include +#include +#include + +namespace { + +class XpuBasicChannelTestHelper : public XpuChannelTestHelper { + protected: + std::shared_ptr makeContextInternal( + std::string id) override { + auto cpuContext = tensorpipe::channel::basic::create(); + auto context = + tensorpipe::channel::xpu_basic::create(std::move(cpuContext)); + context->setId(std::move(id)); + return context; + } + + public: + std::shared_ptr makePeerGroup() override { + return std::make_shared(); + } +}; + +XpuBasicChannelTestHelper helper; + +class XpuBasicChannelTestSuite : public ChannelTestSuite {}; + +} // namespace + +class CannotCommunicateCpuToCpuTest : public ChannelTestCase { + public: + void run(ChannelTestHelper* /* unused */) override { + ForkedThreadPeerGroup pg; + pg.spawn( + [&]() { + auto cpuContext = tensorpipe::channel::basic::create(); + auto ctx = + tensorpipe::channel::xpu_basic::create(std::move(cpuContext)); + auto deviceDescriptors = ctx->deviceDescriptors(); + + auto it = deviceDescriptors.find( + tensorpipe::Device{tensorpipe::kCpuDeviceType, 0}); + EXPECT_FALSE(it == deviceDescriptors.end()); + + auto descriptor = it->second; + EXPECT_FALSE(ctx->canCommunicateWithRemote(descriptor, descriptor)); + }, + [&]() { + // Do nothing. + }); + } +}; + +CHANNEL_TEST(XpuBasicChannelTestSuite, CannotCommunicateCpuToCpu); + +INSTANTIATE_TEST_CASE_P(XpuBasic, ChannelTestSuite, ::testing::Values(&helper)); + +INSTANTIATE_TEST_CASE_P( + XpuBasic, + XpuChannelTestSuite, + ::testing::Values(&helper)); + +INSTANTIATE_TEST_CASE_P( + XpuBasic, + XpuMultiGPUChannelTestSuite, + ::testing::Values(&helper));