Commit c8c5f038 authored by jan.koester's avatar jan.koester
Browse files

test

parent db64f92c
Loading
Loading
Loading
Loading
+10 −0
Original line number Diff line number Diff line
libhttppp (20260404+1) unstable; urgency=medium

  * Add streaming body callbacks: onH2StreamHeaders, onH3StreamHeaders,
    onH2DataChunk, onH3DataChunk for incremental body processing
  * Add sendH2StreamResponse / sendH3StreamResponse helpers
  * H2 DATA frames routed to onH2DataChunk when streaming=true
  * H3 incremental frame parsing with onH3DataChunk support

 -- Jan Koester <jan.koester@tuxist.de>  Fri, 04 Apr 2026 00:00:00 +0200

libhttppp (20260404) unstable; urgency=medium

  * Make RequestEvent/ResponseEvent/ConnectEvent/DisconnectEvent(con&)
+1 −0
Original line number Diff line number Diff line
@@ -396,6 +396,7 @@ namespace libhttppp {
      std::vector<uint8_t> rawHpack;      // accumulates HPACK across CONTINUATION frames
      bool headersComplete = false;       // true once END_HEADERS received
      bool endStreamOnHeaders = false;    // END_STREAM was on HEADERS frame
      bool streaming = false;             // body handled by onH2DataChunk callback
    };

    struct H2State {
+148 −24
Original line number Diff line number Diff line
@@ -844,6 +844,7 @@ reprocess:
                    auto &pending = cureq.h2state().pendingIncoming[sid];
                    pending.headers = std::move(decoded);
                    pending.headersComplete = true;
                    pending.streaming = onH2StreamHeaders(cureq, sid, pending.headers);
                }
            } else {
                // END_HEADERS not set: HPACK block continues in CONTINUATION
@@ -883,6 +884,7 @@ reprocess:
                        cureq.h2state().pendingIncoming.erase(it);
                        goto done;
                    }
                    it->second.streaming = onH2StreamHeaders(cureq, sid, it->second.headers);
                }
            }
            break;
@@ -891,24 +893,39 @@ reprocess:
        case H2_FRAME_DATA: {
            auto it = cureq.h2state().pendingIncoming.find(sid);
            if (it != cureq.h2state().pendingIncoming.end()) {
                it->second.body.append(data + off, flen);
                bool endStream = (fflags & H2_FLAG_END_STREAM) != 0;

                // Replenish flow-control windows so the client
                // can keep sending (both stream and connection).
                if (flen > 0) {
                    out += h2BuildWindowUpdate(sid, flen);
                    out += h2BuildWindowUpdate(0,   flen);
                }

                if (it->second.streaming) {
                    // Streaming mode: feed data to callback instead of accumulating
                    onH2DataChunk(cureq, sid, data + off, flen, endStream, out, tid, args);
                    if (endStream) {
                        off += flen;
                        cureq.RecvData.erase(cureq.RecvData.begin(),
                                             cureq.RecvData.begin() + off);
                        off = 0;
                        cureq.h2state().pendingIncoming.erase(it);
                        goto done;
                    }
                } else {
                    it->second.body.append(data + off, flen);

                    if (flen > 0) {
                        std::cerr << "[H2-RX-DATA] sid=" << sid
                                  << " flen=" << flen
                                  << " bodyTotal=" << it->second.body.size()
                                  << " outSize=" << out.size()
                              << " END_STREAM=" << ((fflags & H2_FLAG_END_STREAM) ? 1 : 0)
                                  << " END_STREAM=" << (endStream ? 1 : 0)
                                  << std::endl;
                    }

                if (fflags & H2_FLAG_END_STREAM) {
                    // Only dispatch once headers are fully received
                    // (CONTINUATION may still be pending)
                    if (endStream) {
                        if (it->second.headersComplete) {
                            off += flen;
                            cureq.RecvData.erase(cureq.RecvData.begin(),
@@ -922,6 +939,7 @@ reprocess:
                            goto done;
                        }
                    }
                }
            } else {
                std::cerr << "[H2-RX-DATA] sid=" << sid
                          << " flen=" << flen
@@ -1062,26 +1080,77 @@ void libhttppp::HttpEvent::Http3StreamEvent(netplus::socket *sock,
        return;
    }

    // Accumulate stream data until fin is received
    // Accumulate stream data
    {
        std::lock_guard<std::mutex> lock(_h3BufferMutex);
        auto &buf = _h3StreamBuffers[stream_id];
        auto &state = _h3StreamStates[stream_id];
        if (!data.empty()) {
            buf.data.insert(buf.data.end(), data.begin(), data.end());
            state.data.insert(state.data.end(), data.begin(), data.end());
        }
    }

    // Try to parse H3 frames incrementally (outside lock for callbacks).
    // QUIC guarantees ordered delivery per stream, so no concurrent calls
    // for the same stream_id.
    H3StreamState *sp = nullptr;
    {
        std::lock_guard<std::mutex> lock(_h3BufferMutex);
        auto it = _h3StreamStates.find(stream_id);
        if (it != _h3StreamStates.end()) sp = &it->second;
    }
    if (!sp) return;
    auto &state = *sp;

    while (state.parseOffset < state.data.size()) {
        size_t remaining = state.data.size() - state.parseOffset;
        const uint8_t *p = state.data.data() + state.parseOffset;

        size_t type_bytes = 0;
        uint64_t frame_type = h3DecodeVarInt(p, remaining, type_bytes);
        if (type_bytes == 0) break;
        if (type_bytes >= remaining) break;

        size_t len_bytes = 0;
        uint64_t frame_len = h3DecodeVarInt(p + type_bytes, remaining - type_bytes, len_bytes);
        if (len_bytes == 0) break;

        size_t header_size = type_bytes + len_bytes;
        if (state.parseOffset + header_size + frame_len > state.data.size()) break;

        const uint8_t *frame_data = p + header_size;

        if (frame_type == 0x01 && !state.headersParsed) {
            // HEADERS frame
            auto decoded = qpack::Decoder::decode(frame_data, frame_len);
            state.headersParsed = true;
            state.streaming = onH3StreamHeaders(sock, stream_id, decoded);
        } else if (frame_type == 0x00 && state.streaming) {
            // DATA frame in streaming mode — feed directly
            onH3DataChunk(sock, stream_id,
                         reinterpret_cast<const char*>(frame_data), frame_len, false);
        }
        if (!fin) {
            return; // Wait for more data

        state.parseOffset += header_size + frame_len;
    }

    if (!fin) return;

    if (state.streaming) {
        // Final callback with fin=true
        onH3DataChunk(sock, stream_id, nullptr, 0, true);
        std::lock_guard<std::mutex> lock(_h3BufferMutex);
        _h3StreamStates.erase(stream_id);
        return;
    }

    // fin received — extract the complete buffered data and remove entry
    // Non-streaming path: extract complete data and process as before
    std::vector<uint8_t> completeData;
    {
        std::lock_guard<std::mutex> lock(_h3BufferMutex);
        auto it = _h3StreamBuffers.find(stream_id);
        if (it != _h3StreamBuffers.end()) {
        auto it = _h3StreamStates.find(stream_id);
        if (it != _h3StreamStates.end()) {
            completeData = std::move(it->second.data);
            _h3StreamBuffers.erase(it);
            _h3StreamStates.erase(it);
        }
    }

@@ -1360,6 +1429,61 @@ void libhttppp::HttpEvent::Http3StreamEvent(netplus::socket *sock,
    }
}

// Default streaming callback implementations — return false / no-op
bool libhttppp::HttpEvent::onH2StreamHeaders(HttpRequest &, uint32_t,
                                             const std::vector<hpack::HeaderField> &) {
    return false;
}

bool libhttppp::HttpEvent::onH3StreamHeaders(netplus::socket *, uint64_t,
                                             const std::vector<qpack::HeaderField> &) {
    return false;
}

void libhttppp::HttpEvent::onH2DataChunk(HttpRequest &, uint32_t,
                                         const char *, size_t, bool,
                                         std::string &, const int, ULONG_PTR) {
}

void libhttppp::HttpEvent::onH3DataChunk(netplus::socket *, uint64_t,
                                         const char *, size_t, bool) {
}

// Helper: send a complete H2 response (HEADERS + DATA) on a stream
void libhttppp::HttpEvent::sendH2StreamResponse(std::string &h2out, uint32_t streamId,
                                                 uint16_t status,
                                                 const std::string &contentType,
                                                 const std::string &body) {
    h2out += h2BuildResponse(streamId, status, body, contentType);
}

// Helper: send a complete H3 response (HEADERS + DATA) on a stream
void libhttppp::HttpEvent::sendH3StreamResponse(netplus::socket *sock, uint64_t streamId,
                                                 uint16_t status,
                                                 const std::string &contentType,
                                                 const std::string &body) {
    auto *q = dynamic_cast<netplus::quic*>(sock);
    if (!q) return;
    auto hdr_block = qpack::Encoder::encodeResponseHeaders(
        status, contentType.empty() ? "application/json" : contentType,
        body.size());
    std::vector<uint8_t> response;
    uint8_t lenbuf[8];
    // HEADERS frame
    response.push_back(0x01);
    size_t lb = h3EncodeVarInt(hdr_block.size(), lenbuf);
    response.insert(response.end(), lenbuf, lenbuf + lb);
    response.insert(response.end(), hdr_block.begin(), hdr_block.end());
    // DATA frame
    if (!body.empty()) {
        response.push_back(0x00);
        lb = h3EncodeVarInt(body.size(), lenbuf);
        response.insert(response.end(), lenbuf, lenbuf + lb);
        response.insert(response.end(), body.begin(), body.end());
    }
    q->sendStreamData(streamId, response, true);
}

void libhttppp::HttpEvent::RequestEvent(netplus::con &curcon,const int tid,ULONG_PTR args){
    HttpRequest &cureq=dynamic_cast<HttpRequest&>(curcon);
    try{
+32 −3
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@
#include <vector>

#include "http.h"
#include "qpack.h"
#include "exception.h"

#pragma once
@@ -65,7 +66,31 @@ namespace libhttppp {
                                      const std::vector<uint8_t> &data,
                                      bool fin);

        // Streaming body callbacks for H2/H3.
        // Called when headers are complete for a body-bearing stream.
        // Return true to handle body data via onH2DataChunk/onH3DataChunk
        // instead of buffering the full body.
        virtual bool onH2StreamHeaders(HttpRequest &conn, uint32_t streamId,
                                       const std::vector<hpack::HeaderField> &headers);
        virtual bool onH3StreamHeaders(netplus::socket *sock, uint64_t streamId,
                                       const std::vector<qpack::HeaderField> &headers);

        // Called for each body data chunk when streaming is enabled.
        // endStream/fin: true on the last chunk.
        virtual void onH2DataChunk(HttpRequest &conn, uint32_t streamId,
                                   const char *data, size_t len, bool endStream,
                                   std::string &h2out, const int tid, ULONG_PTR args);
        virtual void onH3DataChunk(netplus::socket *sock, uint64_t streamId,
                                   const char *data, size_t len, bool fin);

    protected:
        // Helpers for sending a complete response on an H2/H3 stream.
        void sendH2StreamResponse(std::string &h2out, uint32_t streamId,
                                  uint16_t status, const std::string &contentType,
                                  const std::string &body);
        void sendH3StreamResponse(netplus::socket *sock, uint64_t streamId,
                                  uint16_t status, const std::string &contentType,
                                  const std::string &body);
        virtual void CreateConnection(std::shared_ptr<netplus::con> &res);

        virtual void RequestEvent(netplus::con &curcon, const int tid, ULONG_PTR args);
@@ -79,12 +104,16 @@ namespace libhttppp {
        // to enable abbreviated TLS 1.2 handshakes on client reconnection.
        netplus::TlsSessionCache _tlsSessionCache;
    private:
        // Per-stream buffer for HTTP/3: accumulates data until fin
        struct H3StreamBuffer {
        // Per-stream state for HTTP/3: accumulates data and supports
        // incremental H3 frame parsing for streaming body callbacks.
        struct H3StreamState {
            std::vector<uint8_t> data;
            bool headersParsed = false;
            bool streaming = false;
            size_t parseOffset = 0;
        };
        std::mutex                            _h3BufferMutex;
        std::map<uint64_t, H3StreamBuffer>    _h3StreamBuffers;
        std::map<uint64_t, H3StreamState>     _h3StreamStates;
        std::atomic<int>                      _h3NextTid{0};
        void _dispatchH2Stream(HttpRequest &cureq, std::string &out,
                               uint32_t sid,