Commit 64e4ad15 authored by jan.koester's avatar jan.koester
Browse files

some fixes

parent 9d830348
Loading
Loading
Loading
Loading
+356 −0
Original line number Diff line number Diff line
@@ -54,7 +54,268 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#define BLOCKSIZE 16384

#define xmalloc(s) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,(s))
#define xfree(p)   HeapFree(GetProcessHeap(),0,(p))

namespace netplus {
    class poll {
    public:
        poll(HANDLE iocp,eventapi *api,socket *srvsock) {
            g_iocp = iocp;
            g_pCtxtList = nullptr;
            g_bEndServer = false;
            g_eventapi = api;
            g_serversocket = srvsock;
        }

        typedef enum _IO_OPERATION {
            ClientIoAccept,
            ClientIoRead,
            ClientIoWrite
        } IO_OPERATION, * PIO_OPERATION;

        //
        // data to be associated for every I/O operation on a socket
        //
        typedef struct _PER_IO_CONTEXT {
            WSAOVERLAPPED               Overlapped;
            char                        Buffer[BLOCKSIZE];
            WSABUF                      wsabuf;
            int                         nTotalBytes;
            int                         nSentBytes;
            IO_OPERATION                IOOperation;
            SOCKET                      SocketAccept;

            struct _PER_IO_CONTEXT* pIOContextForward;
        } PER_IO_CONTEXT, * PPER_IO_CONTEXT;

        //
        // For AcceptEx, the IOCP key is the PER_SOCKET_CONTEXT for the listening socket,
        // so we need to another field SocketAccept in PER_IO_CONTEXT. When the outstanding
        // AcceptEx completes, this field is our connection socket handle.
        //

        //
        // data to be associated with every socket added to the IOCP
        //
        typedef struct _PER_SOCKET_CONTEXT {
            con                        *CurCon;

            LPFN_ACCEPTEX               fnAcceptEx;

            //
            //linked list for all outstanding i/o on the socket
            //
            PPER_IO_CONTEXT             pIOContext;
            struct _PER_SOCKET_CONTEXT* pCtxtBack;
            struct _PER_SOCKET_CONTEXT* pCtxtForward;
        } PER_SOCKET_CONTEXT, * PPER_SOCKET_CONTEXT;

        PPER_SOCKET_CONTEXT CtxtAllocate(SOCKET sd, IO_OPERATION ClientIO) {

            PPER_SOCKET_CONTEXT lpPerSocketContext;

            lpPerSocketContext = (PPER_SOCKET_CONTEXT)xmalloc(sizeof(PER_SOCKET_CONTEXT));
            if (lpPerSocketContext) {
                lpPerSocketContext->pIOContext = (PPER_IO_CONTEXT)xmalloc(sizeof(PER_IO_CONTEXT));
                if (lpPerSocketContext->pIOContext) {
                    g_eventapi->CreateConnetion(&lpPerSocketContext->CurCon);
                    if (g_serversocket->_Type == sockettype::TCP) {
                        lpPerSocketContext->CurCon->csock = new tcp(sd);
                    }
                    lpPerSocketContext->pCtxtBack = NULL;
                    lpPerSocketContext->pCtxtForward = NULL;

                    lpPerSocketContext->pIOContext->Overlapped.Internal = 0;
                    lpPerSocketContext->pIOContext->Overlapped.InternalHigh = 0;
                    lpPerSocketContext->pIOContext->Overlapped.Offset = 0;
                    lpPerSocketContext->pIOContext->Overlapped.OffsetHigh = 0;
                    lpPerSocketContext->pIOContext->Overlapped.hEvent = NULL;
                    lpPerSocketContext->pIOContext->IOOperation = ClientIO;
                    lpPerSocketContext->pIOContext->pIOContextForward = NULL;
                    lpPerSocketContext->pIOContext->nTotalBytes = 0;
                    lpPerSocketContext->pIOContext->nSentBytes = 0;
                    lpPerSocketContext->pIOContext->wsabuf.buf = lpPerSocketContext->pIOContext->Buffer;
                    lpPerSocketContext->pIOContext->wsabuf.len = sizeof(lpPerSocketContext->pIOContext->Buffer);

                    ZeroMemory(lpPerSocketContext->pIOContext->wsabuf.buf, lpPerSocketContext->pIOContext->wsabuf.len);
                }
                else {
                    xfree(lpPerSocketContext);
                    NetException e;
                    e[NetException::Critical] << "HeapAlloc() PER_IO_CONTEXT failed: " << GetLastError();
                    throw e;
                }

            }
            else {
                NetException e;
                e[NetException::Critical] << "HeapAlloc() PER_SOCKET_CONTEXT failed: " << GetLastError();
                throw e;
            }
            return(lpPerSocketContext);
        }

        VOID CtxtListAddTo(PPER_SOCKET_CONTEXT lpPerSocketContext) {

            PPER_SOCKET_CONTEXT     pTemp;

            if (g_pCtxtList == NULL) {

                //
                // add the first node to the linked list
                //
                lpPerSocketContext->pCtxtBack = NULL;
                lpPerSocketContext->pCtxtForward = NULL;
                g_pCtxtList = lpPerSocketContext;
            }
            else {

                //
                // add node to head of list
                //
                pTemp = g_pCtxtList;

                g_pCtxtList = lpPerSocketContext;
                lpPerSocketContext->pCtxtBack = pTemp;
                lpPerSocketContext->pCtxtForward = NULL;

                pTemp->pCtxtForward = lpPerSocketContext;
            }

            return;
        }

        PPER_SOCKET_CONTEXT UpdateCompletionPort(SOCKET sd, IO_OPERATION ClientIo,
            BOOL bAddToList) {

            PPER_SOCKET_CONTEXT lpPerSocketContext;

            try {
                lpPerSocketContext = CtxtAllocate(sd, ClientIo);
            } catch (NetException& e) {
                NetException ee;
                ee[NetException::Note] << "UpdateCompletionPort failed: " << e.what();
                throw ee;
            }

            g_iocp = CreateIoCompletionPort((HANDLE)sd, g_iocp, (DWORD_PTR)lpPerSocketContext, 0);
            if (g_iocp == nullptr) {
                NetException e;
                e[NetException::Note] << "CreateIoCompletionPort() failed: " << GetLastError();
                if (lpPerSocketContext->pIOContext)
                    xfree(lpPerSocketContext->pIOContext);
                xfree(lpPerSocketContext);
                throw e;
            }

            //
            //The listening socket context (bAddToList is FALSE) is not added to the list.
            //All other socket contexts are added to the list.
            //
            if (bAddToList) CtxtListAddTo(lpPerSocketContext);

            return(lpPerSocketContext);
        };

        VOID CtxtListDeleteFrom(PPER_SOCKET_CONTEXT lpPerSocketContext) {

            PPER_SOCKET_CONTEXT pBack;
            PPER_SOCKET_CONTEXT pForward;
            PPER_IO_CONTEXT     pNextIO = nullptr;
            PPER_IO_CONTEXT     pTempIO = nullptr;

            if (lpPerSocketContext) {
                pBack = lpPerSocketContext->pCtxtBack;
                pForward = lpPerSocketContext->pCtxtForward;


                if ((pBack == nullptr) && (pForward == nullptr)) {

                    //
                    // This is the only node in the list to delete
                    //
                    g_pCtxtList = nullptr;
                }
                else if ((pBack == nullptr) && (pForward != nullptr)) {

                    //
                    // This is the start node in the list to delete
                    //
                    pForward->pCtxtBack = nullptr;
                    g_pCtxtList = pForward;
                }
                else if ((pBack != nullptr) && (pForward == nullptr)) {

                    //
                    // This is the end node in the list to delete
                    //
                    pBack->pCtxtForward = nullptr;
                }
                else if (pBack && pForward) {

                    //
                    // Neither start node nor end node in the list
                    //
                    pBack->pCtxtForward = pForward;
                    pForward->pCtxtBack = pBack;
                }

                //
                // Free all i/o context structures per socket
                //
                pTempIO = (PPER_IO_CONTEXT)(lpPerSocketContext->pIOContext);
                do {
                    pNextIO = (PPER_IO_CONTEXT)(pTempIO->pIOContextForward);
                    if (pTempIO) {

                        //
                        //The overlapped structure is safe to free when only the posted i/o has
                        //completed. Here we only need to test those posted but not yet received 
                        //by PQCS in the shutdown process.
                        //
                        if (g_bEndServer)
                            while (!HasOverlappedIoCompleted((LPOVERLAPPED)pTempIO)) Sleep(0);
                        xfree(pTempIO);
                        pTempIO = NULL;
                    }
                    pTempIO = pNextIO;
                } while (pNextIO);

                xfree(lpPerSocketContext);
                lpPerSocketContext = NULL;

            }
        };

        void CloseClient(PPER_SOCKET_CONTEXT lpPerSocketContext, bool bGraceful) {
            if (lpPerSocketContext) {
                if (!bGraceful) {

                    //
                    // force the subsequent closesocket to be abortative.
                    //
                    LINGER  lingerStruct;

                    lingerStruct.l_onoff = 1;
                    lingerStruct.l_linger = 0;
                    setsockopt(lpPerSocketContext->CurCon->csock->fd(), SOL_SOCKET, SO_LINGER,
                        (char*)&lingerStruct, sizeof(lingerStruct));
                }
                delete lpPerSocketContext->CurCon->csock;
                CtxtListDeleteFrom(lpPerSocketContext);
                lpPerSocketContext = NULL;
            }
            return;
        };

    private:
        HANDLE              g_iocp;
        PPER_SOCKET_CONTEXT g_pCtxtList;
        bool                g_bEndServer;
        socket             *g_serversocket;
        eventapi           *g_eventapi;
    };

    void eventapi::RequestEvent(con* curcon, const int tid, void* args) {
        //dummy
@@ -100,6 +361,36 @@ namespace netplus {
    event::~event() {
    }


    class EventWorkerArgs {
    public:
        EventWorkerArgs() {
        }

        EventWorkerArgs(const EventWorkerArgs& eargs) {
            event = eargs.event;
            pollfd = eargs.pollfd;
            ssocket = eargs.ssocket;
            timeout = eargs.timeout;
            args = eargs.args;
        }

        SOCKET              pollfd;
        int                 timeout;

        eventapi* event;
        socket* ssocket;
        void* args;
    };

    class EventWorker {
    public:
        EventWorker(int tid, EventWorkerArgs* args) {
        EVENTLOOP:
            goto EVENTLOOP;
        }
    };

    void event::runEventloop(void* args) {
        NetException exception;

@@ -119,6 +410,10 @@ namespace netplus {
        GUID GuidAcceptEx = WSAID_ACCEPTEX;
        WSAOVERLAPPED olOverlap;
        DWORD dwBytes;
        DWORD dwRecvNumBytes = 0;
        DWORD dwFlags = 0;

        int nRet = 0;

        int iResult = WSAIoctl(_ServerSocket->fd(), SIO_GET_EXTENSION_FUNCTION_POINTER,
            &GuidAcceptEx, sizeof(GuidAcceptEx),
@@ -135,6 +430,67 @@ namespace netplus {

        memset(&olOverlap, 0, sizeof(olOverlap));

        poll evpoll(iocp,this, _ServerSocket);

        EventWorkerArgs eargs;
        eargs.ssocket = _ServerSocket;
        eargs.event = this;
        eargs.pollfd = _pollFD;
        eargs.timeout = _Timeout;

        std::vector <std::thread> threadpool;

        for (int i = 0; i < threads; i++) {
            try {
                threadpool.push_back(std::thread([&eargs, i] {
                    EventWorker(i, &eargs);
                    }));
            }
            catch (NetException& e) {
                throw e;
            }
        }

        for (;;) {
            try {
                SOCKET sdAccept = WSAAccept(_ServerSocket->fd(), nullptr, nullptr, nullptr, 0);
                if (sdAccept == SOCKET_ERROR) {
                    NetException except;
                    except[NetException::Error] << "CloseEventHandler: can't close socket to epoll: " << WSAGetLastError();
                    throw except;
                }
                else {
                    std::cerr << "Clientsocket Accepted: " << sdAccept << std::endl;
                }

                poll::PPER_SOCKET_CONTEXT lpPerSocketContext = evpoll.UpdateCompletionPort(sdAccept, poll::ClientIoRead, true);

                nRet = WSARecv(sdAccept, &(lpPerSocketContext->pIOContext->wsabuf),
                    1, &dwRecvNumBytes, &dwFlags,
                    &(lpPerSocketContext->pIOContext->Overlapped), nullptr);
                if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {
                    NetException e;
                    e[NetException::Note] << "WSARecv() Failed: ", WSAGetLastError();
                    evpoll.CloseClient(lpPerSocketContext, FALSE);
                    throw e;
                }

                std::cout.write(lpPerSocketContext->pIOContext->wsabuf.buf, lpPerSocketContext->pIOContext->wsabuf.len)<<std::endl;

                std::cout << "test" << std::endl;

            } catch (NetException& e) {
                if (e.getErrorType() == NetException::Note) {
                    std::cerr << e.what() << std::endl;
                } else {
                    throw e;
                }
            }
        }

        for (auto i = threadpool.begin(); i != threadpool.end(); ++i) {
            i->join();
        }
    }
};

+2 −5
Original line number Diff line number Diff line
@@ -46,6 +46,7 @@ namespace netplus {
        public:
            socket();
            virtual                 ~socket();

            void                     setnonblocking();
            void                     setTimeout(int sec);
            
@@ -72,9 +73,6 @@ namespace netplus {
            int                      _Type;
            void                    *_Extension;
            static std::atomic<int>  _InitCount;
#ifdef Windows
            WSAData                  _WSAData;
#endif // Windows
        };
        
        class tcp : public socket{
@@ -104,7 +102,6 @@ namespace netplus {
            virtual void connect(socket *csock);

            void getAddress(std::string &addr);

        private:
            int             _Maxconnections;
            std::string     _UxPath;
+4 −3
Original line number Diff line number Diff line
@@ -39,18 +39,19 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "error.h"
#include "config.h"

//#define HIDDEN __attribute__ ((visibility ("hidden")))


#define WIN32_LEAN_AND_MEAN

#pragma comment (lib, "Ws2_32.lib")

std::atomic<int> netplus::socket::_InitCount=0;

WSAData _WSAData;

netplus::socket::socket(){
    _Socket=-1;
    _SocketPtr=nullptr;
    _Extension = nullptr;
    _SocketPtrSize = 0;
    _Type=-1;
    if (_InitCount<1) {
        if (WSAStartup(MAKEWORD(2, 2), &_WSAData) != 0) {
+1 −1

File changed.

Contains only whitespace changes.