Commit 335a832b authored by jan.koester's avatar jan.koester
Browse files

test

parent e265c831
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -6,6 +6,12 @@ mediadb (20260503+2) unstable; urgency=medium
    is persistently broken but cluster is otherwise healthy
  * import: make fallback replication stricter and fully synchronous:
    pclient_ fallback now retries with exponential backoff before failing
  * import: media replication timeout no longer aborts whole import;
    affected media is marked cluster_available=false (degraded) so
    browser import no longer fails hard
  * cluster/import: route streaming import writes through pclient_ (same
    path as normal uploads) instead of import_client_ begin_store/finalize,
    avoiding import-only timeout pattern

 -- Jan Koester <jan.koester@tuxist.de>  Sat, 03 May 2026 17:00:00 +0200

+43 −47
Original line number Diff line number Diff line
@@ -1393,6 +1393,15 @@ private:
        done_.store(true, std::memory_order_release);
    }

    void mark_media_cluster_unavailable(const std::string& mid, const std::string& reason) {
        auto it = db_.media_.find(mid);
        if (it != db_.media_.end()) {
            it->second.cluster_available = false;
        }
        std::cerr << "[IMPORT-SESSION] media degraded (cluster unavailable) id=" << mid
                  << " reason=" << reason << "\n";
    }

    void advance_after_media() {
        // Commit the media record now that all data is on disk.
        if (cur_media_new_ && pending_media_) {
@@ -1424,8 +1433,13 @@ private:
                                  << ": " << stream->error_message()
                                  << " — warmup + one-shot fallback\n";
                        if (warmup_fn_) warmup_fn_();
                        if (!on_replicate_(fallback_key, fallback_data->data(), fallback_data->size()))
                            fail("replicate failed for " + fallback_key + " (after finalize failure)");
                        if (!on_replicate_(fallback_key, fallback_data->data(), fallback_data->size())) {
                            // Keep import successful: data is stored locally, but mark
                            // this media unavailable in cluster until next repair/import.
                            mark_media_cluster_unavailable(
                                mid, "replicate failed for " + fallback_key +
                                     " (after finalize failure)");
                        }
                    });
                }
                needs_drain_ = true;
@@ -1434,11 +1448,14 @@ private:
                auto data = std::make_shared<std::vector<uint8_t>>(
                    std::move(media_data_buf_));
                std::string key = "media:" + cur_media_id_;
                std::string mid = cur_media_id_;
                {
                    std::lock_guard<std::mutex> dlk(deferred_mutex_);
                    pending_ops_.push_back([this, key, data]() {
                        if (!on_replicate_(key, data->data(), data->size()))
                            fail("replicate failed for " + key);
                    pending_ops_.push_back([this, key, mid, data]() {
                        if (!on_replicate_(key, data->data(), data->size())) {
                            mark_media_cluster_unavailable(mid,
                                "replicate failed for " + key);
                        }
                    });
                }
                needs_drain_ = true;
@@ -3057,12 +3074,12 @@ bool ClusterMediaBackend::import_db_from_buffer(const std::uint8_t* data, std::s

    std::unique_lock<std::shared_mutex> cguard(cluster_op_mutex_);

    // Warmup import client once before starting
    cluster_.warmup_import_client();
    // Warmup cluster clients once before starting.
    // Use the same pclient path as normal uploads for stability.
    cluster_.warmup_read_clients();

    // Synchronous replicate callback: each entry is written to cluster
    // immediately as it is parsed. Media data uses import_client_,
    // store metadata and index use pclient_.
    // immediately as it is parsed, using the same pclient path as uploads.
    int repl_count = 0;
    int repl_fail = 0;
    auto replicate_fn = [&](const std::string& key, const uint8_t* d, size_t l) {
@@ -3071,19 +3088,14 @@ bool ClusterMediaBackend::import_db_from_buffer(const std::uint8_t* data, std::s
            std::cerr << "[CLUSTER-IMPORT] replicate #" << repl_count
                      << " key=" << key << " size=" << l << "\n";

        const bool is_media = (key.compare(0, 6, "media:") == 0);
        constexpr int MAX_RETRIES = 3;
        for (int attempt = 0; attempt < MAX_RETRIES; ++attempt) {
            if (attempt > 0) {
                if (is_media)
                    cluster_.warmup_import_client();
                else
                cluster_.warmup_read_clients();
                std::this_thread::sleep_for(
                    std::chrono::milliseconds(300 * (1 << attempt)));
            }
            bool ok = is_media ? cluster_.replicate_import(key, d, l)
                               : cluster_.replicate(key, d, l);
            bool ok = cluster_.replicate(key, d, l);
            if (ok) return;
        }
        std::cerr << "[CLUSTER-IMPORT] FAILED key=" << key << "\n";
@@ -3116,52 +3128,36 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {

    importing_.store(true);

    // Synchronous replicate: use pclient_ for metadata (store/index),
    // import_client_ for media data. If import_client_ keeps failing,
    // fall back to pclient_ with its own retry loop before giving up.
    // Synchronous replicate: use pclient_ for all keys, same path as uploads.
    auto replicate_fn = [this](const std::string& key, const uint8_t* d, size_t len) -> bool {
        static constexpr int MAX_RETRIES = 3;
        static constexpr int FALLBACK_RETRIES = 5;
        const bool use_pclient = (key == "index" || key.compare(0, 6, "store:") == 0);
        static constexpr int MAX_RETRIES = 5;
        for (int attempt = 0; attempt < MAX_RETRIES; ++attempt) {
            if (attempt > 0) {
                std::cerr << "[CLUSTER-IMPORT-STREAM] replicate retry " << attempt
                          << " key=" << key << " size=" << len << "\n";
                std::this_thread::sleep_for(
                    std::chrono::milliseconds(500 * (1 << (attempt - 1))));
                if (use_pclient)
                cluster_.warmup_read_clients();
                else
                    cluster_.reset_import_client();
            }
            bool ok = use_pclient ? cluster_.replicate(key, d, len)
                                  : cluster_.replicate_import(key, d, len);
            bool ok = cluster_.replicate(key, d, len);
            if (ok) return true;
        }
        // import_client_ persistently broken — fall back to pclient_ so we
        // don't drop the file when the cluster is otherwise healthy.
        if (!use_pclient) {
            std::cerr << "[CLUSTER-IMPORT-STREAM] import_client_ exhausted, "
                         "falling back to pclient_ for key=" << key << "\n";
            for (int attempt = 0; attempt < FALLBACK_RETRIES; ++attempt) {
                if (attempt > 0) {
                    std::cerr << "[CLUSTER-IMPORT-STREAM] pclient fallback retry " << attempt
                              << " key=" << key << " size=" << len << "\n";
                    std::this_thread::sleep_for(
                        std::chrono::milliseconds(1000 * (1 << (attempt - 1))));
                }
                cluster_.warmup_read_clients();
                if (cluster_.replicate(key, d, len)) return true;
            }
        }
        std::cerr << "[CLUSTER-IMPORT-STREAM] replicate FAILED key=" << key << "\n";
        return false;
    };

    // Reserve block for streaming media: uses import_client_ with retry
    // Reserve block for streaming media on the same pclient path as uploads.
    auto begin_replicate_fn = [this](const std::string& key, size_t total_size)
            -> std::unique_ptr<ReplicateSession> {
        auto s = cluster_.reserve_block(key, total_size);
        std::unique_ptr<paritypp::store_session> s;
        for (int attempt = 0; attempt < 3 && !s; ++attempt) {
            if (attempt > 0) {
                cluster_.warmup_read_clients();
                std::this_thread::sleep_for(
                    std::chrono::milliseconds(300 * (1 << attempt)));
            }
            s = cluster_.begin_replicate(key, total_size);
        }
        if (!s) return nullptr;
        // Wrap paritypp::store_session as mediadb::ReplicateSession
        class ClusterReplicateSession : public ReplicateSession {
@@ -3179,7 +3175,7 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {
    };

    auto warmup_fn = [this]() {
        cluster_.reset_import_client();
        cluster_.warmup_read_clients();
    };

    auto session = local_.begin_import(std::move(replicate_fn), std::move(begin_replicate_fn),