Loading src/event/iocp.cpp +29 −38 Original line number Diff line number Diff line Loading @@ -55,9 +55,6 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define BLOCKSIZE 16384 #define xmalloc(s) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,(s)) #define xfree(p) HeapFree(GetProcessHeap(),0,(p)) namespace netplus { class poll { public: Loading @@ -77,7 +74,6 @@ namespace netplus { class client { public: client(eventapi* eapi) { m_pol = nullptr; nTotalBytes = 0; nSentBytes = 0; api = eapi; Loading @@ -85,6 +81,11 @@ namespace netplus { m_pwbuf.buf = new char[BLOCKSIZE]; m_pwbuf.len = BLOCKSIZE; OpCode = 0; m_pol.Internal = 0; m_pol.InternalHigh = 0; m_pol.Offset = 0; m_pol.OffsetHigh = 0; m_pol.hEvent = nullptr; }; ~client() { Loading @@ -92,7 +93,7 @@ namespace netplus { delete[] m_pwbuf.buf; }; OVERLAPPED* m_pol; OVERLAPPED m_pol; WSABUF m_pwbuf; int nTotalBytes; Loading @@ -113,9 +114,9 @@ namespace netplus { bool AssociateWithIOCP(client* pClientContext){ //Associate the socket with IOCP HANDLE hTemp = CreateIoCompletionPort((HANDLE)pClientContext->CurCon->csock->fd(), g_iocp, (ULONG_PTR)pClientContext, 0); HANDLE htemp = (poll::client*)CreateIoCompletionPort((HANDLE)(pClientContext)->CurCon->csock->fd(), g_iocp, (ULONG_PTR)pClientContext, 0); if (hTemp == nullptr){ if (htemp == nullptr){ std::cerr << "AssociateWithIOCP: " << GetLastError() << std::endl; RemoveFromClientList(pClientContext); return false; Loading @@ -142,9 +143,15 @@ namespace netplus { pClientContext->OpCode=OP_READ; if (g_serversocket->_Type == TCP) pClientContext->CurCon->csock= new tcp(); pClientContext->CurCon->csock= new tcp( WSAAccept(g_serversocket->fd(),nullptr,nullptr,nullptr,0) ); g_serversocket->accept(pClientContext->CurCon->csock); if (pClientContext->CurCon->csock->fd() == SOCKET_ERROR) { NetException e; e[NetException::Error] << "WSAAccept() failed: " << WSAGetLastError(); throw e; } pClientContext->CurCon->csock->setnonblocking(); Loading @@ -162,9 +169,9 @@ namespace netplus { DWORD dwBytes = 0; RECONNECT: int nBytesRecv = WSARecv(pClientContext->CurCon->csock->fd(),&pClientContext->m_pwbuf, 1, &dwBytes, &dwFlags,pClientContext->m_pol, nullptr); &dwBytes, &dwFlags,&pClientContext->m_pol, nullptr); if ((SOCKET_ERROR == nBytesRecv)){ if (SOCKET_ERROR == nBytesRecv){ if (WSAEWOULDBLOCK == WSAGetLastError()) { goto RECONNECT; } Loading @@ -175,18 +182,6 @@ RECONNECT: } pClientContext->CurCon->RecvData.append(pClientContext->m_pwbuf.buf,dwBytes); std::cout.write(pClientContext->CurCon->RecvData.data(), dwBytes) << std::endl; g_eventapi->RequestEvent(pClientContext->CurCon, tid, args); if (!pClientContext->CurCon->SendData.empty()) { size_t ssize = BLOCKSIZE < pClientContext->CurCon->SendData.size() ? BLOCKSIZE : pClientContext->CurCon->SendData.size(); pClientContext->m_pwbuf.buf = pClientContext->CurCon->SendData.data(); pClientContext->m_pwbuf.len = (ULONG)ssize; WSASend(pClientContext->CurCon->csock->fd(), &pClientContext->m_pwbuf, 1, &dwBytes, dwFlags, pClientContext->m_pol,nullptr); pClientContext->CurCon->SendData.resize(dwBytes); } } } Loading Loading @@ -284,7 +279,7 @@ RECONNECT: int bReturn = GetQueuedCompletionStatus( args->eviocp, &dwBytesTransfered, (PULONG_PTR)lpContext, (PULONG_PTR)&lpContext, &pOverlapped, WSA_INFINITE); Loading @@ -299,19 +294,18 @@ RECONNECT: continue; } OVERLAPPED* p_ol = pClientContext->m_pol; std::cerr << pClientContext->OpCode << std::endl; switch (pClientContext->OpCode) { case poll::OP_READ: pClientContext->nSentBytes+=dwBytesTransfered; std::cerr << "oh no2" << std::endl; //Write operation was finished, see if all the data was sent. //Else post another write. if (pClientContext->nSentBytes < pClientContext->nTotalBytes) { if (!pClientContext->CurCon->SendData.empty()){ pClientContext->OpCode=poll::OP_READ; std::cerr << "oh no3" << std::endl; dwFlags = 0; ssize = BLOCKSIZE < pClientContext->CurCon->SendData.size() ? BLOCKSIZE : pClientContext->CurCon->SendData.size(); Loading @@ -321,10 +315,9 @@ RECONNECT: //Overlapped send nBytesSent = WSASend(pClientContext->CurCon->csock->fd(), &pClientContext->m_pwbuf, 1, &dwBytes, dwFlags, p_ol, nullptr); &dwBytes, dwFlags, &pClientContext->m_pol, nullptr); if ((SOCKET_ERROR == nBytesSent) && (WSA_IO_PENDING != WSAGetLastError())) { if ((SOCKET_ERROR == nBytesSent) && (WSA_IO_PENDING != WSAGetLastError())){ //Let's not work with this client args->evpoll->RemoveFromClientList(pClientContext); } Loading @@ -345,7 +338,7 @@ RECONNECT: //Get the data. nBytesRecv = WSARecv(pClientContext->CurCon->csock->fd(), &pClientContext->m_pwbuf, 1, &dwBytes, &dwFlags, p_ol, nullptr); &dwBytes, &dwFlags, &pClientContext->m_pol, nullptr); if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError())) { std::cerr << "Thread " << tid << " : Error occurred while executing WSARecv()." << std::endl; Loading Loading @@ -375,7 +368,7 @@ RECONNECT: //Overlapped send nBytesSent = WSASend(pClientContext->CurCon->csock->fd(), &pClientContext->m_pwbuf, 1, &dwBytes, dwFlags, p_ol, NULL); &dwBytes, dwFlags, &pClientContext->m_pol, NULL); if ((SOCKET_ERROR == nBytesSent) && (WSA_IO_PENDING != WSAGetLastError())) { Loading @@ -393,7 +386,6 @@ RECONNECT: pClientContext->OpCode = poll::OP_READ; break; } break; default: Loading Loading @@ -486,7 +478,6 @@ RECONNECT: } } } } for (auto i = threadpool.begin(); i != threadpool.end(); ++i) { Loading Loading
src/event/iocp.cpp +29 −38 Original line number Diff line number Diff line Loading @@ -55,9 +55,6 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define BLOCKSIZE 16384 #define xmalloc(s) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,(s)) #define xfree(p) HeapFree(GetProcessHeap(),0,(p)) namespace netplus { class poll { public: Loading @@ -77,7 +74,6 @@ namespace netplus { class client { public: client(eventapi* eapi) { m_pol = nullptr; nTotalBytes = 0; nSentBytes = 0; api = eapi; Loading @@ -85,6 +81,11 @@ namespace netplus { m_pwbuf.buf = new char[BLOCKSIZE]; m_pwbuf.len = BLOCKSIZE; OpCode = 0; m_pol.Internal = 0; m_pol.InternalHigh = 0; m_pol.Offset = 0; m_pol.OffsetHigh = 0; m_pol.hEvent = nullptr; }; ~client() { Loading @@ -92,7 +93,7 @@ namespace netplus { delete[] m_pwbuf.buf; }; OVERLAPPED* m_pol; OVERLAPPED m_pol; WSABUF m_pwbuf; int nTotalBytes; Loading @@ -113,9 +114,9 @@ namespace netplus { bool AssociateWithIOCP(client* pClientContext){ //Associate the socket with IOCP HANDLE hTemp = CreateIoCompletionPort((HANDLE)pClientContext->CurCon->csock->fd(), g_iocp, (ULONG_PTR)pClientContext, 0); HANDLE htemp = (poll::client*)CreateIoCompletionPort((HANDLE)(pClientContext)->CurCon->csock->fd(), g_iocp, (ULONG_PTR)pClientContext, 0); if (hTemp == nullptr){ if (htemp == nullptr){ std::cerr << "AssociateWithIOCP: " << GetLastError() << std::endl; RemoveFromClientList(pClientContext); return false; Loading @@ -142,9 +143,15 @@ namespace netplus { pClientContext->OpCode=OP_READ; if (g_serversocket->_Type == TCP) pClientContext->CurCon->csock= new tcp(); pClientContext->CurCon->csock= new tcp( WSAAccept(g_serversocket->fd(),nullptr,nullptr,nullptr,0) ); g_serversocket->accept(pClientContext->CurCon->csock); if (pClientContext->CurCon->csock->fd() == SOCKET_ERROR) { NetException e; e[NetException::Error] << "WSAAccept() failed: " << WSAGetLastError(); throw e; } pClientContext->CurCon->csock->setnonblocking(); Loading @@ -162,9 +169,9 @@ namespace netplus { DWORD dwBytes = 0; RECONNECT: int nBytesRecv = WSARecv(pClientContext->CurCon->csock->fd(),&pClientContext->m_pwbuf, 1, &dwBytes, &dwFlags,pClientContext->m_pol, nullptr); &dwBytes, &dwFlags,&pClientContext->m_pol, nullptr); if ((SOCKET_ERROR == nBytesRecv)){ if (SOCKET_ERROR == nBytesRecv){ if (WSAEWOULDBLOCK == WSAGetLastError()) { goto RECONNECT; } Loading @@ -175,18 +182,6 @@ RECONNECT: } pClientContext->CurCon->RecvData.append(pClientContext->m_pwbuf.buf,dwBytes); std::cout.write(pClientContext->CurCon->RecvData.data(), dwBytes) << std::endl; g_eventapi->RequestEvent(pClientContext->CurCon, tid, args); if (!pClientContext->CurCon->SendData.empty()) { size_t ssize = BLOCKSIZE < pClientContext->CurCon->SendData.size() ? BLOCKSIZE : pClientContext->CurCon->SendData.size(); pClientContext->m_pwbuf.buf = pClientContext->CurCon->SendData.data(); pClientContext->m_pwbuf.len = (ULONG)ssize; WSASend(pClientContext->CurCon->csock->fd(), &pClientContext->m_pwbuf, 1, &dwBytes, dwFlags, pClientContext->m_pol,nullptr); pClientContext->CurCon->SendData.resize(dwBytes); } } } Loading Loading @@ -284,7 +279,7 @@ RECONNECT: int bReturn = GetQueuedCompletionStatus( args->eviocp, &dwBytesTransfered, (PULONG_PTR)lpContext, (PULONG_PTR)&lpContext, &pOverlapped, WSA_INFINITE); Loading @@ -299,19 +294,18 @@ RECONNECT: continue; } OVERLAPPED* p_ol = pClientContext->m_pol; std::cerr << pClientContext->OpCode << std::endl; switch (pClientContext->OpCode) { case poll::OP_READ: pClientContext->nSentBytes+=dwBytesTransfered; std::cerr << "oh no2" << std::endl; //Write operation was finished, see if all the data was sent. //Else post another write. if (pClientContext->nSentBytes < pClientContext->nTotalBytes) { if (!pClientContext->CurCon->SendData.empty()){ pClientContext->OpCode=poll::OP_READ; std::cerr << "oh no3" << std::endl; dwFlags = 0; ssize = BLOCKSIZE < pClientContext->CurCon->SendData.size() ? BLOCKSIZE : pClientContext->CurCon->SendData.size(); Loading @@ -321,10 +315,9 @@ RECONNECT: //Overlapped send nBytesSent = WSASend(pClientContext->CurCon->csock->fd(), &pClientContext->m_pwbuf, 1, &dwBytes, dwFlags, p_ol, nullptr); &dwBytes, dwFlags, &pClientContext->m_pol, nullptr); if ((SOCKET_ERROR == nBytesSent) && (WSA_IO_PENDING != WSAGetLastError())) { if ((SOCKET_ERROR == nBytesSent) && (WSA_IO_PENDING != WSAGetLastError())){ //Let's not work with this client args->evpoll->RemoveFromClientList(pClientContext); } Loading @@ -345,7 +338,7 @@ RECONNECT: //Get the data. nBytesRecv = WSARecv(pClientContext->CurCon->csock->fd(), &pClientContext->m_pwbuf, 1, &dwBytes, &dwFlags, p_ol, nullptr); &dwBytes, &dwFlags, &pClientContext->m_pol, nullptr); if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError())) { std::cerr << "Thread " << tid << " : Error occurred while executing WSARecv()." << std::endl; Loading Loading @@ -375,7 +368,7 @@ RECONNECT: //Overlapped send nBytesSent = WSASend(pClientContext->CurCon->csock->fd(), &pClientContext->m_pwbuf, 1, &dwBytes, dwFlags, p_ol, NULL); &dwBytes, dwFlags, &pClientContext->m_pol, NULL); if ((SOCKET_ERROR == nBytesSent) && (WSA_IO_PENDING != WSAGetLastError())) { Loading @@ -393,7 +386,6 @@ RECONNECT: pClientContext->OpCode = poll::OP_READ; break; } break; default: Loading Loading @@ -486,7 +478,6 @@ RECONNECT: } } } } for (auto i = threadpool.begin(); i != threadpool.end(); ++i) { Loading