Commit 78dd7541 authored by jan.koester's avatar jan.koester
Browse files

much work

parent ee580804
Loading
Loading
Loading
Loading
+325 −49
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <winsock2.h>
#include <ws2ipdef.h>
#include <mswsock.h>
#include <strsafe.h>

#include <errno.h>

@@ -60,9 +61,13 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
namespace netplus {
    class poll {
    public:

        WSAEVENT g_hCleanupEvent[1];

        poll(HANDLE iocp,eventapi *api,socket *srvsock) {
            g_iocp = iocp;
            g_pCtxtList = nullptr;
            g_pCtxtListenSocket = nullptr;
            g_bEndServer = false;
            g_eventapi = api;
            g_serversocket = srvsock;
@@ -288,6 +293,101 @@ namespace netplus {
            }
        };

        SOCKET CreateSocket(void) {
            int nRet = 0;
            int nZero = 0;
            SOCKET sdSocket = INVALID_SOCKET;

            sdSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);
            if (sdSocket == INVALID_SOCKET) {
                NetException e;
                e[NetException::Warning] << "WSASocket(sdSocket) failed: " << WSAGetLastError();
                throw e;
            }

            //
            // Disable send buffering on the socket.  Setting SO_SNDBUF
            // to 0 causes winsock to stop buffering sends and perform
            // sends directly from our buffers, thereby save one memory copy.
            //
            // However, this does prevent the socket from ever filling the
            // send pipeline. This can lead to packets being sent that are
            // not full (i.e. the overhead of the IP and TCP headers is 
            // great compared to the amount of data being carried).
            //
            // Disabling the send buffer has less serious repercussions 
            // than disabling the receive buffer.
            //
            nZero = 0;
            nRet = setsockopt(sdSocket, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero));
            if (nRet == SOCKET_ERROR) {
                NetException e;
                e[NetException::Warning] << "setsockopt(SNDBUF) failed: " << WSAGetLastError();
                throw e;
            }

            return(sdSocket);
        }

        bool CreateAcceptSocket(BOOL fUpdateIOCP) {

            int nRet = 0;
            DWORD dwRecvNumBytes = 0;
            DWORD bytes = 0;

            //
            // GUID to Microsoft specific extensions
            //
            GUID acceptex_guid = WSAID_ACCEPTEX;

            //
            //The context for listening socket uses the SockAccept member to store the
            //socket for client connection. 
            //
            if (fUpdateIOCP) {
                g_pCtxtListenSocket = UpdateCompletionPort(g_serversocket->fd(), ClientIoAccept, false);
                if (g_pCtxtListenSocket == nullptr) {
                    return false;
                }

                // Load the AcceptEx extension function from the provider for this socket
                nRet = WSAIoctl(
                    g_serversocket->fd(),
                    SIO_GET_EXTENSION_FUNCTION_POINTER,
                    &acceptex_guid,
                    sizeof(acceptex_guid),
                    &g_pCtxtListenSocket->fnAcceptEx,
                    sizeof(g_pCtxtListenSocket->fnAcceptEx),
                    &bytes,
                    NULL,
                    NULL
                );
                if (nRet == SOCKET_ERROR){
                    return (FALSE);
                }
            }

            g_pCtxtListenSocket->pIOContext->SocketAccept = CreateSocket();
            if (g_pCtxtListenSocket->pIOContext->SocketAccept == INVALID_SOCKET) {
                return false;
            }

            //
            // pay close attention to these parameters and buffer lengths
            //
            nRet = g_pCtxtListenSocket->fnAcceptEx(g_serversocket->fd(), g_pCtxtListenSocket->pIOContext->SocketAccept,
                (LPVOID)(g_pCtxtListenSocket->pIOContext->Buffer),
                BLOCKSIZE - (2 * (sizeof(SOCKADDR_STORAGE) + 16)),
                sizeof(SOCKADDR_STORAGE) + 16, sizeof(SOCKADDR_STORAGE) + 16,
                &dwRecvNumBytes,
                (LPOVERLAPPED) & (g_pCtxtListenSocket->pIOContext->Overlapped));
            if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
                return false;
            }

            return true;
        }

        void CloseClient(PPER_SOCKET_CONTEXT lpPerSocketContext, bool bGraceful) {
            if (lpPerSocketContext) {
                if (!bGraceful) {
@@ -312,6 +412,7 @@ namespace netplus {
    private:
        HANDLE              g_iocp;
        PPER_SOCKET_CONTEXT g_pCtxtList;
        PPER_SOCKET_CONTEXT g_pCtxtListenSocket;
        bool                g_bEndServer;
        socket             *g_serversocket;
        eventapi           *g_eventapi;
@@ -369,15 +470,17 @@ namespace netplus {

        EventWorkerArgs(const EventWorkerArgs& eargs) {
            event = eargs.event;
            pollfd = eargs.pollfd;
            eviocp = eargs.eviocp;
            evpoll = eargs.evpoll;
            ssocket = eargs.ssocket;
            timeout = eargs.timeout;
            args = eargs.args;
        }

        SOCKET              pollfd;
        int                 timeout;
        
        int        timeout;
        HANDLE     eviocp;
        poll      *evpoll;
        eventapi  *event;
        socket    *ssocket;
        void      *args;
@@ -386,7 +489,220 @@ namespace netplus {
    class EventWorker {
    public:
        EventWorker(int tid, EventWorkerArgs* args) {
            int nRet = 0;
            DWORD dwFlags = 0;
            DWORD dwIoSize = 0;
            DWORD dwRecvNumBytes = 0;
            DWORD dwSendNumBytes = 0;

            WSABUF buffRecv;
            WSABUF buffSend;

            LPWSAOVERLAPPED lpOverlapped = nullptr;
            poll::PPER_SOCKET_CONTEXT lpPerSocketContext = nullptr;
            poll::PPER_SOCKET_CONTEXT lpAcceptSocketContext = nullptr;
            poll::PPER_IO_CONTEXT lpIOContext = nullptr;
            bool bSuccess = false;
            HRESULT hRet;

        EVENTLOOP:
            try {
                bSuccess = GetQueuedCompletionStatus(
                    args->eviocp,
                    &dwIoSize,
                    (PDWORD_PTR)&lpPerSocketContext,
                    (LPOVERLAPPED*)&lpOverlapped,
                    INFINITE
                );

                switch (lpIOContext->IOOperation) {
                case poll::ClientIoAccept:

                    //
                    // When the AcceptEx function returns, the socket sAcceptSocket is 
                    // in the default state for a connected socket. The socket sAcceptSocket 
                    // does not inherit the properties of the socket associated with 
                    // sListenSocket parameter until SO_UPDATE_ACCEPT_CONTEXT is set on 
                    // the socket. Use the setsockopt function to set the SO_UPDATE_ACCEPT_CONTEXT 
                    // option, specifying sAcceptSocket as the socket handle and sListenSocket 
                    // as the option value. 
                    //
                    nRet = setsockopt(
                        lpPerSocketContext->pIOContext->SocketAccept,
                        SOL_SOCKET,
                        SO_UPDATE_ACCEPT_CONTEXT,
                        (char*)&args->ssocket,
                        sizeof(args->ssocket)
                    );

                    if (nRet == SOCKET_ERROR) {
                        NetException e;
                        e[NetException::Note] << "setsockopt(SO_UPDATE_ACCEPT_CONTEXT) failed to update accept socket";
                        WSASetEvent(args->evpoll->g_hCleanupEvent[0]);
                        throw e;
                    }

                    lpAcceptSocketContext = args->evpoll->UpdateCompletionPort(
                        lpPerSocketContext->pIOContext->SocketAccept,
                        poll::ClientIoAccept, true);

                    if (lpAcceptSocketContext == nullptr) {
                        NetException e;
                        e[NetException::Note] << "failed to update accept socket to IOCP\n";
                        WSASetEvent(args->evpoll->g_hCleanupEvent[0]);
                        throw e;
                    }

                    if (dwIoSize) {
                        lpAcceptSocketContext->pIOContext->IOOperation = poll::ClientIoWrite;
                        lpAcceptSocketContext->pIOContext->nTotalBytes = dwIoSize;
                        lpAcceptSocketContext->pIOContext->nSentBytes = 0;
                        lpAcceptSocketContext->pIOContext->wsabuf.len = dwIoSize;
                        hRet = StringCbCopyN(lpAcceptSocketContext->pIOContext->Buffer,
                            BLOCKSIZE,
                            lpPerSocketContext->pIOContext->Buffer,
                            sizeof(lpPerSocketContext->pIOContext->Buffer)
                        );
                        lpAcceptSocketContext->pIOContext->wsabuf.buf = lpAcceptSocketContext->pIOContext->Buffer;

                        nRet = WSASend(
                            lpPerSocketContext->pIOContext->SocketAccept,
                            &lpAcceptSocketContext->pIOContext->wsabuf, 1,
                            &dwSendNumBytes,
                            0,
                            &(lpAcceptSocketContext->pIOContext->Overlapped), NULL);

                        if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
                            NetException e;
                            e[NetException::Note] << "WSASend() failed: " << WSAGetLastError();
                            args->evpoll->CloseClient(lpAcceptSocketContext, FALSE);
                            throw e;
                        }
                    }
                    else {

                        //
                        // AcceptEx completes but doesn't read any data so we need to post
                        // an outstanding overlapped read.
                        //
                        lpAcceptSocketContext->pIOContext->IOOperation = poll::ClientIoRead;
                        dwRecvNumBytes = 0;
                        dwFlags = 0;
                        buffRecv.buf = lpAcceptSocketContext->pIOContext->Buffer,
                            buffRecv.len = BLOCKSIZE;
                        nRet = WSARecv(
                            lpAcceptSocketContext->CurCon->csock->fd(),
                            &buffRecv, 1,
                            &dwRecvNumBytes,
                            &dwFlags,
                            &lpAcceptSocketContext->pIOContext->Overlapped, NULL);
                        if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
                            NetException e;
                            e[NetException::Note] << "WSARecv() failed: " << WSAGetLastError();
                            args->evpoll->CloseClient(lpAcceptSocketContext, FALSE);
                            delete lpAcceptSocketContext->CurCon->csock;
                            lpAcceptSocketContext->CurCon->csock = nullptr;
                        }
                    }

                    //
                    //Time to post another outstanding AcceptEx
                    //
                    if (!args->evpoll->CreateAcceptSocket(FALSE)) {
                        NetException e;
                        e[NetException::Critical] << "Please shut down and reboot the server!";
                        WSASetEvent(args->evpoll->g_hCleanupEvent[0]);
                        throw e;
                    }
                    break;


                case poll::ClientIoRead:

                    //
                    // a read operation has completed, post a write operation to echo the
                    // data back to the client using the same data buffer.
                    //
                    lpIOContext->IOOperation = poll::ClientIoWrite;
                    lpIOContext->nTotalBytes = dwIoSize;
                    lpIOContext->nSentBytes = 0;
                    lpIOContext->wsabuf.len = dwIoSize;
                    dwFlags = 0;
                    nRet = WSASend(
                        lpPerSocketContext->CurCon->csock->fd(),
                        &lpIOContext->wsabuf, 1, &dwSendNumBytes,
                        dwFlags,
                        &(lpIOContext->Overlapped), NULL);
                    if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
                        NetException e;
                        e[NetException::Note] << "WSASend() failed: " << WSAGetLastError();
                        args->evpoll->CloseClient(lpPerSocketContext, FALSE);
                        delete lpAcceptSocketContext->CurCon->csock;
                        lpAcceptSocketContext->CurCon->csock = nullptr;
                    }
                    break;

                case poll::ClientIoWrite:

                    //
                    // a write operation has completed, determine if all the data intended to be
                    // sent actually was sent.
                    //
                    lpIOContext->IOOperation = poll::ClientIoWrite;
                    lpIOContext->nSentBytes += dwIoSize;
                    dwFlags = 0;
                    if (lpIOContext->nSentBytes < lpIOContext->nTotalBytes) {

                        //
                        // the previous write operation didn't send all the data,
                        // post another send to complete the operation
                        //
                        buffSend.buf = lpIOContext->Buffer + lpIOContext->nSentBytes;
                        buffSend.len = lpIOContext->nTotalBytes - lpIOContext->nSentBytes;
                        nRet = WSASend(
                            lpPerSocketContext->CurCon->csock->fd(),
                            &buffSend, 1, &dwSendNumBytes,
                            dwFlags,
                            &(lpIOContext->Overlapped), NULL);
                        if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
                            NetException e;
                            e[NetException::Note] << "WSASend() failed: " << WSAGetLastError();
                            args->evpoll->CloseClient(lpPerSocketContext, FALSE);
                            delete lpAcceptSocketContext->CurCon->csock;
                            lpAcceptSocketContext->CurCon->csock = nullptr;
                        }
                    }
                    else {

                        //
                        // previous write operation completed for this socket, post another recv
                        //
                        lpIOContext->IOOperation = poll::ClientIoRead;
                        dwRecvNumBytes = 0;
                        dwFlags = 0;
                        buffRecv.buf = lpIOContext->Buffer,
                            buffRecv.len = BLOCKSIZE;
                        nRet = WSARecv(
                            lpPerSocketContext->CurCon->csock->fd(),
                            &buffRecv, 1, &dwRecvNumBytes,
                            &dwFlags,
                            &lpIOContext->Overlapped, NULL);
                        if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
                            NetException e;
                            e[NetException::Note] << "WSARecv() failed: " << WSAGetLastError();
                            args->evpoll->CloseClient(lpPerSocketContext, FALSE);
                            delete lpAcceptSocketContext->CurCon->csock;
                            lpAcceptSocketContext->CurCon->csock = nullptr;
                        }
                        break;
                    }
                }
            } catch (NetException &e) {
                if (e.getErrorType() == NetException::Note || NetException::Warning)
                    std::cerr << e.what() << std::endl;
                else
                    throw e;
            }
            goto EVENTLOOP;
        }
    };
@@ -410,10 +726,6 @@ namespace netplus {
        GUID GuidAcceptEx = WSAID_ACCEPTEX;
        WSAOVERLAPPED olOverlap;
        DWORD dwBytes;
        DWORD dwRecvNumBytes = 0;
        DWORD dwFlags = 0;

        int nRet = 0;

        int iResult = WSAIoctl(_ServerSocket->fd(), SIO_GET_EXTENSION_FUNCTION_POINTER,
            &GuidAcceptEx, sizeof(GuidAcceptEx),
@@ -435,8 +747,9 @@ namespace netplus {
        EventWorkerArgs eargs;
        eargs.ssocket = _ServerSocket;
        eargs.event = this;
        eargs.pollfd = _pollFD;
        eargs.evpoll = &evpoll;
        eargs.timeout = _Timeout;
        eargs.eviocp = iocp;

        std::vector <std::thread> threadpool;

@@ -451,43 +764,6 @@ namespace netplus {
            }
        }

        for (;;) {
            try {
                SOCKET sdAccept = WSAAccept(_ServerSocket->fd(), nullptr, nullptr, nullptr, 0);
                if (sdAccept == SOCKET_ERROR) {
                    NetException except;
                    except[NetException::Note] << "runEventloop:" << WSAGetLastError();
                    throw except;
                }
                else {
                    std::cerr << "Clientsocket Accepted: " << sdAccept << std::endl;
                }

                poll::PPER_SOCKET_CONTEXT lpPerSocketContext = evpoll.UpdateCompletionPort(sdAccept, poll::ClientIoRead, true);

                nRet = WSARecv(sdAccept, &(lpPerSocketContext->pIOContext->wsabuf),
                    1, &dwRecvNumBytes, &dwFlags,
                    &(lpPerSocketContext->pIOContext->Overlapped), nullptr);
                if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
                    NetException e;
                    e[NetException::Note] << "WSARecv() Failed: ", WSAGetLastError();
                    evpoll.CloseClient(lpPerSocketContext, FALSE);
                    throw e;
                }

                std::cout.write(lpPerSocketContext->pIOContext->wsabuf.buf, lpPerSocketContext->pIOContext->wsabuf.len)<<std::endl;

                std::cout << "test" << std::endl;

            } catch (NetException& e) {
                if (e.getErrorType() == NetException::Note) {
                    std::cerr << e.what() << std::endl;
                } else {
                    throw e;
                }
            }
        }

        for (auto i = threadpool.begin(); i != threadpool.end(); ++i) {
            i->join();
        }