Commit 3af8cfb0 authored by jan.koester's avatar jan.koester
Browse files

deb

parent 9f20e3e8
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
mediadb (20260423+71) unstable; urgency=medium

  * Read client pool: replace single read_client_ with a pool of 4
    independent paritypp clients for cluster reads. Concurrent preview
    and download requests now round-robin across pool members, eliminating
    serialization on the paritypp client mutex.

 -- Jan Koester <jan.koester@tuxist.de>  Wed, 23 Apr 2026 00:00:00 +0200

mediadb (20260423+70) unstable; urgency=medium

  * Cluster data consistency: tag media listings with availability flag
+38 −26
Original line number Diff line number Diff line
@@ -99,14 +99,17 @@ void Cluster::init(const ClusterConfig& cfg) {
                pclient_->set_local_node(
                    peer_to_client_index_[static_cast<size_t>(self_node_index_)], store_);
            }
            // Dedicated read client — separate QUIC connections, never blocked
            // by replicate/store holding pclient_->mutex_
            read_client_ = std::make_unique<paritypp::client>(
            // Pool of dedicated read clients — separate QUIC connections each,
            // concurrent preview/download requests don't serialize.
            read_pool_.resize(READ_POOL_SIZE);
            for (size_t ri = 0; ri < READ_POOL_SIZE; ++ri) {
                read_pool_[ri] = std::make_unique<paritypp::client>(
                    cfg_.data_blocks, cfg_.parity_blocks, nodes, pcreds);
                if (self_node_index_ >= 0) {
                read_client_->set_local_node(
                    read_pool_[ri]->set_local_node(
                        peer_to_client_index_[static_cast<size_t>(self_node_index_)], store_);
                }
            }
            // Dedicated scrub/rebalance client — avoids blocking pclient_
            // during long scrub operations
            scrub_client_ = std::make_unique<paritypp::client>(
@@ -133,6 +136,12 @@ void Cluster::init(const ClusterConfig& cfg) {
    });
}

paritypp::client* Cluster::next_read_client() {
    if (read_pool_.empty()) return nullptr;
    size_t idx = read_pool_rr_.fetch_add(1, std::memory_order_relaxed) % read_pool_.size();
    return read_pool_[idx].get();
}

void Cluster::start() {
    if (running_) return;

@@ -150,7 +159,7 @@ void Cluster::start() {
    // child process.  Doing this in init() (pre-fork) leaves the
    // connections dead after fork, causing empty retrieve results.
    if (pclient_) pclient_->warmup();
    if (read_client_) read_client_->warmup();
    for (auto& rc : read_pool_) if (rc) rc->warmup();
    if (scrub_client_) scrub_client_->warmup();
    if (import_client_) import_client_->warmup();

@@ -308,7 +317,7 @@ void Cluster::warmup_import_client() {
}

void Cluster::warmup_read_clients() {
    if (read_client_) read_client_->warmup();
    for (auto& rc : read_pool_) if (rc) rc->warmup();
    if (pclient_) pclient_->warmup();
}

@@ -320,35 +329,36 @@ bool Cluster::fetch(const std::string& key, std::vector<uint8_t>& out) {
        return std::chrono::steady_clock::now() >= deadline;
    };

    // Try read_client_ first (dedicated read connections)
    if (read_client_) {
    // Try a read pool client (round-robin across independent connections)
    auto* rcli = next_read_client();
    if (rcli) {
        try {
            out = read_client_->retrieve(gid);
            out = rcli->retrieve(gid);
            if (!out.empty()) return true;
        } catch (const netplus::NetException& e) {
            std::cerr << "[CLUSTER] fetch read_client NetException key=" << key
            std::cerr << "[CLUSTER] fetch read_pool NetException key=" << key
                      << " gid=" << gid << ": " << e.what() << "\n";
            if (!timed_out()) {
                try {
                    read_client_->warmup();
                    out = read_client_->retrieve(gid);
                    rcli->warmup();
                    out = rcli->retrieve(gid);
                    if (!out.empty()) return true;
                } catch (const std::exception& e2) {
                    std::cerr << "[CLUSTER] fetch read_client retry failed key=" << key
                    std::cerr << "[CLUSTER] fetch read_pool retry failed key=" << key
                              << " gid=" << gid << ": " << e2.what() << "\n";
                } catch (...) {
                    std::cerr << "[CLUSTER] fetch read_client retry failed key=" << key
                    std::cerr << "[CLUSTER] fetch read_pool retry failed key=" << key
                              << " gid=" << gid << ": unknown exception\n";
                }
            }
        } catch (const std::exception& e) {
            // Data-level error (e.g. "stripe not found") — warmup won't help.
            std::cerr << "[CLUSTER] fetch read_client exception key=" << key
            std::cerr << "[CLUSTER] fetch read_pool exception key=" << key
                      << " gid=" << gid << ": " << e.what() << "\n";
        }
    }

    // Fallback to pclient_ (write client) — may succeed if read_client_
    // Fallback to pclient_ (write client) — may succeed if read pool
    // connections are stale (e.g. after fork or network glitch)
    if (pclient_ && !timed_out()) {
        try {
@@ -394,24 +404,25 @@ bool Cluster::fetch_range(const std::string& key, uint64_t offset, uint64_t leng
        return std::chrono::steady_clock::now() >= deadline;
    };

    if (read_client_) {
    auto* rcli = next_read_client();
    if (rcli) {
        try {
            out = read_client_->retrieve_range(gid, offset, length);
            out = rcli->retrieve_range(gid, offset, length);
            if (!out.empty()) return true;
        } catch (const netplus::NetException& e) {
            std::cerr << "[CLUSTER] fetch_range read_client NetException key=" << key
            std::cerr << "[CLUSTER] fetch_range read_pool NetException key=" << key
                      << " gid=" << gid << ": " << e.what() << "\n";
            if (!timed_out()) {
                try {
                    read_client_->warmup();
                    out = read_client_->retrieve_range(gid, offset, length);
                    rcli->warmup();
                    out = rcli->retrieve_range(gid, offset, length);
                    if (!out.empty()) return true;
                } catch (...) {}
            }
        } catch (const std::exception& e) {
            // Data-level error (e.g. "stripe not found") — warmup won't help,
            // skip retry to avoid blocking 30+ s per dead node.
            std::cerr << "[CLUSTER] fetch_range read_client exception key=" << key
            std::cerr << "[CLUSTER] fetch_range read_pool exception key=" << key
                      << " gid=" << gid << ": " << e.what() << "\n";
        }
    }
@@ -441,7 +452,8 @@ bool Cluster::fetch_range(const std::string& key, uint64_t offset, uint64_t leng
}

bool Cluster::fetch_from_peers(const std::string& key, std::vector<uint8_t>& out) {
    auto& cli = read_client_ ? read_client_ : pclient_;
    auto* rcli = next_read_client();
    paritypp::client* cli = rcli ? rcli : pclient_.get();
    if (!cli) return false;
    uint64_t gid = cluster_group_id(key);
    try {
+9 −4
Original line number Diff line number Diff line
@@ -105,7 +105,7 @@ public:

    // Re-establish QUIC connections on the import client.
    void warmup_import_client();
    // Re-establish QUIC connections on read_client_ and pclient_ (used before sync).
    // Re-establish QUIC connections on read pool and pclient_ (used before sync).
    void warmup_read_clients();
    bool fetch(const std::string& key, std::vector<uint8_t>& out);
    bool fetch_range(const std::string& key, uint64_t offset, uint64_t length,
@@ -205,9 +205,14 @@ private:

    std::unique_ptr<paritypp::client> pclient_;

    // Dedicated client for read operations — avoids blocking on pclient_
    // mutex during store/replicate
    std::unique_ptr<paritypp::client> read_client_;
    // Pool of dedicated read clients — each has independent QUIC connections
    // and its own mutex, so concurrent preview/download requests don't
    // serialize on a single client.
    static constexpr size_t READ_POOL_SIZE = 4;
    std::vector<std::unique_ptr<paritypp::client>> read_pool_;
    std::atomic<size_t> read_pool_rr_{0};  // round-robin index

    paritypp::client* next_read_client();

    // Dedicated client for scrub/rebalance — avoids blocking pclient_
    // (used by replicate and health probing) during long scrub operations