Loading src/event/iocp.cpp +24 −19 Original line number Diff line number Diff line Loading @@ -73,6 +73,7 @@ namespace netplus { int OpCode; //will be used by the worker thread to decide what operation to perform eventapi *api; con *CurCon; std::atomic<bool> Conlock; }; class poll { Loading @@ -87,18 +88,15 @@ namespace netplus { g_iocp = iocp; g_eventapi = api; g_serversocket = ssock; InitializeCriticalSection(&g_csClientList); }; void AddToClientList(client* pClientContext){ EnterCriticalSection(&g_csClientList); const std::lock_guard<std::mutex> lock(g_polllock); g_ClientContext.push_back(pClientContext); LeaveCriticalSection(&g_csClientList); } bool AssociateWithIOCP(client* pClientContext){ const std::lock_guard<std::mutex> lock(g_polllock); //Associate the socket with IOCP HANDLE htemp = CreateIoCompletionPort((HANDLE)(pClientContext)->CurCon->csock->fd(), g_iocp, (ULONG_PTR)pClientContext, 0); Loading @@ -112,7 +110,7 @@ namespace netplus { } void RemoveFromClientList(client* pClientContext){ EnterCriticalSection(&g_csClientList); const std::lock_guard<std::mutex> lock(g_polllock); std::vector <client*>::iterator IterClientContext; for (IterClientContext = g_ClientContext.begin(); IterClientContext != g_ClientContext.end(); IterClientContext++){ if (pClientContext == *IterClientContext){ Loading @@ -121,11 +119,9 @@ namespace netplus { break; } } LeaveCriticalSection(&g_csClientList); } void AcceptConnection(int tid,ULONG_PTR args){ EnterCriticalSection(&g_csClientList); client* pClientContext = new client(g_eventapi); pClientContext->OpCode=OP_READ; Loading Loading @@ -195,16 +191,15 @@ RECONNECT: int nBytesSend = WSASend(pClientContext->CurCon->csock->fd(), &wbuf, 1, &dwBytes, dwFlags, &pClientContext->m_pol, nullptr); } LeaveCriticalSection(&g_csClientList); } private: CRITICAL_SECTION g_csClientList; HANDLE g_iocp; socket *g_serversocket; eventapi *g_eventapi; std::vector<client*> g_ClientContext; std::mutex g_polllock; friend class client; friend class EventWorker; }; Loading Loading @@ -287,7 +282,7 @@ RECONNECT: DWORD dwBytes = 0, dwFlags = 0; DWORD ret = 0; ULONG ssize = 0; EnterCriticalSection(&eargs->evpoll->g_csClientList); for (;;){ int bReturn = GetQueuedCompletionStatus( eargs->eviocp, Loading @@ -297,11 +292,15 @@ RECONNECT: WSA_INFINITE); if (lpContext == 0){ break; continue; } pClientContext = (client*)lpContext; bool expc = false; pClientContext->Conlock.compare_exchange_strong(expc,true); if ((bReturn==0) && (0 == dwBytesTransfered)) { eargs->evpoll->RemoveFromClientList(pClientContext); continue; Loading Loading @@ -329,6 +328,7 @@ RECONNECT: if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())){ //Let's not work with this client eargs->evpoll->RemoveFromClientList(pClientContext); continue; } pClientContext->CurCon->SendData.resize(dwBytes); Loading Loading @@ -388,12 +388,15 @@ RECONNECT: ret = WSASend(pClientContext->CurCon->csock->fd(), &wbuf, 1, &dwBytes, dwFlags, &pClientContext->m_pol, NULL); if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) { if (ret == SOCKET_ERROR){ std::cerr << "Thread " << tid << " <<: Error occurred while executing WSASend()." << std::endl; //Let's not work with this client if (WSA_IO_PENDING != WSAGetLastError()) eargs->evpoll->RemoveFromClientList(pClientContext); else pClientContext->Conlock.store(false); continue; } pClientContext->CurCon->SendData.resize(dwBytes); Loading @@ -412,8 +415,10 @@ RECONNECT: eargs->evpoll->RemoveFromClientList(pClientContext); break; } // switch pClientContext->Conlock.store(false); } // while LeaveCriticalSection(&eargs->evpoll->g_csClientList); } }; Loading Loading
src/event/iocp.cpp +24 −19 Original line number Diff line number Diff line Loading @@ -73,6 +73,7 @@ namespace netplus { int OpCode; //will be used by the worker thread to decide what operation to perform eventapi *api; con *CurCon; std::atomic<bool> Conlock; }; class poll { Loading @@ -87,18 +88,15 @@ namespace netplus { g_iocp = iocp; g_eventapi = api; g_serversocket = ssock; InitializeCriticalSection(&g_csClientList); }; void AddToClientList(client* pClientContext){ EnterCriticalSection(&g_csClientList); const std::lock_guard<std::mutex> lock(g_polllock); g_ClientContext.push_back(pClientContext); LeaveCriticalSection(&g_csClientList); } bool AssociateWithIOCP(client* pClientContext){ const std::lock_guard<std::mutex> lock(g_polllock); //Associate the socket with IOCP HANDLE htemp = CreateIoCompletionPort((HANDLE)(pClientContext)->CurCon->csock->fd(), g_iocp, (ULONG_PTR)pClientContext, 0); Loading @@ -112,7 +110,7 @@ namespace netplus { } void RemoveFromClientList(client* pClientContext){ EnterCriticalSection(&g_csClientList); const std::lock_guard<std::mutex> lock(g_polllock); std::vector <client*>::iterator IterClientContext; for (IterClientContext = g_ClientContext.begin(); IterClientContext != g_ClientContext.end(); IterClientContext++){ if (pClientContext == *IterClientContext){ Loading @@ -121,11 +119,9 @@ namespace netplus { break; } } LeaveCriticalSection(&g_csClientList); } void AcceptConnection(int tid,ULONG_PTR args){ EnterCriticalSection(&g_csClientList); client* pClientContext = new client(g_eventapi); pClientContext->OpCode=OP_READ; Loading Loading @@ -195,16 +191,15 @@ RECONNECT: int nBytesSend = WSASend(pClientContext->CurCon->csock->fd(), &wbuf, 1, &dwBytes, dwFlags, &pClientContext->m_pol, nullptr); } LeaveCriticalSection(&g_csClientList); } private: CRITICAL_SECTION g_csClientList; HANDLE g_iocp; socket *g_serversocket; eventapi *g_eventapi; std::vector<client*> g_ClientContext; std::mutex g_polllock; friend class client; friend class EventWorker; }; Loading Loading @@ -287,7 +282,7 @@ RECONNECT: DWORD dwBytes = 0, dwFlags = 0; DWORD ret = 0; ULONG ssize = 0; EnterCriticalSection(&eargs->evpoll->g_csClientList); for (;;){ int bReturn = GetQueuedCompletionStatus( eargs->eviocp, Loading @@ -297,11 +292,15 @@ RECONNECT: WSA_INFINITE); if (lpContext == 0){ break; continue; } pClientContext = (client*)lpContext; bool expc = false; pClientContext->Conlock.compare_exchange_strong(expc,true); if ((bReturn==0) && (0 == dwBytesTransfered)) { eargs->evpoll->RemoveFromClientList(pClientContext); continue; Loading Loading @@ -329,6 +328,7 @@ RECONNECT: if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())){ //Let's not work with this client eargs->evpoll->RemoveFromClientList(pClientContext); continue; } pClientContext->CurCon->SendData.resize(dwBytes); Loading Loading @@ -388,12 +388,15 @@ RECONNECT: ret = WSASend(pClientContext->CurCon->csock->fd(), &wbuf, 1, &dwBytes, dwFlags, &pClientContext->m_pol, NULL); if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) { if (ret == SOCKET_ERROR){ std::cerr << "Thread " << tid << " <<: Error occurred while executing WSASend()." << std::endl; //Let's not work with this client if (WSA_IO_PENDING != WSAGetLastError()) eargs->evpoll->RemoveFromClientList(pClientContext); else pClientContext->Conlock.store(false); continue; } pClientContext->CurCon->SendData.resize(dwBytes); Loading @@ -412,8 +415,10 @@ RECONNECT: eargs->evpoll->RemoveFromClientList(pClientContext); break; } // switch pClientContext->Conlock.store(false); } // while LeaveCriticalSection(&eargs->evpoll->g_csClientList); } }; Loading