Commit fa78f0a7 authored by jan.koester's avatar jan.koester
Browse files

syn import

parent db184413
Loading
Loading
Loading
Loading
+60 −105
Original line number Diff line number Diff line
@@ -2961,7 +2961,6 @@ bool ClusterMediaBackend::import_db_from_buffer(const std::uint8_t* data, std::s
    }

    importing_.store(true);
    // RAII guard: always clear importing_ on exit, even on exceptions
    struct ImportGuard {
        std::atomic<bool>& flag;
        ~ImportGuard() { flag.store(false); }
@@ -2969,37 +2968,45 @@ bool ClusterMediaBackend::import_db_from_buffer(const std::uint8_t* data, std::s

    std::unique_lock<std::shared_mutex> cguard(cluster_op_mutex_);

    // Stream-Import: each entry is written to the cluster immediately
    // as it is parsed, keeping RAM usage minimal.
    // Warmup import client once before starting
    cluster_.warmup_import_client();

    // Synchronous replicate callback: each entry is written to cluster
    // immediately as it is parsed. Media data uses import_client_,
    // store metadata and index use pclient_.
    int repl_count = 0;
    int repl_fail = 0;
    auto stream_fn = [&](const std::string& key, const uint8_t* d, size_t l) {
    auto replicate_fn = [&](const std::string& key, const uint8_t* d, size_t l) {
        ++repl_count;
        if (repl_count % 50 == 1)
            std::cerr << "[CLUSTER-IMPORT] replicate #" << repl_count
                      << " key=" << key << " size=" << l << "\n";

        const bool is_media = (key.compare(0, 6, "media:") == 0);
        constexpr int MAX_RETRIES = 3;
        for (int attempt = 1; attempt <= MAX_RETRIES; ++attempt) {
            if (attempt > 1) {
                // Warmup on retry only — first attempt uses existing
                // connections; warming up concurrently with health_loop
                // causes use-after-free on pclient_ internals.
        for (int attempt = 0; attempt < MAX_RETRIES; ++attempt) {
            if (attempt > 0) {
                if (is_media)
                    cluster_.warmup_import_client();
                else
                    cluster_.warmup_read_clients();
                std::this_thread::sleep_for(std::chrono::seconds(attempt - 1));
                std::this_thread::sleep_for(
                    std::chrono::milliseconds(300 * (1 << attempt)));
            }
            if (cluster_.replicate(key, d, l)) return;
            std::cerr << "[CLUSTER-IMPORT] retry " << attempt << " key=" << key << "\n";
            bool ok = is_media ? cluster_.replicate_import(key, d, l)
                               : cluster_.replicate(key, d, l);
            if (ok) return;
        }
        std::cerr << "[CLUSTER-IMPORT] FAILED key=" << key << " after retries\n";
        std::cerr << "[CLUSTER-IMPORT] FAILED key=" << key << "\n";
        ++repl_fail;
    };

    bool parse_ok = local_.import_db_from_buffer(data, len, stream_fn);
    std::cerr << "[CLUSTER-IMPORT] parse done, ok=" << parse_ok
    bool parse_ok = local_.import_db_from_buffer(data, len, replicate_fn);
    std::cerr << "[CLUSTER-IMPORT] done, ok=" << parse_ok
              << " replicated=" << repl_count << " failed=" << repl_fail << "\n";
    if (!parse_ok || repl_fail > 0) return false;

    // Clear tombstones for any stores that were just imported.
    // Clear tombstones for imported stores
    {
        auto sids = local_.store_ids();
        bool tombstone_changed = false;
@@ -3009,22 +3016,10 @@ bool ClusterMediaBackend::import_db_from_buffer(const std::uint8_t* data, std::s
        }
        if (tombstone_changed) {
            replicate_tombstones();
            std::cerr << "[CLUSTER-IMPORT] cleared tombstones for imported stores\n";
            std::cerr << "[CLUSTER-IMPORT] cleared tombstones\n";
        }
    }

    std::cerr << "[CLUSTER-IMPORT] complete, success\n";
    // importing_ is cleared by ImportGuard destructor

    // Rebalance to fix under-replicated groups from partial node failures
    std::cerr << "[CLUSTER-IMPORT] triggering post-import rebalance\n";
    try {
        cluster_.warmup_read_clients();
        cluster_.rebalance();
    } catch (const std::exception& e) {
        std::cerr << "[CLUSTER-IMPORT] rebalance failed: " << e.what() << "\n";
    }

    return true;
}

@@ -3036,12 +3031,11 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {
    }

    importing_.store(true);

    // Synchronous replicate: use pclient_ for metadata (store/index),
    // import_client_ for media data. Retry with warmup on failure.
    auto replicate_fn = [this](const std::string& key, const uint8_t* d, size_t len) -> bool {
        static constexpr int MAX_RETRIES = 3;
        // Use pclient_ for store metadata and index (small, critical data
        // that MUST persist). pclient_ connections are kept alive by the
        // health loop. import_client_ connections go stale during long
        // streaming imports and can silently lose data.
        const bool use_pclient = (key == "index" || key.compare(0, 6, "store:") == 0);
        for (int attempt = 0; attempt < MAX_RETRIES; ++attempt) {
            if (attempt > 0) {
@@ -3049,29 +3043,25 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {
                          << " key=" << key << " size=" << len << "\n";
                std::this_thread::sleep_for(
                    std::chrono::milliseconds(500 * (1 << (attempt - 1))));
                // Warmup on retry only — first attempt uses existing
                // connections; warming up concurrently with health_loop
                // causes use-after-free on pclient_ internals.
                if (use_pclient) {
                if (use_pclient)
                    cluster_.warmup_read_clients();
                } else {
                else
                    cluster_.warmup_import_client();
            }
            }
            std::cerr << "[CLUSTER-IMPORT-STREAM] replicate key=" << key << " size=" << len
                      << " via " << (use_pclient ? "pclient" : "import_client") << "\n";
            bool ok = use_pclient ? cluster_.replicate(key, d, len)
                                  : cluster_.replicate_import(key, d, len);
            if (ok) return true;
            std::cerr << "[CLUSTER-IMPORT-STREAM] replicate attempt " << attempt
                      << " failed for key=" << key << "\n";
        }
        std::cerr << "[CLUSTER-IMPORT-STREAM] replicate FAILED after "
                  << MAX_RETRIES << " attempts key=" << key << "\n";
        std::cerr << "[CLUSTER-IMPORT-STREAM] replicate FAILED key=" << key << "\n";
        return false;
    };

    // Adapter: wrap paritypp::store_session as mediadb::ReplicateSession
    // Reserve block for streaming media: uses import_client_ with retry
    auto begin_replicate_fn = [this](const std::string& key, size_t total_size)
            -> std::unique_ptr<ReplicateSession> {
        auto s = cluster_.reserve_block(key, total_size);
        if (!s) return nullptr;
        // Wrap paritypp::store_session as mediadb::ReplicateSession
        class ClusterReplicateSession : public ReplicateSession {
        public:
            explicit ClusterReplicateSession(std::unique_ptr<paritypp::store_session> s)
@@ -3083,19 +3073,21 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {
        private:
            std::unique_ptr<paritypp::store_session> session_;
        };
        return std::make_unique<ClusterReplicateSession>(std::move(s));
    };

    auto begin_replicate_fn = [this](const std::string& key, size_t total_size)
            -> std::unique_ptr<ReplicateSession> {
        std::cerr << "[CLUSTER-IMPORT-STREAM] begin_replicate_import key=" << key
                  << " size=" << total_size << "\n";
        // Ensure connections are fresh before starting a streaming session
    auto warmup_fn = [this]() {
        cluster_.warmup_import_client();
        auto s = cluster_.begin_replicate_import(key, total_size);
        if (!s) return nullptr;
        return std::make_unique<ClusterReplicateSession>(std::move(s));
    };

    // Wrap session to clear tombstones, reload from cluster, and clear importing_ flag when done
    auto session = local_.begin_import(std::move(replicate_fn), std::move(begin_replicate_fn),
                                       std::move(warmup_fn), nullptr, nullptr);
    if (!session) {
        importing_.store(false);
        return nullptr;
    }

    // Wrap to handle cleanup on completion
    class ImportGuardSession : public ImportSession {
    public:
        ImportGuardSession(std::unique_ptr<ImportSession> inner,
@@ -3104,43 +3096,17 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {
            : inner_(std::move(inner)), flag_(flag), backend_(backend) {}
        ~ImportGuardSession() override {
            if (ok()) {
                // 1. Clear tombstones for imported stores so they don't get deleted
                auto sids = backend_.local_.store_ids();
                bool tombstone_changed = false;
                for (const auto& sid : sids) {
                    if (backend_.tombstones_.erase(sid) > 0)
                        tombstone_changed = true;
                }
                if (tombstone_changed) {
                if (tombstone_changed)
                    backend_.replicate_tombstones();
                    std::cerr << "[CLUSTER-IMPORT-STREAM] cleared tombstones for imported stores\n";
                }

                // 2. Release importing_ flag BEFORE sync so sync_from_cluster doesn't skip
                flag_.store(false);

                // 3. Warmup read/write clients before sync — they may be stale
                //    after a long import that monopolised import_client_.
                std::cerr << "[CLUSTER-IMPORT-STREAM] warming up read/write clients\n";
                backend_.cluster_.warmup_import_client();
                backend_.cluster_.warmup_read_clients();

                // 4. Reload from cluster into RAM — the cluster is the source of truth
                std::cerr << "[CLUSTER-IMPORT-STREAM] triggering sync_from_cluster to reload state\n";
                try {
                    backend_.sync_from_cluster();
                } catch (const std::exception& e) {
                    std::cerr << "[CLUSTER-IMPORT-STREAM] sync_from_cluster failed: " << e.what() << "\n";
                }

                // 5. Rebalance to fix under-replicated groups from partial
                //    node failures during import (e.g. missing parity blocks).
                std::cerr << "[CLUSTER-IMPORT-STREAM] triggering post-import rebalance\n";
                try {
                    backend_.cluster_.rebalance();
                } catch (const std::exception& e) {
                    std::cerr << "[CLUSTER-IMPORT-STREAM] rebalance failed: " << e.what() << "\n";
                }
                // Sync from cluster to pick up the replicated data on this node
                try { backend_.sync_from_cluster(); } catch (...) {}
            } else {
                flag_.store(false);
            }
@@ -3156,17 +3122,6 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {
        ClusterMediaBackend& backend_;
    };

    auto warmup_fn = [this]() {
        std::cerr << "[CLUSTER-IMPORT-STREAM] warmup_import_client\n";
        cluster_.warmup_import_client();
    };

    auto session = local_.begin_import(std::move(replicate_fn), std::move(begin_replicate_fn),
                               std::move(warmup_fn), nullptr, nullptr);
    if (!session) {
        importing_.store(false);
        return nullptr;
    }
    return std::make_unique<ImportGuardSession>(std::move(session), importing_, *this);
}

+22 −0
Original line number Diff line number Diff line
@@ -281,6 +281,28 @@ std::unique_ptr<paritypp::store_session> Cluster::begin_replicate_import(const s
    }
}

std::unique_ptr<paritypp::store_session> Cluster::reserve_block(const std::string& key,
                                                                 size_t total_size,
                                                                 int max_retries) {
    if (!import_client_) return nullptr;
    uint64_t gid = cluster_group_id(key);
    for (int attempt = 0; attempt < max_retries; ++attempt) {
        if (attempt > 0) {
            import_client_->warmup();
            std::this_thread::sleep_for(
                std::chrono::milliseconds(200 * (1 << (attempt - 1))));
        }
        try {
            auto session = import_client_->begin_store(gid, total_size);
            if (session) return session;
        } catch (const std::exception& e) {
            DBG_LOG("[CLUSTER] reserve_block attempt " << attempt
                      << " failed key=" << key << ": " << e.what() << "\n");
        }
    }
    return nullptr;
}

void Cluster::warmup_import_client() {
    if (import_client_) import_client_->warmup();
}
+8 −0
Original line number Diff line number Diff line
@@ -94,6 +94,14 @@ public:
    // Streaming store session on the dedicated import client — does not block
    // pclient_ so normal writes (add_media, delete, metadata) remain responsive.
    std::unique_ptr<paritypp::store_session> begin_replicate_import(const std::string& key, size_t total_size);

    // Reserve a block for streaming writes on the import client.
    // Warms up the connection, then returns a store_session.
    // On failure retries up to max_retries times.
    std::unique_ptr<paritypp::store_session> reserve_block(const std::string& key,
                                                            size_t total_size,
                                                            int max_retries = 3);

    // Re-establish QUIC connections on the import client.
    void warmup_import_client();
    // Re-establish QUIC connections on read_client_ and pclient_ (used before sync).