Commit cb20a305 authored by jan.koester's avatar jan.koester
Browse files

deb

parent 5d3fce08
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
mediadb (20260503+1) unstable; urgency=medium

  * cluster: recreate import_client_ on retry to reset paritypp node error
    counters when cluster recovers (reset_import_client vs warmup-only)

 -- Jan Koester <jan.koester@tuxist.de>  Sat, 03 May 2026 17:00:00 +0200

mediadb (20260502+1) unstable; urgency=medium

  * Rebuild against libnetplus 20260502+1 (QUIC performance improvements)
+2 −2
Original line number Diff line number Diff line
@@ -3130,7 +3130,7 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {
                if (use_pclient)
                    cluster_.warmup_read_clients();
                else
                    cluster_.warmup_import_client();
                    cluster_.reset_import_client();
            }
            bool ok = use_pclient ? cluster_.replicate(key, d, len)
                                  : cluster_.replicate_import(key, d, len);
@@ -3161,7 +3161,7 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {
    };

    auto warmup_fn = [this]() {
        cluster_.warmup_import_client();
        cluster_.reset_import_client();
    };

    auto session = local_.begin_import(std::move(replicate_fn), std::move(begin_replicate_fn),
+16 −0
Original line number Diff line number Diff line
@@ -126,6 +126,9 @@ void Cluster::init(const ClusterConfig& cfg) {
                import_client_->set_local_node(
                    peer_to_client_index_[static_cast<size_t>(self_node_index_)], store_);
            }
            // Cache for reset_import_client()
            client_nodes_ = nodes;
            client_creds_ = pcreds;
            // NOTE: warmup() is deferred to start() so QUIC connections
            // are established in the child process (after fork).
        }
@@ -312,6 +315,19 @@ std::unique_ptr<paritypp::store_session> Cluster::reserve_block(const std::strin
    return nullptr;
}

void Cluster::reset_import_client() {
    if (client_nodes_.empty()) return;
    std::lock_guard<std::mutex> lk(import_client_mutex_);
    std::cerr << "[CLUSTER] reset_import_client: recreating import_client_ to clear error state\n";
    import_client_ = std::make_unique<paritypp::client>(
        cfg_.data_blocks, cfg_.parity_blocks, client_nodes_, client_creds_);
    if (self_node_index_ >= 0) {
        import_client_->set_local_node(
            peer_to_client_index_[static_cast<size_t>(self_node_index_)], store_);
    }
    import_client_->warmup();
}

void Cluster::warmup_import_client() {
    if (import_client_) import_client_->warmup();
}
+10 −0
Original line number Diff line number Diff line
@@ -110,6 +110,8 @@ public:

    // Re-establish QUIC connections on the import client.
    void warmup_import_client();
    // Recreate import_client_ to reset internal paritypp error counters.
    void reset_import_client();
    // Re-establish QUIC connections on read pool and pclient_ (used before sync).
    void warmup_read_clients();
    bool fetch(const std::string& key, std::vector<uint8_t>& out);
@@ -226,10 +228,18 @@ private:
    // Dedicated client for import streaming — avoids blocking pclient_
    // during long import operations so normal writes remain responsive
    std::unique_ptr<paritypp::client> import_client_;
    std::mutex import_client_mutex_;  // protects reset_import_client()

    int self_node_index_ = -1;
    std::unordered_map<size_t, size_t> peer_to_client_index_;  // cfg peer idx -> pclient node idx

    // Cached parameters for recreating import_client_ after persistent failures
    std::vector<paritypp::client::node_info> client_nodes_;
    paritypp::client::credentials client_creds_;

    // Recreate import_client_ to reset internal paritypp error counters.
    void reset_import_client();

    void server_loop();
    void health_loop();
};