Commit ba034114 authored by jan.koester's avatar jan.koester
Browse files

test

parent 335a832b
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -12,6 +12,9 @@ mediadb (20260503+2) unstable; urgency=medium
  * cluster/import: route streaming import writes through pclient_ (same
    path as normal uploads) instead of import_client_ begin_store/finalize,
    avoiding import-only timeout pattern
  * import: disable per-media begin_store/finalize streaming session and
    force one-shot replicate() path; pre-clean media gid before retries to
    recover from broken partial groups

 -- Jan Koester <jan.koester@tuxist.de>  Sat, 03 May 2026 17:00:00 +0200

+16 −24
Original line number Diff line number Diff line
@@ -3131,6 +3131,16 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {
    // Synchronous replicate: use pclient_ for all keys, same path as uploads.
    auto replicate_fn = [this](const std::string& key, const uint8_t* d, size_t len) -> bool {
        static constexpr int MAX_RETRIES = 5;
        const bool is_media = (key.compare(0, 6, "media:") == 0);

        // If a previous failed import left this media gid in a broken state
        // (seen as "retrieve returned empty" in rebalance), clear it once
        // before retrying writes.
        if (is_media) {
            cluster_.remove(key);
            cluster_.warmup_read_clients();
        }

        for (int attempt = 0; attempt < MAX_RETRIES; ++attempt) {
            if (attempt > 0) {
                std::cerr << "[CLUSTER-IMPORT-STREAM] replicate retry " << attempt
@@ -3146,32 +3156,14 @@ std::unique_ptr<ImportSession> ClusterMediaBackend::begin_import() {
        return false;
    };

    // Reserve block for streaming media on the same pclient path as uploads.
    // Disable streaming begin_store/finalize for import media. One-shot
    // replicate() follows the stable upload path and avoids import-only
    // finalize timeouts.
    auto begin_replicate_fn = [this](const std::string& key, size_t total_size)
            -> std::unique_ptr<ReplicateSession> {
        std::unique_ptr<paritypp::store_session> s;
        for (int attempt = 0; attempt < 3 && !s; ++attempt) {
            if (attempt > 0) {
                cluster_.warmup_read_clients();
                std::this_thread::sleep_for(
                    std::chrono::milliseconds(300 * (1 << attempt)));
            }
            s = cluster_.begin_replicate(key, total_size);
        }
        if (!s) return nullptr;
        // Wrap paritypp::store_session as mediadb::ReplicateSession
        class ClusterReplicateSession : public ReplicateSession {
        public:
            explicit ClusterReplicateSession(std::unique_ptr<paritypp::store_session> s)
                : session_(std::move(s)) {}
            void feed(const uint8_t* data, size_t len) override { session_->feed(data, len); }
            void finalize() override { session_->finalize(); }
            bool ok() const override { return session_->ok(); }
            std::string error_message() const override { return session_->error_message(); }
        private:
            std::unique_ptr<paritypp::store_session> session_;
        };
        return std::make_unique<ClusterReplicateSession>(std::move(s));
        (void)key;
        (void)total_size;
        return nullptr;
    };

    auto warmup_fn = [this]() {