diff --git a/include/picongpu/plugins/Checkpoint.x.cpp b/include/picongpu/plugins/Checkpoint.x.cpp index 175e52d7ee..f57fff025c 100644 --- a/include/picongpu/plugins/Checkpoint.x.cpp +++ b/include/picongpu/plugins/Checkpoint.x.cpp @@ -123,9 +123,37 @@ namespace picongpu void checkpoint(uint32_t currentStep, std::string const checkpointDirectory) override { auto cBackend = ioBackends.find(checkpointBackendName); - if(cBackend != ioBackends.end()) + if(cBackend == ioBackends.end()) { - cBackend->second->dumpCheckpoint(currentStep, checkpointDirectory, checkpointFilename); + return; + } + + int mpiInitialized; + MPI_CHECK(MPI_Query_thread(&mpiInitialized)); + + if(mpiInitialized < MPI_THREAD_MULTIPLE) + { + cBackend->second->dumpCheckpoint(currentStep, checkpointDirectory, checkpointFilename, std::nullopt); + } + else + { + std::atomic synchronization = 0; + + // need to copy the plugin temporarily to avoid race conditions + auto restartFuture = std::async( + std::launch::async, + [this, copiedBackend = cBackend, currentStep, &checkpointDirectory, &synchronization]() + { + copiedBackend->second->doRestart( + currentStep, + checkpointDirectory, + this->checkpointFilename, + this->restartChunkSize, + {&synchronization}); + }); + cBackend->second + ->dumpCheckpoint(currentStep, checkpointDirectory, checkpointFilename, {&synchronization}); + restartFuture.wait(); } } @@ -134,7 +162,8 @@ namespace picongpu auto rBackend = ioBackends.find(restartBackendName); if(rBackend != ioBackends.end()) { - rBackend->second->doRestart(restartStep, restartDirectory, restartFilename, restartChunkSize); + rBackend->second + ->doRestart(restartStep, restartDirectory, restartFilename, restartChunkSize, std::nullopt); } } diff --git a/include/picongpu/plugins/openPMD/NDScalars.hpp b/include/picongpu/plugins/openPMD/NDScalars.hpp index 7ef07f6235..6f2bdda4b4 100644 --- a/include/picongpu/plugins/openPMD/NDScalars.hpp +++ b/include/picongpu/plugins/openPMD/NDScalars.hpp @@ -89,7 +89,7 @@ namespace picongpu localDomainOffset[d] = Environment::get().GridController().getPosition()[d]; } - ::openPMD::Series& series = *params.openPMDSeries; + ::openPMD::Series& series = *params.writeOpenPMDSeries; ::openPMD::MeshRecordComponent mrc = series.writeIterations()[currentStep].meshes[baseName + "_" + group][dataset]; @@ -125,7 +125,7 @@ namespace picongpu std::make_shared(value), std::move(std::get<1>(tuple)), std::move(std::get<2>(tuple))); - params.openPMDSeries->flush(PreferredFlushTarget::Buffer); + std::get<0>(tuple).seriesFlush(PreferredFlushTarget::Buffer); } private: @@ -162,7 +162,7 @@ namespace picongpu auto datasetName = baseName + "/" + group + "/" + dataset; - ::openPMD::Series& series = *params.openPMDSeries; + ::openPMD::Series& series = *params.readOpenPMDSeries; ::openPMD::MeshRecordComponent mrc = series.iterations[currentStep].open().meshes[baseName + "_" + group][dataset]; auto ndim = mrc.getDimensionality(); @@ -174,11 +174,12 @@ namespace picongpu DataSpace gridPos = Environment::get().GridController().getPosition(); ::openPMD::Offset start; ::openPMD::Extent count; + ::openPMD::Extent extent = mrc.getExtent(); start.reserve(ndim); count.reserve(ndim); for(int d = 0; d < ndim; ++d) { - start.push_back(gridPos.revert()[d]); + start.push_back(gridPos.revert()[d] % extent[d]); // well well well, TODO fix this but how count.push_back(1); } diff --git a/include/picongpu/plugins/openPMD/WriteSpecies.hpp b/include/picongpu/plugins/openPMD/WriteSpecies.hpp index d59912a6a7..e9663f94e6 100644 --- a/include/picongpu/plugins/openPMD/WriteSpecies.hpp +++ b/include/picongpu/plugins/openPMD/WriteSpecies.hpp @@ -26,6 +26,7 @@ # include "picongpu/particles/traits/GetShape.hpp" # include "picongpu/particles/traits/GetSpeciesFlagName.hpp" # include "picongpu/plugins/ISimulationPlugin.hpp" +# include "picongpu/plugins/common/MPIHelpers.hpp" # include "picongpu/plugins/kernel/CopySpecies.kernel" # include "picongpu/plugins/openPMD/openPMDDimension.hpp" # include "picongpu/plugins/openPMD/openPMDWriter.def" @@ -46,12 +47,14 @@ # include # include # include +# include # include # include # include # include +# include # include // std::remove_reference_t # include @@ -92,12 +95,14 @@ namespace picongpu } }; - template + template struct Strategy { using FrameType = Frame; using BufferType = Frame; + using RunParameters = T_RunParameters; + BufferType buffers; FrameType frame; @@ -107,7 +112,8 @@ namespace picongpu uint32_t const currentStep, std::string name, RunParameters, - ChunkDescription const& chunkDesc) + std::optional const& chunkDesc, + bool verbose_log = true) = 0; virtual ~Strategy() = default; @@ -120,10 +126,12 @@ namespace picongpu struct StrategyADIOS : Strategy { using FrameType = typename Strategy::FrameType; + uint64_cu m_myNumParticles = 0; FrameType malloc(std::string name, uint64_cu const myNumParticles) override { /* malloc host memory */ + m_myNumParticles = myNumParticles; log("openPMD: (begin) malloc host memory: %1%") % name; meta::ForEach> mallocMem; @@ -136,15 +144,22 @@ namespace picongpu uint32_t const currentStep, std::string name, RunParameters rp, - ChunkDescription const& chunkDesc) override + std::optional const& chunkDesc, + bool verbose_log) override { - log("openPMD: (begin) copy particle host (with hierarchy) to " - "host (without hierarchy): %1%") - % name; + if(verbose_log) + { + log("openPMD: (begin) copy particle host (with hierarchy) to " + "host (without hierarchy): %1%") + % name; + } int particlesProcessed = 0; auto const areaMapper = makeAreaMapper(*(rp.params.cellDescription)); - auto rangeMapper = makeRangeMapper(areaMapper, chunkDesc.beginSupercellIdx, chunkDesc.endSupercellIdx); + auto rangeMapper + = chunkDesc.has_value() + ? makeRangeMapper(areaMapper, chunkDesc->beginSupercellIdx, chunkDesc->endSupercellIdx) + : makeRangeMapper(areaMapper); pmacc::particles::operations::ConcatListOfFrames concatListOfFrames{}; @@ -180,7 +195,9 @@ namespace picongpu /* this costs a little bit of time but writing to external is * slower in general */ - PMACC_VERIFY((uint64_cu) particlesProcessed == chunkDesc.numberOfParticles); + PMACC_VERIFY( + (uint64_cu) particlesProcessed == chunkDesc.has_value() ? chunkDesc->numberOfParticles + : m_myNumParticles); } }; @@ -191,10 +208,12 @@ namespace picongpu struct StrategyHDF5 : Strategy { using FrameType = typename Strategy::FrameType; + uint64_cu m_myNumParticles = 0; FrameType malloc(std::string name, uint64_cu const myNumParticles) override { log("openPMD: (begin) malloc mapped memory: %1%") % name; + m_myNumParticles = myNumParticles; /*malloc mapped memory*/ meta::ForEach> mallocMem; @@ -203,14 +222,24 @@ namespace picongpu return this->frame; } - void prepare(uint32_t currentStep, std::string name, RunParameters rp, ChunkDescription const& chunkDesc) - override + void prepare( + uint32_t currentStep, + std::string name, + RunParameters rp, + std::optional const& chunkDesc, + bool verbose_log) override { - log("openPMD: (begin) copy particle to host: %1%") % name; + if(verbose_log) + { + log("openPMD: (begin) copy particle to host: %1%") % name; + } GridBuffer counterBuffer(DataSpace(1)); auto const areaMapper = makeAreaMapper(*(rp.params.cellDescription)); - auto rangeMapper = makeRangeMapper(areaMapper, chunkDesc.beginSupercellIdx, chunkDesc.endSupercellIdx); + auto rangeMapper + = chunkDesc.has_value() + ? makeRangeMapper(areaMapper, chunkDesc->beginSupercellIdx, chunkDesc->endSupercellIdx) + : makeRangeMapper(areaMapper); /* this sanity check costs a little bit of time but hdf5 writing is * slower */ @@ -229,7 +258,10 @@ namespace picongpu eventSystem::getTransactionEvent().waitForFinished(); log("openPMD: all events are finished: %1%") % name; - PMACC_VERIFY((uint64_t) counterBuffer.getHostBuffer().getDataBox()[0] == chunkDesc.numberOfParticles); + PMACC_VERIFY( + (uint64_t) counterBuffer.getHostBuffer().getDataBox()[0] == chunkDesc.has_value() + ? chunkDesc->numberOfParticles + : m_myNumParticles); } }; @@ -246,6 +278,9 @@ namespace picongpu using ParticleDescription = typename FrameType::ParticleDescription; using ParticleAttributeList = typename FrameType::ValueTypeSeq; + using usedFilters = pmacc::mp_list::type>; + using MyParticleFilter = typename FilterFactory::FilterType; + /* delete multiMask and localCellIdx in openPMD particle*/ using TypesToDelete = pmacc::mp_list; using ParticleCleanedAttributeList = typename RemoveFromSeq::type; @@ -256,7 +291,7 @@ namespace picongpu using NewParticleDescription = typename ReplaceValueTypeSeq::type; - void setParticleAttributes( + static void setParticleAttributes( ::openPMD::ParticleSpecies& record, uint64_t const globalNumParticles, AbstractJsonMatcher& matcher, @@ -328,271 +363,804 @@ namespace picongpu } } - HINLINE void operator()( + static void writeParticlePatches( + ThreadParams* params, + DataSpace const& particleToTotalDomainOffset, + ::openPMD::Series& series, + ::openPMD::ParticleSpecies& particleSpecies, + std::string const& basename, + std::string const& speciesGroup, + uint64_t mpiSize, + uint64_t mpiRank, + size_t myParticleOffset, + size_t myNumParticles, + size_t globalNumParticles) + { + /* write species counter table to openPMD storage */ + log("openPMD: (begin) writing particle patches for %1%") + % T_SpeciesFilter::getName(); + using index_t = uint64_t; + ::openPMD::Datatype const datatype = ::openPMD::determineDatatype(); + // not const, we'll switch out the JSON config + ::openPMD::Dataset ds(datatype, {mpiSize}); + + ::openPMD::ParticlePatches particlePatches = particleSpecies.particlePatches; + ::openPMD::PatchRecordComponent numParticles + = particlePatches["numParticles"][::openPMD::RecordComponent::SCALAR]; + ::openPMD::PatchRecordComponent numParticlesOffset + = particlePatches["numParticlesOffset"][::openPMD::RecordComponent::SCALAR]; + + ds.options = params->jsonMatcher->get(basename + "/particlePatches/numParticles"); + numParticles.resetDataset(ds); + ds.options = params->jsonMatcher->get(basename + "/particlePatches/numParticlesOffset"); + numParticlesOffset.resetDataset(ds); + + /* It is safe to use the mpi rank to write the data even if the rank can differ between simulation + * runs. During the restart the plugin is using patch information to find the corresponding data. + */ + numParticles.store(mpiRank, myNumParticles); + numParticlesOffset.store(mpiRank, myParticleOffset); + + ::openPMD::PatchRecord offset = particlePatches["offset"]; + ::openPMD::PatchRecord extent = particlePatches["extent"]; + auto const patchExtent = params->window.localDimensions.size; + + for(size_t d = 0; d < simDim; ++d) + { + ::openPMD::PatchRecordComponent offset_x = offset[name_lookup[d]]; + ::openPMD::PatchRecordComponent extent_x = extent[name_lookup[d]]; + ds.options = params->jsonMatcher->get(basename + "/particlePatches/offset/" + name_lookup[d]); + offset_x.resetDataset(ds); + ds.options = params->jsonMatcher->get(basename + "/particlePatches/extent/" + name_lookup[d]); + extent_x.resetDataset(ds); + + auto const totalPatchOffset + = particleToTotalDomainOffset[d] + params->localWindowToDomainOffset[d]; + offset_x.store(mpiRank, totalPatchOffset); + extent_x.store(mpiRank, patchExtent[d]); + } + + /* openPMD ED-PIC: additional attributes */ + setParticleAttributes( + particleSpecies, + globalNumParticles, + *params->jsonMatcher, + series.particlesPath() + speciesGroup); + params->m_dumpTimes.now("\tFlush species " + T_SpeciesFilter::getName()); + particleSpecies.seriesFlush(PreferredFlushTarget::Buffer); + params->m_dumpTimes.now("\tFinished flush species"); + + log("openPMD: ( end ) writing particle patches for %1%") + % T_SpeciesFilter::getName(); + } + + template + static void writeParticlesWithoutAnySubdividing( ThreadParams* params, uint32_t const currentStep, - DataSpace const particleToTotalDomainOffset) + ::openPMD::Series& series, + ::openPMD::ParticleSpecies& particleSpecies, + std::string const& basename, + std::string const& speciesGroup, + std::unique_ptr strategy, + DataConnector& dc, + typename AStrategy::RunParameters::SpeciesType& speciesTmp, + particles::filter::IUnary& particleFilter, + DataSpace const& particleToTotalDomainOffset) { - log("openPMD: (begin) write species: %1%") % T_SpeciesFilter::getName(); - params->m_dumpTimes.now( - "Begin write species " + T_SpeciesFilter::getName()); + /* count total number of particles on the device */ + log("openPMD: (begin) count particles: %1%") % T_SpeciesFilter::getName(); + uint64_cu const myNumParticles = pmacc::CountParticles::countOnDevice( + *speciesTmp, + *(params->cellDescription), + params->localWindowToDomainOffset, + params->window.localDimensions.size, + particleFilter); + + uint64_t globalNumParticles = 0; + uint64_t myParticleOffset = 0; - DataConnector& dc = Environment<>::get().DataConnector(); GridController& gc = Environment::get().GridController(); uint64_t mpiSize = gc.getGlobalSize(); uint64_t mpiRank = gc.getGlobalRank(); - /* load particle without copy particle data to host */ - auto speciesTmp = dc.get(ThisSpecies::FrameType::getName()); - std::string const speciesGroup(T_Species::getName()); + std::vector allNumParticles(mpiSize); - ::openPMD::Series& series = *params->openPMDSeries; - ::openPMD::Iteration iteration = series.writeIterations()[currentStep]; - std::string const basename = series.particlesPath() + speciesGroup; - - auto idProvider = dc.get("globalId"); - // enforce that the filter interface is fulfilled - particles::filter::IUnary particleFilter( - currentStep, - idProvider->getDeviceGenerator()); - using usedFilters = pmacc::mp_list::type>; - using MyParticleFilter = typename FilterFactory::FilterType; MyParticleFilter filter; filter.setWindowPosition(params->localWindowToDomainOffset, params->window.localDimensions.size); + // avoid deadlock between not finished pmacc tasks and mpi blocking + // collectives + eventSystem::getTransactionEvent().waitForFinished(); + MPI_CHECK(MPI_Allgather( + &myNumParticles, + 1, + MPI_UNSIGNED_LONG_LONG, + allNumParticles.data(), + 1, + MPI_UNSIGNED_LONG_LONG, + gc.getCommunicator().getMPIComm())); + + for(uint64_t i = 0; i < mpiSize; ++i) + { + globalNumParticles += allNumParticles[i]; + if(i < mpiRank) + myParticleOffset += allNumParticles[i]; + } + + log("openPMD: species '%1%': global particle count=%2%") + % T_SpeciesFilter::getName() % globalNumParticles; + + auto hostFrame = strategy->malloc(T_SpeciesFilter::getName(), myNumParticles); + + { + meta::ForEach< + typename NewParticleDescription::ValueTypeSeq, + openPMD::InitParticleAttribute> + initParticleAttributes; + initParticleAttributes(params, particleSpecies, basename, globalNumParticles); + } + + + using RunParameters = typename AStrategy::RunParameters; + RunParameters runParameters( + dc, + *params, + speciesTmp, + filter, + particleFilter, + particleToTotalDomainOffset, + globalNumParticles); + + if(myNumParticles > 0) + { + strategy->prepare(currentStep, T_SpeciesFilter::getName(), runParameters, std::nullopt); + } + log("openPMD: (begin) write particle records for %1%") + % T_SpeciesFilter::getName(); + + std::stringstream description; + description << "\tprepare"; + params->m_dumpTimes.now(description.str()); + + size_t writtenBytes = 0; + + meta:: + ForEach> + writeToOpenPMD; + writeToOpenPMD( + params, + hostFrame, + particleSpecies, + basename, + myNumParticles, + globalNumParticles, + myParticleOffset, + writtenBytes, + /* verbose_log = */ true); + + description = std::stringstream(); + description << ": " << writtenBytes << " bytes for " << myNumParticles << " particles from offset " + << myParticleOffset; + params->m_dumpTimes.append(description.str()); + + log("openPMD: flush particle records for %1%") % T_SpeciesFilter::getName(); + + // avoid deadlock between not finished pmacc tasks and mpi blocking collectives + eventSystem::getTransactionEvent().waitForFinished(); + params->m_dumpTimes.now("\tflush"); + particleSpecies.seriesFlush(PreferredFlushTarget::Disk); + params->m_dumpTimes.now("\tend"); + + + log("openPMD: (end) write particle records for %1%") + % T_SpeciesFilter::getName(); + + + strategy.reset(); + + log("openPMD: ( end ) writing species: %1%") % T_SpeciesFilter::getName(); + + writeParticlePatches( + params, + particleToTotalDomainOffset, + series, + particleSpecies, + basename, + speciesGroup, + mpiSize, + mpiRank, + myParticleOffset, + myNumParticles, + globalNumParticles); + } + + template + static void writeParticlesByChunks( + ThreadParams* params, + uint32_t const currentStep, + ::openPMD::Series& series, + ::openPMD::ParticleSpecies& particleSpecies, + std::string const& basename, + std::string const& speciesGroup, + std::unique_ptr strategy, + DataConnector& dc, + typename AStrategy::RunParameters::SpeciesType& speciesTmp, + particles::filter::IUnary& particleFilter, + DataSpace const& particleToTotalDomainOffset) + { uint64_t myNumParticles = 0; uint64_t globalNumParticles = 0; uint64_t myParticleOffset = 0; - ::openPMD::ParticleSpecies& particleSpecies = iteration.particles[speciesGroup]; + GridController& gc = Environment::get().GridController(); + uint64_t mpiSize = gc.getGlobalSize(); + uint64_t mpiRank = gc.getGlobalRank(); + + MyParticleFilter filter; + filter.setWindowPosition(params->localWindowToDomainOffset, params->window.localDimensions.size); + + constexpr size_t particleSizeInByte = AStrategy::FrameType::ParticleType::sizeInByte(); + ParticleIoChunkInfo particleIoChunkInfo + = createSupercellRangeChunks(params, speciesTmp, particleFilter, particleSizeInByte); + uint64_t const requiredDumpRounds = particleIoChunkInfo.ranges.size(); + + /* count total number of particles on the device */ + log( + "openPMD: species '%1%': particles=%2% in %3% chunks, largest data chunk size %4% byte") + % T_SpeciesFilter::getName() % particleIoChunkInfo.totalNumParticles % requiredDumpRounds + % (particleIoChunkInfo.largestChunk * particleSizeInByte); + + myNumParticles = particleIoChunkInfo.totalNumParticles; + std::vector allNumParticles(mpiSize); + + // avoid deadlock between not finished pmacc tasks and mpi blocking + // collectives + eventSystem::getTransactionEvent().waitForFinished(); + MPI_CHECK(MPI_Allgather( + &myNumParticles, + 1, + MPI_UNSIGNED_LONG_LONG, + allNumParticles.data(), + 1, + MPI_UNSIGNED_LONG_LONG, + gc.getCommunicator().getMPIComm())); + + for(uint64_t i = 0; i < mpiSize; ++i) { - // This scope is limiting the lifetime of strategy - using RunParameters_T = StrategyRunParameters< - decltype(speciesTmp), - decltype(filter), - decltype(particleFilter), - DataSpace const>; + globalNumParticles += allNumParticles[i]; + if(i < mpiRank) + myParticleOffset += allNumParticles[i]; + } + + // @todo combine this and the MPI_Gather above to a single gather for better scaling + std::vector numRounds(mpiSize); - using AStrategy = Strategy; - std::unique_ptr strategy; + MPI_CHECK(MPI_Allgather( + &requiredDumpRounds, + 1, + MPI_UNSIGNED_LONG_LONG, + numRounds.data(), + 1, + MPI_UNSIGNED_LONG_LONG, + gc.getCommunicator().getMPIComm())); - switch(params->strategy) + uint64_t globalNumDumpRounds = requiredDumpRounds; + for(uint64_t i = 0; i < mpiSize; ++i) + globalNumDumpRounds = std::max(globalNumDumpRounds, numRounds[i]); + + + log( + "openPMD: species '%1%': global particle count=%2%, number of dumping iterations=%3%") + % T_SpeciesFilter::getName() % globalNumParticles % globalNumDumpRounds; + + auto hostFrame = strategy->malloc(T_SpeciesFilter::getName(), particleIoChunkInfo.largestChunk); + + { + meta::ForEach< + typename NewParticleDescription::ValueTypeSeq, + openPMD::InitParticleAttribute> + initParticleAttributes; + initParticleAttributes(params, particleSpecies, basename, globalNumParticles); + } + + /** Offset within our global chunk where we are allowed to write particles too. + * The offset is updated each dumping iteration. + */ + size_t particleOffset = 0u; + + uint64_t dumpIteration = 0u; + // To write all metadata for particles we need to perform dumping once even if we have no + // particles + do + { + ChunkDescription chunk; + if(dumpIteration < particleIoChunkInfo.ranges.size()) { - case WriteSpeciesStrategy::ADIOS: - { - using type = StrategyADIOS; - strategy = std::unique_ptr(dynamic_cast(new type)); - break; - } - case WriteSpeciesStrategy::HDF5: - { - using type = StrategyHDF5; - strategy = std::unique_ptr(dynamic_cast(new type)); - break; - } + chunk = particleIoChunkInfo.ranges[dumpIteration]; } - constexpr size_t particleSizeInByte = AStrategy::FrameType::ParticleType::sizeInByte(); - ParticleIoChunkInfo particleIoChunkInfo - = createSupercellRangeChunks(params, speciesTmp, particleFilter, particleSizeInByte); - uint64_t const requiredDumpRounds = particleIoChunkInfo.ranges.size(); + using RunParameters = typename AStrategy::RunParameters; + RunParameters runParameters( + dc, + *params, + speciesTmp, + filter, + particleFilter, + particleToTotalDomainOffset, + globalNumParticles); + + if(chunk.numberOfParticles > 0) + { + strategy->prepare(currentStep, T_SpeciesFilter::getName(), runParameters, chunk); + } + log("openPMD: (begin) write particle records for %1%, dumping round %2%") + % T_SpeciesFilter::getName() % dumpIteration; + + std::stringstream description; + description << "\tslice " << dumpIteration << " prepare"; + params->m_dumpTimes.now(description.str()); + + size_t writtenBytes = 0; + + meta::ForEach< + typename NewParticleDescription::ValueTypeSeq, + openPMD::ParticleAttribute> + writeToOpenPMD; + writeToOpenPMD( + params, + hostFrame, + particleSpecies, + basename, + chunk.numberOfParticles, + globalNumParticles, + myParticleOffset + particleOffset, + writtenBytes, + /* verbose_log = */ true); - /* count total number of particles on the device */ - log( - "openPMD: species '%1%': particles=%2% in %3% chunks, largest data chunk size %4% byte") - % T_SpeciesFilter::getName() % particleIoChunkInfo.totalNumParticles % requiredDumpRounds - % (particleIoChunkInfo.largestChunk * particleSizeInByte); + description = std::stringstream(); + description << ": " << writtenBytes << " bytes for " << chunk.numberOfParticles + << " particles from offset " << (myParticleOffset + particleOffset); + params->m_dumpTimes.append(description.str()); - myNumParticles = particleIoChunkInfo.totalNumParticles; - std::vector allNumParticles(mpiSize); + log("openPMD: flush particle records for %1%, dumping round %2%") + % T_SpeciesFilter::getName() % dumpIteration; - // avoid deadlock between not finished pmacc tasks and mpi blocking - // collectives + // avoid deadlock between not finished pmacc tasks and mpi blocking collectives eventSystem::getTransactionEvent().waitForFinished(); - MPI_CHECK(MPI_Allgather( - &myNumParticles, + params->m_dumpTimes.now( + "\tslice " + std::to_string(dumpIteration) + " flush"); + particleSpecies.seriesFlush(PreferredFlushTarget::Disk); + params->m_dumpTimes.now( + "\tslice " + std::to_string(dumpIteration) + " end"); + + + log("openPMD: (end) write particle records for %1%, dumping round %2%") + % T_SpeciesFilter::getName() % dumpIteration; + + particleOffset += chunk.numberOfParticles; + ++dumpIteration; + } while(dumpIteration < globalNumDumpRounds); + + strategy.reset(); + + log("openPMD: ( end ) writing species: %1%") % T_SpeciesFilter::getName(); + + writeParticlePatches( + params, + particleToTotalDomainOffset, + series, + particleSpecies, + basename, + speciesGroup, + mpiSize, + mpiRank, + myParticleOffset, + myNumParticles, + globalNumParticles); + } + + static void writeParticlePatchesBySupercells( + ThreadParams* params, + DataSpace const& particleToTotalDomainOffset, + ParticleIoChunkInfo particleIoChunkInfo, + ::openPMD::Series& series, + ::openPMD::ParticleSpecies& particleSpecies, + std::string const& basename, + std::string const& speciesGroup, + GridController& gc, + uint64_t mpiSize, + uint64_t mpiRank, + size_t myParticleOffset, + size_t myNumParticles, + size_t globalNumParticles) + { + /* write species counter table to openPMD storage */ + log("openPMD: (begin) writing particle patches for %1%") + % T_SpeciesFilter::getName(); + + size_t numPatches = particleIoChunkInfo.ranges.size(); + { // Verify that each rank contributes the same number of patches + size_t maximum, minimum; + MPI_Allreduce( + &numPatches, + &minimum, 1, - MPI_UNSIGNED_LONG_LONG, - allNumParticles.data(), + MPI_Types().value, + MPI_MIN, + gc.getCommunicator().getMPIComm()); + MPI_Allreduce( + &numPatches, + &maximum, 1, - MPI_UNSIGNED_LONG_LONG, - gc.getCommunicator().getMPIComm())); + MPI_Types().value, + MPI_MAX, + gc.getCommunicator().getMPIComm()); - for(uint64_t i = 0; i < mpiSize; ++i) + if(minimum != maximum) { - globalNumParticles += allNumParticles[i]; - if(i < mpiRank) - myParticleOffset += allNumParticles[i]; + throw std::runtime_error( + "Ranks have different number of supercells: Minimum number = " + std::to_string(minimum) + + ", maximum number: " + std::to_string(maximum) + ". Will abort."); } + } - // @todo combine this and the MPI_Gather above to a single gather for better scaling - std::vector numRounds(mpiSize); - - MPI_CHECK(MPI_Allgather( - &requiredDumpRounds, - 1, - MPI_UNSIGNED_LONG_LONG, - numRounds.data(), - 1, - MPI_UNSIGNED_LONG_LONG, - gc.getCommunicator().getMPIComm())); + auto const areaMapper = makeAreaMapper(*params->cellDescription); + auto const baseGridDim = areaMapper.getGridDim(); + auto const rangeMapper = makeRangeMapper(areaMapper); - uint64_t globalNumDumpRounds = requiredDumpRounds; - for(uint64_t i = 0; i < mpiSize; ++i) - globalNumDumpRounds = std::max(globalNumDumpRounds, numRounds[i]); + using index_t = uint64_t; + std::vector bufferNumParticlesOffset(numPatches); + std::vector bufferNumParticles(numPatches); + std::array, simDim> bufferOffset; + std::array, simDim> bufferExtent; + for(size_t d = 0; d < simDim; ++d) + { + bufferOffset[d].resize(numPatches); + bufferExtent[d].resize(numPatches); + } + ::openPMD::Datatype const datatype = ::openPMD::determineDatatype(); + // not const, we'll switch out the JSON config + ::openPMD::Dataset ds(datatype, {numPatches * mpiSize}); - log( - "openPMD: species '%1%': global particle count=%2%, number of dumping iterations=%3%") - % T_SpeciesFilter::getName() % globalNumParticles % globalNumDumpRounds; + ::openPMD::ParticlePatches particlePatches = particleSpecies.particlePatches; + ::openPMD::PatchRecordComponent numParticles + = particlePatches["numParticles"][::openPMD::RecordComponent::SCALAR]; + ::openPMD::PatchRecordComponent numParticlesOffset + = particlePatches["numParticlesOffset"][::openPMD::RecordComponent::SCALAR]; - auto hostFrame = strategy->malloc(T_SpeciesFilter::getName(), particleIoChunkInfo.largestChunk); + auto const totalPatchOffset = particleToTotalDomainOffset + params->localWindowToDomainOffset; + auto const totalPatchExtent = params->window.localDimensions.size; + std::array singlePatchExtent; + for(uint32_t d = 0; d < simDim; ++d) + { + singlePatchExtent[d] = (double) totalPatchExtent[d] / baseGridDim[d]; + } + size_t runningOffset = myParticleOffset; + for(size_t i = 0; i < numPatches; ++i) + { + auto const& supercell = particleIoChunkInfo.ranges.at(i); + if(supercell.endSupercellIdx != supercell.beginSupercellIdx + 1) { - meta::ForEach< - typename NewParticleDescription::ValueTypeSeq, - openPMD::InitParticleAttribute> - initParticleAttributes; - initParticleAttributes(params, particleSpecies, basename, globalNumParticles); + throw std::runtime_error( + "Grouping of supercells for load balancing output currently not implemented."); } + bufferNumParticlesOffset[i] = runningOffset; + bufferNumParticles[i] = supercell.numberOfParticles; + runningOffset += supercell.numberOfParticles; + DataSpace const superCellIdxND + = rangeMapper.getSuperCellIndex(supercell.beginSupercellIdx); + for(uint32_t d = 0; d < simDim; ++d) + { + bufferExtent[d][i] = singlePatchExtent[d]; + // No idea why the -1 is needed, but the results do not fit otherwise + // Something weird going on in supercell indexing + bufferOffset[d][i] + = totalPatchOffset[d] + index_t(singlePatchExtent[d] * (superCellIdxND[d] - 1)); + } + } - /** Offset within our global chunk where we are allowed to write particles too. - * The offset is updated each dumping iteration. - */ - size_t particleOffset = 0u; + ds.options = params->jsonMatcher->get(basename + "/particlePatches/numParticles"); + numParticles.resetDataset(ds); + ds.options = params->jsonMatcher->get(basename + "/particlePatches/numParticlesOffset"); + numParticlesOffset.resetDataset(ds); - uint64_t dumpIteration = 0u; - // To write all metadata for particles we need to perform dumping once even if we have no particles - do - { - ChunkDescription chunk; - if(dumpIteration < particleIoChunkInfo.ranges.size()) - { - chunk = particleIoChunkInfo.ranges[dumpIteration]; - } - - RunParameters_T runParameters( - dc, - *params, - speciesTmp, - filter, - particleFilter, - particleToTotalDomainOffset, - globalNumParticles); - - if(chunk.numberOfParticles > 0) - { - strategy->prepare(currentStep, T_SpeciesFilter::getName(), runParameters, chunk); - } - log("openPMD: (begin) write particle records for %1%, dumping round %2%") - % T_SpeciesFilter::getName() % dumpIteration; + /* It is safe to use the mpi rank to write the data even if the rank can differ between simulation + * runs. During the restart the plugin is using patch information to find the corresponding data. + */ + numParticles.storeChunk(bufferNumParticles, {mpiRank * numPatches}, {numPatches}); + numParticlesOffset.storeChunk(bufferNumParticlesOffset, {mpiRank * numPatches}, {numPatches}); - std::stringstream description; - description << "\tslice " << dumpIteration << " prepare"; - params->m_dumpTimes.now(description.str()); - - size_t writtenBytes = 0; - - meta::ForEach< - typename NewParticleDescription::ValueTypeSeq, - openPMD::ParticleAttribute> - writeToOpenPMD; - writeToOpenPMD( - params, - hostFrame, - particleSpecies, - basename, - chunk.numberOfParticles, - globalNumParticles, - myParticleOffset + particleOffset, - writtenBytes); - - description = std::stringstream(); - description << ": " << writtenBytes << " bytes for " << chunk.numberOfParticles - << " particles from offset " << (myParticleOffset + particleOffset); - params->m_dumpTimes.append(description.str()); - - log("openPMD: flush particle records for %1%, dumping round %2%") - % T_SpeciesFilter::getName() % dumpIteration; + ::openPMD::PatchRecord offset = particlePatches["offset"]; + ::openPMD::PatchRecord extent = particlePatches["extent"]; - // avoid deadlock between not finished pmacc tasks and mpi blocking collectives - eventSystem::getTransactionEvent().waitForFinished(); - params->m_dumpTimes.now( - "\tslice " + std::to_string(dumpIteration) + " flush"); - params->openPMDSeries->flush(PreferredFlushTarget::Disk); - params->m_dumpTimes.now( - "\tslice " + std::to_string(dumpIteration) + " end"); + for(uint32_t d = 0; d < simDim; ++d) + { + ::openPMD::PatchRecordComponent offset_x = offset[name_lookup[d]]; + ::openPMD::PatchRecordComponent extent_x = extent[name_lookup[d]]; + ds.options = params->jsonMatcher->get(basename + "/particlePatches/offset/" + name_lookup[d]); + offset_x.resetDataset(ds); + ds.options = params->jsonMatcher->get(basename + "/particlePatches/extent/" + name_lookup[d]); + extent_x.resetDataset(ds); + + offset_x.storeChunk(bufferOffset[d], {mpiRank * numPatches}, {numPatches}); + // TODO: Maybe use a constant record for this + extent_x.storeChunk(bufferExtent[d], {mpiRank * numPatches}, {numPatches}); + } + /* openPMD ED-PIC: additional attributes */ + setParticleAttributes( + particleSpecies, + globalNumParticles, + *params->jsonMatcher, + series.particlesPath() + speciesGroup); + params->m_dumpTimes.now("\tFlush species " + T_SpeciesFilter::getName()); + particleSpecies.seriesFlush(PreferredFlushTarget::Buffer); + params->m_dumpTimes.now("\tFinished flush species"); - log("openPMD: (end) write particle records for %1%, dumping round %2%") - % T_SpeciesFilter::getName() % dumpIteration; + log("openPMD: ( end ) writing particle patches for %1%") + % T_SpeciesFilter::getName(); + } - particleOffset += chunk.numberOfParticles; - ++dumpIteration; - } while(dumpIteration < globalNumDumpRounds); + template + static void writeParticlesBySupercells( + ThreadParams* params, + uint32_t const currentStep, + ::openPMD::Series& series, + ::openPMD::ParticleSpecies& particleSpecies, + std::string const& basename, + std::string const& speciesGroup, + std::unique_ptr strategy, + DataConnector& dc, + typename AStrategy::RunParameters::SpeciesType& speciesTmp, + particles::filter::IUnary& particleFilter, + DataSpace const& particleToTotalDomainOffset) + { + uint64_t myNumParticles = 0; + uint64_t globalNumParticles = 0; + uint64_t myParticleOffset = 0; + + GridController& gc = Environment::get().GridController(); + uint64_t mpiSize = gc.getGlobalSize(); + uint64_t mpiRank = gc.getGlobalRank(); + + MyParticleFilter filter; + filter.setWindowPosition(params->localWindowToDomainOffset, params->window.localDimensions.size); + + ParticleIoChunkInfo particleIoChunkInfo + = createAChunkForEverySupercell(params, speciesTmp, particleFilter); + uint64_t const requiredDumpRounds = particleIoChunkInfo.ranges.size(); + + /* count total number of particles on the device */ + constexpr size_t particleSizeInByte = AStrategy::FrameType::ParticleType::sizeInByte(); + log( + "openPMD: species '%1%': particles=%2% in %3% supercells, largest supercell size %4% byte") + % T_SpeciesFilter::getName() % particleIoChunkInfo.totalNumParticles % requiredDumpRounds + % (particleIoChunkInfo.largestChunk * particleSizeInByte); + + myNumParticles = particleIoChunkInfo.totalNumParticles; + std::vector allNumParticles(mpiSize); + + // avoid deadlock between not finished pmacc tasks and mpi blocking + // collectives + eventSystem::getTransactionEvent().waitForFinished(); + MPI_CHECK(MPI_Allgather( + &myNumParticles, + 1, + MPI_UNSIGNED_LONG_LONG, + allNumParticles.data(), + 1, + MPI_UNSIGNED_LONG_LONG, + gc.getCommunicator().getMPIComm())); + + for(uint64_t i = 0; i < mpiSize; ++i) + { + globalNumParticles += allNumParticles[i]; + if(i < mpiRank) + myParticleOffset += allNumParticles[i]; } - log("openPMD: ( end ) writing species: %1%") % T_SpeciesFilter::getName(); + // @todo combine this and the MPI_Gather above to a single gather for better scaling + std::vector numRounds(mpiSize); - /* write species counter table to openPMD storage */ - log("openPMD: (begin) writing particle patches for %1%") - % T_SpeciesFilter::getName(); + MPI_CHECK(MPI_Allgather( + &requiredDumpRounds, + 1, + MPI_UNSIGNED_LONG_LONG, + numRounds.data(), + 1, + MPI_UNSIGNED_LONG_LONG, + gc.getCommunicator().getMPIComm())); + + uint64_t globalNumDumpRounds = requiredDumpRounds; + for(uint64_t i = 0; i < mpiSize; ++i) + globalNumDumpRounds = std::max(globalNumDumpRounds, numRounds[i]); + + + log( + "openPMD: species '%1%': global particle count=%2%, number of dumping iterations=%3%") + % T_SpeciesFilter::getName() % globalNumParticles % globalNumDumpRounds; + + auto hostFrame = strategy->malloc(T_SpeciesFilter::getName(), particleIoChunkInfo.largestChunk); + + { + meta::ForEach< + typename NewParticleDescription::ValueTypeSeq, + openPMD::InitParticleAttribute> + initParticleAttributes; + initParticleAttributes(params, particleSpecies, basename, globalNumParticles); + } + + /** Offset within our global chunk where we are allowed to write particles too. + * The offset is updated each dumping iteration. + */ + size_t particleOffset = 0u; + + uint64_t dumpIteration = 0u; + // To write all metadata for particles we need to perform dumping once even if we have no + // particles + + constexpr size_t log_granularity = 100; + do { - using index_t = uint64_t; - ::openPMD::Datatype const datatype = ::openPMD::determineDatatype(); - // not const, we'll switch out the JSON config - ::openPMD::Dataset ds(datatype, {mpiSize}); - - ::openPMD::ParticlePatches particlePatches = particleSpecies.particlePatches; - ::openPMD::PatchRecordComponent numParticles - = particlePatches["numParticles"][::openPMD::RecordComponent::SCALAR]; - ::openPMD::PatchRecordComponent numParticlesOffset - = particlePatches["numParticlesOffset"][::openPMD::RecordComponent::SCALAR]; - - ds.options = params->jsonMatcher->get(basename + "/particlePatches/numParticles"); - numParticles.resetDataset(ds); - ds.options = params->jsonMatcher->get(basename + "/particlePatches/numParticlesOffset"); - numParticlesOffset.resetDataset(ds); - - /* It is safe to use the mpi rank to write the data even if the rank can differ between simulation - * runs. During the restart the plugin is using patch information to find the corresponding data. - */ - numParticles.store(mpiRank, myNumParticles); - numParticlesOffset.store(mpiRank, myParticleOffset); - - ::openPMD::PatchRecord offset = particlePatches["offset"]; - ::openPMD::PatchRecord extent = particlePatches["extent"]; - auto const patchExtent = params->window.localDimensions.size; - - for(size_t d = 0; d < simDim; ++d) + if(dumpIteration % log_granularity == 0) { - ::openPMD::PatchRecordComponent offset_x = offset[name_lookup[d]]; - ::openPMD::PatchRecordComponent extent_x = extent[name_lookup[d]]; - ds.options = params->jsonMatcher->get(basename + "/particlePatches/offset/" + name_lookup[d]); - offset_x.resetDataset(ds); - ds.options = params->jsonMatcher->get(basename + "/particlePatches/extent/" + name_lookup[d]); - extent_x.resetDataset(ds); - - auto const totalPatchOffset - = particleToTotalDomainOffset[d] + params->localWindowToDomainOffset[d]; - offset_x.store(mpiRank, totalPatchOffset); - extent_x.store(mpiRank, patchExtent[d]); + log("openPMD: (begin) write particle records for %1%, dumping round %2%") + % T_SpeciesFilter::getName() % dumpIteration; + } + ChunkDescription chunk; + if(dumpIteration < particleIoChunkInfo.ranges.size()) + { + chunk = particleIoChunkInfo.ranges[dumpIteration]; } - /* openPMD ED-PIC: additional attributes */ - setParticleAttributes( + using RunParameters = typename AStrategy::RunParameters; + RunParameters runParameters( + dc, + *params, + speciesTmp, + filter, + particleFilter, + particleToTotalDomainOffset, + globalNumParticles); + + if(chunk.numberOfParticles > 0) + { + strategy->prepare( + currentStep, + T_SpeciesFilter::getName(), + runParameters, + chunk, + /* verbose_log = */ false); + } + + std::stringstream description; + description << "\tslice " << dumpIteration << " prepare"; + params->m_dumpTimes.now(description.str()); + + size_t writtenBytes = 0; + + meta::ForEach< + typename NewParticleDescription::ValueTypeSeq, + openPMD::ParticleAttribute> + writeToOpenPMD; + writeToOpenPMD( + params, + hostFrame, particleSpecies, + basename, + chunk.numberOfParticles, globalNumParticles, - *params->jsonMatcher, - series.particlesPath() + speciesGroup); + myParticleOffset + particleOffset, + writtenBytes, + /* verbose_log = */ false); + + description = std::stringstream(); + description << ": " << writtenBytes << " bytes for " << chunk.numberOfParticles + << " particles from offset " << (myParticleOffset + particleOffset); + params->m_dumpTimes.append(description.str()); + + // avoid deadlock between not finished pmacc tasks and mpi blocking collectives + eventSystem::getTransactionEvent().waitForFinished(); params->m_dumpTimes.now( - "\tFlush species " + T_SpeciesFilter::getName()); - params->openPMDSeries->flush(PreferredFlushTarget::Buffer); - params->m_dumpTimes.now("\tFinished flush species"); + "\tslice " + std::to_string(dumpIteration) + " flush"); + particleSpecies.seriesFlush(PreferredFlushTarget::Disk); + params->m_dumpTimes.now( + "\tslice " + std::to_string(dumpIteration) + " end"); + + + particleOffset += chunk.numberOfParticles; + ++dumpIteration; + } while(dumpIteration < globalNumDumpRounds); + + strategy.reset(); + + log("openPMD: ( end ) writing species: %1%") % T_SpeciesFilter::getName(); + + writeParticlePatchesBySupercells( + params, + particleToTotalDomainOffset, + particleIoChunkInfo, + series, + particleSpecies, + basename, + speciesGroup, + gc, + mpiSize, + mpiRank, + myParticleOffset, + myNumParticles, + globalNumParticles); + } + + HINLINE void operator()( + ThreadParams* params, + uint32_t const currentStep, + DataSpace const particleToTotalDomainOffset) + { + log("openPMD: (begin) write species: %1%") % T_SpeciesFilter::getName(); + params->m_dumpTimes.now( + "Begin write species " + T_SpeciesFilter::getName()); + + DataConnector& dc = Environment<>::get().DataConnector(); + + /* load particle without copy particle data to host */ + auto speciesTmp = dc.get(ThisSpecies::FrameType::getName()); + std::string const speciesGroup(T_Species::getName()); + + ::openPMD::Series& series = *params->writeOpenPMDSeries; + ::openPMD::Iteration iteration = series.writeIterations()[currentStep]; + std::string const basename = series.particlesPath() + speciesGroup; + + auto idProvider = dc.get("globalId"); + + // enforce that the filter interface is fulfilled + particles::filter::IUnary particleFilter( + currentStep, + idProvider->getDeviceGenerator()); + + ::openPMD::ParticleSpecies& particleSpecies = iteration.particles[speciesGroup]; + + using RunParameters_T = StrategyRunParameters< + decltype(speciesTmp), + MyParticleFilter, + decltype(particleFilter), + DataSpace const>; + using AStrategy = Strategy; + + std::unique_ptr strategy; + + switch(params->strategy) + { + case WriteSpeciesStrategy::ADIOS: + { + using type = StrategyADIOS; + strategy = std::unique_ptr(dynamic_cast(new type)); + break; + } + case WriteSpeciesStrategy::HDF5: + { + using type = StrategyHDF5; + strategy = std::unique_ptr(dynamic_cast(new type)); + break; + } } - log("openPMD: ( end ) writing particle patches for %1%") - % T_SpeciesFilter::getName(); + // writeParticlesByChunks( + writeParticlesBySupercells( + params, + currentStep, + series, + particleSpecies, + basename, + speciesGroup, + std::move(strategy), + dc, + speciesTmp, + particleFilter, + particleToTotalDomainOffset); } }; diff --git a/include/picongpu/plugins/openPMD/openPMDWriter.def b/include/picongpu/plugins/openPMD/openPMDWriter.def index 6cf93721ff..e4763adb00 100644 --- a/include/picongpu/plugins/openPMD/openPMDWriter.def +++ b/include/picongpu/plugins/openPMD/openPMDWriter.def @@ -79,7 +79,9 @@ namespace picongpu struct ThreadParams : PluginParameters { #if (ENABLE_OPENPMD == 1) - std::unique_ptr<::openPMD::Series> openPMDSeries; /* is null iff there is no series currently open */ + // might be needed at the same time for load balancing purposes + std::optional<::openPMD::Series> writeOpenPMDSeries; /* is null iff there is no series currently open */ + std::optional<::openPMD::Series> readOpenPMDSeries; /* is null iff there is no series currently open */ #endif /** current dump is a checkpoint */ bool isCheckpoint; @@ -161,7 +163,7 @@ namespace picongpu #endif }; - void closeSeries(); + void closeSeries(::openPMD::Access); /* * If file is empty, read from command line parameters. diff --git a/include/picongpu/plugins/openPMD/openPMDWriter.x.cpp b/include/picongpu/plugins/openPMD/openPMDWriter.x.cpp index 7bfbd49f66..fa9e1bf25a 100644 --- a/include/picongpu/plugins/openPMD/openPMDWriter.x.cpp +++ b/include/picongpu/plugins/openPMD/openPMDWriter.x.cpp @@ -142,6 +142,17 @@ namespace picongpu inline ::openPMD::Series& ThreadParams::openSeries(::openPMD::Access at) { + auto& openPMDSeries = *[&]() + { + if(at == ::openPMD::Access::READ_ONLY || at == ::openPMD::Access::READ_LINEAR) + { + return &readOpenPMDSeries; + } + else + { + return &writeOpenPMDSeries; + } + }(); if(!openPMDSeries) { std::string fullName = fileName + fileInfix + "." + fileExtension; @@ -149,7 +160,7 @@ namespace picongpu // avoid deadlock between not finished pmacc tasks and mpi calls in // openPMD eventSystem::getTransactionEvent().waitForFinished(); - openPMDSeries = std::make_unique<::openPMD::Series>( + openPMDSeries = std::make_optional<::openPMD::Series>( fullName, at, communicator, @@ -185,8 +196,19 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. } } - inline void ThreadParams::closeSeries() + inline void ThreadParams::closeSeries(::openPMD::Access at) { + auto& openPMDSeries = *[&]() + { + if(at == ::openPMD::Access::READ_ONLY || at == ::openPMD::Access::READ_LINEAR) + { + return &readOpenPMDSeries; + } + else + { + return &writeOpenPMDSeries; + } + }(); if(openPMDSeries) { log("openPMD: close file: %1%") % fileName; @@ -898,7 +920,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. rngProvider->synchronize(); auto const name = rngProvider->getName(); - ::openPMD::Iteration iteration = params->openPMDSeries->writeIterations()[currentStep]; + ::openPMD::Iteration iteration = params->writeOpenPMDSeries->writeIterations()[currentStep]; ::openPMD::Mesh mesh = iteration.meshes[name]; auto const unitDimension = std::vector(7, 0.0); @@ -906,7 +928,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. writeFieldAttributes(params, currentStep, unitDimension, timeOffset, mesh); ::openPMD::MeshRecordComponent mrc = mesh[::openPMD::RecordComponent::SCALAR]; - std::string datasetName = params->openPMDSeries->meshesPath() + name; + std::string datasetName = params->writeOpenPMDSeries->meshesPath() + name; auto numRNGsPerSuperCell = DataSpace::create(1); numRNGsPerSuperCell.x() = numFrameSlots; @@ -949,7 +971,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. mrc.storeChunkRaw(rawPtr, asStandardVector(recordOffsetDims), asStandardVector(recordLocalSizeDims)); // avoid deadlock between not finished pmacc tasks and mpi blocking collectives eventSystem::getTransactionEvent().waitForFinished(); - params->openPMDSeries->flush(PreferredFlushTarget::Disk); + params->writeOpenPMDSeries->flush(PreferredFlushTarget::Disk); } /** Implementation of loading random number generator states @@ -968,7 +990,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. auto rngProvider = dc.get(RNGProvider::getName()); auto const name = rngProvider->getName(); - ::openPMD::Iteration iteration = params->openPMDSeries->iterations[restartStep].open(); + ::openPMD::Iteration iteration = params->readOpenPMDSeries->iterations[restartStep].open(); ::openPMD::Mesh mesh = iteration.meshes[name]; ::openPMD::MeshRecordComponent mrc = mesh[::openPMD::RecordComponent::SCALAR]; @@ -1023,6 +1045,12 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. { loadRngStatesImpl(&mThreadParams, restartStep); } + catch(std::exception const& e) + { + log("openPMD: loading RNG states failed, they will be re-initialized " + "instead. Original error:\n\t%1%") + % e.what(); + } catch(...) { log( @@ -1183,7 +1211,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. mThreadParams.initFromConfig(*m_help, m_id, currentStep, outputDirectory); mThreadParams.isCheckpoint = false; - dumpData(currentStep); + dumpData(currentStep, std::nullopt); } void restart(uint32_t const restartStep, std::string const& restartDirectory) override @@ -1200,10 +1228,59 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. */ } + enum class SyncState : signed int + { + WriterLetsGo, + ReaderOpenFile, + WriterCreateNextIteration, + ReaderLetsGo, + WriterRelease + }; + + void notifySync( + std::optional*>& sync, + SyncState nextState, + std::string const& writerOrReader, + std::string const& message) + { + if(!sync.has_value()) + { + return; + } + auto oldVal = (**sync).exchange((signed int) nextState, std::memory_order::release); + if(oldVal > (signed int) nextState) + { + std::cerr << "openPMD plugin, SyncState: Tried replacing newer state " << oldVal + << " with older state " << (signed int) nextState + << ". Will emplace the newer state again and go on." << std::endl; + (**sync).store(oldVal, std::memory_order::relaxed); + } + (**sync).notify_all(); + log("openPMD: SYNC %1%: set state %2%") % writerOrReader % message; + }; + + void catchUpSync( + std::optional*>& sync, + SyncState targetState, + std::string const& writerOrReader, + std::string const& message) + { + if(!sync.has_value()) + { + return; + } + for(signed int oldVal = **sync; oldVal < (signed int) targetState; oldVal = **sync) + { + (**sync).wait(oldVal, std::memory_order::acquire); + } + log("openPMD: SYNC %1%: received state %2%") % writerOrReader % message; + }; + void dumpCheckpoint( uint32_t const currentStep, std::string const& checkpointDirectory, - std::string const& checkpointFilename) override + std::string const& checkpointFilename, + std::optional*> sync) override { // checkpointing is only allowed if the plugin is controlled by the // class Checkpoint @@ -1217,7 +1294,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. mThreadParams.window = MovingWindow::getInstance().getDomainAsWindow(currentStep); - dumpData(currentStep); + dumpData(currentStep, sync); } /** Checks if a loaded openPMD series is supported for restarting. @@ -1232,7 +1309,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. * * @param series OpenPMD series which is used to load restart data. */ - void checkIOFileVersionRestartCompatibility(std::unique_ptr<::openPMD::Series>& series) const + void checkIOFileVersionRestartCompatibility(std::optional<::openPMD::Series>& series) const { /* Major version 0 and not having the attribute picongpuIOVersionMajor is handled equally later on. * Major version 0 will never have any other minor version than 1. @@ -1294,8 +1371,11 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. uint32_t const restartStep, std::string const& restartDirectory, std::string const& constRestartFilename, - uint32_t const restartChunkSize) override + uint32_t const restartChunkSize, + std::optional*> sync) override { + catchUpSync(sync, SyncState::ReaderOpenFile, "READER", "ReaderOpenFile"); + // restart is only allowed if the plugin is controlled by the class // Checkpoint assert(!m_help->selfRegister); @@ -1306,9 +1386,13 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. mThreadParams.openSeries(::openPMD::Access::READ_ONLY); - checkIOFileVersionRestartCompatibility(mThreadParams.openPMDSeries); + // Must not attempt to create the next Iteration too soon, else we might not even get to see it + notifySync(sync, SyncState::WriterCreateNextIteration, "READER", "WriterCreateNextIteration"); + catchUpSync(sync, SyncState::ReaderLetsGo, "READER", "ReaderLetsGo"); + + checkIOFileVersionRestartCompatibility(mThreadParams.readOpenPMDSeries); - ::openPMD::Iteration iteration = mThreadParams.openPMDSeries->iterations[restartStep].open(); + ::openPMD::Iteration iteration = mThreadParams.readOpenPMDSeries->iterations[restartStep].open(); /* load number of slides to initialize MovingWindow */ log("openPMD: (begin) read attr (%1% available)") % iteration.numAttributes(); @@ -1377,7 +1461,8 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. eventSystem::getTransactionEvent().waitForFinished(); // Finalize the openPMD Series by calling its destructor - mThreadParams.closeSeries(); + mThreadParams.closeSeries(::openPMD::Access::READ_ONLY); + notifySync(sync, SyncState::WriterRelease, "READER", "WriterRelease"); } private: @@ -1398,7 +1483,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. * * @param currentStep current simulation step */ - void dumpData(uint32_t currentStep) + void dumpData(uint32_t currentStep, std::optional*> sync) { // local offset + extent pmacc::Selection const localDomain = Environment::get().SubGrid().getLocalDomain(); @@ -1436,7 +1521,17 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. timer.toggleStart(); initWrite(); - write(&mThreadParams, currentStep, mpiTransportParams); + write(&mThreadParams, currentStep, mpiTransportParams, sync); + + // TODO: add a better keepalive logic for in-situ-restarting, + // e.g. keep Series alive on writer AND reader and use proper streaming API + // for now, every load/balance step opens a new stream (and each reader additionally opens a new copy + // of the IO plugin to avoid race conditions + + if(sync.has_value()) + { + mThreadParams.closeSeries(::openPMD::Access::CREATE); + } endWrite(); timer.toggleEnd(); @@ -1572,7 +1667,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. params->m_dumpTimes.now("Begin write field " + name); } - ::openPMD::Iteration iteration = params->openPMDSeries->writeIterations()[currentStep]; + ::openPMD::Iteration iteration = params->writeOpenPMDSeries->writeIterations()[currentStep]; ::openPMD::Mesh mesh = iteration.meshes[name]; // set mesh attributes @@ -1675,8 +1770,8 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. }(); ::openPMD::MeshRecordComponent mrc = mesh[pathToRecordComponent]; std::string datasetName - = nComponents > 1 ? params->openPMDSeries->meshesPath() + name + "/" + name_lookup_tpl[d] - : params->openPMDSeries->meshesPath() + name; + = nComponents > 1 ? params->writeOpenPMDSeries->meshesPath() + name + "/" + name_lookup_tpl[d] + : params->writeOpenPMDSeries->meshesPath() + name; params->initDataset(mrc, openPMDType, recordGlobalSizeDims, datasetName); @@ -1688,7 +1783,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. { // avoid deadlock between not finished pmacc tasks and mpi blocking collectives eventSystem::getTransactionEvent().waitForFinished(); - params->openPMDSeries->flush(PreferredFlushTarget::Disk); + params->writeOpenPMDSeries->flush(PreferredFlushTarget::Disk); continue; } @@ -1744,7 +1839,7 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. // avoid deadlock between not finished pmacc tasks and mpi blocking collectives eventSystem::getTransactionEvent().waitForFinished(); params->m_dumpTimes.now("\tComponent " + std::to_string(d) + " flush"); - params->openPMDSeries->flush(PreferredFlushTarget::Disk); + params->writeOpenPMDSeries->flush(PreferredFlushTarget::Disk); params->m_dumpTimes.now("\tComponent " + std::to_string(d) + " end"); } } @@ -1789,7 +1884,11 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. } }; - void write(ThreadParams* threadParams, uint32_t const currentStep, std::string mpiTransportParams) + void write( + ThreadParams* threadParams, + uint32_t const currentStep, + std::string mpiTransportParams, + std::optional*> sync) { threadParams->m_dumpTimes.now( "Beginning iteration " + std::to_string(currentStep)); @@ -1810,10 +1909,17 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. bool dumpFields = plugins::misc::containsObject(vectorOfDataSourceNames, "fields_all"); - if(threadParams->openPMDSeries) + // Need to signal the reader to go on, streaming engines might need to be opened together + notifySync(sync, SyncState::ReaderOpenFile, "WRITER", "ReaderOpenFile"); + + if(threadParams->writeOpenPMDSeries) { log("openPMD: Series still open, reusing"); // TODO check for same configuration + + // In this branch, we must however pay attention to not create the next Iteration too soon, as the + // reader might just be starting up and might miss it + catchUpSync(sync, SyncState::WriterCreateNextIteration, "WRITER", "WriterCreateNextIteration"); } else { @@ -1824,8 +1930,8 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. /* attributes written here are pure meta data */ WriteMeta writeMetaAttributes; writeMetaAttributes( - *threadParams->openPMDSeries, - (*threadParams->openPMDSeries).writeIterations()[currentStep], + *threadParams->writeOpenPMDSeries, + (*threadParams->writeOpenPMDSeries).writeIterations()[currentStep], currentStep); bool dumpAllParticles = plugins::misc::containsObject(vectorOfDataSourceNames, "species_all"); @@ -1935,9 +2041,14 @@ make sure that environment variable OPENPMD_BP_BACKEND is not set to ADIOS1. eventSystem::getTransactionEvent().waitForFinished(); mThreadParams.m_dumpTimes.now( "Closing iteration " + std::to_string(currentStep)); - mThreadParams.openPMDSeries->writeIterations()[currentStep].close(); + mThreadParams.writeOpenPMDSeries->writeIterations()[currentStep].close(); mThreadParams.m_dumpTimes.now("Done."); + // The above code might have ignored this state, we must now catch up to it in order not to set the + // next state too soon + catchUpSync(sync, SyncState::WriterCreateNextIteration, "WRITER", "WriterCreateNextIteration"); + notifySync(sync, SyncState::ReaderLetsGo, "WRITER", "ReaderLetsGo"); mThreadParams.m_dumpTimes.flush(); + catchUpSync(sync, SyncState::WriterRelease, "WRITER", "WriterRelease"); return; } diff --git a/include/picongpu/plugins/openPMD/restart/LoadParticleAttributesFromOpenPMD.hpp b/include/picongpu/plugins/openPMD/restart/LoadParticleAttributesFromOpenPMD.hpp index 5c379ec286..15f1673b3a 100644 --- a/include/picongpu/plugins/openPMD/restart/LoadParticleAttributesFromOpenPMD.hpp +++ b/include/picongpu/plugins/openPMD/restart/LoadParticleAttributesFromOpenPMD.hpp @@ -107,7 +107,7 @@ namespace picongpu * (this is collective call in many methods of openPMD * backends) */ - params->openPMDSeries->flush(); + record.seriesFlush(); uint64_t globalNumElements = 1; for(auto ext : rc.getExtent()) diff --git a/include/picongpu/plugins/openPMD/restart/LoadSpecies.hpp b/include/picongpu/plugins/openPMD/restart/LoadSpecies.hpp index d7487bbda4..82610129ee 100644 --- a/include/picongpu/plugins/openPMD/restart/LoadSpecies.hpp +++ b/include/picongpu/plugins/openPMD/restart/LoadSpecies.hpp @@ -35,8 +35,10 @@ # include +# include # include # include +# include # include @@ -46,6 +48,138 @@ namespace picongpu { using namespace pmacc; + struct RedistributeFilteredParticlesKernel + { + template + HDINLINE void operator()( + T_Worker const& worker, + ValueType* dataPtr, + FilterType&& filter, + RemapType&& remap, + MemIdxType const size, + char const filterRemove) const + { + constexpr uint32_t blockDomSize = T_Worker::blockDomSize(); + auto numDataBlocks = (size + blockDomSize - 1u) / blockDomSize; + + ValueType* s_mem = ::alpaka::getDynSharedMem(worker.getAcc()); + + // grid-strided loop over the chunked data + for(int dataBlock = worker.blockDomIdx(); dataBlock < numDataBlocks; dataBlock += worker.gridDomSize()) + { + auto dataBlockOffset = dataBlock * blockDomSize; + auto forEach = pmacc::lockstep::makeForEach(worker); + // read + forEach( + [&](uint32_t const inBlockIdx) + { + auto idx = dataBlockOffset + inBlockIdx; + if(idx < size) + { + s_mem[inBlockIdx] = dataPtr[idx]; + } + }); + worker.sync(); + + // write + forEach( + [&](uint32_t const inBlockIdx) + { + auto idx = dataBlockOffset + inBlockIdx; + if(idx < size && filter[idx] != filterRemove) + { + dataPtr[remap[idx]] = s_mem[inBlockIdx]; + } + }); + + // The next Iteration of the outer for loop does not depend on the data overwritten until now. + // Filtering moves data only to lower indexes and each of the outer loop's iterations deals with a + // contiguous chunk of data. + } + } + }; + + template + struct RedistributeFilteredParticles + + { + template + HINLINE void operator()( + FrameType& frame, + FilterType const& filter, + RemapType const& remap, + uint64_t const numParticlesCurrentBatch, + char const filterRemove) + { + using Identifier = T_Identifier; + using ValueType = typename pmacc::traits::Resolve::type::type; + using ComponentType = typename GetComponentsType::type; + + + constexpr uint32_t blockDomSize = decltype(lockstep::makeBlockCfg())::blockDomSize(); + constexpr size_t requiredSharedMemBytes = blockDomSize * sizeof(ValueType); + + ValueType* dataPtr = frame.getIdentifier(Identifier()).getPointer(); + + PMACC_LOCKSTEP_KERNEL(RedistributeFilteredParticlesKernel{}) + .configSMem(pmacc::math::Vector{numParticlesCurrentBatch}, requiredSharedMemBytes)( + dataPtr, + alpaka::getPtrNative(filter), + alpaka::getPtrNative(remap), + numParticlesCurrentBatch, + filterRemove); + } + }; + + struct KernelFilterParticles + { + template + DINLINE void operator()( + T_Worker const& worker, + FrameType&& loadedData, + MemIdxType size, + DataSpace patchTotalOffset, + DataSpace patchUpperCorner, + char const filterKeep, + char const filterRemove, + FilterType&& filterOut) const + { + // DataSpace<1> particleIndex = worker.blockDomIdxND(); + constexpr uint32_t blockDomSize = T_Worker::blockDomSize(); + auto numDataBlocks = (size + blockDomSize - 1u) / blockDomSize; + auto position_ = loadedData.getIdentifier(position()).getPointer(); + auto positionOffset = loadedData.getIdentifier(totalCellIdx()).getPointer(); + + // grid-strided loop over the chunked data + for(int dataBlock = worker.blockDomIdx(); dataBlock < numDataBlocks; dataBlock += worker.gridDomSize()) + { + auto dataBlockOffset = dataBlock * blockDomSize; + auto forEach = pmacc::lockstep::makeForEach(worker); + forEach( + [&](uint32_t const inBlockIdx) + { + auto idx = dataBlockOffset + inBlockIdx; + if(idx < size) + { + auto& positionVec = position_[idx]; + auto& positionOffsetVec = positionOffset[idx]; + char filterCurrent = filterKeep; + for(size_t d = 0; d < simDim; ++d) + { + auto positionInD = positionVec[d] + positionOffsetVec[d]; + if(positionInD < patchTotalOffset[d] || positionInD > patchUpperCorner[d]) + { + filterCurrent = filterRemove; + break; + } + } + filterOut[idx] = filterCurrent; + } + }); + } + } + }; + /** Load species from openPMD checkpoint storage * * @tparam T_Species type of species @@ -83,7 +217,7 @@ namespace picongpu DataConnector& dc = Environment<>::get().DataConnector(); GridController& gc = Environment::get().GridController(); - ::openPMD::Series& series = *params->openPMDSeries; + ::openPMD::Series& series = *params->readOpenPMDSeries; ::openPMD::Container<::openPMD::ParticleSpecies>& particles = series.iterations[currentStep].open().particles; ::openPMD::ParticleSpecies particleSpecies = particles[speciesName]; @@ -101,91 +235,262 @@ namespace picongpu auto numRanks = gc.getGlobalSize(); - size_t patchIdx = getPatchIdx(params, particleSpecies, numRanks); + auto [fullMatches, partialMatches] = getPatchIdx(params, particleSpecies); - std::shared_ptr fullParticlesInfoShared - = particleSpecies.particlePatches["numParticles"][::openPMD::RecordComponent::SCALAR] - .load(); + std::shared_ptr numParticlesShared + = particleSpecies.particlePatches["numParticles"].load(); + std::shared_ptr numParticlesOffsetShared + = particleSpecies.particlePatches["numParticlesOffset"].load(); particles.seriesFlush(); - uint64_t* fullParticlesInfo = fullParticlesInfoShared.get(); + uint64_t* patchNumParticles = numParticlesShared.get(); + uint64_t* patchNumParticlesOffset = numParticlesOffsetShared.get(); - /* Run a prefix sum over the numParticles[0] element in - * particlesInfo to retreive the offset of particles - */ - uint64_t particleOffset = 0u; - /* count total number of particles on the device */ - uint64_t totalNumParticles = 0u; - - assert(patchIdx < numRanks); - - for(size_t i = 0u; i <= patchIdx; ++i) { - if(i < patchIdx) - particleOffset += fullParticlesInfo[i]; - if(i == patchIdx) - totalNumParticles = fullParticlesInfo[i]; - } - - log("openPMD: Loading %1% particles from offset %2%") - % (long long unsigned) totalNumParticles % (long long unsigned) particleOffset; + uint64_t totalNumParticles = std::transform_reduce( + fullMatches.begin(), + fullMatches.end(), + 0, + /* reduce = */ [](uint64_t left, uint64_t right) { return left + right; }, + /* transform = */ [patchNumParticles](size_t patchIdx) + { return patchNumParticles[patchIdx]; }); - log("openPMD: malloc mapped memory: %1%") % speciesName; + log("openPMD: malloc mapped memory: %1%") % speciesName; - using FrameType = Frame; - using BufferType = Frame; + using FrameType = Frame; + using BufferType = Frame; - BufferType buffers; - FrameType mappedFrame; + BufferType buffers; + FrameType mappedFrame; - uint64_t maxChunkSize = std::min(static_cast(restartChunkSize), totalNumParticles); + uint64_t maxChunkSize = std::min(static_cast(restartChunkSize), totalNumParticles); - /*malloc mapped memory*/ - meta::ForEach> - mallocMem; - mallocMem(buffers, mappedFrame, maxChunkSize); + /*malloc mapped memory*/ + meta::ForEach> + mallocMem; + mallocMem(buffers, mappedFrame, maxChunkSize); - uint32_t const numLoadIterations - = maxChunkSize == 0u ? 0u : alpaka::core::divCeil(totalNumParticles, maxChunkSize); + for(size_t const patchIdx : fullMatches) + { + uint64_t particleOffset = patchNumParticlesOffset[patchIdx]; + uint64_t numParticles = patchNumParticles[patchIdx]; + + log("openPMD: Loading %1% particles from offset %2%") + % (long long unsigned) totalNumParticles % (long long unsigned) particleOffset; + + + uint32_t const numLoadIterations + = maxChunkSize == 0u ? 0u : alpaka::core::divCeil(numParticles, maxChunkSize); + + for(uint64_t loadRound = 0u; loadRound < numLoadIterations; ++loadRound) + { + auto particleLoadOffset = particleOffset + loadRound * maxChunkSize; + auto numLeftParticles = numParticles - loadRound * maxChunkSize; + + auto numParticlesCurrentBatch = std::min(numLeftParticles, maxChunkSize); + + log("openPMD: (begin) load species %1% round: %2%/%3%") % speciesName + % (loadRound + 1) % numLoadIterations; + if(numParticlesCurrentBatch != 0) + { + meta::ForEach< + typename NewParticleDescription::ValueTypeSeq, + LoadParticleAttributesFromOpenPMD> + loadAttributes; + loadAttributes( + params, + mappedFrame, + particleSpecies, + particleLoadOffset, + numParticlesCurrentBatch); + + pmacc::particles::operations::splitIntoListOfFrames( + *speciesTmp, + mappedFrame, + numParticlesCurrentBatch, + cellOffsetToTotalDomain, + totalCellIdx_, + *(params->cellDescription), + picLog::INPUT_OUTPUT()); + } + log("openPMD: ( end ) load species %1% round: %2%/%3%") % speciesName + % (loadRound + 1) % numLoadIterations; + } + } + } - for(uint64_t loadRound = 0u; loadRound < numLoadIterations; ++loadRound) { - auto particleLoadOffset = particleOffset + loadRound * maxChunkSize; - auto numLeftParticles = totalNumParticles - loadRound * maxChunkSize; - - auto numParticlesCurrentBatch = std::min(numLeftParticles, maxChunkSize); - - log("openPMD: (begin) load species %1% round: %2%/%3%") % speciesName - % (loadRound + 1) % numLoadIterations; - if(numParticlesCurrentBatch != 0) + SubGrid const& subGrid = Environment::get().SubGrid(); + pmacc::Selection const localDomain = subGrid.getLocalDomain(); + pmacc::Selection const globalDomain = subGrid.getGlobalDomain(); + /* Offset to transform local particle offsets into total offsets for all particles within the + * current local domain. + * @attention A window can be the full simulation domain or the moving window. + */ + DataSpace localToTotalDomainOffset(localDomain.offset + globalDomain.offset); + + /* params->localWindowToDomainOffset is in PIConGPU for a restart zero but to stay generic we take + * the variable into account. + */ + DataSpace const patchTotalOffset + = localToTotalDomainOffset + params->localWindowToDomainOffset; + DataSpace const patchExtent = params->window.localDimensions.size; + DataSpace const patchUpperCorner = patchTotalOffset + patchExtent; + + uint64_t totalNumParticles = std::transform_reduce( + partialMatches.begin(), + partialMatches.end(), + 0, + /* reduce = */ [](uint64_t left, uint64_t right) { return left + right; }, + /* transform = */ [patchNumParticles](size_t patchIdx) + { return patchNumParticles[patchIdx]; }); + + log("openPMD: malloc mapped memory for partial patches: %1%") % speciesName; + + using FrameType = Frame; + using BufferType = Frame; + + BufferType buffers; + FrameType mappedFrame; + + uint64_t maxChunkSize = std::min(static_cast(restartChunkSize), totalNumParticles); + + /*malloc mapped memory*/ + meta::ForEach> + mallocMem; + mallocMem(buffers, mappedFrame, maxChunkSize); + constexpr bool isMappedMemorySupported + = alpaka::hasMappedBufSupport<::alpaka::Platform>; + PMACC_VERIFY_MSG(isMappedMemorySupported, "Device must support mapped memory!"); + // alpaka::Buf> filter; + auto filter = alpaka::allocMappedBuf( + manager::Device::get().current(), + manager::Device::get().getPlatform(), + MemSpace(maxChunkSize).toAlpakaMemVec()); + auto remap = alpaka::allocMappedBuf( + manager::Device::get().current(), + manager::Device::get().getPlatform(), + MemSpace(maxChunkSize).toAlpakaMemVec()); + for(size_t const patchIdx : partialMatches) { - meta::ForEach< - typename NewParticleDescription::ValueTypeSeq, - LoadParticleAttributesFromOpenPMD> - loadAttributes; - loadAttributes( - params, - mappedFrame, - particleSpecies, - particleLoadOffset, - numParticlesCurrentBatch); - - - pmacc::particles::operations::splitIntoListOfFrames( - *speciesTmp, - mappedFrame, - numParticlesCurrentBatch, - cellOffsetToTotalDomain, - totalCellIdx_, - *(params->cellDescription), - picLog::INPUT_OUTPUT()); + uint64_t particleOffset = patchNumParticlesOffset[patchIdx]; + uint64_t numParticles = patchNumParticles[patchIdx]; + + log("openPMD: Loading up to %1% particles from offset %2%") + % (long long unsigned) totalNumParticles % (long long unsigned) particleOffset; + + + uint32_t const numLoadIterations + = maxChunkSize == 0u ? 0u : alpaka::core::divCeil(numParticles, maxChunkSize); + + for(uint64_t loadRound = 0u; loadRound < numLoadIterations; ++loadRound) + { + auto particleLoadOffset = particleOffset + loadRound * maxChunkSize; + auto numLeftParticles = numParticles - loadRound * maxChunkSize; + + auto numParticlesCurrentBatch = std::min(numLeftParticles, maxChunkSize); + + log("openPMD: (begin) load species %1% round: %2%/%3%") % speciesName + % (loadRound + 1) % numLoadIterations; + if(numParticlesCurrentBatch != 0) + { + meta::ForEach< + typename NewParticleDescription::ValueTypeSeq, + LoadParticleAttributesFromOpenPMD> + loadAttributes; + loadAttributes( + params, + mappedFrame, + particleSpecies, + particleLoadOffset, + numParticlesCurrentBatch); + + // now filter + + constexpr char filterKeep{1}, filterRemove{0}; + PMACC_LOCKSTEP_KERNEL(KernelFilterParticles{}) + .config(pmacc::math::Vector{numParticlesCurrentBatch})( + mappedFrame, + numParticlesCurrentBatch, + patchTotalOffset, + patchUpperCorner, + filterKeep, + filterRemove, + alpaka::getPtrNative(filter)); + eventSystem::getTransactionEvent().waitForFinished(); + + // This part is inherently sequential, keep it on CPU + // (maybe run also this on GPU to avoid multiple data copies) + // std::cout << "REMAP: "; + MemIdxType remapCurrent = 0; + for(size_t particleIndex = 0; particleIndex < numParticlesCurrentBatch; + ++particleIndex) + { + if(filter[particleIndex] == filterKeep) + { + remap[particleIndex] = remapCurrent++; + // std::cout << '1'; + } + else + { + remap[particleIndex] = std::numeric_limits::max(); + // std::cout << '0'; + } + } + // std::cout << std::endl; + + meta::ForEach< + typename NewParticleDescription::ValueTypeSeq, + RedistributeFilteredParticles> + redistributeFilteredParticles; + redistributeFilteredParticles( + mappedFrame, + filter, + remap, + numParticlesCurrentBatch, + filterRemove); + + std::cout << "Filtered " << remapCurrent << " out of " << numParticlesCurrentBatch + << " particles" << std::endl; + + pmacc::particles::operations::splitIntoListOfFrames( + *speciesTmp, + mappedFrame, + remapCurrent, // !! not numParticlesCurrentBatch, filtered vs. unfiltered number + cellOffsetToTotalDomain, + totalCellIdx_, + *(params->cellDescription), + picLog::INPUT_OUTPUT()); + } + log("openPMD: ( end ) load species %1% round: %2%/%3%") % speciesName + % (loadRound + 1) % numLoadIterations; + } } - log("openPMD: ( end ) load species %1% round: %2%/%3%") % speciesName - % (loadRound + 1) % numLoadIterations; } log("openPMD: ( end ) load species: %1%") % speciesName; } private: + // o: offset, e: extent, u: upper corner (= o+e) + static std::pair, DataSpace> intersect( + DataSpace const& o1, + DataSpace const& e1, + DataSpace const& o2, + DataSpace const& e2) + { + // Convert extents into upper coordinates + auto u1 = o1 + e1; + auto u2 = o2 + e2; + + DataSpace intersect_o, intersect_u, intersect_e; + for(unsigned d = 0; d < simDim; ++d) + { + intersect_o[d] = std::max(o1[d], o2[d]); + intersect_u[d] = std::min(u1[d], u2[d]); + intersect_e[d] = intersect_u[d] > intersect_o[d] ? intersect_u[d] - intersect_o[d] : 0; + } + return {intersect_o, intersect_e}; + } + /** get index for particle data within the openPMD patch data * * It is not possible to assume that we can use the MPI rank to load the particle data. @@ -197,13 +502,16 @@ namespace picongpu * * @return index of the particle patch within the openPMD data */ - HINLINE size_t - getPatchIdx(ThreadParams* params, ::openPMD::ParticleSpecies particleSpecies, size_t numRanks) + HINLINE std::pair, std::deque> getPatchIdx( + ThreadParams* params, + ::openPMD::ParticleSpecies particleSpecies) { std::string const name_lookup[] = {"x", "y", "z"}; - std::vector> offsets(numRanks); - std::vector> extents(numRanks); + size_t patches = particleSpecies.particlePatches["numParticles"].getExtent()[0]; + + std::vector> offsets(patches); + std::vector> extents(patches); // transform openPMD particle patch data into PIConGPU data objects for(uint32_t d = 0; d < simDim; ++d) @@ -213,7 +521,7 @@ namespace picongpu std::shared_ptr patchExtentsInfoShared = particleSpecies.particlePatches["extent"][name_lookup[d]].load(); particleSpecies.seriesFlush(); - for(size_t i = 0; i < numRanks; ++i) + for(size_t i = 0; i < patches; ++i) { offsets[i][d] = patchOffsetsInfoShared.get()[i]; extents[i][d] = patchExtentsInfoShared.get()[i]; @@ -235,18 +543,45 @@ namespace picongpu DataSpace const patchTotalOffset = localToTotalDomainOffset + params->localWindowToDomainOffset; DataSpace const patchExtent = params->window.localDimensions.size; + math::Vector true_; + for(unsigned d = 0; d < simDim; ++d) + { + true_[d] = true; + } // search the patch index based on the offset and extents of local domain size - for(size_t i = 0; i < numRanks; ++i) + std::deque fullMatches; + std::deque partialMatches; + size_t noMatches = 0; + for(size_t i = 0; i < patches; ++i) { - if(patchTotalOffset == offsets[i] && patchExtent == extents[i]) - return i; + // std::cout << "Comp.: " << patchTotalOffset << " - " << (patchTotalOffset + patchExtent) + // << "\tAGAINST " << offsets[i] << " - " << (offsets[i] + extents[i]) + // << "\toffsets: " << (patchTotalOffset <= offsets[i]) + // << ", extents: " << ((offsets[i] + extents[i]) <= (patchTotalOffset + patchExtent)) << + // '\n'; + if((patchTotalOffset <= offsets[i]) == true_ + && ((offsets[i] + extents[i]) <= (patchTotalOffset + patchExtent)) == true_) + { + fullMatches.emplace_back(i); + } + else if( + intersect(offsets[i], extents[i], patchTotalOffset, patchExtent).second.productOfComponents() + != 0) + { + partialMatches.emplace_back(i); + } + else + { + ++noMatches; + } } - // If no patch fits the conditions, something went wrong before - throw std::runtime_error( - "Error while restarting: no particle patch matches the required offset and extent"); - // Fake return still needed to avoid warnings - return 0; + + std::cout << "\n\n" + << fullMatches.size() << " full matches, " << partialMatches.size() << " partial matches, " + << noMatches << " unmatched." << std ::endl; + + return std::make_pair(std::move(fullMatches), std::move(partialMatches)); } }; diff --git a/include/picongpu/plugins/openPMD/restart/RestartFieldLoader.hpp b/include/picongpu/plugins/openPMD/restart/RestartFieldLoader.hpp index 939d65b3e0..e31463148a 100644 --- a/include/picongpu/plugins/openPMD/restart/RestartFieldLoader.hpp +++ b/include/picongpu/plugins/openPMD/restart/RestartFieldLoader.hpp @@ -122,7 +122,7 @@ namespace picongpu useLinearIdxAsDestination = true; } - ::openPMD::Series& series = *params->openPMDSeries; + ::openPMD::Series& series = *params->readOpenPMDSeries; ::openPMD::Container<::openPMD::Mesh>& meshes = series.iterations[currentStep].open().meshes; auto destBox = field.getHostBuffer().getDataBox(); @@ -219,6 +219,13 @@ namespace picongpu /* load from openPMD */ bool const isDomainBound = traits::IsFieldDomainBound::value; + + // Skip PML fields for load balancing purposes + if(!isDomainBound) + { + return; + } + RestartFieldLoader::loadField( field->getGridBuffer(), (uint32_t) T_Field::numComponents, diff --git a/include/picongpu/plugins/openPMD/writer/ParticleAttribute.hpp b/include/picongpu/plugins/openPMD/writer/ParticleAttribute.hpp index 01b6a7aff7..311c86c063 100644 --- a/include/picongpu/plugins/openPMD/writer/ParticleAttribute.hpp +++ b/include/picongpu/plugins/openPMD/writer/ParticleAttribute.hpp @@ -34,6 +34,8 @@ # include # include +# include + namespace picongpu { namespace openPMD @@ -130,7 +132,8 @@ namespace picongpu size_t const elements, size_t const globalElements, size_t const globalOffset, - size_t& accumulateWrittenBytes) + size_t& accumulateWrittenBytes, + bool verbose_log) { using Identifier = T_Identifier; using ValueType = typename pmacc::traits::Resolve::type::type; @@ -158,10 +161,35 @@ namespace picongpu if(elements == 0) { // accumulateWrittenBytes += 0; + + // Since Span-based storeChunk needs to interact with the openPMD backend (potentially opening it), + // this cannot be skipped in parallel setups + for(uint32_t d = 0; d < components; d++) + { + ::openPMD::RecordComponent recordComponent + = components > 1 ? record[name_lookup[d]] : record[::openPMD::MeshRecordComponent::SCALAR]; + + /* + * storeChunk() on constant components (this includes empty datasets) are illegal, so skip in + * that case. + */ + if(std::ranges::any_of(recordComponent.getExtent(), [](auto const e) { return e == 0; })) + { + continue; + } + + recordComponent.storeChunk( + ::openPMD::Offset{globalOffset}, + ::openPMD::Extent{elements}); + } return; } - log("openPMD: (begin) write species attribute: %1%") % Identifier::getName(); + if(verbose_log) + { + log("openPMD: (begin) write species attribute: %1%") + % Identifier::getName(); + } accumulateWrittenBytes += components * elements * sizeof(ComponentType); @@ -186,7 +214,11 @@ namespace picongpu } } - log("openPMD: ( end ) write species attribute: %1%") % Identifier::getName(); + if(verbose_log) + { + log("openPMD: ( end ) write species attribute: %1%") + % Identifier::getName(); + } } }; diff --git a/include/picongpu/plugins/openPMD/writer/misc.hpp b/include/picongpu/plugins/openPMD/writer/misc.hpp index 4aa075d328..b7c65eaf08 100644 --- a/include/picongpu/plugins/openPMD/writer/misc.hpp +++ b/include/picongpu/plugins/openPMD/writer/misc.hpp @@ -134,6 +134,47 @@ namespace picongpu * simulation. * @return chunk information */ + template + inline ParticleIoChunkInfo createAChunkForEverySupercell( + T_IOParameters* params, + T_SpeciesPtr& speciesPtr, + T_ParticleFilter particleFilter) + { + auto const areaMapper = makeAreaMapper(*(params->cellDescription)); + auto rangeMapper = makeRangeMapper(areaMapper); + + // buffer accessible from device and host to store the number of particles within each supercell. + auto superCellHistogram = alpaka::allocMappedBufIfSupported( + manager::Device::get().current(), + manager::Device::get().getPlatform(), + MemSpace(rangeMapper.size()).toAlpakaMemVec()); + uint32_t* histData = alpaka::getPtrNative(superCellHistogram); + + using UsedPositionFilters = mp_list::type>; + using MyParticleFilter = typename FilterFactory::FilterType; + MyParticleFilter posFilter; + posFilter.setWindowPosition(params->localWindowToDomainOffset, params->window.localDimensions.size); + + PMACC_LOCKSTEP_KERNEL(KernelSuperCellHistogram{}) + .config(rangeMapper.getGridDim(), *speciesPtr)( + speciesPtr->getDeviceParticlesBox(), + histData, + particleFilter, + posFilter, + rangeMapper); + eventSystem::getTransactionEvent().waitForFinished(); + + ParticleIoChunkInfo particleIoChunkInfo; + + for(size_t supercellIdx = 0u; supercellIdx < rangeMapper.size(); ++supercellIdx) + { + particleIoChunkInfo.emplace(supercellIdx, supercellIdx + 1, histData[supercellIdx]); + particleIoChunkInfo.totalNumParticles += histData[supercellIdx]; + } + + return particleIoChunkInfo; + } + template inline ParticleIoChunkInfo createSupercellRangeChunks( T_IOParameters* params, diff --git a/include/picongpu/plugins/output/IIOBackend.hpp b/include/picongpu/plugins/output/IIOBackend.hpp index 6a07b673fe..3eb405ce0f 100644 --- a/include/picongpu/plugins/output/IIOBackend.hpp +++ b/include/picongpu/plugins/output/IIOBackend.hpp @@ -21,6 +21,7 @@ #include "picongpu/plugins/multi/IInstance.hpp" +#include #include #include @@ -38,7 +39,8 @@ namespace picongpu virtual void dumpCheckpoint( uint32_t currentStep, std::string const& checkpointDirectory, - std::string const& checkpointFilename) + std::string const& checkpointFilename, + std::optional*> synchronization) = 0; //! restart from a checkpoint @@ -46,7 +48,8 @@ namespace picongpu uint32_t restartStep, std::string const& restartDirectory, std::string const& restartFilename, - uint32_t restartChunkSize) + uint32_t restartChunkSize, + std::optional*> synchronization) = 0; };