Loading src/event/iocp.cpp +47 −28 Original line number Diff line number Diff line Loading @@ -40,6 +40,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include <vector> #include <memory> #include <mutex> #include <shared_mutex> #include <unordered_map> #include <iostream> #include <algorithm> Loading @@ -62,7 +63,7 @@ namespace netplus { struct EventState { HANDLE iocp = nullptr; std::mutex conMutex; std::shared_mutex conMutex; std::unordered_map<SOCKET, std::shared_ptr<con>> sockToCon; std::vector<std::thread> workers; Loading @@ -84,19 +85,19 @@ namespace netplus { // Connection helpers // ------------------------------------------------------------------------- static void register_con(EventState* st, SOCKET s, std::shared_ptr<con> c) { std::lock_guard<std::mutex> lk(st->conMutex); std::unique_lock<std::shared_mutex> lk(st->conMutex); st->sockToCon[s] = c; } static std::shared_ptr<con> find_con(EventState* st, SOCKET s) { std::lock_guard<std::mutex> lk(st->conMutex); std::shared_lock<std::shared_mutex> lk(st->conMutex); auto it = st->sockToCon.find(s); if (it == st->sockToCon.end()) return nullptr; return it->second; } static void remove_con(EventState* st, SOCKET s) { std::lock_guard<std::mutex> lk(st->conMutex); std::unique_lock<std::shared_mutex> lk(st->conMutex); st->sockToCon.erase(s); } Loading Loading @@ -146,39 +147,57 @@ namespace netplus { // ------------------------------------------------------------------------- // Post send (only if not already pending) // Uses pos-based tracking to avoid O(n) erase on every chunk. // ------------------------------------------------------------------------- static bool try_post_send(event* ev, EventState* /*st*/, con& c, int tid) { static bool try_post_send(event* ev, EventState* st, con& c, int tid) { if (c.slots[0].Closing.load()) return false; if (c.slots[0].WritePending.load()) return false; if (c.SendData.empty()) return false; if (c.SendData.pos >= c.SendData.size()) return false; c.slots[0].WritePending.store(true); try { size_t sendlen = std::min<size_t>(c.SendData.size(), BLOCKSIZE); const char* ptr = c.SendData.data(); // Loop instead of recursion to avoid stack overflow on large responses while (c.SendData.pos < c.SendData.size()) { size_t remaining = c.SendData.size() - c.SendData.pos; size_t sendlen = std::min<size_t>(remaining, BLOCKSIZE); const char* ptr = c.SendData.data() + c.SendData.pos; buffer sbuf(ptr, sendlen); size_t consumed = c.slots[0].csock->sendData(sbuf, 0); if (consumed > 0) { c.SendData.erase(c.SendData.begin(), c.SendData.begin() + consumed); c.SendData.pos += consumed; // Compact only when pos is far ahead to avoid frequent O(n) shifts if (c.SendData.pos >= c.SendData.size()) { c.SendData.clear(); c.SendData.pos = 0; } else if (c.SendData.pos > BLOCKSIZE * 4) { c.SendData.erase(c.SendData.begin(), c.SendData.begin() + c.SendData.pos); c.SendData.pos = 0; } } if (c.slots[0].csock->hasPendingWrite()) { // Async IOCP write posted, completion will continue the send return true; } c.slots[0].WritePending.store(false); if (!c.SendData.empty()) { return try_post_send(ev, nullptr, c, tid); if (consumed == 0) break; } c.slots[0].WritePending.store(false); return false; } catch (NetException& e) { c.slots[0].WritePending.store(false); if (e.getErrorType() != NetException::Note) throw; // EWOULDBLOCK: data remains in SendData but write is not pending. // Post a zero-byte completion to wake the worker and retry later. if (st && c.SendData.pos < c.SendData.size()) { SOCKET cs = (SOCKET)c.slots[0].csock->fd(); PostQueuedCompletionStatus(st->iocp, 0, (ULONG_PTR)c.slots[0].csock.get(), &c.slots[0].csock->_ReadBuffer->overlapped); } return false; } } Loading Loading @@ -448,7 +467,7 @@ namespace netplus { // Post-handshake framing callback (e.g. HTTP/2 SETTINGS) if (auto *sslsock = dynamic_cast<ssl*>(owner->slots[0].csock.get())) { const auto &fcb = sslsock->getFramingCallback(); if (fcb && owner->SendData.empty()) { if (fcb && owner->SendData.pos >= owner->SendData.size()) { std::string init = fcb(sslsock->getSelectedAlpn(), 0, "", ""); if (!init.empty()) { owner->SendData.append(init.data(), init.size()); Loading Loading @@ -617,25 +636,25 @@ namespace netplus { } try { if (owner->SendData.empty() && !owner->slots[0].csock->hasPendingWrite()) { if (owner->SendData.pos >= owner->SendData.size() && !owner->slots[0].csock->hasPendingWrite()) { ev->ResponseEvent(*owner, tid, (ULONG_PTR)bytes); } if (!owner->SendData.empty()) { if (owner->SendData.pos < owner->SendData.size()) { bool posted = try_post_send(ev, st, *owner, tid); if (posted) continue; } // App requested re-trigger (e.g. upstream data not yet available) if (owner->SendData.empty() && owner->slots[0].ResponsePending.exchange(false, std::memory_order_relaxed)) { if (owner->SendData.pos >= owner->SendData.size() && owner->slots[0].ResponsePending.exchange(false, std::memory_order_relaxed)) { ev->ResponseEvent(*owner, tid, 0); if (!owner->SendData.empty()) { if (owner->SendData.pos < owner->SendData.size()) { bool posted = try_post_send(ev, st, *owner, tid); if (posted) continue; } } if (owner->SendData.empty() && !owner->slots[0].csock->hasPendingWrite() && !owner->slots[0].Closing.load()) { if (owner->SendData.pos >= owner->SendData.size() && !owner->slots[0].csock->hasPendingWrite() && !owner->slots[0].Closing.load()) { try { post_recv(st, *owner); } catch (...) {} } } catch (...) { Loading Loading @@ -708,7 +727,7 @@ namespace netplus { // Cancel all pending I/O on registered connections { std::lock_guard<std::mutex> lk(st->conMutex); std::unique_lock<std::shared_mutex> lk(st->conMutex); for (auto& kv : st->sockToCon) { CancelIoEx((HANDLE)kv.first, nullptr); } Loading Loading
src/event/iocp.cpp +47 −28 Original line number Diff line number Diff line Loading @@ -40,6 +40,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include <vector> #include <memory> #include <mutex> #include <shared_mutex> #include <unordered_map> #include <iostream> #include <algorithm> Loading @@ -62,7 +63,7 @@ namespace netplus { struct EventState { HANDLE iocp = nullptr; std::mutex conMutex; std::shared_mutex conMutex; std::unordered_map<SOCKET, std::shared_ptr<con>> sockToCon; std::vector<std::thread> workers; Loading @@ -84,19 +85,19 @@ namespace netplus { // Connection helpers // ------------------------------------------------------------------------- static void register_con(EventState* st, SOCKET s, std::shared_ptr<con> c) { std::lock_guard<std::mutex> lk(st->conMutex); std::unique_lock<std::shared_mutex> lk(st->conMutex); st->sockToCon[s] = c; } static std::shared_ptr<con> find_con(EventState* st, SOCKET s) { std::lock_guard<std::mutex> lk(st->conMutex); std::shared_lock<std::shared_mutex> lk(st->conMutex); auto it = st->sockToCon.find(s); if (it == st->sockToCon.end()) return nullptr; return it->second; } static void remove_con(EventState* st, SOCKET s) { std::lock_guard<std::mutex> lk(st->conMutex); std::unique_lock<std::shared_mutex> lk(st->conMutex); st->sockToCon.erase(s); } Loading Loading @@ -146,39 +147,57 @@ namespace netplus { // ------------------------------------------------------------------------- // Post send (only if not already pending) // Uses pos-based tracking to avoid O(n) erase on every chunk. // ------------------------------------------------------------------------- static bool try_post_send(event* ev, EventState* /*st*/, con& c, int tid) { static bool try_post_send(event* ev, EventState* st, con& c, int tid) { if (c.slots[0].Closing.load()) return false; if (c.slots[0].WritePending.load()) return false; if (c.SendData.empty()) return false; if (c.SendData.pos >= c.SendData.size()) return false; c.slots[0].WritePending.store(true); try { size_t sendlen = std::min<size_t>(c.SendData.size(), BLOCKSIZE); const char* ptr = c.SendData.data(); // Loop instead of recursion to avoid stack overflow on large responses while (c.SendData.pos < c.SendData.size()) { size_t remaining = c.SendData.size() - c.SendData.pos; size_t sendlen = std::min<size_t>(remaining, BLOCKSIZE); const char* ptr = c.SendData.data() + c.SendData.pos; buffer sbuf(ptr, sendlen); size_t consumed = c.slots[0].csock->sendData(sbuf, 0); if (consumed > 0) { c.SendData.erase(c.SendData.begin(), c.SendData.begin() + consumed); c.SendData.pos += consumed; // Compact only when pos is far ahead to avoid frequent O(n) shifts if (c.SendData.pos >= c.SendData.size()) { c.SendData.clear(); c.SendData.pos = 0; } else if (c.SendData.pos > BLOCKSIZE * 4) { c.SendData.erase(c.SendData.begin(), c.SendData.begin() + c.SendData.pos); c.SendData.pos = 0; } } if (c.slots[0].csock->hasPendingWrite()) { // Async IOCP write posted, completion will continue the send return true; } c.slots[0].WritePending.store(false); if (!c.SendData.empty()) { return try_post_send(ev, nullptr, c, tid); if (consumed == 0) break; } c.slots[0].WritePending.store(false); return false; } catch (NetException& e) { c.slots[0].WritePending.store(false); if (e.getErrorType() != NetException::Note) throw; // EWOULDBLOCK: data remains in SendData but write is not pending. // Post a zero-byte completion to wake the worker and retry later. if (st && c.SendData.pos < c.SendData.size()) { SOCKET cs = (SOCKET)c.slots[0].csock->fd(); PostQueuedCompletionStatus(st->iocp, 0, (ULONG_PTR)c.slots[0].csock.get(), &c.slots[0].csock->_ReadBuffer->overlapped); } return false; } } Loading Loading @@ -448,7 +467,7 @@ namespace netplus { // Post-handshake framing callback (e.g. HTTP/2 SETTINGS) if (auto *sslsock = dynamic_cast<ssl*>(owner->slots[0].csock.get())) { const auto &fcb = sslsock->getFramingCallback(); if (fcb && owner->SendData.empty()) { if (fcb && owner->SendData.pos >= owner->SendData.size()) { std::string init = fcb(sslsock->getSelectedAlpn(), 0, "", ""); if (!init.empty()) { owner->SendData.append(init.data(), init.size()); Loading Loading @@ -617,25 +636,25 @@ namespace netplus { } try { if (owner->SendData.empty() && !owner->slots[0].csock->hasPendingWrite()) { if (owner->SendData.pos >= owner->SendData.size() && !owner->slots[0].csock->hasPendingWrite()) { ev->ResponseEvent(*owner, tid, (ULONG_PTR)bytes); } if (!owner->SendData.empty()) { if (owner->SendData.pos < owner->SendData.size()) { bool posted = try_post_send(ev, st, *owner, tid); if (posted) continue; } // App requested re-trigger (e.g. upstream data not yet available) if (owner->SendData.empty() && owner->slots[0].ResponsePending.exchange(false, std::memory_order_relaxed)) { if (owner->SendData.pos >= owner->SendData.size() && owner->slots[0].ResponsePending.exchange(false, std::memory_order_relaxed)) { ev->ResponseEvent(*owner, tid, 0); if (!owner->SendData.empty()) { if (owner->SendData.pos < owner->SendData.size()) { bool posted = try_post_send(ev, st, *owner, tid); if (posted) continue; } } if (owner->SendData.empty() && !owner->slots[0].csock->hasPendingWrite() && !owner->slots[0].Closing.load()) { if (owner->SendData.pos >= owner->SendData.size() && !owner->slots[0].csock->hasPendingWrite() && !owner->slots[0].Closing.load()) { try { post_recv(st, *owner); } catch (...) {} } } catch (...) { Loading Loading @@ -708,7 +727,7 @@ namespace netplus { // Cancel all pending I/O on registered connections { std::lock_guard<std::mutex> lk(st->conMutex); std::unique_lock<std::shared_mutex> lk(st->conMutex); for (auto& kv : st->sockToCon) { CancelIoEx((HANDLE)kv.first, nullptr); } Loading