diff --git a/CMakeLists.txt b/CMakeLists.txt index bd888f84b5..97f1bdc543 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -385,6 +385,8 @@ else() endif() unset(openPMD_REQUIRED_ADIOS2_COMPONENTS) +find_package(AWSSDK REQUIRED COMPONENTS s3) + # external library: pybind11 (optional) include(${openPMD_SOURCE_DIR}/cmake/dependencies/pybind11.cmake) @@ -434,7 +436,12 @@ set(CORE_SOURCE src/snapshots/IteratorTraits.cpp src/snapshots/RandomAccessIterator.cpp src/snapshots/Snapshots.cpp - src/snapshots/StatefulIterator.cpp) + src/snapshots/StatefulIterator.cpp + src/toolkit/ExternalBlockStorage.cpp + src/toolkit/AwsBuilder.cpp + src/toolkit/Aws.cpp + src/toolkit/StdioBuilder.cpp + src/toolkit/Stdio.cpp) set(IO_SOURCE src/IO/AbstractIOHandler.cpp src/IO/AbstractIOHandlerImpl.cpp @@ -562,6 +569,8 @@ if(openPMD_HAVE_ADIOS2) endif() endif() +target_link_libraries(openPMD PUBLIC ${AWSSDK_LIBRARIES}) + # Runtime parameter and API status checks ("asserts") if(openPMD_USE_VERIFY) target_compile_definitions(openPMD PRIVATE openPMD_USE_VERIFY=1) @@ -704,6 +713,7 @@ set(openPMD_TEST_NAMES set(openPMD_CLI_TOOL_NAMES ls convert-toml-json + merge-json ) set(openPMD_PYTHON_CLI_TOOL_NAMES pipe diff --git a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp index 7c02f38ddd..b3cefec55a 100644 --- a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp +++ b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp @@ -29,9 +29,11 @@ #include "openPMD/auxiliary/JSON_internal.hpp" #include "openPMD/backend/Variant_internal.hpp" #include "openPMD/config.hpp" +#include "openPMD/toolkit/ExternalBlockStorage.hpp" #include #include +#include #if openPMD_HAVE_MPI #include #endif @@ -153,8 +155,72 @@ void from_json(const nlohmann::json &j, std::complex &p) } } // namespace std +namespace openPMD::internal +{ +auto jsonDatatypeToString(Datatype dt) -> std::string; + +struct JsonDatatypeHandling +{ + template + static auto encodeDatatype(nlohmann::json &j) -> bool + { + auto const &needed_datatype = + jsonDatatypeToString(determineDatatype()); + if (auto it = j.find("datatype"); it != j.end()) + { + return it.value().get() == needed_datatype; + } + else + { + j["datatype"] = needed_datatype; + return true; + } + } + + template + static auto checkDatatype(nlohmann::json const &j) -> bool + { + auto const &needed_datatype = + jsonDatatypeToString(determineDatatype()); + if (auto it = j.find("datatype"); it != j.end()) + { + return it.value().get() == needed_datatype; + } + else + { + return false; + } + } + + template + static auto decodeDatatype(nlohmann::json const &j, Args &&...args) -> bool + { + if (auto it = j.find("datatype"); it != j.end()) + { + switchDatasetType( + stringToDatatype(it.value().get()), + std::forward(args)...); + return true; + } + else + { + return false; + } + } +}; +} // namespace openPMD::internal + namespace openPMD { +namespace dataset_mode_types +{ + struct Dataset_t + {}; + struct Template_t + {}; + using External_t = std::shared_ptr; +} // namespace dataset_mode_types + class JSONIOHandlerImpl : public AbstractIOHandlerImpl { using json = nlohmann::json; @@ -243,42 +309,6 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl std::future flush(); -private: -#if openPMD_HAVE_MPI - std::optional m_communicator; -#endif - - using FILEHANDLE = std::fstream; - - // map each Writable to its associated file - // contains only the filename, without the OS path - std::unordered_map m_files; - - std::unordered_map> m_jsonVals; - - // files that have logically, but not physically been written to - std::unordered_set m_dirty; - - /* - * Is set by constructor. - */ - FileFormat m_fileFormat{}; - - /* - * Under which key do we find the backend configuration? - * -> "json" for the JSON backend, "toml" for the TOML backend. - */ - std::string backendConfigKey() const; - - /* - * First return value: The location of the JSON value (either "json" or - * "toml") Second return value: The value that was maybe found at this place - */ - std::pair> - getBackendConfig(openPMD::json::TracingJSON &) const; - - std::string m_originalExtension; - /* * Was the config value explicitly user-chosen, or are we still working with * defaults? @@ -293,17 +323,36 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl // Dataset IO mode // ///////////////////// - enum class DatasetMode + struct DatasetMode + : std::variant< + dataset_mode_types::Dataset_t, + dataset_mode_types::Template_t, + dataset_mode_types::External_t> { - Dataset, - Template + using Dataset_t = dataset_mode_types::Dataset_t; + using Template_t = dataset_mode_types::Template_t; + using External_t = dataset_mode_types::External_t; + constexpr static Dataset_t Dataset{}; + constexpr static Template_t Template{}; + + using variant_t = std::variant< + dataset_mode_types::Dataset_t, + dataset_mode_types::Template_t, + External_t>; + using variant_t ::operator=; + + // casts needed because of + // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=90943 + inline auto as_base() const -> variant_t const & + { + return *this; + } + inline auto as_base() -> variant_t & + { + return *this; + } }; - // IOMode m_mode{}; - // SpecificationVia m_IOModeSpecificationVia = - // SpecificationVia::DefaultValue; bool m_printedSkippedWriteWarningAlready - // = false; - struct DatasetMode_s { // Initialized in init() @@ -318,8 +367,6 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl m_mode, m_specificationVia, m_skipWarnings}; } }; - DatasetMode_s m_datasetMode; - DatasetMode_s retrieveDatasetMode(openPMD::json::TracingJSON &config) const; /////////////////////// // Attribute IO mode // @@ -338,8 +385,57 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl AttributeMode m_mode{}; SpecificationVia m_specificationVia = SpecificationVia::DefaultValue; }; - AttributeMode_s m_attributeMode; +private: +#if openPMD_HAVE_MPI + std::optional m_communicator; +#endif + + using FILEHANDLE = std::fstream; + + // map each Writable to its associated file + // contains only the filename, without the OS path + std::unordered_map m_files; + + std::unordered_map> m_jsonVals; + + // files that have logically, but not physically been written to + std::unordered_set m_dirty; + + /* + * Is set by constructor. + */ + FileFormat m_fileFormat{}; + + /* + * Under which key do we find the backend configuration? + * -> "json" for the JSON backend, "toml" for the TOML backend. + */ + std::string backendConfigKey() const; + + /* + * First return value: The location of the JSON value (either "json" or + * "toml") Second return value: The value that was maybe found at this place + */ + std::pair> + getBackendConfig(openPMD::json::TracingJSON &) const; + static std::pair> + getBackendConfig( + openPMD::json::TracingJSON &, std::string const &configLocation); + + std::string m_originalExtension; + + /* + * In read mode, we can only open the external block storage backend upon + * opening the JSON file, because it contains meta information relevant + * for configuring the backend. + */ + std::optional + m_deferredExternalBlockstorageConfig; + DatasetMode_s m_datasetMode; + DatasetMode_s retrieveDatasetMode(openPMD::json::TracingJSON &config); + + AttributeMode_s m_attributeMode; AttributeMode_s retrieveAttributeMode(openPMD::json::TracingJSON &config) const; @@ -389,7 +485,8 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl // essentially: m_i = \prod_{j=0}^{i-1} extent_j static Extent getMultiplicators(Extent const &extent); - static std::pair getExtent(nlohmann::json &j); + static std::pair + getExtent(nlohmann::json &j, DatasetMode const &baseMode); // remove single '/' in the beginning and end of a string static std::string removeSlashes(std::string); diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index 75c91675d0..91035fa3e8 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -35,6 +35,7 @@ #include "openPMD/config.hpp" #include "openPMD/snapshots/Snapshots.hpp" #include "openPMD/version.hpp" +#include #if openPMD_HAVE_MPI #include @@ -239,6 +240,8 @@ namespace internal std::optional> m_deferred_initialization = std::nullopt; + std::optional m_manageAwsAPI = std::nullopt; + void close(); #if openPMD_HAVE_MPI diff --git a/include/openPMD/cli/convert-toml-json.hpp b/include/openPMD/cli/convert-toml-json.hpp new file mode 100644 index 0000000000..051fa01bfb --- /dev/null +++ b/include/openPMD/cli/convert-toml-json.hpp @@ -0,0 +1,182 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace from_format_to_format +{ +namespace json = openPMD::json; +struct ID +{ + template + static auto call(nlohmann::json const &&val) + // template <> + // auto call(nlohmann::json const &val) -> + // nlohmann::json const& + { + if constexpr (originallySpecifiedAs == json::SupportedLanguages::JSON) + { + return val; + } + else + { + return json::jsonToToml(val); + } + } +}; + +struct switch_ +{ + template + struct other_type; + template + static auto call(nlohmann::json const &&val) + { + return ID::call::value>( + std::move(val)); + } +}; +template <> +struct switch_::other_type +{ + static constexpr json::SupportedLanguages value = + json::SupportedLanguages::TOML; +}; +template <> +struct switch_::other_type +{ + static constexpr json::SupportedLanguages value = + json::SupportedLanguages::JSON; +}; +} // namespace from_format_to_format + +template +class convert_json_toml +{ + static void print(toml::value &val) + { + namespace json = openPMD::json; + std::cout << json::format_toml(val); + } + static void print(nlohmann::json const &val) + { + std::cout << val << '\n'; + } + static void + with_parsed_cmdline_args(openPMD::json::ParsedConfig parsed_config) + { + namespace json = openPMD::json; + auto [config, originallySpecifiedAs] = std::move(parsed_config); + switch (originallySpecifiedAs) + { + using SL = json::SupportedLanguages; + case SL::JSON: { + auto for_print = + FromFormatToFormat::template call(std::move(config)); + print(for_print); + } + break; + case SL::TOML: { + auto for_print = + FromFormatToFormat::template call(std::move(config)); + print(for_print); + } + break; + } + } + + struct ByLine : std::string + { + friend auto operator>>(std::istream &i, ByLine &l) -> std::istream & + { + decltype(auto) res = std::getline(i, l); + if (res) + { + l.insert(0, 1, '@'); + } + return res; + } + }; + using ByLineIterator = std::istream_iterator; + + template + static auto merge(It begin, It end) -> openPMD::json::ParsedConfig + { + namespace json = openPMD::json; + if (begin == end) + { + throw std::runtime_error( + "merge: need at least one JSON/TOML file."); + } + auto config = json::parseOptions( + *begin, + /* considerFiles = */ true, + /* convertLowercase = */ false); + for (++begin; begin != end; ++begin) + { + auto [next, _] = json::parseOptions( + *begin, + /* considerFiles = */ true, + /* convertLowercase = */ false); + json::merge_internal(config.config, next, /* do_prune = */ false); + } + return config; + } + +public: + enum class UseStdinAs : std::uint8_t + { + InlineJson, + ListOfJson + }; + + static void run_application( + int argc, + char const **argv, + UseStdinAs stdinconfig, + void (*print_help_message)(char const *)) + { + std::string jsonOrToml; + switch (argc) + { + case 0: + case 1: + switch (stdinconfig) + { + case UseStdinAs::InlineJson: { + // Just read the whole stream into memory + // Not very elegant, but we'll hold the entire JSON/TOML dataset + // in memory at some point anyway, so it doesn't really matter + std::stringbuf readEverything; + std::cin >> &readEverything; + jsonOrToml = readEverything.str(); + break; + } + case UseStdinAs::ListOfJson: { + auto parsed_config = + merge(ByLineIterator(std::cin), ByLineIterator{}); + with_parsed_cmdline_args(std::move(parsed_config)); + break; + } + } + break; + default: + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-h") == 0) + { + print_help_message(argv[1]); + exit(0); + } + auto parsed_config = merge(argv + 1, argv + argc); + with_parsed_cmdline_args(std::move(parsed_config)); + break; + } + } +}; diff --git a/include/openPMD/toolkit/Aws.hpp b/include/openPMD/toolkit/Aws.hpp new file mode 100644 index 0000000000..5c66eee9c0 --- /dev/null +++ b/include/openPMD/toolkit/Aws.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "openPMD/toolkit/ExternalBlockStorage.hpp" + +#include + +namespace openPMD::internal +{ +struct ExternalBlockStorageAws : ExternalBlockStorageBackend +{ +private: + Aws::S3::S3Client m_client; + std::string m_bucketName; + std::optional m_endpoint; + +public: + ExternalBlockStorageAws( + Aws::S3::S3Client, + std::string bucketName, + std::optional endpoint); + auto put(std::string const &identifier, void const *data, size_t len) + -> std::string override; + void get(std::string const &external_ref, void *data, size_t len) override; + [[nodiscard]] auto externalStorageLocation() const + -> nlohmann::json override; + ~ExternalBlockStorageAws() override; +}; +} // namespace openPMD::internal diff --git a/include/openPMD/toolkit/AwsBuilder.hpp b/include/openPMD/toolkit/AwsBuilder.hpp new file mode 100644 index 0000000000..4a8ad691b9 --- /dev/null +++ b/include/openPMD/toolkit/AwsBuilder.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include +#include +#include +#include + +namespace openPMD +{ +class ExternalBlockStorage; +} + +namespace openPMD::internal +{ +struct AwsBuilder +{ + AwsBuilder( + std::string bucketName, std::string accessKeyId, std::string secretKey); + + enum class Scheme : uint8_t + { + HTTP, + HTTPS + }; + std::string m_bucketName; + std::string m_accessKeyId; + std::string m_secretKey; + std::optional m_sessionToken; + std::initializer_list m_credentials; + std::optional m_endpointOverride; + std::optional m_region; + std::optional m_scheme; + std::optional m_verifySSL; + + auto setBucketName(std::string bucketName) -> AwsBuilder &; + auto setCredentials(std::string accessKeyId, std::string secretKey) + -> AwsBuilder &; + auto setSessionToken(std::string sessionToken) -> AwsBuilder &; + auto setEndpointOverride(std::string endpoint) -> AwsBuilder &; + auto setRegion(std::string regionName) -> AwsBuilder &; + auto setScheme(Scheme s) -> AwsBuilder &; + auto setVerifySSL(bool verify) -> AwsBuilder &; + + operator ::openPMD::ExternalBlockStorage(); + auto build() -> ::openPMD::ExternalBlockStorage; +}; +} // namespace openPMD::internal diff --git a/include/openPMD/toolkit/ExternalBlockStorage.hpp b/include/openPMD/toolkit/ExternalBlockStorage.hpp new file mode 100644 index 0000000000..25b776e620 --- /dev/null +++ b/include/openPMD/toolkit/ExternalBlockStorage.hpp @@ -0,0 +1,115 @@ +#pragma once + +#include "openPMD/Dataset.hpp" +#include "openPMD/toolkit/AwsBuilder.hpp" +#include "openPMD/toolkit/StdioBuilder.hpp" + +#include + +#include +#include +#include +#include +#include + +namespace openPMD +{ +class ExternalBlockStorage; +} + +namespace openPMD::internal +{ +struct ExternalBlockStorageBackend +{ + virtual auto + put(std::string const &identifier, void const *data, size_t len) + -> std::string = 0; + virtual void + get(std::string const &external_ref, void *data, size_t len) = 0; + [[nodiscard]] virtual auto externalStorageLocation() const + -> nlohmann::json = 0; + + virtual ~ExternalBlockStorageBackend(); +}; +} // namespace openPMD::internal + +namespace openPMD +{ +// used nowhere, just shows the signatures +// TODO: replace this with a concept upon switching to C++20 +struct DatatypeHandling_Interface +{ + /* + * Returns false if the same JSON location was previously encoded as + * another datatype. + */ + template + static auto encodeDatatype(nlohmann::json &) -> bool; + + /* + * Returns false if the encoded datatype does not match T_required + * or if no datatype has been encoded. + */ + template + static auto checkDatatype(nlohmann::json const &j) -> bool; + + /* + * Returns false if no encoded datatype could be found + */ + template + static auto decodeDatatype(nlohmann::json const &j, Args &&...args) -> bool; +}; + +class ExternalBlockStorage +{ +private: + std::unique_ptr m_worker; + ExternalBlockStorage( + std::unique_ptr); + + friend struct internal::StdioBuilder; + friend struct internal::AwsBuilder; + +public: + explicit ExternalBlockStorage(); + + static auto makeStdioSession(std::string directory) + -> internal::StdioBuilder; + static auto makeAwsSession( + std::string bucketName, std::string accessKeyId, std::string secretKey) + -> internal::AwsBuilder; + + // returns created JSON key + template + auto store( + Extent const &globalExtent, + Offset const &blockOffset, + Extent const &blockExtent, + nlohmann::json &fullJsonDataset, + nlohmann::json::json_pointer const &path, + std::optional infix, // e.g. for distinguishing MPI ranks + T const *data) -> std::string; + + template + void read( + std::string const &identifier, + nlohmann::json const &fullJsonDataset, + nlohmann::json::json_pointer const &path, + T *data); + + template + void read( + Offset const &blockOffset, + Extent const &blockExtent, + nlohmann::json const &fullJsonDataset, + nlohmann::json::json_pointer const &path, + T *data); + + [[nodiscard]] auto externalStorageLocation() const -> nlohmann::json; + + static void sanitizeString(std::string &s); +}; + +// Implementations + +} // namespace openPMD diff --git a/include/openPMD/toolkit/Stdio.hpp b/include/openPMD/toolkit/Stdio.hpp new file mode 100644 index 0000000000..9428151d2e --- /dev/null +++ b/include/openPMD/toolkit/Stdio.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "openPMD/toolkit/ExternalBlockStorage.hpp" + +namespace openPMD::internal +{ +struct ExternalBlockStorageStdio : ExternalBlockStorageBackend +{ +private: + std::string m_directory; + std::string m_openMode; + +public: + ExternalBlockStorageStdio(std::string directory, std::string openMode); + auto put(std::string const &identifier, void const *data, size_t len) + -> std::string override; + void get(std::string const &external_ref, void *data, size_t len) override; + [[nodiscard]] auto externalStorageLocation() const + -> nlohmann::json override; + ~ExternalBlockStorageStdio() override; +}; +} // namespace openPMD::internal diff --git a/include/openPMD/toolkit/StdioBuilder.hpp b/include/openPMD/toolkit/StdioBuilder.hpp new file mode 100644 index 0000000000..7d93048167 --- /dev/null +++ b/include/openPMD/toolkit/StdioBuilder.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include +#include + +namespace openPMD +{ +class ExternalBlockStorage; +} + +namespace openPMD::internal +{ +struct StdioBuilder +{ + std::string m_directory; + std::optional m_openMode = std::nullopt; + + auto setDirectory(std::string directory) -> StdioBuilder &; + auto setOpenMode(std::string openMode) -> StdioBuilder &; + + operator ::openPMD::ExternalBlockStorage(); + auto build() -> ::openPMD::ExternalBlockStorage; +}; +} // namespace openPMD::internal diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index 88e221bea5..8b6a0184a9 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -24,6 +24,7 @@ #include "openPMD/Error.hpp" #include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/IO/AbstractIOHandlerImpl.hpp" +#include "openPMD/IO/Access.hpp" #include "openPMD/ThrowError.hpp" #include "openPMD/auxiliary/Filesystem.hpp" #include "openPMD/auxiliary/JSONMatcher.hpp" @@ -31,10 +32,16 @@ #include "openPMD/auxiliary/Memory.hpp" #include "openPMD/auxiliary/StringManip.hpp" #include "openPMD/auxiliary/TypeTraits.hpp" +#include "openPMD/auxiliary/Variant.hpp" #include "openPMD/backend/Attribute.hpp" #include "openPMD/backend/Writable.hpp" +#include "openPMD/toolkit/ExternalBlockStorage.hpp" +#if openPMD_USE_FILESYSTEM_HEADER +#include +#endif #include +#include #include #include @@ -42,6 +49,7 @@ #include #include #include +#include namespace openPMD { @@ -140,11 +148,30 @@ namespace return *accum_ptr; } - void warnUnusedJson(openPMD::json::TracingJSON const &jsonConfig) + auto prepend_to_json(nlohmann::json j) -> nlohmann::json + { + return j; + } + + template + auto prepend_to_json(nlohmann::json j, Arg &&arg, Args &&...args) + -> nlohmann::json + { + return nlohmann::json{ + {std::forward(arg), + prepend_to_json(std::move(j), std::forward(args)...)}}; + } + + template + void warnUnusedJson( + openPMD::json::TracingJSON const &jsonConfig, + Args &&...extra_json_hierarchy) { auto shadow = jsonConfig.invertShadow(); if (shadow.size() > 0) { + shadow = prepend_to_json( + std::move(shadow), std::forward(extra_json_hierarchy)...); switch (jsonConfig.originallySpecifiedAs) { case openPMD::json::SupportedLanguages::JSON: @@ -162,7 +189,10 @@ namespace } } } +} // namespace +namespace internal +{ // Does the same as datatypeToString(), but this makes sure that we don't // accidentally change the JSON schema by modifying datatypeToString() std::string jsonDatatypeToString(Datatype dt) @@ -251,17 +281,224 @@ namespace } return "Unreachable!"; } +} // namespace internal + +namespace +{ + void parse_internal_mode( + nlohmann::json const &mode_j, + std::string const &configLocation, + JSONIOHandlerImpl::DatasetMode_s &res) + { + using DatasetMode = JSONIOHandlerImpl::DatasetMode; + using SpecificationVia = JSONIOHandlerImpl::SpecificationVia; + + DatasetMode &ioMode = res.m_mode; + SpecificationVia &specificationVia = res.m_specificationVia; + bool &skipWarnings = res.m_skipWarnings; + + auto modeOption = openPMD::json::asLowerCaseStringDynamic(mode_j); + if (!modeOption.has_value()) + { + throw error::BackendConfigSchema( + {configLocation, "mode"}, + "Invalid value of non-string type (accepted values are " + "'dataset' and 'template'."); + } + auto mode = modeOption.value(); + if (mode == "dataset") + { + ioMode = DatasetMode::Dataset; + specificationVia = SpecificationVia::Manually; + } + else if (mode == "template") + { + ioMode = DatasetMode::Template; + specificationVia = SpecificationVia::Manually; + } + else if (mode == "template_no_warn") + { + ioMode = DatasetMode::Template; + specificationVia = SpecificationVia::Manually; + skipWarnings = true; + } + else + { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode"}, + "Invalid value: '" + mode + + "' (accepted values are 'dataset' and 'template'."); + } + } + + template + auto optionalOrElse(std::optional o, OrElse &&orElse) -> T + { + if (o.has_value()) + { + return *std::move(o); + } + else + { + return std::forward(orElse)(); + } + } + + void parse_external_mode( + json::TracingJSON mode, + // In read mode, the metadata section stored under 'external_storage' + // These are default values, overridable with the first argument + std::optional previousCfg, + std::string const &configLocation, + JSONIOHandlerImpl::DatasetMode_s &res) + { + using SpecificationVia = JSONIOHandlerImpl::SpecificationVia; + using ExternalBlockStorage = openPMD::ExternalBlockStorage; + + auto get_key = + [&](char const *key) -> std::optional { + if (mode.json().contains(key)) + { + return {&mode.json({key})}; + } + else if (previousCfg.has_value() && (*previousCfg)->contains(key)) + { + return {&(**previousCfg).at(key)}; + } + else + { + return std::nullopt; + } + }; + + auto get_mandatory = [&](char const *key, + bool lowercase) -> std::string { + auto const &val = + *optionalOrElse(get_key(key), [&]() -> nlohmann::json const * { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", key}, + "Mandatory key."); + }); + return optionalOrElse( + lowercase ? openPMD::json::asLowerCaseStringDynamic(val) + : openPMD::json::asStringDynamic(val), + [&]() -> std::string { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", key}, + "Must be of string type."); + }); + }; + auto if_contains_optional = + [&](char const *key, bool lowercase, auto &&then) { + auto const maybeVal = get_key(key); + if (!maybeVal.has_value()) + { + return; + } + auto const &val = **maybeVal; + static_cast(then)(optionalOrElse( + lowercase ? openPMD::json::asLowerCaseStringDynamic(val) + : openPMD::json::asStringDynamic(val), + [&]() -> std::string { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", key}, + "Must be of string type."); + })); + }; + auto if_contains_optional_bool = [&](char const *key, auto &&then) { + auto const maybeVal = get_key(key); + if (!maybeVal.has_value()) + { + return; + } + auto const &val = **maybeVal; + if (!val.is_boolean()) + { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", key}, + "Must be of boolean type."); + } + static_cast(then)(val.get()); + }; + auto modeString = get_mandatory("provider", true); + + if (modeString == "stdio") + { + auto builder = ExternalBlockStorage::makeStdioSession( + get_mandatory("directory", false)); + + if_contains_optional("open_mode", false, [&](std::string openMode) { + builder.setOpenMode(std::move(openMode)); + }); + + res.m_mode = + std::make_shared(builder.build()); + } + else if (modeString == "aws") + { + openPMD::internal::AwsBuilder builder( + // TODO: bucket_name: introduce expansion pattern for openPMD + // file name + get_mandatory("bucket", false), + get_mandatory("access_key_id", false), + get_mandatory("secret_access_key", false)); + + if_contains_optional( + "session_token", false, [&](std::string sessionToken) { + builder.setSessionToken(std::move(sessionToken)); + }); + if_contains_optional( + "endpoint", false, [&](std::string endpointOverride) { + builder.setEndpointOverride(std::move(endpointOverride)); + }); + if_contains_optional("region", false, [&](std::string region) { + builder.setRegion(std::move(region)); + }); + if_contains_optional_bool("verify_ssl", [&](bool verifySSL) { + builder.setVerifySSL(verifySSL); + }); + if_contains_optional( + "scheme", true, [&](std::string const &scheme) { + if (scheme == "http") + { + builder.setScheme( + openPMD::internal::AwsBuilder::Scheme::HTTP); + } + else if (scheme == "https") + { + builder.setScheme( + openPMD::internal::AwsBuilder::Scheme::HTTPS); + } + else + { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", "scheme"}, + "Must be either 'http' or 'https'."); + } + }); + + res.m_mode = + std::make_shared(builder.build()); + } + else + { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", "provider"}, + "Must be either 'stdio' or 'aws'."); + } + + res.m_specificationVia = SpecificationVia::Manually; + } } // namespace -auto JSONIOHandlerImpl::retrieveDatasetMode( - openPMD::json::TracingJSON &config) const -> DatasetMode_s +auto JSONIOHandlerImpl::retrieveDatasetMode(openPMD::json::TracingJSON &config) + -> DatasetMode_s { // start with / copy from current config auto res = m_datasetMode; - DatasetMode &ioMode = res.m_mode; - SpecificationVia &specificationVia = res.m_specificationVia; - bool &skipWarnings = res.m_skipWarnings; - if (auto [configLocation, maybeConfig] = getBackendConfig(config); + + if (auto [configLocation, maybeConfig] = + getBackendConfig(config, backendConfigKey()); maybeConfig.has_value()) { auto jsonConfig = maybeConfig.value(); @@ -270,38 +507,27 @@ auto JSONIOHandlerImpl::retrieveDatasetMode( auto datasetConfig = jsonConfig["dataset"]; if (datasetConfig.json().contains("mode")) { - auto modeOption = openPMD::json::asLowerCaseStringDynamic( - datasetConfig["mode"].json()); - if (!modeOption.has_value()) - { - throw error::BackendConfigSchema( - {configLocation, "mode"}, - "Invalid value of non-string type (accepted values are " - "'dataset' and 'template'."); - } - auto mode = modeOption.value(); - if (mode == "dataset") - { - ioMode = DatasetMode::Dataset; - specificationVia = SpecificationVia::Manually; - } - else if (mode == "template") + auto mode = datasetConfig["mode"]; + if (mode.json().is_object()) { - ioMode = DatasetMode::Template; - specificationVia = SpecificationVia::Manually; - } - else if (mode == "template_no_warn") - { - ioMode = DatasetMode::Template; - specificationVia = SpecificationVia::Manually; - skipWarnings = true; + if (access::writeOnly(m_handler->m_backendAccess)) + { + parse_external_mode( + std::move(mode), std::nullopt, configLocation, res); + } + else + { + // sic! initialize the deferred json config as a new + // tracing object + m_deferredExternalBlockstorageConfig = + std::make_optional( + mode.json(), mode.originallySpecifiedAs); + config.declareFullyRead(); + } } else { - throw error::BackendConfigSchema( - {configLocation, "dataset", "mode"}, - "Invalid value: '" + mode + - "' (accepted values are 'dataset' and 'template'."); + parse_internal_mode(mode.json(), configLocation, res); } } } @@ -373,7 +599,13 @@ std::string JSONIOHandlerImpl::backendConfigKey() const std::pair> JSONIOHandlerImpl::getBackendConfig(openPMD::json::TracingJSON &config) const { - std::string configLocation = backendConfigKey(); + return getBackendConfig(config, backendConfigKey()); +} + +std::pair> +JSONIOHandlerImpl::getBackendConfig( + openPMD::json::TracingJSON &config, std::string const &configLocation) +{ if (config.json().contains(configLocation)) { return std::make_pair( @@ -467,6 +699,14 @@ void JSONIOHandlerImpl::createFile( access::write(m_handler->m_backendAccess), "[JSON] Creating a file in read-only mode is not possible."); + if (m_deferredExternalBlockstorageConfig.has_value()) + { + throw error::Internal( + "Creation of external block storage backend was deferred until " + "opening the first file, but a file is created before any was " + "opened."); + } + /* * Need to resolve this later than init() since the openPMD version might be * specified after the creation of the IOHandler. @@ -633,49 +873,53 @@ void JSONIOHandlerImpl::createDataset( } setAndGetFilePosition(writable, name); auto &dset = jsonVal[name]; - dset["datatype"] = jsonDatatypeToString(parameter.dtype); + dset["datatype"] = internal::jsonDatatypeToString(parameter.dtype); + + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + auto extent = parameter.extent; + switch (parameter.dtype) + { + case Datatype::CFLOAT: + case Datatype::CDOUBLE: + case Datatype::CLONG_DOUBLE: { + extent.push_back(2); + break; + } + default: + break; + } + if (parameter.extent.size() != 1 || + parameter.extent[0] != Dataset::UNDEFINED_EXTENT) + { + // TOML does not support nulls, so initialize with zero + dset["data"] = initializeNDArray( + extent, + m_fileFormat == FileFormat::Json + ? std::optional{} + : parameter.dtype); + } + }, + [&](DatasetMode::Template_t const &) { + if (parameter.extent != Extent{0} && + parameter.extent[0] != Dataset::UNDEFINED_EXTENT) + { + dset["extent"] = parameter.extent; + } + else + { + // no-op + // If extent is empty or no datatype is defined, don't + // bother writing it. The datatype is written above + // anyway. + } + }, + [&](DatasetMode::External_t const &) { + dset["extent"] = parameter.extent; + }}, + localMode.as_base()); - switch (localMode) - { - case DatasetMode::Dataset: { - auto extent = parameter.extent; - switch (parameter.dtype) - { - case Datatype::CFLOAT: - case Datatype::CDOUBLE: - case Datatype::CLONG_DOUBLE: { - extent.push_back(2); - break; - } - default: - break; - } - if (parameter.extent.size() != 1 || - parameter.extent[0] != Dataset::UNDEFINED_EXTENT) - { - // TOML does not support nulls, so initialize with zero - dset["data"] = initializeNDArray( - extent, - m_fileFormat == FileFormat::Json ? std::optional{} - : parameter.dtype); - } - break; - } - case DatasetMode::Template: - if (parameter.extent != Extent{0} && - parameter.extent[0] != Dataset::UNDEFINED_EXTENT) - { - dset["extent"] = parameter.extent; - } - else - { - // no-op - // If extent is empty or no datatype is defined, don't bother - // writing it. - // The datatype is written above anyway. - } - break; - } writable->written = true; m_dirty.emplace(file); } @@ -725,7 +969,8 @@ void JSONIOHandlerImpl::extendDataset( try { Extent datasetExtent; - std::tie(datasetExtent, localIOMode) = getExtent(j); + std::tie(datasetExtent, localIOMode) = + getExtent(j, m_datasetMode.m_mode); VERIFY_ALWAYS( datasetExtent.size() == parameters.extent.size(), "[JSON] Cannot change dimensionality of a dataset") @@ -743,38 +988,40 @@ void JSONIOHandlerImpl::extendDataset( "[JSON] The specified location contains no valid dataset"); } - switch (localIOMode) - { - case DatasetMode::Dataset: { - auto extent = parameters.extent; - auto datatype = stringToDatatype(j["datatype"].get()); - switch (datatype) - { - case Datatype::CFLOAT: - case Datatype::CDOUBLE: - case Datatype::CLONG_DOUBLE: { - extent.push_back(2); - break; - } - default: - // nothing to do - break; - } - // TOML does not support nulls, so initialize with zero - nlohmann::json newData = initializeNDArray( - extent, - m_fileFormat == FileFormat::Json ? std::optional{} - : datatype); - nlohmann::json &oldData = j["data"]; - mergeInto(newData, oldData); - j["data"] = newData; - } - break; - case DatasetMode::Template: { - j["extent"] = parameters.extent; - } - break; - } + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + auto extent = parameters.extent; + auto datatype = + stringToDatatype(j["datatype"].get()); + switch (datatype) + { + case Datatype::CFLOAT: + case Datatype::CDOUBLE: + case Datatype::CLONG_DOUBLE: { + extent.push_back(2); + break; + } + default: + // nothing to do + break; + } + // TOML does not support nulls, so initialize with zero + nlohmann::json newData = initializeNDArray( + extent, + m_fileFormat == FileFormat::Json ? std::optional{} + : datatype); + nlohmann::json &oldData = j["data"]; + mergeInto(newData, oldData); + j["data"] = newData; + }, + [&](DatasetMode::Template_t const &) { + j["extent"] = parameters.extent; + }, + [&](DatasetMode::External_t const &) { + j["extent"] = parameters.extent; + }}, + localIOMode.as_base()); writable->written = true; } @@ -882,9 +1129,44 @@ void JSONIOHandlerImpl::availableChunks( { refreshFileFromParent(writable); auto filePosition = setAndGetFilePosition(writable); - auto &j = obtainJsonContents(writable)["data"]; - *parameters.chunks = chunksInJSON(j); - chunk_assignment::mergeChunks(*parameters.chunks); + auto &j = obtainJsonContents(writable); + + auto [extent, datasetmode] = getExtent(j, m_datasetMode.m_mode); + + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + *parameters.chunks = chunksInJSON(j.at("data")); + chunk_assignment::mergeChunks(*parameters.chunks); + }, + [&](DatasetMode::Template_t const &) { + /* no-op, no chunks to be loaded */ + }, + [&](DatasetMode::External_t &) { + auto external_blocks = j.at("external_blocks"); + auto &res = *parameters.chunks; + res.reserve(external_blocks.size()); + for (auto it = external_blocks.begin(); + it != external_blocks.end(); + ++it) + { + auto const &block = it.value(); + try + { + auto const &o = block.at("offset").get(); + auto const &e = block.at("extent").get(); + res.emplace_back(o, e); + } + catch (nlohmann::json::exception const &e) + { + std::cerr << "[JSONIOHandlerImpl::availableChunks] " + "Could not parse block '" + << it.key() << "'. Original error was:\n" + << e.what(); + } + } + }}, + datasetmode.as_base()); } void JSONIOHandlerImpl::openFile( @@ -903,6 +1185,16 @@ void JSONIOHandlerImpl::openFile( auto file = std::get<0>(getPossiblyExisting(name)); + // Need to access data in order to resolve external block storage + // configuration. EBS for read modes is configured at two places: + // + // 1. In the JSON config (stored at m_deferredExternalBlockstorageConfig) + // 2. In the previous JSON file that we are now opening + // + // Since the configuration may exclusively take place in either of the two + // options, files need to be opened now in any case. + obtainJsonContents(file); + associateWithFile(writable, file); writable->written = true; @@ -970,7 +1262,7 @@ void JSONIOHandlerImpl::openDataset( *parameters.dtype = Datatype(stringToDatatype(datasetJson["datatype"].get())); - *parameters.extent = getExtent(datasetJson).first; + *parameters.extent = getExtent(datasetJson, m_datasetMode.m_mode).first; writable->written = true; } @@ -1139,6 +1431,46 @@ void JSONIOHandlerImpl::deleteAttribute( j.erase(parameters.name); } +namespace +{ + template + auto + write_rank_to_stream_with_sufficient_padding(Stream &s, int rank, int size) + -> Stream & + { + auto num_digits = [](unsigned n) -> unsigned { + constexpr auto max = std::numeric_limits::max(); + unsigned base_10 = 1; + unsigned res = 1; + while (base_10 < max) + { + base_10 *= 10; + if (n / base_10 == 0) + { + return res; + } + ++res; + } + return res; + }; + s << std::setw(num_digits(size - 1)) << std::setfill('0') << rank; + return s; + } + + struct StoreExternally + { + template + static void call( + ExternalBlockStorage &blockStorage, void const *ptr, Args &&...args) + { + blockStorage.store( + std::forward(args)..., static_cast(ptr)); + } + + static constexpr char const *errorMsg = "StoreExternally"; + }; +} // namespace + void JSONIOHandlerImpl::writeDataset( Writable *writable, Parameter ¶meters) { @@ -1148,25 +1480,53 @@ void JSONIOHandlerImpl::writeDataset( auto pos = setAndGetFilePosition(writable); auto file = refreshFileFromParent(writable); - auto &j = obtainJsonContents(writable); - - switch (verifyDataset(parameters, j)) - { - case DatasetMode::Dataset: - break; - case DatasetMode::Template: - if (!m_datasetMode.m_skipWarnings) - { - std::cerr - << "[JSON/TOML backend: Warning] Trying to write data to a " - "template dataset. Will skip." - << '\n'; - m_datasetMode.m_skipWarnings = true; - } - return; - } - - switchType(parameters.dtype, j, parameters); + auto filePosition = setAndGetFilePosition(writable, false); + auto &jsonRoot = *obtainJsonContents(file); + auto &j = jsonRoot[filePosition->id]; + + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + switchType(parameters.dtype, j, parameters); + }, + [&](DatasetMode::Template_t const &) { + if (!m_datasetMode.m_skipWarnings) + { + std::cerr << "[JSON/TOML backend: Warning] Trying to write " + "data to a " + "template dataset. Will skip." + << '\n'; + m_datasetMode.m_skipWarnings = true; + } + }, + [&](DatasetMode::External_t const &external) { + std::optional rankInfix; +#if openPMD_HAVE_MPI + if (m_communicator.has_value()) + { + auto &comm = *m_communicator; + // TODO maybe cache the result for this computation + int rank, size; + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + std::stringstream s; + s << "r"; + write_rank_to_stream_with_sufficient_padding(s, rank, size); + rankInfix = s.str(); + } +#endif + switchDatasetType( + parameters.dtype, + *external, + parameters.data.get(), + j.at("extent").get(), + parameters.offset, + parameters.extent, + jsonRoot, + filePosition->id, + std::move(rankInfix)); + }}, + verifyDataset(parameters, j).as_base()); writable->written = true; } @@ -1204,7 +1564,7 @@ void JSONIOHandlerImpl::writeAttribute( { case AttributeMode::Long: (*jsonVal)[filePosition->id]["attributes"][name] = { - {"datatype", jsonDatatypeToString(parameter.dtype)}, + {"datatype", internal::jsonDatatypeToString(parameter.dtype)}, {"value", value}}; break; case AttributeMode::Short: @@ -1235,40 +1595,66 @@ namespace static constexpr char const *errorMsg = "[JSON Backend] Fill with zeroes."; }; + + struct RetrieveExternally + { + template + static void + call(ExternalBlockStorage &blockStorage, void *ptr, Args &&...args) + { + blockStorage.read( + std::forward(args)..., static_cast(ptr)); + } + + static constexpr char const *errorMsg = "RetrieveExternally"; + }; } // namespace void JSONIOHandlerImpl::readDataset( Writable *writable, Parameter ¶meters) { - refreshFileFromParent(writable); - setAndGetFilePosition(writable); - auto &j = obtainJsonContents(writable); + auto file = refreshFileFromParent(writable); + auto filePosition = setAndGetFilePosition(writable); + auto &jsonRoot = *obtainJsonContents(file); + auto &j = jsonRoot[filePosition->id]; DatasetMode localMode = verifyDataset(parameters, j); - switch (localMode) - { - case DatasetMode::Template: - std::cerr << "[Warning] Cannot read chunks in Template mode of JSON " - "backend. Will fill with zeroes instead." - << '\n'; - switchNonVectorType( - parameters.dtype, parameters.data.get(), parameters.extent); - return; - case DatasetMode::Dataset: - try - { - switchType(parameters.dtype, j["data"], parameters); - } - catch (json::basic_json::type_error &) - { - throw error::ReadError( - error::AffectedObject::Dataset, - error::Reason::UnexpectedContent, - "JSON", - "The given path does not contain a valid dataset."); - } - break; - } + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + try + { + switchType( + parameters.dtype, j["data"], parameters); + } + catch (json::basic_json::type_error &) + { + throw error::ReadError( + error::AffectedObject::Dataset, + error::Reason::UnexpectedContent, + "JSON", + "The given path does not contain a valid dataset."); + } + }, + [&](DatasetMode::Template_t const &) { + std::cerr + << "[Warning] Cannot read chunks in Template mode of JSON " + "backend. Will fill with zeroes instead." + << '\n'; + switchNonVectorType( + parameters.dtype, parameters.data.get(), parameters.extent); + }, + [&](DatasetMode::External_t &external) { + switchDatasetType( + parameters.dtype, + *external, + parameters.data.get(), + parameters.offset, + parameters.extent, + jsonRoot, + filePosition->id); + }}, + localMode.as_base()); } namespace @@ -1790,7 +2176,8 @@ Extent JSONIOHandlerImpl::getMultiplicators(Extent const &extent) return res; } -auto JSONIOHandlerImpl::getExtent(nlohmann::json &j) +auto JSONIOHandlerImpl::getExtent( + nlohmann::json &j, DatasetMode const &baseMode) -> std::pair { Extent res; @@ -1819,7 +2206,10 @@ auto JSONIOHandlerImpl::getExtent(nlohmann::json &j) } else if (j.contains("extent")) { - ioMode = DatasetMode::Template; + ioMode = + std::holds_alternative(baseMode.as_base()) + ? baseMode + : DatasetMode{DatasetMode::Template}; res = j["extent"].get(); } else @@ -1964,6 +2354,9 @@ JSONIOHandlerImpl::obtainJsonContents(File const &file) auto res = serialImplementation(); #endif + bool initialize_external_block_storage = + m_deferredExternalBlockstorageConfig.has_value(); + if (res->contains(JSONDefaults::openpmd_internal)) { auto const &openpmd_internal = res->at(JSONDefaults::openpmd_internal); @@ -1994,6 +2387,10 @@ JSONIOHandlerImpl::obtainJsonContents(File const &file) { m_datasetMode.m_mode = DatasetMode::Template; } + else if (modeOption.value() == "external") + { + initialize_external_block_storage = true; + } else { std::cerr << "[JSON/TOML backend] Warning: Invalid value '" @@ -2037,6 +2434,31 @@ JSONIOHandlerImpl::obtainJsonContents(File const &file) } } } + + if (initialize_external_block_storage) + { + auto previousConfig = [&]() -> std::optional { + if (res->contains("external_storage")) + { + return std::make_optional( + &res->at("external_storage")); + } + else + { + return std::nullopt; + } + }(); + auto manual_config = m_deferredExternalBlockstorageConfig.has_value() + ? std::move(*m_deferredExternalBlockstorageConfig) + : openPMD::json::TracingJSON(); + parse_external_mode( + manual_config, previousConfig, backendConfigKey(), m_datasetMode); + warnUnusedJson(manual_config, "dataset", "mode"); + m_attributeMode.m_specificationVia = SpecificationVia::Manually; + + m_deferredExternalBlockstorageConfig.reset(); + } + m_jsonVals.emplace(file, res); return res; } @@ -2062,18 +2484,25 @@ auto JSONIOHandlerImpl::putJsonContents( return it; } - switch (m_datasetMode.m_mode) - { - case DatasetMode::Dataset: - (*it->second)["platform_byte_widths"] = platformSpecifics(); - (*it->second)[JSONDefaults::openpmd_internal] - [JSONDefaults::DatasetMode] = "dataset"; - break; - case DatasetMode::Template: - (*it->second)[JSONDefaults::openpmd_internal] - [JSONDefaults::DatasetMode] = "template"; - break; - } + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + (*it->second)["platform_byte_widths"] = platformSpecifics(); + (*it->second)[JSONDefaults::openpmd_internal] + [JSONDefaults::DatasetMode] = "dataset"; + }, + [&](DatasetMode::Template_t const &) { + (*it->second)[JSONDefaults::openpmd_internal] + [JSONDefaults::DatasetMode] = "template"; + }, + [&](DatasetMode::External_t const &external) { + (*it->second)["platform_byte_widths"] = platformSpecifics(); + (*it->second)["external_storage"] = + external->externalStorageLocation(); + (*it->second)[JSONDefaults::openpmd_internal] + [JSONDefaults::DatasetMode] = "external"; + }}, + m_datasetMode.m_mode.as_base()); switch (m_attributeMode.m_mode) { @@ -2112,53 +2541,37 @@ auto JSONIOHandlerImpl::putJsonContents( }; #if openPMD_HAVE_MPI - auto num_digits = [](unsigned n) -> unsigned { - constexpr auto max = std::numeric_limits::max(); - unsigned base_10 = 1; - unsigned res = 1; - while (base_10 < max) - { - base_10 *= 10; - if (n / base_10 == 0) - { - return res; - } - ++res; + auto parallelImplementation = [this, &filename, &writeSingleFile]( + MPI_Comm comm) { + auto path = fullPath(*filename); + auto dirpath = path + ".parallel"; + if (!auxiliary::create_directories(dirpath)) + { + throw std::runtime_error( + "Failed creating directory '" + dirpath + + "' for parallel JSON output"); } - return res; - }; - - auto parallelImplementation = - [this, &filename, &writeSingleFile, &num_digits](MPI_Comm comm) { - auto path = fullPath(*filename); - auto dirpath = path + ".parallel"; - if (!auxiliary::create_directories(dirpath)) - { - throw std::runtime_error( - "Failed creating directory '" + dirpath + - "' for parallel JSON output"); - } - int rank = 0, size = 0; - MPI_Comm_rank(comm, &rank); - MPI_Comm_size(comm, &size); - std::stringstream subfilePath; - // writeSingleFile will prepend the base dir - subfilePath << *filename << ".parallel/mpi_rank_" - << std::setw(num_digits(size - 1)) << std::setfill('0') - << rank << [&]() { - switch (m_fileFormat) - { - case FileFormat::Json: - return ".json"; - case FileFormat::Toml: - return ".toml"; - } - throw std::runtime_error("Unreachable!"); - }(); - writeSingleFile(subfilePath.str()); - if (rank == 0) - { - constexpr char const *readme_msg = R"( + int rank = 0, size = 0; + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + std::stringstream subfilePath; + // writeSingleFile will prepend the base dir + subfilePath << *filename << ".parallel/mpi_rank_"; + write_rank_to_stream_with_sufficient_padding(subfilePath, rank, size) + << [&]() { + switch (m_fileFormat) + { + case FileFormat::Json: + return ".json"; + case FileFormat::Toml: + return ".toml"; + } + throw std::runtime_error("Unreachable!"); + }(); + writeSingleFile(subfilePath.str()); + if (rank == 0) + { + constexpr char const *readme_msg = R"( This folder has been created by a parallel instance of the JSON backend in openPMD. There is one JSON file for each parallel writer MPI rank. The parallel JSON backend performs no metadata or data aggregation at all. @@ -2168,26 +2581,90 @@ There is no support in the openPMD-api for reading this folder as a single dataset. For reading purposes, either pick a single .json file and read that, or merge the .json files somehow (no tooling provided for this (yet)). )"; - std::fstream readme_file; - readme_file.open( - dirpath + "/README.txt", - std::ios_base::out | std::ios_base::trunc); - readme_file << readme_msg + 1; - readme_file.close(); - if (!readme_file.good() && - !filename.fileState->printedReadmeWarningAlready) - { - std::cerr - << "[Warning] Something went wrong in trying to create " - "README file at '" - << dirpath - << "/README.txt'. Will ignore and continue. The README " - "message would have been:\n----------\n" - << readme_msg + 1 << "----------" << std::endl; - filename.fileState->printedReadmeWarningAlready = true; - } + std::fstream readme_file; + readme_file.open( + dirpath + "/README.txt", + std::ios_base::out | std::ios_base::trunc); + readme_file << &readme_msg[1]; + readme_file.close(); + if (!readme_file.good() && + !filename.fileState->printedReadmeWarningAlready) + { + std::cerr + << "[Warning] Something went wrong in trying to create " + "README file at '" + << dirpath + << "/README.txt'. Will ignore and continue. The README " + "message would have been:\n----------\n" + << readme_msg + 1 << "----------" << std::endl; + filename.fileState->printedReadmeWarningAlready = true; } - }; + + constexpr char const *merge_script = R"END( +#!/usr/bin/env bash + +set -euo pipefail + +parallel_dir="$(dirname "$BASH_SOURCE")" +parallel_dir="$(cd "$parallel_dir" && pwd)" +serial_dir="${parallel_dir%.json.parallel}" +if [[ "$serial_dir" = "$parallel_dir" ]]; then + serial_dir="$parallel_dir/merged.json" +else + serial_dir="$serial_dir.json" +fi +echo "Will merge files to '$serial_dir'." >&2 +if [[ -e "$serial_dir" ]]; then + echo "Target file already exists, aborting." >&2 + exit 1 +fi +if ! which openpmd-merge-json >/dev/null 2>&1; then + echo "Did not find 'openpmd-merge-json' on PATH, aborting." >&2 + exit 1 +fi +for file in "$parallel_dir"/mpi_rank_*.json; do + echo "$file" +done | + openpmd-merge-json >"$serial_dir" +)END"; + std::string const merge_script_path = dirpath + "/merge.sh"; + std::fstream merge_file; + merge_file.open( + merge_script_path, std::ios_base::out | std::ios_base::trunc); + merge_file << &merge_script[1]; + merge_file.close(); + + if (!merge_file.good() && + !filename.fileState->printedReadmeWarningAlready) + { + std::cerr + << "[Warning] Something went wrong in trying to create " + "merge script at '" + << merge_script_path << "'. Will ignore and continue." + << std::endl; + filename.fileState->printedReadmeWarningAlready = true; + } + +#if openPMD_USE_FILESYSTEM_HEADER + try + { + std::filesystem::permissions( + merge_script_path, + std::filesystem::perms::owner_exec | + std::filesystem::perms::owner_exec | + std::filesystem::perms::owner_exec, + std::filesystem::perm_options::add); + } + catch (std::filesystem::filesystem_error const &e) + { + std::cerr << "Failed setting executable permissions on '" + << merge_script_path + << "', will ignore. Original error was:\n" + << e.what() << std::endl; + } +#endif + } + }; std::shared_ptr res; if (m_communicator.has_value()) @@ -2318,7 +2795,7 @@ auto JSONIOHandlerImpl::verifyDataset( try { Extent datasetExtent; - std::tie(datasetExtent, res) = getExtent(j); + std::tie(datasetExtent, res) = getExtent(j, m_datasetMode.m_mode); VERIFY_ALWAYS( datasetExtent.size() == parameters.extent.size(), "[JSON] Read/Write request does not fit the dataset's dimension"); @@ -2366,7 +2843,7 @@ nlohmann::json JSONIOHandlerImpl::platformSpecifics() Datatype::BOOL}; for (auto &datatype : datatypes) { - res[jsonDatatypeToString(datatype)] = toBytes(datatype); + res[internal::jsonDatatypeToString(datatype)] = toBytes(datatype); } return res; } diff --git a/src/Series.cpp b/src/Series.cpp index a1411a2dbb..a5b8a0672d 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -47,6 +47,8 @@ #include "openPMD/snapshots/StatefulIterator.hpp" #include "openPMD/version.hpp" +#include + #include #include #include @@ -1043,38 +1045,25 @@ void Series::init( } } -template -auto Series::initIOHandler( - std::string const &filepath, - std::string const &options, - Access at, - bool resolve_generic_extension, - MPI_Communicator &&...comm) - -> std::tuple, TracingJSON> +namespace { - auto &series = get(); - - json::TracingJSON optionsJson = json::parseOptions( - options, - std::forward(comm)..., - /* considerFiles = */ true); - auto input = parseInput(filepath); - if (resolve_generic_extension && input->format == Format::GENERIC && - !access::create(at)) + template + void do_resolve_generic_extension_read( + ParsedInput_t &input, std::string const &filepath, Access at) { auto isPartOfSeries = - input->iterationEncoding == IterationEncoding::fileBased + input.iterationEncoding == IterationEncoding::fileBased ? matcher( - input->filenamePrefix, - input->filenamePadding, - input->filenamePostfix, + input.filenamePrefix, + input.filenamePadding, + input.filenamePostfix, std::nullopt) - : matcher(input->name, -1, "", std::nullopt); + : matcher(input.name, -1, "", std::nullopt); std::optional extension; std::set additional_extensions; autoDetectPadding( isPartOfSeries, - input->path, + input.path, [&extension, &additional_extensions](std::string const &, Match const &match) { auto const &ext = match.extension.value(); @@ -1107,8 +1096,8 @@ auto Series::initIOHandler( std::nullopt, error.str()); } - input->filenameExtension = *extension; - input->format = determineFormat(*extension); + input.filenameExtension = *extension; + input.format = determineFormat(*extension); } else if (access::read(at)) { @@ -1120,30 +1109,68 @@ auto Series::initIOHandler( } } + template + void do_resolve_generic_extension_write(ParsedInput_t &input) + { + { + if (input.format == /* still */ Format::GENERIC) + { + throw error::WrongAPIUsage( + "Unable to automatically determine filename extension. " + "Please " + "specify in some way."); + } + else if (input.format == Format::ADIOS2_BP) + { + // Since ADIOS2 has multiple extensions depending on the engine, + // we need to pass this job on to the backend + input.filenameExtension = ".%E"; + } + else + { + input.filenameExtension = suffix(input.format); + } + } + } +} // namespace + +template +auto Series::initIOHandler( + std::string const &filepath, + std::string const &options, + Access at, + bool resolve_generic_extension, + MPI_Communicator &&...comm) + -> std::tuple, TracingJSON> +{ + auto &series = get(); + + json::TracingJSON optionsJson = json::parseOptions( + options, + std::forward(comm)..., + /* considerFiles = */ true); + auto input = parseInput(filepath); + + if (resolve_generic_extension && input->format == Format::GENERIC && + !access::create(at)) + { + do_resolve_generic_extension_read(*input, filepath, at); + } + // default options series.m_parseLazily = at == Access::READ_LINEAR; // now check for user-specified options parseJsonOptions(optionsJson, *input); + if (series.m_manageAwsAPI.has_value()) + { + Aws::InitAPI(*series.m_manageAwsAPI); + } + if (resolve_generic_extension && !input->filenameExtension.has_value()) { - if (input->format == /* still */ Format::GENERIC) - { - throw error::WrongAPIUsage( - "Unable to automatically determine filename extension. Please " - "specify in some way."); - } - else if (input->format == Format::ADIOS2_BP) - { - // Since ADIOS2 has multiple extensions depending on the engine, - // we need to pass this job on to the backend - input->filenameExtension = ".%E"; - } - else - { - input->filenameExtension = suffix(input->format); - } + do_resolve_generic_extension_write(*input); } return std::make_tuple(std::move(input), std::move(optionsJson)); } @@ -3146,6 +3173,14 @@ void Series::parseJsonOptions(TracingJSON &options, ParsedInput &input) { series.m_rankTable.m_rankTableSource = std::move(rankTableSource); } + { + bool doManageAwsAPI = false; + getJsonOption(options, "init_aws_api", doManageAwsAPI); + if (doManageAwsAPI) + { + series.m_manageAwsAPI = std::make_optional(); + } + } // backend key { std::map const backendDescriptors{ @@ -3232,6 +3267,10 @@ namespace internal // we must not throw in a destructor try { + if (m_manageAwsAPI.has_value()) + { + Aws::ShutdownAPI(*m_manageAwsAPI); + } close(); } catch (std::exception const &ex) diff --git a/src/cli/convert-toml-json.cpp b/src/cli/convert-toml-json.cpp index 60017fd8e4..4429e9c7bd 100644 --- a/src/cli/convert-toml-json.cpp +++ b/src/cli/convert-toml-json.cpp @@ -1,55 +1,8 @@ -#include -#include -#include +#include "openPMD/cli/convert-toml-json.hpp" -#include -#include -#include - -namespace json = openPMD::json; - -void parsed_main(std::string jsonOrToml) -{ - auto [config, originallySpecifiedAs] = json::parseOptions( - jsonOrToml, /* considerFiles = */ true, /* convertLowercase = */ false); - { - // NOLINTNEXTLINE(bugprone-unused-local-non-trivial-variable) - [[maybe_unused]] auto _ = std::move(jsonOrToml); - } - switch (originallySpecifiedAs) - { - using SL = json::SupportedLanguages; - case SL::JSON: { - auto asToml = json::jsonToToml(config); - std::cout << json::format_toml(asToml); - } - break; - case SL::TOML: - std::cout << config << '\n'; - break; - } -} - -int main(int argc, char const **argv) +void print_help_message(char const *program_name) { - std::string jsonOrToml; - switch (argc) - { - case 0: - case 1: - // Just read the whole stream into memory - // Not very elegant, but we'll hold the entire JSON/TOML dataset - // in memory at some point anyway, so it doesn't really matter - { - std::stringbuf readEverything; - std::cin >> &readEverything; - jsonOrToml = readEverything.str(); - } - break; - case 2: - if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-h") == 0) - { - std::cout << "Usage: " << std::string(argv[0]) << R"( [json_or_toml] + std::cout << "Usage: " << std::string(program_name) << R"( [json_or_toml] 'json_or_toml' can be a JSON or TOML dataset specified inline or a reference to a file prepended by an '@'. Inline datasets will be interpreted as JSON if they start with an '{', as TOML @@ -60,14 +13,11 @@ Inline dataset specifications can be replaced by input read from stdin. If the input is JSON, then it will be converted to TOML and written to stdout, equivalently from TOML to JSON. )"; - exit(0); - } - jsonOrToml = argv[1]; - break; - default: - throw std::runtime_error( - std::string("Usage: ") + argv[0] + - " [file location or inline JSON/TOML]"); - } - parsed_main(std::move(jsonOrToml)); +} + +int main(int argc, char const **argv) +{ + using convert = convert_json_toml; + convert::run_application( + argc, argv, convert::UseStdinAs::InlineJson, print_help_message); } diff --git a/src/cli/merge-json.cpp b/src/cli/merge-json.cpp new file mode 100644 index 0000000000..79312a2388 --- /dev/null +++ b/src/cli/merge-json.cpp @@ -0,0 +1,27 @@ +#include "openPMD/cli/convert-toml-json.hpp" + +void print_help_message(char const *program_name) +{ + std::cout << "Merge multiple JSON/TOML files into one.\nUsage: " + << std::string(program_name) << R"( [json_or_toml]+ +'json_or_toml' can be a JSON or TOML dataset specified inline or a reference +to a file prepended by an '@'. +Inline datasets will be interpreted as JSON if they start with an '{', as TOML +otherwise. Datasets from a file will be interpreted as JSON or TOML depending +on the file ending '.json' or '.toml' respectively. + +In order to support large numbers of files to be merged, the paths to those +files can also be specified line-by-line per stdin, replacing the limitations +of command line arguments. + +If the JSON/TOML files are mixed, then the output type (JSON or TOML) will be +determined by the type of the first file. +)"; +} + +int main(int argc, char const **argv) +{ + using convert = convert_json_toml; + convert::run_application( + argc, argv, convert::UseStdinAs::ListOfJson, print_help_message); +} diff --git a/src/toolkit/Aws.cpp b/src/toolkit/Aws.cpp new file mode 100644 index 0000000000..c9a7b71fad --- /dev/null +++ b/src/toolkit/Aws.cpp @@ -0,0 +1,133 @@ +#include "openPMD/toolkit/Aws.hpp" + +#include +#include +#include + +#include +#include + +namespace +{ +struct membuf : std::streambuf +{ + membuf(char const *base, std::size_t size) + { + auto p = const_cast(base); + this->setg(p, p, p + size); + } +}; + +struct imemstream : std::iostream +{ + imemstream(char const *base, std::size_t size) + : std::iostream(&m_buf), m_buf(base, size) + {} + +private: + membuf m_buf; +}; +} // namespace + +namespace openPMD::internal +{ +ExternalBlockStorageAws::ExternalBlockStorageAws( + Aws::S3::S3Client client, + std::string bucketName, + std::optional endpoint) + : m_client{std::move(client)} + , m_bucketName(std::move(bucketName)) + , m_endpoint(std::move(endpoint)) +{ + Aws::S3::Model::CreateBucketRequest create_request; + create_request.SetBucket(m_bucketName); + auto create_outcome = m_client.CreateBucket(create_request); + if (!create_outcome.IsSuccess()) + { + std::cerr << "[ExternalBlockStorageAws::ExternalBlockStorageAws] " + "Warning: Failed to create bucket (may already exist): " + << create_outcome.GetError().GetMessage() << std::endl; + } + else + { + std::cout << "Bucket created: " << m_bucketName << std::endl; + } +} +ExternalBlockStorageAws::~ExternalBlockStorageAws() = default; + +auto ExternalBlockStorageAws::put( + std::string const &identifier, void const *data, size_t len) -> std::string +{ + auto sanitized = !identifier.empty() && identifier.at(0) == '/' + ? identifier.substr(1) + : identifier; + + Aws::S3::Model::PutObjectRequest put_request; + put_request.SetBucket(m_bucketName); + put_request.SetKey(sanitized); + + auto input_data = Aws::MakeShared( + "PutObjectInputStream", reinterpret_cast(data), len); + put_request.SetBody(input_data); + put_request.SetContentLength(static_cast(len)); + + auto put_outcome = m_client.PutObject(put_request); + + if (put_outcome.IsSuccess()) + { + std::cout << "File uploaded successfully to S3!" << std::endl; + } + else + { + std::cerr << "Upload failed: " << put_outcome.GetError().GetMessage() + << std::endl; + } + return sanitized; +} + +void ExternalBlockStorageAws::get( + std::string const &external_ref, void *data, size_t len) +{ + if (len == 0) + { + return; + } + + Aws::S3::Model::GetObjectRequest get_request; + get_request.SetBucket(m_bucketName); + get_request.SetKey(external_ref); + + auto get_outcome = m_client.GetObject(get_request); + if (!get_outcome.IsSuccess()) + { + throw std::runtime_error( + std::string("ExternalBlockStorageAws::get failed: ") + + get_outcome.GetError().GetMessage()); + } + + auto &body = get_outcome.GetResult().GetBody(); + body.read( + reinterpret_cast(data), static_cast(len)); + std::streamsize read_bytes = body.gcount(); + if (read_bytes != static_cast(len)) + { + throw std::runtime_error( + "ExternalBlockStorageAws: failed to read expected number of bytes " + "from S3 object"); + } +} + +[[nodiscard]] auto ExternalBlockStorageAws::externalStorageLocation() const + -> nlohmann::json +{ + nlohmann::json j; + j["provider"] = "aws"; + if (m_endpoint.has_value()) + { + j["endpoint"] = *m_endpoint; + } + j["bucket"] = m_bucketName; + return j; +} + +} // namespace openPMD::internal diff --git a/src/toolkit/AwsBuilder.cpp b/src/toolkit/AwsBuilder.cpp new file mode 100644 index 0000000000..cc3cdc87ef --- /dev/null +++ b/src/toolkit/AwsBuilder.cpp @@ -0,0 +1,130 @@ +#include "openPMD/toolkit/AwsBuilder.hpp" + +#include "openPMD/toolkit/Aws.hpp" +#include "openPMD/toolkit/ExternalBlockStorage.hpp" + +#include +#include +#include + +namespace openPMD::internal +{ +AwsBuilder::AwsBuilder( + std::string bucketName, std::string accessKeyId, std::string secretKey) + : m_bucketName(std::move(bucketName)) + , m_accessKeyId(std::move(accessKeyId)) + , m_secretKey(std::move(secretKey)) +{} + +auto AwsBuilder::setBucketName(std::string bucketName) -> AwsBuilder & +{ + m_bucketName = std::move(bucketName); + return *this; +} + +auto internal::AwsBuilder::setCredentials( + std::string accessKeyId, std::string secretKey) -> AwsBuilder & +{ + m_accessKeyId = std::move(accessKeyId); + m_secretKey = std::move(secretKey); + return *this; +} + +auto AwsBuilder::setEndpointOverride(std::string endpoint) -> AwsBuilder & +{ + m_endpointOverride = std::move(endpoint); + return *this; +} + +auto AwsBuilder::setRegion(std::string regionName) -> AwsBuilder & +{ + m_region = std::move(regionName); + return *this; +} + +auto AwsBuilder::setScheme(Scheme s) -> AwsBuilder & +{ + m_scheme = s; + return *this; +} + +auto AwsBuilder::setVerifySSL(bool verify) -> AwsBuilder & +{ + m_verifySSL = verify; + return *this; +} + +auto internal::AwsBuilder::setSessionToken(std::string sessionToken) + -> AwsBuilder & +{ + m_sessionToken = std::move(sessionToken); + return *this; +} + +AwsBuilder::operator ExternalBlockStorage() +{ + Aws::Client::ClientConfiguration config; + + if (m_endpointOverride.has_value()) + { + config.endpointOverride = *m_endpointOverride; + } + if (m_region.has_value()) + { + config.region = *m_region; + } + else + { + config.region = "us-east-1"; + } + if (m_scheme.has_value()) + { + switch (*m_scheme) + { + case Scheme::HTTP: + config.scheme = Aws::Http::Scheme::HTTP; + break; + case Scheme::HTTPS: + config.scheme = Aws::Http::Scheme::HTTPS; + break; + break; + } + } + + config.connectTimeoutMs = 5000; + config.requestTimeoutMs = 15000; + + if (m_verifySSL.has_value()) + { + config.verifySSL = *m_verifySSL; + } + + auto aws_credentials = [&]() -> Aws::Auth::AWSCredentials { + if (m_sessionToken.has_value()) + { + return {m_accessKeyId, m_secretKey, *m_sessionToken}; + } + else + { + return {m_accessKeyId, m_secretKey}; + } + }(); + + Aws::S3::S3Client s3_client( + aws_credentials, + config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + false); + + return ExternalBlockStorage{std::make_unique( + std::move(s3_client), + std::move(m_bucketName), + std::move(m_endpointOverride))}; +} + +auto AwsBuilder::build() -> ExternalBlockStorage +{ + return *this; +} + +} // namespace openPMD::internal diff --git a/src/toolkit/ExternalBlockStorage.cpp b/src/toolkit/ExternalBlockStorage.cpp new file mode 100644 index 0000000000..4a4d37fa96 --- /dev/null +++ b/src/toolkit/ExternalBlockStorage.cpp @@ -0,0 +1,258 @@ +#include "openPMD/toolkit/ExternalBlockStorage.hpp" + +#include "openPMD/DatatypeMacros.hpp" +#include "openPMD/IO/JSON/JSONIOHandlerImpl.hpp" +#include "openPMD/auxiliary/StringManip.hpp" + +#include + +#include +#include +#include + +namespace openPMD::internal +{ +ExternalBlockStorageBackend::~ExternalBlockStorageBackend() = default; +} + +namespace openPMD +{ + +namespace +{ + auto flat_extent(Extent const &e) -> size_t + { + return std::accumulate( + e.begin(), e.end(), 1, [](size_t left, size_t right) { + return left * right; + }); + } + + template + void read_impl( + internal::ExternalBlockStorageBackend *backend, + nlohmann::json const &external_block, + T *data, + size_t len) + { + auto const &external_ref = + external_block.at("external_ref").get(); + backend->get(external_ref, data, sizeof(T) * len); + } +} // namespace + +ExternalBlockStorage::ExternalBlockStorage() = default; +ExternalBlockStorage::ExternalBlockStorage( + std::unique_ptr worker) + : m_worker(std::move(worker)) +{} + +auto ExternalBlockStorage::makeStdioSession(std::string directory) + -> internal::StdioBuilder +{ + return internal::StdioBuilder{std::move(directory)}; +} + +auto ExternalBlockStorage::makeAwsSession( + std::string bucketName, std::string accessKeyId, std::string secretKey) + -> internal::AwsBuilder +{ + return internal::AwsBuilder( + std::move(bucketName), std::move(accessKeyId), std::move(secretKey)); +} + +template +auto ExternalBlockStorage::store( + Extent const &globalExtent, + Offset const &blockOffset, + Extent const &blockExtent, + nlohmann::json &fullJsonDataset, + nlohmann::json::json_pointer const &path, + std::optional infix, + T const *data) -> std::string +{ + auto &dataset = fullJsonDataset[path]; + + using running_index_t = uint64_t; + running_index_t running_index = [&]() -> running_index_t { + if (auto it = dataset.find("_running_index"); it != dataset.end()) + { + auto res = it->get(); + ++res; + *it = res; + return res; + } + else + { + dataset["_running_index"] = 0; + return 0; + } + }(); + + constexpr size_t padding = 6; + std::string index_as_str = [running_index, &infix]() { + auto res = std::to_string(running_index); + auto size = res.size(); + if (size >= padding) + { + return res; + } + std::stringstream padded; + if (infix.has_value()) + { + padded << *infix << "--"; + } + for (size_t i = 0; i < padding - size; ++i) + { + padded << '0'; + } + padded << res; + return padded.str(); + }(); + + if (dataset.contains(index_as_str)) + { + throw std::runtime_error( + "Inconsistent state: Index " + index_as_str + " already in use."); + } + + auto check_metadata = [&dataset](char const *key, auto const &value) { + using value_t = + std::remove_reference_t>; + if (auto it = dataset.find(key); it != dataset.end()) + { + auto const &stored_value = it->get(); + if (stored_value != value) + { + throw std::runtime_error( + "Inconsistent chunk storage in key " + std::string(key) + + "."); + } + } + else + { + dataset[key] = value; + } + }; + if (!DatatypeHandling::template encodeDatatype(dataset)) + { + throw std::runtime_error("Inconsistent chunk storage in datatype."); + } + check_metadata("byte_width", sizeof(T)); + check_metadata("extent", globalExtent); + + auto &block = dataset["external_blocks"][index_as_str]; + block["offset"] = blockOffset; + block["extent"] = blockExtent; + std::stringstream filesystem_identifier; + filesystem_identifier << path.to_string(); + filesystem_identifier << "--" << index_as_str; + auto escaped_filesystem_identifier = m_worker->put( + filesystem_identifier.str(), + data, + sizeof(T) * flat_extent(blockExtent)); + block["external_ref"] = escaped_filesystem_identifier; + return index_as_str; +} + +template +void ExternalBlockStorage::read( + [[maybe_unused]] std::string const &identifier, + [[maybe_unused]] nlohmann::json const &fullJsonDataset, + [[maybe_unused]] nlohmann::json::json_pointer const &path, + [[maybe_unused]] T *data) +{} + +template +void ExternalBlockStorage::read( + Offset const &blockOffset, + Extent const &blockExtent, + nlohmann::json const &fullJsonDataset, + nlohmann::json::json_pointer const &path, + T *data) +{ + auto &dataset = fullJsonDataset[path]; + if (!DatatypeHandling::template checkDatatype(dataset)) + { + throw std::runtime_error("Inconsistent chunk storage in datatype."); + } + auto external_blocks = dataset.at("external_blocks"); + bool found_a_precise_match = false; + for (auto it = external_blocks.begin(); it != external_blocks.end(); ++it) + { + auto const &block = it.value(); + try + { + auto const &o = block.at("offset").get(); + auto const &e = block.at("extent").get(); + // Look only for exact matches for now + if (o != blockOffset || e != blockExtent) + { + continue; + } + found_a_precise_match = true; + read_impl(m_worker.get(), block, data, flat_extent(blockExtent)); + break; + } + catch (nlohmann::json::exception const &e) + { + std::cerr << "[ExternalBlockStorage::read] Could not parse block '" + << it.key() << "'. Original error was:\n" + << e.what(); + } + } + if (!found_a_precise_match) + { + throw std::runtime_error( + "[ExternalBlockStorage::read] Unable to find a precise match for " + "offset " + + auxiliary::vec_as_string(blockOffset) + " and extent " + + auxiliary::vec_as_string(blockExtent)); + } +} + +[[nodiscard]] auto ExternalBlockStorage::externalStorageLocation() const + -> nlohmann::json +{ + return m_worker->externalStorageLocation(); +} + +void ExternalBlockStorage::sanitizeString(std::string &s) +{ + for (char &c : s) + { + if (c == '/' || c == '\\' || c == ':' || c == '*' || c == '?' || + c == '"' || c == '<' || c == '>' || c == '|' || c == '\n' || + c == '\r' || c == '\t' || c == '\0' || c == ' ') + { + c = '_'; + } + } +} + +#define OPENPMD_INSTANTIATE_DATATYPEHANDLING(datatypehandling, type) \ + template auto ExternalBlockStorage::store( \ + Extent const &globalExtent, \ + Offset const &blockOffset, \ + Extent const &blockExtent, \ + nlohmann::json &fullJsonDataset, \ + nlohmann::json::json_pointer const &path, \ + std::optional infix, \ + type const *data) -> std::string; \ + template void ExternalBlockStorage::read( \ + std::string const &identifier, \ + nlohmann::json const &fullJsonDataset, \ + nlohmann::json::json_pointer const &path, \ + type *data); \ + template void ExternalBlockStorage::read( \ + Offset const &blockOffset, \ + Extent const &blockExtent, \ + nlohmann::json const &fullJsonDataset, \ + nlohmann::json::json_pointer const &path, \ + type *data); +#define OPENPMD_INSTANTIATE(type) \ + OPENPMD_INSTANTIATE_DATATYPEHANDLING(internal::JsonDatatypeHandling, type) +OPENPMD_FOREACH_DATASET_DATATYPE(OPENPMD_INSTANTIATE) +#undef OPENPMD_INSTANTIATE + +} // namespace openPMD diff --git a/src/toolkit/Stdio.cpp b/src/toolkit/Stdio.cpp new file mode 100644 index 0000000000..ddf7da7178 --- /dev/null +++ b/src/toolkit/Stdio.cpp @@ -0,0 +1,146 @@ +#include "openPMD/toolkit/Stdio.hpp" + +#include "openPMD/auxiliary/Filesystem.hpp" + +#include +#include + +namespace +{ +auto concat_filepath(std::string const &s1, std::string const &s2) + -> std::string +{ + if (s1.empty()) + { + return s2; + } + if (s2.empty()) + { + return s1; + } + bool ends_with_slash = + *s1.crbegin() == openPMD::auxiliary::directory_separator; + bool starts_with_slash = + *s2.cbegin() == openPMD::auxiliary::directory_separator; + + if (ends_with_slash ^ starts_with_slash) + { + return s1 + s2; + } + else if (ends_with_slash && starts_with_slash) + { + return s1 + (s2.c_str() + 1); + } + else + { + return s1 + openPMD::auxiliary::directory_separator + s2; + } +} +} // namespace + +namespace openPMD::internal +{ +ExternalBlockStorageStdio::ExternalBlockStorageStdio( + std::string directory, std::string openMode) + : m_directory(std::move(directory)), m_openMode(std::move(openMode)) +{ + if (m_directory.empty()) + { + throw std::invalid_argument( + "ExternalBlockStorageStdio: directory cannot be empty"); + } + + if (!auxiliary::create_directories(m_directory)) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to create or access " + "directory: " + + m_directory); + } +} + +ExternalBlockStorageStdio::~ExternalBlockStorageStdio() = default; + +auto ExternalBlockStorageStdio::put( + std::string const &identifier, void const *data, size_t len) -> std::string +{ + auto sanitized = identifier + ".dat"; + ExternalBlockStorage::sanitizeString(sanitized); + std::string filepath = concat_filepath(m_directory, sanitized); + + if (len == 0) + { + return filepath; + } + + FILE *file = std::fopen(filepath.c_str(), m_openMode.c_str()); + if (!file) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to open file for writing: " + + filepath); + } + + size_t written = std::fwrite(data, 1, len, file); + if (written != len) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to write full data to file: " + + filepath); + } + + if (std::fclose(file) != 0) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to close file after writing: " + + filepath); + } + + return sanitized; +} + +void ExternalBlockStorageStdio::get( + std::string const &external_ref, void *data, size_t len) +{ + if (len == 0) + { + return; + } + + std::string filepath = concat_filepath(m_directory, external_ref); + + FILE *file = std::fopen(filepath.c_str(), "rb"); + if (!file) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to open file for reading: " + + filepath); + } + + size_t read = std::fread(data, 1, len, file); + if (read != len) + { + std::fclose(file); + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to read full data from file: " + + filepath); + } + + if (std::fclose(file) != 0) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to close file after reading: " + + filepath); + } +} + +[[nodiscard]] auto ExternalBlockStorageStdio::externalStorageLocation() const + -> nlohmann::json +{ + nlohmann::json j; + j["provider"] = "stdio"; + j["directory"] = m_directory; + j["open_mode"] = m_openMode; + return j; +} +} // namespace openPMD::internal diff --git a/src/toolkit/StdioBuilder.cpp b/src/toolkit/StdioBuilder.cpp new file mode 100644 index 0000000000..8fa5f6bb6f --- /dev/null +++ b/src/toolkit/StdioBuilder.cpp @@ -0,0 +1,31 @@ +#include "openPMD/toolkit/StdioBuilder.hpp" + +#include "openPMD/toolkit/ExternalBlockStorage.hpp" +#include "openPMD/toolkit/Stdio.hpp" + +#include + +namespace openPMD::internal +{ +auto StdioBuilder::setDirectory(std::string directory) -> StdioBuilder & +{ + m_directory = std::move(directory); + return *this; +} +auto StdioBuilder::setOpenMode(std::string openMode) -> StdioBuilder & +{ + m_openMode = std::move(openMode); + return *this; +} + +StdioBuilder::operator ExternalBlockStorage() +{ + return ExternalBlockStorage{std::make_unique( + std::move(m_directory), std::move(m_openMode).value_or("wb"))}; +} + +auto StdioBuilder::build() -> ExternalBlockStorage +{ + return *this; +} +} // namespace openPMD::internal diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 432dd864f3..b042f07288 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -5243,6 +5243,27 @@ TEST_CASE("bp4_steps", "[serial][adios2]") void serial_iterator(std::string const &file) { + auto const write_config = R"( +init_aws_api = true +rank_table = "posix_hostname" + +[json.attribute] +mode = "short" + +[json.dataset.mode] +provider = "aws" +access_key_id = "test" +secret_access_key = "test" +endpoint = "http://localhost:4566" +bucket = "simdata" + )"; + auto const read_config = R"( +init_aws_api = true + +[json.dataset.mode] +access_key_id = "test" +secret_access_key = "test" + )"; constexpr Extent::value_type extent = 1000; { Series writeSeries( @@ -5250,7 +5271,7 @@ void serial_iterator(std::string const &file) Access::CREATE_LINEAR #ifndef _WIN32 , - R"({"rank_table": "posix_hostname"})" + write_config #endif ); auto iterations = writeSeries.snapshots(); @@ -5265,7 +5286,7 @@ void serial_iterator(std::string const &file) } } - Series readSeries(file, Access::READ_ONLY); + Series readSeries(file, Access::READ_ONLY, read_config); size_t last_iteration_index = 0; size_t numberOfIterations = 0; @@ -5301,19 +5322,23 @@ void serial_iterator(std::string const &file) TEST_CASE("serial_iterator", "[serial][adios2]") { - for (auto const &t : testedFileExtensions()) - { -#ifdef _WIN32 - serial_iterator("../samples/serial_iterator_filebased_%T." + t); - serial_iterator("../samples/serial_iterator_groupbased." + t); -#else - // Add some regex characters into the file names to see that we can deal - // with that. Don't do that on Windows because Windows does not like - // those characters within file paths. - serial_iterator("../samples/serial_iterator_filebased_+?_%T." + t); - serial_iterator("../samples/serial_iterator_groupbased_+?." + t); -#endif - } + serial_iterator("../samples/serial_iterator.json"); + // for (auto const &t : testedFileExtensions()) + // { + // #ifdef _WIN32 + // serial_iterator("../samples/serial_iterator_filebased_%T." + t); + // serial_iterator("../samples/serial_iterator_groupbased." + t); + // #else + // // Add some regex characters into the file names to see that we + // can deal + // // with that. Don't do that on Windows because Windows does not + // like + // // those characters within file paths. + // serial_iterator("../samples/serial_iterator_filebased_+?_%T." + + // t); serial_iterator("../samples/serial_iterator_groupbased_+?." + + // t); + // #endif + // } } void variableBasedSingleIteration(std::string const &file)