Loading src/event/iocp.cpp +507 −463 Original line number Diff line number Diff line Loading @@ -70,10 +70,10 @@ namespace netplus { OVERLAPPED m_pol; std::mutex ConLock; int OpCode; //will be used by the worker thread to decide what operation to perform eventapi* api; con* CurCon; std::atomic<bool> Conlock; }; class poll { Loading Loading @@ -131,7 +131,8 @@ namespace netplus { ACCEPTCON: try { g_serversocket->accept(pClientContext->CurCon->csock); } catch (NetException &e) { } catch (NetException& e) { if (e.getErrorType() == NetException::Note) goto ACCEPTCON; delete pClientContext->CurCon->csock; Loading @@ -157,14 +158,11 @@ namespace netplus { wbuf.buf = new char[BLOCKSIZE]; wbuf.len = BLOCKSIZE; RECONNECT: int nBytesRecv = WSARecv(pClientContext->CurCon->csock->fd(), &wbuf, 1, &dwBytes, &dwFlags, nullptr,nullptr); &dwBytes, &dwFlags, &pClientContext->m_pol, nullptr); if (SOCKET_ERROR == nBytesRecv) { if ( WSA_IO_PENDING == WSAGetLastError() || WSAEWOULDBLOCK == WSAGetLastError() ) { goto RECONNECT; } if (WSA_IO_PENDING != WSAGetLastError()) { NetException e; e[NetException::Error] << "AcceptConnection Failed on: " << (int)pClientContext->CurCon->csock->fd() << "Error: " << WSAGetLastError(); Loading @@ -172,24 +170,11 @@ RECONNECT: delete wbuf.buf; throw e; } pClientContext->CurCon->RecvData.append(wbuf.buf,dwBytes); } else { std::cout << nBytesRecv << std::endl; pClientContext->CurCon->RecvData.append(wbuf.buf, nBytesRecv); delete wbuf.buf; try{ g_eventapi->RequestEvent(pClientContext->CurCon, 0, args); } catch (NetException &e) { std::cerr << e.what() << std::endl; } ULONG ssize = BLOCKSIZE < pClientContext->CurCon->SendData.size() ? BLOCKSIZE : (ULONG)pClientContext->CurCon->SendData.size(); wbuf.buf = pClientContext->CurCon->SendData.data(); wbuf.len = ssize; int nBytesSend = WSASend(pClientContext->CurCon->csock->fd(), &wbuf, 1, &dwBytes, dwFlags, &pClientContext->m_pol, nullptr); } } Loading Loading @@ -242,7 +227,7 @@ RECONNECT: SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); threads = sysinfo.dwNumberOfProcessors; threads = 2; // sysinfo.dwNumberOfProcessors; } event::~event() { Loading @@ -261,6 +246,7 @@ RECONNECT: ssocket = eargs.ssocket; timeout = eargs.timeout; args = eargs.args; mtx = eargs.mtx; } Loading @@ -269,6 +255,7 @@ RECONNECT: poll* evpoll; eventapi* event; socket* ssocket; std::mutex* mtx; void* args; }; Loading @@ -283,7 +270,20 @@ RECONNECT: DWORD ret = 0; ULONG ssize = 0; auto elock = [eargs](client* eclient) { // eargs->mtx->lock(); // eclient->ConLock.lock(); // eargs->mtx->unlock(); }; auto eunlock = [eargs](client* eclient) { //eargs->mtx->lock(); //eclient->ConLock.unlock(); // eargs->mtx->unlock(); }; for (;;) { int bReturn = GetQueuedCompletionStatus( eargs->eviocp, &dwBytesTransfered, Loading @@ -297,9 +297,7 @@ RECONNECT: pClientContext = (client*)lpContext; bool expc = false; pClientContext->Conlock.compare_exchange_strong(expc,true); elock(pClientContext); if ((bReturn == 0) && (0 == dwBytesTransfered)) { eargs->evpoll->RemoveFromClientList(pClientContext); Loading Loading @@ -335,16 +333,16 @@ RECONNECT: try { eargs->event->ResponseEvent(pClientContext->CurCon, tid, args); } catch (NetException& e) { } catch (NetException& e) { std::cerr << e.what() << std::endl; } } else { } else { pClientContext->OpCode = poll::OP_WRITE; dwFlags = 0; ssize = BLOCKSIZE < pClientContext->CurCon->SendData.size() ? BLOCKSIZE : (ULONG) pClientContext->CurCon->SendData.size(); WSABUF wbuf; wbuf.buf = new char[BLOCKSIZE]; wbuf.len = BLOCKSIZE; Loading @@ -367,7 +365,8 @@ RECONNECT: try { eargs->event->RequestEvent(pClientContext->CurCon, tid, args); }catch (NetException& e) { } catch (NetException& e) { std::cerr << e.what() << std::endl; } } Loading @@ -377,7 +376,7 @@ RECONNECT: case poll::OP_WRITE: //Send the message back to the client. dwFlags = 0; if (!((con*)pClientContext->CurCon)->SendData.empty()) { ssize = BLOCKSIZE < pClientContext->CurCon->SendData.size() ? BLOCKSIZE : (ULONG)pClientContext->CurCon->SendData.size(); WSABUF wbuf; Loading @@ -395,7 +394,7 @@ RECONNECT: if (WSA_IO_PENDING != WSAGetLastError()) eargs->evpoll->RemoveFromClientList(pClientContext); else pClientContext->Conlock.store(false); eunlock(pClientContext); continue; } Loading @@ -403,21 +402,62 @@ RECONNECT: try { eargs->event->ResponseEvent(pClientContext->CurCon, tid, args); } catch (NetException& e) { } catch (NetException& e) { std::cerr << e.what() << std::endl; } if (pClientContext->CurCon->SendData.empty()) { pClientContext->OpCode = poll::OP_READ; } break; } else if (!((con*)pClientContext->CurCon)->RecvData.empty()) { eargs->event->RequestEvent(pClientContext->CurCon, tid, args); } else { pClientContext->OpCode = poll::OP_WRITE; dwFlags = 0; WSABUF wbuf; wbuf.buf = new char[BLOCKSIZE]; wbuf.len = BLOCKSIZE; //Get the data. ret = WSARecv(pClientContext->CurCon->csock->fd(), &wbuf, 1, &dwBytes, &dwFlags, &pClientContext->m_pol, nullptr); if ((SOCKET_ERROR == ret)) { if (WSA_IO_PENDING != WSAGetLastError()) { eargs->event->DisconnectEvent(pClientContext->CurCon, tid, args); eargs->evpoll->RemoveFromClientList(pClientContext); } std::cerr << "Thread " << tid << " : Error occurred while executing WSARecv: " << WSAGetLastError() << std::endl; continue; } std::cout << ret << std::endl; pClientContext->CurCon->RecvData.append(wbuf.buf, ret); std::cout.write(wbuf.buf, dwBytes) << std::endl; delete wbuf.buf; std::cout.write(pClientContext->CurCon->RecvData.data(), pClientContext->CurCon->RecvData.size()) << std::endl; try { eargs->event->RequestEvent(pClientContext->CurCon, tid, args); } catch (NetException& e) { std::cerr << e.what() << std::endl; } } default: eargs->evpoll->RemoveFromClientList(pClientContext); break; } // switch pClientContext->Conlock.store(false); eunlock(pClientContext); } // while } }; Loading Loading @@ -445,11 +485,14 @@ RECONNECT: poll evpoll(iocp, this, _ServerSocket); std::mutex pmtx; eargs.ssocket = _ServerSocket; eargs.event = this; eargs.evpoll = &evpoll; eargs.timeout = _Timeout; eargs.eviocp = iocp; eargs.mtx = &pmtx; std::vector <std::thread> threadpool; Loading Loading @@ -499,7 +542,8 @@ RECONNECT: if ((WSAEvents.lNetworkEvents & FD_ACCEPT) && (0 == WSAEvents.iErrorCode[FD_ACCEPT_BIT])) { try { evpoll.AcceptConnection(0, args); } catch (NetException& e) { } catch (NetException& e) { std::cerr << e.what() << std::endl; } } Loading Loading
src/event/iocp.cpp +507 −463 Original line number Diff line number Diff line Loading @@ -70,10 +70,10 @@ namespace netplus { OVERLAPPED m_pol; std::mutex ConLock; int OpCode; //will be used by the worker thread to decide what operation to perform eventapi* api; con* CurCon; std::atomic<bool> Conlock; }; class poll { Loading Loading @@ -131,7 +131,8 @@ namespace netplus { ACCEPTCON: try { g_serversocket->accept(pClientContext->CurCon->csock); } catch (NetException &e) { } catch (NetException& e) { if (e.getErrorType() == NetException::Note) goto ACCEPTCON; delete pClientContext->CurCon->csock; Loading @@ -157,14 +158,11 @@ namespace netplus { wbuf.buf = new char[BLOCKSIZE]; wbuf.len = BLOCKSIZE; RECONNECT: int nBytesRecv = WSARecv(pClientContext->CurCon->csock->fd(), &wbuf, 1, &dwBytes, &dwFlags, nullptr,nullptr); &dwBytes, &dwFlags, &pClientContext->m_pol, nullptr); if (SOCKET_ERROR == nBytesRecv) { if ( WSA_IO_PENDING == WSAGetLastError() || WSAEWOULDBLOCK == WSAGetLastError() ) { goto RECONNECT; } if (WSA_IO_PENDING != WSAGetLastError()) { NetException e; e[NetException::Error] << "AcceptConnection Failed on: " << (int)pClientContext->CurCon->csock->fd() << "Error: " << WSAGetLastError(); Loading @@ -172,24 +170,11 @@ RECONNECT: delete wbuf.buf; throw e; } pClientContext->CurCon->RecvData.append(wbuf.buf,dwBytes); } else { std::cout << nBytesRecv << std::endl; pClientContext->CurCon->RecvData.append(wbuf.buf, nBytesRecv); delete wbuf.buf; try{ g_eventapi->RequestEvent(pClientContext->CurCon, 0, args); } catch (NetException &e) { std::cerr << e.what() << std::endl; } ULONG ssize = BLOCKSIZE < pClientContext->CurCon->SendData.size() ? BLOCKSIZE : (ULONG)pClientContext->CurCon->SendData.size(); wbuf.buf = pClientContext->CurCon->SendData.data(); wbuf.len = ssize; int nBytesSend = WSASend(pClientContext->CurCon->csock->fd(), &wbuf, 1, &dwBytes, dwFlags, &pClientContext->m_pol, nullptr); } } Loading Loading @@ -242,7 +227,7 @@ RECONNECT: SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); threads = sysinfo.dwNumberOfProcessors; threads = 2; // sysinfo.dwNumberOfProcessors; } event::~event() { Loading @@ -261,6 +246,7 @@ RECONNECT: ssocket = eargs.ssocket; timeout = eargs.timeout; args = eargs.args; mtx = eargs.mtx; } Loading @@ -269,6 +255,7 @@ RECONNECT: poll* evpoll; eventapi* event; socket* ssocket; std::mutex* mtx; void* args; }; Loading @@ -283,7 +270,20 @@ RECONNECT: DWORD ret = 0; ULONG ssize = 0; auto elock = [eargs](client* eclient) { // eargs->mtx->lock(); // eclient->ConLock.lock(); // eargs->mtx->unlock(); }; auto eunlock = [eargs](client* eclient) { //eargs->mtx->lock(); //eclient->ConLock.unlock(); // eargs->mtx->unlock(); }; for (;;) { int bReturn = GetQueuedCompletionStatus( eargs->eviocp, &dwBytesTransfered, Loading @@ -297,9 +297,7 @@ RECONNECT: pClientContext = (client*)lpContext; bool expc = false; pClientContext->Conlock.compare_exchange_strong(expc,true); elock(pClientContext); if ((bReturn == 0) && (0 == dwBytesTransfered)) { eargs->evpoll->RemoveFromClientList(pClientContext); Loading Loading @@ -335,16 +333,16 @@ RECONNECT: try { eargs->event->ResponseEvent(pClientContext->CurCon, tid, args); } catch (NetException& e) { } catch (NetException& e) { std::cerr << e.what() << std::endl; } } else { } else { pClientContext->OpCode = poll::OP_WRITE; dwFlags = 0; ssize = BLOCKSIZE < pClientContext->CurCon->SendData.size() ? BLOCKSIZE : (ULONG) pClientContext->CurCon->SendData.size(); WSABUF wbuf; wbuf.buf = new char[BLOCKSIZE]; wbuf.len = BLOCKSIZE; Loading @@ -367,7 +365,8 @@ RECONNECT: try { eargs->event->RequestEvent(pClientContext->CurCon, tid, args); }catch (NetException& e) { } catch (NetException& e) { std::cerr << e.what() << std::endl; } } Loading @@ -377,7 +376,7 @@ RECONNECT: case poll::OP_WRITE: //Send the message back to the client. dwFlags = 0; if (!((con*)pClientContext->CurCon)->SendData.empty()) { ssize = BLOCKSIZE < pClientContext->CurCon->SendData.size() ? BLOCKSIZE : (ULONG)pClientContext->CurCon->SendData.size(); WSABUF wbuf; Loading @@ -395,7 +394,7 @@ RECONNECT: if (WSA_IO_PENDING != WSAGetLastError()) eargs->evpoll->RemoveFromClientList(pClientContext); else pClientContext->Conlock.store(false); eunlock(pClientContext); continue; } Loading @@ -403,21 +402,62 @@ RECONNECT: try { eargs->event->ResponseEvent(pClientContext->CurCon, tid, args); } catch (NetException& e) { } catch (NetException& e) { std::cerr << e.what() << std::endl; } if (pClientContext->CurCon->SendData.empty()) { pClientContext->OpCode = poll::OP_READ; } break; } else if (!((con*)pClientContext->CurCon)->RecvData.empty()) { eargs->event->RequestEvent(pClientContext->CurCon, tid, args); } else { pClientContext->OpCode = poll::OP_WRITE; dwFlags = 0; WSABUF wbuf; wbuf.buf = new char[BLOCKSIZE]; wbuf.len = BLOCKSIZE; //Get the data. ret = WSARecv(pClientContext->CurCon->csock->fd(), &wbuf, 1, &dwBytes, &dwFlags, &pClientContext->m_pol, nullptr); if ((SOCKET_ERROR == ret)) { if (WSA_IO_PENDING != WSAGetLastError()) { eargs->event->DisconnectEvent(pClientContext->CurCon, tid, args); eargs->evpoll->RemoveFromClientList(pClientContext); } std::cerr << "Thread " << tid << " : Error occurred while executing WSARecv: " << WSAGetLastError() << std::endl; continue; } std::cout << ret << std::endl; pClientContext->CurCon->RecvData.append(wbuf.buf, ret); std::cout.write(wbuf.buf, dwBytes) << std::endl; delete wbuf.buf; std::cout.write(pClientContext->CurCon->RecvData.data(), pClientContext->CurCon->RecvData.size()) << std::endl; try { eargs->event->RequestEvent(pClientContext->CurCon, tid, args); } catch (NetException& e) { std::cerr << e.what() << std::endl; } } default: eargs->evpoll->RemoveFromClientList(pClientContext); break; } // switch pClientContext->Conlock.store(false); eunlock(pClientContext); } // while } }; Loading Loading @@ -445,11 +485,14 @@ RECONNECT: poll evpoll(iocp, this, _ServerSocket); std::mutex pmtx; eargs.ssocket = _ServerSocket; eargs.event = this; eargs.evpoll = &evpoll; eargs.timeout = _Timeout; eargs.eviocp = iocp; eargs.mtx = &pmtx; std::vector <std::thread> threadpool; Loading Loading @@ -499,7 +542,8 @@ RECONNECT: if ((WSAEvents.lNetworkEvents & FD_ACCEPT) && (0 == WSAEvents.iErrorCode[FD_ACCEPT_BIT])) { try { evpoll.AcceptConnection(0, args); } catch (NetException& e) { } catch (NetException& e) { std::cerr << e.what() << std::endl; } } Loading