Commit e0a08db3 authored by jan.koester's avatar jan.koester
Browse files

deb

parent a75c7ee4
Loading
Loading
Loading
Loading
+11 −0
Original line number Diff line number Diff line
mediadb (20260503+14) unstable; urgency=high

  * server/import (H1/H2/H3): do not finish import session immediately on
    stream-end while parser still has deferred operations
  * server/import: drain deferred import ops before finish_import() so global
    import flags stay active until import is actually complete
  * fixes sync/tombstone runs interleaving with active import feed when
    network stream finishes before parser/deferred stage

 -- Jan Koester <jan.koester@tuxist.de>  Sun, 03 May 2026 21:55:00 +0200

mediadb (20260503+13) unstable; urgency=high

  * cluster/sync: add single-worker scheduler for SYNC/REPAIR jobs so
+68 −16
Original line number Diff line number Diff line
@@ -72,10 +72,10 @@ void MediaHttpEvent::RequestEvent(netplus::con &curcon, const int tid, ULONG_PTR
            // No data available yet — yield CPU to avoid busy-loop
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
        if (state.bytes_written >= state.content_length || state.session->done()) {
            bool ok = state.session->ok();
            std::string errmsg = ok ? "" : state.session->error_message();
            if (errmsg.empty() && !ok) errmsg = "import failed";
        bool ok = false;
        std::string errmsg;
        bool input_finished = (state.bytes_written >= state.content_length);
        if (should_finalize_import(state, input_finished, ok, errmsg)) {
            active_imports_.erase(it);
            app_.finish_import(ok, errmsg);
            if (ok) {
@@ -162,10 +162,10 @@ void MediaHttpEvent::RequestEvent(netplus::con &curcon, const int tid, ULONG_PTR
            std::cerr << "[IMPORT-STREAM] after initial feed: bytes_written=" << state.bytes_written
                      << " clen=" << clen << " done=" << state.session->done()
                      << " ok=" << state.session->ok() << "\n";
            if (state.bytes_written >= clen || state.session->done()) {
                bool ok = state.session->ok();
                std::string errmsg = ok ? "" : state.session->error_message();
                if (errmsg.empty() && !ok) errmsg = "import failed";
            bool ok = false;
            std::string errmsg;
            bool input_finished = (state.bytes_written >= clen);
            if (should_finalize_import(state, input_finished, ok, errmsg)) {
                app_.finish_import(ok, errmsg);
                if (ok) {
                    send_response(cureq, HttpResponse::json(200,
@@ -476,6 +476,60 @@ static std::string json_ok() {
    return out;
}

static bool drain_import_deferred(ImportState &state, std::string &error) {
    constexpr int kMaxRounds = 2048;
    for (int round = 0; round < kMaxRounds; ++round) {
        if (state.session->done()) return true;
        auto ops = state.session->take_deferred();
        if (ops.empty()) return state.session->done();
        for (auto &op : ops) {
            try {
                op();
            } catch (const std::exception &e) {
                error = std::string("import deferred op failed: ") + e.what();
                return false;
            } catch (...) {
                error = "import deferred op failed (unknown)";
                return false;
            }
        }
    }
    error = "import deferred op drain limit reached";
    return false;
}

static bool should_finalize_import(ImportState &state, bool input_finished,
                                   bool &ok_out, std::string &errmsg_out) {
    ok_out = false;
    errmsg_out.clear();

    if (!state.session) {
        errmsg_out = "import session missing";
        return true;
    }

    if (input_finished && !state.session->done()) {
        std::string drain_error;
        (void)drain_import_deferred(state, drain_error);
        if (!drain_error.empty()) errmsg_out = drain_error;
    }

    if (!state.session->done() && !input_finished) {
        return false;
    }

    ok_out = state.session->ok();
    if (!ok_out) {
        if (errmsg_out.empty()) errmsg_out = state.session->error_message();
        if (errmsg_out.empty()) {
            errmsg_out = input_finished
                ? "import stream finished before parser completed"
                : "import failed";
        }
    }
    return true;
}

bool MediaHttpEvent::onH2StreamHeaders(libhttppp::HttpRequest &conn, uint32_t streamId,
                                       const std::vector<libhttppp::hpack::HeaderField> &headers) {
    // Check if this is PUT /api/import
@@ -524,10 +578,9 @@ void MediaHttpEvent::onH2DataChunk(libhttppp::HttpRequest &conn, uint32_t stream
        state.bytes_written += len;
    }

    if (endStream || state.session->done()) {
        bool ok = state.session->ok();
        std::string errmsg = ok ? "" : state.session->error_message();
        if (errmsg.empty() && !ok) errmsg = "import failed";
    bool ok = false;
    std::string errmsg;
    if (should_finalize_import(state, endStream, ok, errmsg)) {
        h2_imports_.erase(it);
        app_.finish_import(ok, errmsg);

@@ -594,10 +647,9 @@ void MediaHttpEvent::onH3DataChunk(netplus::socket *sock, uint64_t streamId,
        state.bytes_written += len;
    }

    if (fin || state.session->done()) {
        bool ok = state.session->ok();
        std::string errmsg = ok ? "" : state.session->error_message();
        if (errmsg.empty() && !ok) errmsg = "import failed";
    bool ok = false;
    std::string errmsg;
    if (should_finalize_import(state, fin, ok, errmsg)) {
        h3_imports_.erase(it);
        app_.finish_import(ok, errmsg);