Commit 33e7bd95 authored by jan.koester's avatar jan.koester
Browse files

test

parent 7a951019
Loading
Loading
Loading
Loading
+71 −58
Original line number Diff line number Diff line
@@ -1165,8 +1165,10 @@ void libhttppp::HttpEvent::Http3StreamEvent(netplus::socket *sock,
    auto decoded = libhttppp::qpack::Decoder::decode(
        headersPayload.data(), headersPayload.size());

    // Create a temporary HttpRequest and parse via parseH3
    HttpRequest tempreq;
    // Create a heap-allocated HttpRequest (like H2 path) so it can
    // be moved into a background thread for streaming responses.
    auto tempreq_ptr = std::make_unique<HttpRequest>();
    HttpRequest &tempreq = *tempreq_ptr;
    tempreq.parseH3(decoded);

    // Store POST body in RecvData so HttpForm::parse() can access it
@@ -1271,69 +1273,80 @@ void libhttppp::HttpEvent::Http3StreamEvent(netplus::socket *sock,
            body.clear();
            body.shrink_to_fit();

            if (initial_complete) {
                // Entire body was sent with initial DATA frame
                tempreq.deldata(":res-valid");
                tempreq.deldata(":res-status");
                tempreq.deldata(":res-content-type");
                tempreq.deldata(":res-content-length");
                return;
            }

            // Event-driven streaming: move tempreq to a background
            // thread (like H2's _resumeH2Streams) so we don't block
            // the QUIC callback.  The thread calls ResponseEvent in
            // a loop, sending DATA frames as upstream produces data,
            // with proper sleeps between empty iterations.
            tempreq.SendData.pos = 0;
            size_t max_iter = content_length / CHUNKSIZE + 4096;
            std::thread([this, q, stream_id, h3tid,
                         tr = std::move(tempreq_ptr),
                         content_length, total_sent]() mutable {
                size_t sent = total_sent;
                size_t empty_streak = 0;
            const size_t max_empty = 600;
                static constexpr size_t max_empty = 500;
                size_t pump_counter = 0;
            for (size_t i = 0;
                 i < max_iter && total_sent < content_length;
                 ++i) {
                // Drain incoming QUIC datagrams every iteration so ACKs
                // and flow-control updates (MAX_DATA, MAX_STREAM_DATA)
                // are processed promptly while this synchronous loop runs.
                uint8_t lbuf[8];

                while (sent < content_length) {
                    if (++pump_counter % 4 == 0)
                        q->pumpIncoming();

                ResponseEvent(tempreq, h3tid, 0);
                size_t sa = tempreq.SendData.size();
                    ResponseEvent(*tr, h3tid, 0);
                    size_t sa = tr->SendData.size();

                    if (sa > 0) {
                    // Cap to remaining content-length to avoid over-send
                    size_t remaining = content_length > total_sent
                                     ? content_length - total_sent : 0;
                    if (sa > remaining)
                        sa = remaining;
                    // Build H3 DATA frame and send incrementally
                        size_t remaining = content_length - sent;
                        if (sa > remaining) sa = remaining;

                        std::vector<uint8_t> data_frame;
                        data_frame.reserve(1 + 8 + sa);
                        data_frame.push_back(0x00); // DATA frame type
                    lb = h3EncodeVarInt(sa, lenbuf);
                    data_frame.insert(data_frame.end(), lenbuf, lenbuf + lb);
                        size_t lb = h3EncodeVarInt(sa, lbuf);
                        data_frame.insert(data_frame.end(), lbuf, lbuf + lb);
                        data_frame.insert(data_frame.end(),
                                      tempreq.SendData.data(),
                                      tempreq.SendData.data() + sa);
                    tempreq.SendData.clear();
                    total_sent += sa;
                    bool last = (total_sent >= content_length);
                                          tr->SendData.data(),
                                          tr->SendData.data() + sa);
                        tr->SendData.clear();
                        sent += sa;
                        bool last = (sent >= content_length);
                        q->sendStreamData(stream_id, data_frame, last);
                        empty_streak = 0;
                    } else {
                    // No data yet — drain QUIC to unblock flow control
                        q->pumpIncoming();
                    if (tempreq.slots[0].ResponsePending.exchange(false)) {
                        if (tr->slots[0].ResponsePending.exchange(false)) {
                            empty_streak = 0;
                        } else {
                            ++empty_streak;
                        }
                    if (empty_streak >= max_empty)
                        if (empty_streak >= max_empty) {
                            std::cerr << "[H3-STREAM] STALLED sid="
                                      << stream_id << " sent="
                                      << sent << " / "
                                      << content_length << std::endl;
                            break;
                    // Sleep briefly to give upstream time to produce data.
                    // yield() is essentially a no-op on Linux and burns
                    // through max_empty in microseconds, causing premature
                    // stream termination (504) for slow upstreams.
                    std::this_thread::sleep_for(std::chrono::milliseconds(10));
                        }
                        // Sleep to let upstream produce data — mirrors
                        // H2's event-driven break-and-resume approach.
                        std::this_thread::sleep_for(
                            std::chrono::milliseconds(10));
                    }
                }

            // If we didn't finish, close the stream anyway
            if (total_sent < content_length) {
                q->sendStreamData(stream_id,nullptr, true);
                if (sent < content_length) {
                    q->sendStreamData(stream_id, {}, true);
                }
            }).detach();

            // Clean up :res-* pseudo-headers
            tempreq.deldata(":res-valid");
            tempreq.deldata(":res-status");
            tempreq.deldata(":res-content-type");
            tempreq.deldata(":res-content-length");
            return;
        }