#include "rcache.hpp" #include #include #include #include "buffer.hpp" #include "common.hpp" using namespace rlib; static constexpr auto rcache_file_flags(bool readonly) -> IO::Flags { return (readonly ? IO::READ : IO::WRITE) | IO::NO_INTERUPT | IO::NO_OVERGROW; } static auto rcache_file_path(fs::path base, std::size_t index) -> fs::path { if (!index) return base; return std::move(base.replace_extension(fmt::format(".{:05d}.bundle", index))); } RCache::RCache(Options const& options) : options_(options) { if (!options_.readonly) { options_.flush_size = std::max(1 * MiB, options_.flush_size); options_.max_size = std::max(options_.flush_size * 2, options_.max_size) - options_.flush_size; } if (fs::exists(options.path) && fs::is_directory(options.path)) { this->load_folder_internal(); } else if (options_.readonly) { this->load_file_internal_read_only(); } else { this->load_file_internal_read_write(); } } RCache::~RCache() { this->flush_internal(); } auto RCache::add(RChunk const& chunk, std::span data) -> bool { if (!can_write()) { return false; } rlib_assert(chunk.compressed_size == data.size()); std::lock_guard lock(this->mutex_); if (lookup_.contains(chunk.chunkId)) { return false; } rlib_assert(chunk.compressed_size <= RChunk::LIMIT); rlib_assert(chunk.uncompressed_size <= RChunk::LIMIT); rlib_assert(ZSTD_compressBound(chunk.compressed_size) <= RChunk::LIMIT); this->add_internal(chunk, data); return true; } auto RCache::add_uncompressed(std::span src, int level, HashType hash_type) -> RChunk::Src { rlib_assert(can_write()); rlib_assert(src.size() <= RChunk::LIMIT); rlib_assert(ZSTD_compressBound(src.size()) <= RChunk::LIMIT); auto id = RChunk::hash(src, hash_type); std::lock_guard lock(this->mutex_); if (auto c = this->find_internal(id)) { rlib_assert(c->uncompressed_size == src.size()); return *c; } thread_local Buffer buffer = {}; rlib_assert(buffer.resize_destroy(ZSTD_compressBound(src.size()))); auto size = rlib_assert_zstd(ZSTD_compress(buffer.data(), buffer.size(), src.data(), src.size(), level)); rlib_assert(size <= RChunk::LIMIT); auto chunk = RChunk::Src{}; chunk.chunkId = id; chunk.uncompressed_size = src.size(); chunk.compressed_size = (std::uint32_t)size; this->add_internal(chunk, buffer.subspan(0, size)); return chunk; } auto RCache::add_chunks(std::span chunks) -> FileID { auto packed = std::vector(chunks.begin(), chunks.end()); auto result = this->add_uncompressed({(char const*)packed.data(), packed.size() * sizeof(RChunk::Dst::Packed)}, ZSTD_minCLevel()); return (FileID)result.chunkId; } auto RCache::contains(ChunkID chunkId) const noexcept -> bool { std::shared_lock lock(this->mutex_); return lookup_.contains(chunkId); } auto RCache::get(std::vector chunks, RChunk::Dst::data_cb on_data) const -> std::vector { std::shared_lock lock(this->mutex_); auto f = chunks.end(); auto const e = chunks.end(); for (auto i = chunks.begin(); i != f;) { if (auto c = this->find_internal(i->chunkId); c && c->uncompressed_size == i->uncompressed_size) { auto dst = RChunk::Dst{*c, i->hash_type, i->uncompressed_offset}; *i = *(--f); *f = dst; } else { ++i; } } sort_by<&RChunk::Src::bundleId, &RChunk::Dst::compressed_offset, &RChunk::Dst::uncompressed_offset>(f, e); auto last_id = ChunkID::None; auto last_data = std::span{}; for (auto i = f; i != e; ++i) { if (last_id == ChunkID::None || i->chunkId != last_id) { last_data = zstd_decompress(this->get_internal(*i), i->uncompressed_size); last_id = i->chunkId; } on_data(*i, last_data); } chunks.resize(f - chunks.begin()); return std::move(chunks); } auto RCache::get_into(RChunk const& chunk, std::span dst) const -> bool { std::shared_lock lock(this->mutex_); if (auto c = this->find_internal(chunk.chunkId); c) { rlib_assert(c->uncompressed_size == chunk.uncompressed_size); auto src = get_internal(*c); auto result = rlib_assert_zstd(ZSTD_decompress(dst.data(), dst.size(), src.data(), src.size())); rlib_assert(result == chunk.uncompressed_size); return true; } return false; } auto RCache::get_chunks(FileID fileId) const -> std::vector { std::shared_lock lock(this->mutex_); if (auto c = this->find_internal((ChunkID)fileId); c) { auto const count = c->uncompressed_size / sizeof(RChunk::Dst::Packed); rlib_assert(c->uncompressed_size % sizeof(RChunk::Dst::Packed) == 0); auto src = get_internal(*c); auto dst = std::vector(count); auto result = rlib_assert_zstd(ZSTD_decompress(dst.data(), c->uncompressed_size, src.data(), src.size())); rlib_assert(result == c->uncompressed_size); auto converted = std::vector(dst.begin(), dst.end()); for (auto offset = std::uint64_t{0}; auto& c : converted) { c.uncompressed_offset = offset; offset += c.uncompressed_size; } return converted; } return {}; } auto RCache::find_internal(ChunkID chunkId) const noexcept -> RChunk::Src const* { if (chunkId == ChunkID::None) { return nullptr; } auto i = lookup_.find(chunkId); if (i == lookup_.end()) { return nullptr; } return &i->second; } auto RCache::get_internal(RChunk::Src const& chunk) const -> std::span { if (files_.empty()) { thread_local struct Lazy { BundleID bundleId = {}; std::unique_ptr io = {}; } lazy = {}; rlib_assert(chunk.bundleId != BundleID::None); if (lazy.bundleId != chunk.bundleId) { auto path = fmt::format("{}/{}.bundle", options_.path, chunk.bundleId); lazy.bundleId = BundleID::None; lazy.io = std::make_unique(path, IO::READ); lazy.bundleId = chunk.bundleId; } return lazy.io->copy(chunk.compressed_offset, chunk.compressed_size); } auto const index = (std::size_t)chunk.bundleId; rlib_assert(index < files_.size()); auto const& file = files_.at(index); if (can_write() && &file == &files_.back() && chunk.compressed_offset >= writer_.toc_offset) { return writer_.buffer.subspan(chunk.compressed_offset - writer_.toc_offset, chunk.compressed_size); } else { return file->copy(chunk.compressed_offset, chunk.compressed_size); } } auto RCache::add_internal(RChunk const& chunk, std::span data) -> void { // Space we will be adding this write auto const extra_data = sizeof(RChunk) + data.size(); // only move to next bundle when we wrote at least one chunk and we run out of space if (writer_.chunks.size() && writer_.end_offset + extra_data > options_.max_size) { this->flush_internal(); // flush anything that we have atm auto const index = files_.size(); auto const path = rcache_file_path(options_.path, index); auto file = std::make_unique(path, rcache_file_flags(false)); file->resize(0, 0); files_.push_back(std::move(file)); writer_.toc_offset = 0; writer_.end_offset = sizeof(RBUN::Footer); writer_.chunks.clear(); writer_.buffer.clear(); this->flush_internal(); } writer_.chunks.push_back(chunk); lookup_[chunk.chunkId] = {chunk, (BundleID)(files_.size() - 1), writer_.buffer.size() + writer_.toc_offset}; rlib_assert(writer_.buffer.append(data)); auto const buffer_size = writer_.buffer.size(); auto const current_toc_size = files_.back()->size() - writer_.toc_offset; if (buffer_size > current_toc_size && buffer_size - current_toc_size > options_.flush_size) { this->flush_internal(); } writer_.end_offset += extra_data; } auto RCache::flush_internal() -> bool { // Dont reflush when there is nothing to flush. if (!can_write() || (writer_.buffer.empty() && writer_.toc_offset != 0)) { return false; } auto toc_size = sizeof(RChunk) * writer_.chunks.size(); RBUN::Footer footer = { .checksum = std::bit_cast>(XXH64((char const*)writer_.chunks.data(), toc_size, 0)), .entry_count = (std::uint32_t)writer_.chunks.size(), .version = RBUN::Footer::VERSION, .magic = {'R', 'B', 'U', 'N'}, }; auto new_toc_offset = writer_.toc_offset + writer_.buffer.size(); rlib_assert(writer_.buffer.append({(char const*)writer_.chunks.data(), toc_size})); rlib_assert(writer_.buffer.append({(char const*)&footer, sizeof(footer)})); rlib_assert(files_.back()->write(writer_.toc_offset, writer_.buffer)); writer_.buffer.clear(); writer_.toc_offset = new_toc_offset; return true; } auto RCache::load_file_internal_read_only() -> void { fs::path path = options_.path; do { auto const index = files_.size(); auto file = std::make_unique(path, rcache_file_flags(true)); auto bundle = RBUN::read(*file); files_.push_back(std::move(file)); for (auto& chunk : bundle.lookup) { chunk.second.bundleId = (BundleID)index; } lookup_.merge(std::move(bundle.lookup)); path = rcache_file_path(options_.path, index + 1); } while (fs::exists(path)); } auto RCache::load_file_internal_read_write() -> void { fs::path path = options_.path; do { auto const index = files_.size(); auto next_path = rcache_file_path(options_.path, index + 1); // file will be opened in RW when: // - it is a new file // - it is a existing file with no files after it and: // - newonly flag is not set // - size would not grow over limit if (!fs::exists(path) || (!options_.newonly && !fs::exists(next_path) && fs::file_size(path) < options_.max_size)) { auto file = std::make_unique(path, rcache_file_flags(false)); auto size = file->size(); auto bundle = size ? RBUN::read(*file) : RBUN{}; files_.push_back(std::move(file)); for (auto& chunk : bundle.lookup) { chunk.second.bundleId = (BundleID)index; } lookup_.merge(std::move(bundle.lookup)); writer_.toc_offset = bundle.toc_offset; writer_.end_offset = size; writer_.chunks = std::move(bundle.chunks); writer_.buffer.clear(); can_write_ = true; this->flush_internal(); break; } auto file = std::make_unique(path, rcache_file_flags(true)); auto bundle = RBUN::read(*file); files_.push_back(std::move(file)); for (auto& chunk : bundle.lookup) { chunk.second.bundleId = (BundleID)index; } lookup_.merge(std::move(bundle.lookup)); path = std::move(next_path); } while (true); } auto RCache::load_folder_internal() -> void { options_.readonly = true; for (auto const& entry : fs::directory_iterator(options_.path)) { auto const& path = entry.path(); auto filename = path.filename().generic_string(); if (!filename.ends_with(".bundle") || filename.find(".bundle") != 16) { continue; } auto bundleId = std::uint64_t{}; auto [ptr, ec] = std::from_chars(filename.data(), filename.data() + 16, bundleId, 16); rlib_assert(ptr == filename.data() + 16); rlib_assert(ec == std::errc{}); auto file = IO::File(path, IO::READ); auto bundle = RBUN::read(file); rlib_assert(bundle.bundleId == (BundleID)bundleId); for (auto& [key, value] : bundle.lookup) { value.bundleId = bundle.bundleId; } lookup_.merge(std::move(bundle.lookup)); } }