Loading src/quic.cpp +66 −1 Original line number Diff line number Diff line Loading @@ -2764,6 +2764,13 @@ void quic::pumpNetwork(int flags) { _stream_callback(this, pd.stream_id, pd.data, pd.fin); } } // Loss detection + PTO probes: retransmit lost/stale packets so the // peer can make progress even when the caller is not sending new data. { std::lock_guard<std::recursive_mutex> lock(quic_mtx()); checkLossAndRetransmit(); } } // ============================================================================ Loading Loading @@ -3603,9 +3610,67 @@ void quic::recordSentPacket(uint64_t pn, uint64_t stream_id, uint64_t stream_off // ============================================================================ void quic::checkLossAndRetransmit() { if (_sent_packets.empty() || _largest_acked_pn == UINT64_MAX) return; if (_sent_packets.empty()) return; auto now = std::chrono::steady_clock::now(); // PTO (Probe Timeout) — RFC 9002 §6.2 // When the largest-acked PN hasn't advanced past our oldest unacked // packets (tail loss), retransmit the oldest unacked packet to elicit // an ACK and bootstrap the normal loss-detection cycle. if (!_sent_packets.empty()) { double pto = _rtt_initialized ? (_srtt + std::max(4.0 * _rttvar, 0.001) + 0.025) : _rto; // Exponential back-off: double PTO each consecutive probe double pto_thresh = pto * (1 << std::min(_pto_count, 5)); auto& oldest = _sent_packets.begin()->second; double age = std::chrono::duration<double>(now - oldest.sent_time).count(); if (age >= pto_thresh) { // Retransmit the oldest unacked packet as a probe auto sp = oldest; // copy — will be erased below uint64_t old_pn = _sent_packets.begin()->first; _sent_packets.erase(_sent_packets.begin()); ++_pto_count; if (sp.data_length > 0 || sp.fin) { uint8_t frame_type = 0x08 | 0x02; if (sp.stream_offset > 0) frame_type |= 0x04; if (sp.fin) frame_type |= 0x01; std::vector<uint8_t> frame; frame.reserve(1 + 8 + 8 + 8 + sp.data_length); frame.push_back(frame_type); uint8_t vb[8]; size_t vl; vl = encodeVarInt(sp.stream_id, vb); frame.insert(frame.end(), vb, vb + vl); if (sp.stream_offset > 0) { vl = encodeVarInt(sp.stream_offset, vb); frame.insert(frame.end(), vb, vb + vl); } vl = encodeVarInt(sp.data_length, vb); frame.insert(frame.end(), vb, vb + vl); frame.insert(frame.end(), sp.stream_data.begin(), sp.stream_data.end()); uint64_t new_pn = _app_pn_send; std::vector<uint8_t> packet = buildShortHeaderPacket(frame); const uint8_t* hp_key = _is_server ? _app_hp_server : _app_hp_client; applyHeaderProtection(packet, hp_key); ssize_t sent = sendPacket(packet.data(), packet.size()); if (sent < 0) sent = sendPacket(packet.data(), packet.size()); if (sent > 0) { recordSentPacket(new_pn, sp.stream_id, sp.stream_offset, sp.stream_data.data(), sp.stream_data.size(), sp.fin, packet.size()); } QUIC_DBG("PTO probe: old_pn=%lu new_pn=%lu age=%.3fs pto=%.3fs", (unsigned long)old_pn, (unsigned long)new_pn, age, pto_thresh); } } } // Normal loss detection requires at least one ACK if (_largest_acked_pn == UINT64_MAX) return; // Reset PTO back-off counter once ACKs are flowing _pto_count = 0; // Time-based loss threshold (RFC 9002 §6.1.2) double loss_delay = TIME_THRESHOLD * std::max(_srtt, _rttvar); if (loss_delay < 0.001) loss_delay = 0.001; // min 1ms Loading src/socket.h +1 −0 Original line number Diff line number Diff line Loading @@ -1002,6 +1002,7 @@ namespace netplus { // Loss detection static constexpr uint64_t PACKET_THRESHOLD = 3; // RFC 9002 kPacketThreshold static constexpr double TIME_THRESHOLD = 9.0/8; // RFC 9002 kTimeThreshold int _pto_count = 0; // PTO back-off counter (RFC 9002 §6.2) // Flow control batching: avoid sending duplicate MAX_DATA/MAX_STREAM_DATA bool _pending_max_data = false; Loading Loading
src/quic.cpp +66 −1 Original line number Diff line number Diff line Loading @@ -2764,6 +2764,13 @@ void quic::pumpNetwork(int flags) { _stream_callback(this, pd.stream_id, pd.data, pd.fin); } } // Loss detection + PTO probes: retransmit lost/stale packets so the // peer can make progress even when the caller is not sending new data. { std::lock_guard<std::recursive_mutex> lock(quic_mtx()); checkLossAndRetransmit(); } } // ============================================================================ Loading Loading @@ -3603,9 +3610,67 @@ void quic::recordSentPacket(uint64_t pn, uint64_t stream_id, uint64_t stream_off // ============================================================================ void quic::checkLossAndRetransmit() { if (_sent_packets.empty() || _largest_acked_pn == UINT64_MAX) return; if (_sent_packets.empty()) return; auto now = std::chrono::steady_clock::now(); // PTO (Probe Timeout) — RFC 9002 §6.2 // When the largest-acked PN hasn't advanced past our oldest unacked // packets (tail loss), retransmit the oldest unacked packet to elicit // an ACK and bootstrap the normal loss-detection cycle. if (!_sent_packets.empty()) { double pto = _rtt_initialized ? (_srtt + std::max(4.0 * _rttvar, 0.001) + 0.025) : _rto; // Exponential back-off: double PTO each consecutive probe double pto_thresh = pto * (1 << std::min(_pto_count, 5)); auto& oldest = _sent_packets.begin()->second; double age = std::chrono::duration<double>(now - oldest.sent_time).count(); if (age >= pto_thresh) { // Retransmit the oldest unacked packet as a probe auto sp = oldest; // copy — will be erased below uint64_t old_pn = _sent_packets.begin()->first; _sent_packets.erase(_sent_packets.begin()); ++_pto_count; if (sp.data_length > 0 || sp.fin) { uint8_t frame_type = 0x08 | 0x02; if (sp.stream_offset > 0) frame_type |= 0x04; if (sp.fin) frame_type |= 0x01; std::vector<uint8_t> frame; frame.reserve(1 + 8 + 8 + 8 + sp.data_length); frame.push_back(frame_type); uint8_t vb[8]; size_t vl; vl = encodeVarInt(sp.stream_id, vb); frame.insert(frame.end(), vb, vb + vl); if (sp.stream_offset > 0) { vl = encodeVarInt(sp.stream_offset, vb); frame.insert(frame.end(), vb, vb + vl); } vl = encodeVarInt(sp.data_length, vb); frame.insert(frame.end(), vb, vb + vl); frame.insert(frame.end(), sp.stream_data.begin(), sp.stream_data.end()); uint64_t new_pn = _app_pn_send; std::vector<uint8_t> packet = buildShortHeaderPacket(frame); const uint8_t* hp_key = _is_server ? _app_hp_server : _app_hp_client; applyHeaderProtection(packet, hp_key); ssize_t sent = sendPacket(packet.data(), packet.size()); if (sent < 0) sent = sendPacket(packet.data(), packet.size()); if (sent > 0) { recordSentPacket(new_pn, sp.stream_id, sp.stream_offset, sp.stream_data.data(), sp.stream_data.size(), sp.fin, packet.size()); } QUIC_DBG("PTO probe: old_pn=%lu new_pn=%lu age=%.3fs pto=%.3fs", (unsigned long)old_pn, (unsigned long)new_pn, age, pto_thresh); } } } // Normal loss detection requires at least one ACK if (_largest_acked_pn == UINT64_MAX) return; // Reset PTO back-off counter once ACKs are flowing _pto_count = 0; // Time-based loss threshold (RFC 9002 §6.1.2) double loss_delay = TIME_THRESHOLD * std::max(_srtt, _rttvar); if (loss_delay < 0.001) loss_delay = 0.001; // min 1ms Loading
src/socket.h +1 −0 Original line number Diff line number Diff line Loading @@ -1002,6 +1002,7 @@ namespace netplus { // Loss detection static constexpr uint64_t PACKET_THRESHOLD = 3; // RFC 9002 kPacketThreshold static constexpr double TIME_THRESHOLD = 9.0/8; // RFC 9002 kTimeThreshold int _pto_count = 0; // PTO back-off counter (RFC 9002 §6.2) // Flow control batching: avoid sending duplicate MAX_DATA/MAX_STREAM_DATA bool _pending_max_data = false; Loading