Commit a75c7ee4 authored by jan.koester's avatar jan.koester
Browse files

deb

parent f9c4d022
Loading
Loading
Loading
Loading
+18 −0
Original line number Diff line number Diff line
mediadb (20260503+13) unstable; urgency=high

  * cluster/sync: add single-worker scheduler for SYNC/REPAIR jobs so
    background and on-demand sync tasks are serialized instead of running
    concurrently across threads
  * backend: route sync_from_cluster() through scheduler (blocking request)
    to reduce lock contention and import-time race windows

 -- Jan Koester <jan.koester@tuxist.de>  Sun, 03 May 2026 21:30:00 +0200

mediadb (20260503+12) unstable; urgency=high

  * cluster/sync: re-check import flags during index fetch/retry failure path
    and abort quietly when import becomes active mid-fetch (prevents false
    "index fetch failed" noise and avoids sync/import contention)

 -- Jan Koester <jan.koester@tuxist.de>  Sun, 03 May 2026 21:15:00 +0200

mediadb (20260503+11) unstable; urgency=high

  * cluster/sync: add runtime setting MEDIADB_SYNC_INTERVAL_SEC (1..300,
+106 −20
Original line number Diff line number Diff line
@@ -2623,6 +2623,13 @@ void ClusterMediaBackend::start_sync() {
                  << "/5 incomplete, retrying in 2s\n";
        std::this_thread::sleep_for(std::chrono::seconds(2));
    }
    {
        std::lock_guard<std::mutex> lock(sync_mutex_);
        sync_requested_ = false;
        repair_requested_ = false;
        sync_ticket_ = 0;
        sync_done_ticket_ = 0;
    }
    sync_running_ = true;
    sync_thread_ = std::thread(&ClusterMediaBackend::sync_loop, this);
}
@@ -2633,6 +2640,22 @@ ClusterMediaBackend::~ClusterMediaBackend() {
    if (sync_thread_.joinable()) sync_thread_.join();
}

void ClusterMediaBackend::request_sync(bool wait_for_completion) {
    std::unique_lock<std::mutex> lock(sync_mutex_);
    sync_requested_ = true;
    const std::uint64_t ticket = ++sync_ticket_;
    sync_cv_.notify_all();
    if (wait_for_completion) {
        sync_cv_.wait(lock, [this, ticket] {
            return !sync_running_.load() || sync_done_ticket_ >= ticket;
        });
    }
}

bool ClusterMediaBackend::import_active() const {
    return importing_.load() || cluster_.isImportRunning();
}

// ---- stores ----
StoreRecord ClusterMediaBackend::create_store(const std::string& name) {
    std::unique_lock<std::shared_mutex> cguard(cluster_op_mutex_);
@@ -3289,6 +3312,18 @@ void ClusterMediaBackend::vacuum_all_stores() {

// ---- cluster sync ----
void ClusterMediaBackend::sync_from_cluster() {
    // Route all sync requests through the scheduler so sync/repair runs
    // serialize behind one worker instead of interleaving across threads.
    if (sync_running_.load()) {
        request_sync(true);
        return;
    }

    // Before scheduler startup (initial boot), run sync inline.
    sync_from_cluster_now();
}

void ClusterMediaBackend::sync_from_cluster_now() {
    if (!cluster_.isRunning()) {
        std::cerr << "[CLUSTER-SYNC] cluster not running, skipping sync\n";
        return;
@@ -3302,9 +3337,12 @@ void ClusterMediaBackend::sync_from_cluster() {
        return;
    }

    auto import_active = [this]() {
        return cluster_.isImportRunning() || importing_.load();
    };
    auto import_active = [this]() { return this->import_active(); };

    if (import_active()) {
        std::cerr << "[CLUSTER-SYNC] import active, aborting before index scan\n";
        return;
    }

    uint64_t index_gid = cluster_group_id("index");
    bool has_index = false;
@@ -3322,14 +3360,30 @@ void ClusterMediaBackend::sync_from_cluster() {
    // Fetch index from cluster (contains store/album/ACL metadata, no media data)
    std::vector<uint8_t> index_data;
    bool fetch_ok = cluster_.fetch("index", index_data);
    if (import_active()) {
        std::cerr << "[CLUSTER-SYNC] import became active during index fetch, aborting sync\n";
        return;
    }
    if (!fetch_ok || index_data.empty()) {
        if (import_active()) {
            std::cerr << "[CLUSTER-SYNC] import active, skipping index fetch retry\n";
            return;
        }
        // Retry once after warmup — connections may be stale after heavy
        // import replication traffic on peers.
        cluster_.warmup_read_clients();
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
        fetch_ok = cluster_.fetch("index", index_data);
        if (import_active()) {
            std::cerr << "[CLUSTER-SYNC] import became active during index fetch retry, aborting sync\n";
            return;
        }
    }
    if (!fetch_ok || index_data.empty()) {
        if (import_active()) {
            std::cerr << "[CLUSTER-SYNC] import active after index fetch failure, suppressing sync error\n";
            return;
        }
        if (!has_index) {
            std::cerr << "[CLUSTER-SYNC] no index exists on any peer — fresh/empty cluster\n";
            initial_sync_ok_.store(true);
@@ -3755,30 +3809,62 @@ void ClusterMediaBackend::repair_replication() {

void ClusterMediaBackend::sync_loop() {
    int cycle = 0;
    while (sync_running_) {
        const int interval_sec = sync_interval_seconds();
    auto next_sync = std::chrono::steady_clock::now()
                   + std::chrono::seconds(sync_interval_seconds());

    while (sync_running_.load()) {
        bool do_sync = false;
        bool do_repair = false;
        std::uint64_t done_ticket = 0;

        {
            std::unique_lock<std::mutex> lock(sync_mutex_);
            sync_cv_.wait_for(lock, std::chrono::seconds(interval_sec),
                              [this]{ return !sync_running_.load(); });
        }
        if (!sync_running_) break;
        if (importing_.load() || cluster_.isImportRunning()) continue;
        try {
            sync_from_cluster();
            // Run repair every 6th cycle (~30s).
            // repair_replication() can fix index even before initial_sync_ok_
            // so nodes stuck in "fetch failed" can recover.
            sync_cv_.wait_until(lock, next_sync, [this] {
                return !sync_running_.load() || sync_requested_ || repair_requested_;
            });
            if (!sync_running_.load()) break;

            const auto now = std::chrono::steady_clock::now();
            if (now >= next_sync) {
                sync_requested_ = true;
                if (++cycle >= 6) {
                    cycle = 0;
                if (!importing_.load() && !cluster_.isImportRunning()) {
                    repair_replication();
                    repair_requested_ = true;
                }
                next_sync = now + std::chrono::seconds(sync_interval_seconds());
            }

            if (import_active()) {
                // Keep queued jobs pending while import is active.
                continue;
            }

            if (sync_requested_) {
                sync_requested_ = false;
                do_sync = true;
                done_ticket = sync_ticket_;
            } else if (repair_requested_) {
                repair_requested_ = false;
                do_repair = true;
            }
        }

        try {
            if (do_sync) {
                sync_from_cluster_now();
            } else if (do_repair) {
                repair_replication();
            }
        } catch (const std::exception& e) {
            std::cerr << "[CLUSTER-SYNC] periodic sync failed: " << e.what() << "\n";
            std::cerr << "[CLUSTER-SYNC] scheduled task failed: " << e.what() << "\n";
        } catch (...) {
            std::cerr << "[CLUSTER-SYNC] periodic sync failed (unknown)\n";
            std::cerr << "[CLUSTER-SYNC] scheduled task failed (unknown)\n";
        }

        if (do_sync) {
            std::lock_guard<std::mutex> lock(sync_mutex_);
            if (sync_done_ticket_ < done_ticket) sync_done_ticket_ = done_ticket;
            sync_cv_.notify_all();
        }
    }
}
+7 −0
Original line number Diff line number Diff line
@@ -344,6 +344,9 @@ public:
    BinDb& local();

private:
    void request_sync(bool wait_for_completion);
    bool import_active() const;
    void sync_from_cluster_now();
    void replicate_index(bool force = false);
    void replicate_store(const std::string& store_id);
    void replicate_tombstones();
@@ -361,6 +364,10 @@ private:
    std::thread sync_thread_;
    std::mutex sync_mutex_;
    std::condition_variable sync_cv_;
    bool sync_requested_ = false;
    bool repair_requested_ = false;
    std::uint64_t sync_ticket_ = 0;
    std::uint64_t sync_done_ticket_ = 0;
    mutable std::shared_mutex cluster_op_mutex_;  // protects write+replicate+read vs sync_from_cluster

    BlobCache& cache_;