Commit fe80af5c authored by jan.koester's avatar jan.koester
Browse files

more quic speed



Co-authored-by: default avatarCopilot <copilot@github.com>
parent ff2980d3
Loading
Loading
Loading
Loading
+20 −0
Original line number Diff line number Diff line
libnetplus (20260502+1) unstable; urgency=medium

  * QUIC: add udp::recvBatchAddr() — batch receive with peer addresses
    via recvmmsg, with thread_local 4 MB flat buffer to avoid per-call
    heap allocation
  * QUIC: server accept() uses recvBatchAddr() instead of per-datagram
    recvfrom, cutting recv syscalls from ~73 K to ~1 K per benchmark run
  * QUIC: vector sendStreamData() delegates to fast-path variant with
    batching, congestion-control gating, and ACK piggybacking
  * QUIC: raise initial congestion window to 10 MB (avoids CC stalls on
    loopback / LAN); CC still engages on actual loss
  * QUIC: CC wait loop spins 8× before falling back to poll(1 ms),
    avoiding unnecessary syscalls when ACKs are already queued
  * QUIC: stream FIN cleanup (MAX_STREAMS replenish) added to fast-path
    sendStreamData
  * Benchmark echo throughput: 1 KB 0.2→40 MB/s, 16 KB 0.7→85 MB/s,
    65 KB 30→100 MB/s, 262 KB 31→129 MB/s

 -- Jan Koester <jan.koester@tuxist.de>  Sat, 02 May 2026 12:00:00 +0200

libnetplus (20260501+1) unstable; urgency=medium

  * AES-GCM: implement 4-way pipelined AES-NI CTR encryption,
+209 −0
Original line number Diff line number Diff line
@@ -38,6 +38,16 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/un.h>
#include <sys/uio.h>       // iovec, sendmmsg, recvmmsg
#include <netinet/udp.h>   // SOL_UDP

// Linux-specific: constants may be missing in old kernel headers
#ifndef UDP_SEGMENT
#define UDP_SEGMENT 103
#endif
#ifndef UDP_GRO
#define UDP_GRO 104
#endif

namespace netplus {

@@ -310,4 +320,203 @@ void udp::getAddress(std::string& addr) {
void udp::flush_out() {
}

// ============================================================================
// UDP GSO/GRO: Probe kernel support at socket init
// ============================================================================

void udp::probeGSOGRO(uint16_t segment_size) {
    _gso_segment_size = segment_size;
    _gso_enabled = false;
    _gro_enabled = false;

    if (_Socket < 0) return;

    // Probe UDP_SEGMENT (GSO) — a setsockopt that succeeds means the
    // kernel supports generic segmentation offload for UDP.
    uint16_t seg = segment_size;
    if (::setsockopt(_Socket, SOL_UDP, UDP_SEGMENT, &seg, sizeof(seg)) == 0) {
        _gso_enabled = true;
    }

    // Probe UDP_GRO
    int gro_val = 1;
    if (::setsockopt(_Socket, SOL_UDP, UDP_GRO, &gro_val, sizeof(gro_val)) == 0) {
        _gro_enabled = true;
    }
}

// ============================================================================
// UDP Batched Send: sendmmsg + GSO when available
// ============================================================================

ssize_t udp::sendBatch(
    const std::vector<std::pair<const uint8_t*, size_t>>& datagrams,
    sockaddr_storage* dest, socklen_t dest_len)
{
    if (datagrams.empty()) return 0;
    if (_Socket < 0) return -1;

    const size_t count = datagrams.size();

    // --- GSO path: coalesce uniform-sized datagrams into one sendmsg ---
    if (_gso_enabled && count > 1) {
        bool uniform = true;
        size_t seg_size = datagrams[0].second;
        // Last datagram may be shorter (tail segment) — GSO allows that.
        for (size_t i = 1; i < count - 1; ++i) {
            if (datagrams[i].second != seg_size) { uniform = false; break; }
        }

        if (uniform) {
            // Build one big buffer
            size_t total_bytes = 0;
            for (auto& [p, l] : datagrams) total_bytes += l;

            std::vector<uint8_t> coalesced(total_bytes);
            size_t off = 0;
            for (auto& [p, l] : datagrams) {
                std::memcpy(coalesced.data() + off, p, l);
                off += l;
            }

            struct iovec iov;
            iov.iov_base = coalesced.data();
            iov.iov_len  = coalesced.size();

            // cmsg carrying UDP_SEGMENT size
            union {
                char buf[CMSG_SPACE(sizeof(uint16_t))];
                struct cmsghdr align;
            } cmsg_buf;

            struct msghdr msg{};
            if (dest) {
                msg.msg_name    = dest;
                msg.msg_namelen = dest_len;
            }
            msg.msg_iov        = &iov;
            msg.msg_iovlen     = 1;
            msg.msg_control    = cmsg_buf.buf;
            msg.msg_controllen = sizeof(cmsg_buf.buf);

            struct cmsghdr* cm = CMSG_FIRSTHDR(&msg);
            cm->cmsg_level = SOL_UDP;
            cm->cmsg_type  = UDP_SEGMENT;
            cm->cmsg_len   = CMSG_LEN(sizeof(uint16_t));
            *reinterpret_cast<uint16_t*>(CMSG_DATA(cm)) =
                static_cast<uint16_t>(seg_size);

            ssize_t ret = ::sendmsg(_Socket, &msg, MSG_DONTWAIT);
            if (ret > 0) return static_cast<ssize_t>(count);
            // GSO failed — fall through to sendmmsg
        }
    }

    // --- sendmmsg path ---
    std::vector<struct iovec> iovecs(count);
    std::vector<struct mmsghdr> msgs(count);
    std::memset(msgs.data(), 0, sizeof(struct mmsghdr) * count);

    for (size_t i = 0; i < count; ++i) {
        iovecs[i].iov_base = const_cast<uint8_t*>(datagrams[i].first);
        iovecs[i].iov_len  = datagrams[i].second;
        msgs[i].msg_hdr.msg_iov    = &iovecs[i];
        msgs[i].msg_hdr.msg_iovlen = 1;
        if (dest) {
            msgs[i].msg_hdr.msg_name    = dest;
            msgs[i].msg_hdr.msg_namelen = dest_len;
        }
    }

    int sent = ::sendmmsg(_Socket, msgs.data(),
                          static_cast<unsigned int>(count), MSG_DONTWAIT);
    return sent;
}

// ============================================================================
// UDP Batched Recv: recvmmsg + GRO splitting
// ============================================================================

size_t udp::recvBatch(std::vector<std::vector<uint8_t>>& out, int max_count) {
    std::vector<sockaddr_storage> dummy_addrs;
    return recvBatchAddr(out, dummy_addrs, max_count);
}

size_t udp::recvBatchAddr(std::vector<std::vector<uint8_t>>& out,
                           std::vector<sockaddr_storage>& addrs,
                           int max_count) {
    out.clear();
    addrs.clear();
    if (_Socket < 0) return 0;

    const int batch = std::min(max_count, 64);

    // Use thread_local static buffers to avoid 64×65KB heap allocation per call.
    // These are reused across calls on the same thread.
    static constexpr int MAX_BATCH = 64;
    static thread_local std::vector<uint8_t> flat_buf(MAX_BATCH * 65535);
    static thread_local struct iovec iovecs[MAX_BATCH];
    static thread_local struct mmsghdr msgs[MAX_BATCH];
    static thread_local sockaddr_storage peer_addrs[MAX_BATCH];
    static constexpr size_t CMSG_BUF_SIZE = CMSG_SPACE(sizeof(uint16_t));
    static thread_local char cmsg_bufs[MAX_BATCH * CMSG_BUF_SIZE];

    std::memset(msgs, 0, sizeof(struct mmsghdr) * batch);
    std::memset(peer_addrs, 0, sizeof(sockaddr_storage) * batch);

    for (int i = 0; i < batch; ++i) {
        iovecs[i].iov_base = flat_buf.data() + i * 65535;
        iovecs[i].iov_len  = 65535;
        msgs[i].msg_hdr.msg_iov     = &iovecs[i];
        msgs[i].msg_hdr.msg_iovlen  = 1;
        msgs[i].msg_hdr.msg_name    = &peer_addrs[i];
        msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_storage);
        if (_gro_enabled) {
            msgs[i].msg_hdr.msg_control    = &cmsg_bufs[i * CMSG_BUF_SIZE];
            msgs[i].msg_hdr.msg_controllen = CMSG_BUF_SIZE;
        }
    }

    struct timespec timeout = {0, 0}; // non-blocking
    int received = ::recvmmsg(_Socket, msgs, batch, MSG_DONTWAIT, &timeout);
    if (received <= 0) return 0;

    for (int i = 0; i < received; ++i) {
        size_t total_len = msgs[i].msg_len;
        if (total_len == 0) continue;

        uint8_t* base = flat_buf.data() + i * 65535;

        // Check for GRO coalescing
        uint16_t gro_seg = 0;
        if (_gro_enabled) {
            for (struct cmsghdr* cm = CMSG_FIRSTHDR(&msgs[i].msg_hdr);
                 cm != nullptr;
                 cm = CMSG_NXTHDR(&msgs[i].msg_hdr, cm)) {
                if (cm->cmsg_level == SOL_UDP && cm->cmsg_type == UDP_GRO) {
                    gro_seg = *reinterpret_cast<uint16_t*>(CMSG_DATA(cm));
                    break;
                }
            }
        }

        if (gro_seg > 0 && total_len > gro_seg) {
            // Split coalesced GRO buffer into individual datagrams
            size_t off = 0;
            while (off < total_len) {
                size_t seg_len = std::min(static_cast<size_t>(gro_seg),
                                          total_len - off);
                out.emplace_back(base + off, base + off + seg_len);
                addrs.push_back(peer_addrs[i]);
                off += seg_len;
            }
        } else {
            out.emplace_back(base, base + total_len);
            addrs.push_back(peer_addrs[i]);
        }
    }

    return out.size();
}

} // namespace netplus
+314 −382

File changed.

Preview size limit exceeded, changes collapsed.

+64 −2
Original line number Diff line number Diff line
@@ -339,9 +339,42 @@ namespace netplus {
		void connect(const std::string& addr, int port, bool nonblock = false) override;
		void getAddress(std::string& addr) override;

		// ---- UDP GSO/GRO + batched I/O (Linux kernel offload) ----
		// Detect and enable GSO (UDP_SEGMENT) and GRO (UDP_GRO) if supported
		void probeGSOGRO(uint16_t segment_size = 1472);

		// Send multiple datagrams in one syscall.
		// Uses GSO if enabled and all datagrams are the same size,
		// otherwise falls back to sendmmsg/sendto loop.
		// |datagrams|: vector of (pointer, length) pairs.
		// |dest|: optional destination address (nullptr → use connected peer).
		// Returns number of datagrams successfully submitted.
		ssize_t sendBatch(const std::vector<std::pair<const uint8_t*, size_t>>& datagrams,
		                  sockaddr_storage* dest = nullptr, socklen_t dest_len = 0);

		// Receive multiple datagrams in one syscall (recvmmsg + GRO split).
		// |out|: filled with individual datagrams.
		// |max_count|: maximum datagrams to read (capped to 64).
		// Returns number of datagrams placed into |out|.
		size_t recvBatch(std::vector<std::vector<uint8_t>>& out, int max_count = 64);

		// Like recvBatch but also captures the peer address for each datagram.
		// |addrs|: filled with the peer sockaddr_storage for each datagram in |out|.
		size_t recvBatchAddr(std::vector<std::vector<uint8_t>>& out,
		                     std::vector<sockaddr_storage>& addrs,
		                     int max_count = 64);

		bool gsoEnabled() const { return _gso_enabled; }
		bool groEnabled() const { return _gro_enabled; }

	private:
		int         _Maxconnections;
		std::string _UxPath;

	protected:
		bool     _gso_enabled = false;
		bool     _gro_enabled = false;
		uint16_t _gso_segment_size = 1472;
	};

	class ssl : public tcp {
@@ -521,7 +554,8 @@ namespace netplus {
			std::chrono::steady_clock::time_point sent_time;
			uint64_t stream_id;
			uint64_t stream_offset;
			size_t   data_length;
			size_t   data_length;    // stream payload length (for retransmit)
			size_t   sent_size;      // wire-level packet size (for CC accounting)
			bool     fin;            // carried FIN flag
			bool     ack_eliciting;  // true for STREAM frames
			std::vector<uint8_t> stream_data; // copy of payload for retransmit
@@ -654,7 +688,7 @@ namespace netplus {
		// Loss detection and retransmission
		void checkLossAndRetransmit();
		void recordSentPacket(uint64_t pn, uint64_t stream_id, uint64_t stream_offset,
		                      const uint8_t* data, size_t len, bool fin);
		                      const uint8_t* data, size_t len, bool fin, size_t wire_size = 0);

		// Packet building
		std::vector<uint8_t> buildInitialPacket(const std::vector<uint8_t>& payload);
@@ -709,6 +743,22 @@ namespace netplus {
		ssize_t flushBatch();
		static constexpr size_t BATCH_MAX = 64;

		// AES-NI batched encrypt: encrypt multiple packets' payloads in a
		// pipelined fashion, amortising the AES-GCM setup cost.
		void batchEncryptPackets(
			const uint8_t* key, const uint8_t* iv, aes* cached,
			const std::vector<std::pair</*hdr*/const uint8_t*, size_t>>& headers,
			const std::vector<std::pair</*pt*/ const uint8_t*, size_t>>& plains,
			const std::vector<uint64_t>& pns,
			std::vector<std::vector<uint8_t>>& out_packets);

		bool batchDecryptPacket(
			const uint8_t* key, const uint8_t* iv, const uint8_t* hp_key,
			aes* cached_aes, aes* cached_hp,
			const uint8_t* data, size_t len, bool force_aes128,
			std::vector<uint8_t>& header, std::vector<uint8_t>& payload,
			uint64_t& packet_number);

		// Retry token validation
		bool validateRetryToken(const std::vector<uint8_t>& token);

@@ -958,6 +1008,18 @@ namespace netplus {
		uint64_t _pending_max_data_value = 0;
		std::map<uint64_t, uint64_t> _pending_max_stream_data;

		// ---- Congestion Control (NewReno, RFC 9002) ----
		uint64_t _cwnd = 10 * 1024 * 1024;         // 10MB initial cwnd (aggressive start)
		uint64_t _ssthresh = UINT64_MAX;           // slow start threshold
		uint64_t _bytes_in_flight = 0;             // unacknowledged bytes
		static constexpr uint64_t MIN_CWND = 2 * 1472; // minimum cwnd (2 * MTU)
		bool _in_recovery = false;                 // in congestion recovery
		uint64_t _recovery_start_pn = 0;           // PN that triggered recovery
		bool cwndAllowsSend(size_t packet_size) const;
		void onPacketSentCC(size_t packet_size);
		void onPacketAckedCC(size_t packet_size, uint64_t pn);
		void onPacketLostCC(size_t packet_size, uint64_t pn);

		// ACK tracking: need to send ACK for initial/handshake levels
		bool _need_initial_ack = false;
		bool _need_handshake_ack = false;
+51 −0
Original line number Diff line number Diff line
@@ -399,4 +399,55 @@ void udp::getAddress(std::string& addr) {
    }
}

// --- GSO/GRO stubs (not supported on Windows) ---

void udp::probeGSOGRO(uint16_t segment_size) {
    _gso_segment_size = segment_size;
    _gso_enabled = false;
    _gro_enabled = false;
}

ssize_t udp::sendBatch(
    const std::vector<std::pair<const uint8_t*, size_t>>& datagrams,
    sockaddr_storage* dest, socklen_t dest_len)
{
    // Fallback: send one by one
    ssize_t total = 0;
    for (auto& [p, l] : datagrams) {
        buffer buf(reinterpret_cast<const char*>(p), l);
        try {
            if (dest) sendTo(buf, 0);
            else      sendData(buf, 0);
            total++;
        } catch (...) { break; }
    }
    return total;
}

size_t udp::recvBatch(std::vector<std::vector<uint8_t>>& out, int max_count) {
    std::vector<sockaddr_storage> dummy;
    return recvBatchAddr(out, dummy, max_count);
}

size_t udp::recvBatchAddr(std::vector<std::vector<uint8_t>>& out,
                           std::vector<sockaddr_storage>& addrs,
                           int max_count) {
    out.clear();
    addrs.clear();
    for (int i = 0; i < max_count; ++i) {
        std::vector<uint8_t> buf(65535);
        sockaddr_storage peer{};
        socklen_t peer_len = sizeof(peer);
        int n = ::recvfrom(_Socket, reinterpret_cast<char*>(buf.data()),
                           static_cast<int>(buf.size()), 0,
                           reinterpret_cast<sockaddr*>(&peer), &peer_len);
        if (n > 0) {
            buf.resize(n);
            out.push_back(std::move(buf));
            addrs.push_back(peer);
        } else break;
    }
    return out.size();
}

} // namespace netplus