Commit efdf0f83 authored by jan.koester's avatar jan.koester
Browse files

test



Co-authored-by: default avatarCopilot <copilot@github.com>
parent 2f024938
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -134,6 +134,7 @@ if(NOT CLIENT_ONLY)
        src/backend.cpp
        src/cluster.cpp
        src/preview.cpp
        src/preview_cache.cpp
    )

    target_include_directories(mediadb_cluster_perf_test PRIVATE
@@ -157,6 +158,7 @@ if(NOT CLIENT_ONLY)
    add_executable(mediadb_cache_test
        test/mediadb_cache_test.cpp
        src/preview.cpp
        src/preview_cache.cpp
    )

    target_include_directories(mediadb_cache_test PRIVATE
+5 −0
Original line number Diff line number Diff line
@@ -79,6 +79,11 @@ public:
    size_t getTotalPeers() const { return cfg_.peers.size(); }
    const std::unordered_map<size_t, size_t>& getPeerToClientIndex() const { return peer_to_client_index_; }

    // Register an additional block store on the parity server (multi-store)
    void addStore(uint8_t store_id, std::shared_ptr<paritypp::block_store> store) {
        if (server_) server_->add_store(store_id, std::move(store));
    }

    struct NodeGroupInfo {
        size_t index;
        std::string address;
+6 −22
Original line number Diff line number Diff line
@@ -116,28 +116,13 @@ int main(int argc, char* argv[]) {
                             mediadb::blob_value_size);

    // ---- Cluster-shared preview cache (L2) ----
    const int pcache_port = conf_int(cfg, "/MEDIADB/PREVIEW_CACHE/PORT", 4434);
    const int pcache_mb = conf_int(cfg, "/MEDIADB/PREVIEW_CACHE/SIZE_MB", 512);
    std::unique_ptr<mediadb::PreviewCache> preview_cache;
    if (cluster_enable == "true") {
        mediadb::ClusterConfig ccfg_copy;
        ccfg_copy.enabled      = true;
        ccfg_copy.bind_address = conf_string(cfg, "/MEDIADB/CLUSTER/BIND", "0.0.0.0");
        ccfg_copy.port         = conf_int(cfg, "/MEDIADB/CLUSTER/PORT", 4433);
        ccfg_copy.cert_file    = cert_path;
        ccfg_copy.key_file     = key_path;
        ccfg_copy.client_name  = conf_string(cfg, "/MEDIADB/CLUSTER/AUTH_NAME", "");
        ccfg_copy.client_key   = conf_string(cfg, "/MEDIADB/CLUSTER/AUTH_KEY", "");
        ccfg_copy.data_blocks  = cluster.getConfig().data_blocks;
        ccfg_copy.parity_blocks = cluster.getConfig().parity_blocks;
        ccfg_copy.peers.clear();
        for (const auto& p : cluster.getConfig().peers)
            ccfg_copy.peers.push_back(p);
        preview_cache = std::make_unique<mediadb::PreviewCache>(
            ccfg_copy, pcache_port,
            static_cast<size_t>(pcache_mb) * 1024ULL * 1024ULL);
        std::cerr << "[mediadb] Preview cache: L2 on port " << pcache_port
                  << " (" << pcache_mb << " MB/node)" << std::endl;
            cluster, static_cast<size_t>(pcache_mb) * 1024ULL * 1024ULL);
        std::cerr << "[mediadb] Preview cache: L2 store_id=1 ("
                  << pcache_mb << " MB/node)" << std::endl;
    }

    // ---- Create backend: local or cluster ----
@@ -160,7 +145,7 @@ int main(int argc, char* argv[]) {
        auto* cluster_backend = dynamic_cast<mediadb::ClusterMediaBackend*>(backend.get());
        server.post_fork_callback = [&cluster, cluster_backend, &preview_cache]() {
            cluster.start();
            // Start preview cache server + client warmup
            // Start preview cache client warmup (uses same server, store_id=1)
            if (preview_cache) preview_cache->start();
            // Wait for peers before syncing — prevents empty index on startup
            size_t online = cluster.waitForPeers(120);
@@ -173,8 +158,7 @@ int main(int argc, char* argv[]) {

    int rc = server.run();

    // Shutdown preview cache + cluster (backend destructor stops sync thread)
    if (preview_cache) preview_cache->stop();
    // Shutdown cluster (backend destructor stops sync thread)
    backend.reset();
    mediadb::g_Cluster = nullptr;
    cluster.stop();
+18 −88
Original line number Diff line number Diff line
#include "preview_cache.h"

#include <paritypp/auth.h>

#include <cstring>
#include <iostream>

#ifndef _WIN32
#include <unistd.h>
#include <ifaddrs.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#endif

namespace mediadb {

// ---------- helpers ----------

uint64_t PreviewCache::cache_group_id(const std::string& key) {
    // Reuse the same SHA-256 → uint64 scheme as cluster_group_id
    std::vector<uint8_t> input(key.begin(), key.end());
    auto hash = netplus::sha256_hash(input);
    uint64_t id = 0;
@@ -67,83 +57,35 @@ bool PreviewCache::decode_meta(const std::vector<uint8_t>& data, FragMeta& out)

// ---------- construction ----------

PreviewCache::PreviewCache(const ClusterConfig& base_cfg, int pcache_port,
                           size_t max_bytes_per_node) {
PreviewCache::PreviewCache(Cluster& cluster, size_t max_bytes_per_node) {
    store_ = std::make_shared<paritypp::memory_block_store>(max_bytes_per_node);

    auth_ = std::make_shared<paritypp::auth_store>();
    if (!base_cfg.client_name.empty())
        auth_->add_client(base_cfg.client_name, base_cfg.client_key);

    paritypp::server::config scfg;
    scfg.bind_address = base_cfg.bind_address;
    scfg.port = pcache_port;
    scfg.cert_file = base_cfg.cert_file;
    scfg.key_file = base_cfg.key_file;
    scfg.max_connections = 64;
    scfg.require_auth = !base_cfg.client_name.empty();

    server_ = std::make_unique<paritypp::server>(scfg, store_, auth_);

    // Build node list + detect self (same logic as Cluster::init)
    std::vector<std::string> local_addrs;
    {
        char hostname[256] = {};
        gethostname(hostname, sizeof(hostname));
        local_addrs.emplace_back(hostname);
    }
#ifndef _WIN32
    {
        struct ifaddrs* ifaddr = nullptr;
        if (getifaddrs(&ifaddr) == 0) {
            for (struct ifaddrs* ifa = ifaddr; ifa; ifa = ifa->ifa_next) {
                if (!ifa->ifa_addr) continue;
                char buf[INET6_ADDRSTRLEN] = {};
                if (ifa->ifa_addr->sa_family == AF_INET) {
                    inet_ntop(AF_INET,
                              &reinterpret_cast<struct sockaddr_in*>(ifa->ifa_addr)->sin_addr,
                              buf, sizeof(buf));
                    local_addrs.emplace_back(buf);
                } else if (ifa->ifa_addr->sa_family == AF_INET6) {
                    inet_ntop(AF_INET6,
                              &reinterpret_cast<struct sockaddr_in6*>(ifa->ifa_addr)->sin6_addr,
                              buf, sizeof(buf));
                    local_addrs.emplace_back(buf);
                }
            }
            freeifaddrs(ifaddr);
        }
    }
#endif
    // Register as store_id=1 on the existing cluster parity server
    cluster.addStore(STORE_ID, store_);

    const auto& cfg = cluster.getConfig();
    const auto& peer_map = cluster.getPeerToClientIndex();

    // Build node list reusing the cluster's peer addresses + port
    std::vector<paritypp::client::node_info> nodes;
    for (size_t pi = 0; pi < base_cfg.peers.size(); ++pi) {
        const auto& peer = base_cfg.peers[pi];
        bool is_self = false;
        for (const auto& la : local_addrs) {
            if (peer.address == la && peer.port == base_cfg.port) {
                is_self = true;
                break;
            }
        }
        if (is_self)
            self_node_index_ = static_cast<int>(pi);
        peer_to_client_index_[pi] = nodes.size();
        nodes.push_back({peer.address, pcache_port});
    for (size_t pi = 0; pi < cfg.peers.size(); ++pi) {
        nodes.push_back({cfg.peers[pi].address, cfg.peers[pi].port});
    }

    paritypp::client::credentials pcreds;
    if (!base_cfg.client_name.empty()) {
        pcreds.client_name = base_cfg.client_name;
        pcreds.key.assign(base_cfg.client_key.begin(), base_cfg.client_key.end());
    if (!cfg.client_name.empty()) {
        pcreds.client_name = cfg.client_name;
        pcreds.key.assign(cfg.client_key.begin(), cfg.client_key.end());
    }

    if (!nodes.empty()) {
        client_ = std::make_unique<paritypp::client>(
            base_cfg.data_blocks, base_cfg.parity_blocks, nodes, pcreds);
        if (self_node_index_ >= 0) {
            client_->set_local_node(
                peer_to_client_index_[static_cast<size_t>(self_node_index_)], store_);
            cfg.data_blocks, cfg.parity_blocks, nodes, pcreds);
        client_->set_store_id(STORE_ID);
        if (cluster.getSelfNodeIndex() >= 0) {
            auto it = peer_map.find(static_cast<size_t>(cluster.getSelfNodeIndex()));
            if (it != peer_map.end())
                client_->set_local_node(it->second, store_);
        }
    }
}
@@ -151,19 +93,7 @@ PreviewCache::PreviewCache(const ClusterConfig& base_cfg, int pcache_port,
// ---------- lifecycle ----------

void PreviewCache::start() {
    if (running_) return;
    if (client_) client_->warmup();
    running_ = true;
    server_thread_ = std::thread([this]() {
        try { server_->run(); } catch (...) {}
    });
}

void PreviewCache::stop() {
    if (!running_) return;
    running_ = false;
    if (server_) server_->stop();
    if (server_thread_.joinable()) server_thread_.join();
}

// ---------- get (L2 → L1) ----------
+8 −12
Original line number Diff line number Diff line
@@ -20,6 +20,9 @@ static constexpr size_t PREVIEW_FRAGMENT_SIZE = 10ULL * 1024 * 1024;

// Cluster-shared preview cache (L2) with fragment support.
//
// Uses store_id=1 on the existing cluster parity server (same port).
// Local block_store is a memory_block_store with byte-budget LRU eviction.
//
// Small previews (≤ PREVIEW_FRAGMENT_SIZE) are stored as a single parity group.
// Large previews are split into 10 MB fragments, each in its own parity group,
// plus a metadata group describing the layout.
@@ -27,14 +30,12 @@ static constexpr size_t PREVIEW_FRAGMENT_SIZE = 10ULL * 1024 * 1024;
// L1 (BlobCache) sits in front — this class handles L1 miss → L2 lookup → render.
class PreviewCache {
public:
    // pcache_port: UDP port for the preview-cache parity server (separate from data cluster)
    // cluster: the existing Cluster instance (to register store + create client)
    // max_bytes_per_node: byte budget for the local memory_block_store
    PreviewCache(const ClusterConfig& base_cfg, int pcache_port, size_t max_bytes_per_node);
    PreviewCache(Cluster& cluster, size_t max_bytes_per_node);

    // Start the parity server thread and warm up client connections.
    // Must be called post-fork.
    // Create client connections and warm up. Must be called post-fork.
    void start();
    void stop();

    // Try to retrieve a preview from L2.
    // Returns the assembled blob, or nullptr on miss.
@@ -60,15 +61,10 @@ private:
    static std::vector<uint8_t> encode_meta(const FragMeta& m);
    static bool decode_meta(const std::vector<uint8_t>& data, FragMeta& out);

    static constexpr uint8_t STORE_ID = 1; // preview cache store

    std::shared_ptr<paritypp::memory_block_store> store_;
    std::shared_ptr<paritypp::auth_store> auth_;
    std::unique_ptr<paritypp::server> server_;
    std::unique_ptr<paritypp::client> client_;

    std::thread server_thread_;
    std::atomic<bool> running_{false};
    int self_node_index_ = -1;
    std::unordered_map<size_t, size_t> peer_to_client_index_;
};

} // namespace mediadb