]> jfr.im git - irc/DALnet/libd.git/commitdiff
Tie in queueing with reads...
authorepiphani <redacted>
Tue, 21 Apr 2009 16:50:33 +0000 (12:50 -0400)
committerepiphani <redacted>
Tue, 21 Apr 2009 16:50:33 +0000 (12:50 -0400)
include/sockeng.h
src/client.c
src/ebuf.c
src/sockeng.c

index 203a0a9871bf3d97a52f881e9eb5d0ec0e11871c..1b6b30e1dd3f403bb0c6261ecf65de908a58ab3c 100644 (file)
 #include <errno.h>
 #include <unistd.h>
 
-#define WRITEV_IOV 32 /* FIXME future configured value */
+#include "ebuf.h"
 
+#define WRITEV_IOV 32 /* FIXME future configured value */
 #define DEBUG
-
 #define MAX_FDS 1024           /* maximum supported file descriptors */
-
 #define BUFSIZE 8192
 
 /* typedefs make life easier */
@@ -93,6 +92,8 @@ struct _client {
        ipvx            addr;                   /* address of the client */
        unsigned int    bufsize;                /* current size of the buffer */
        char            buffer[BUFSIZE];        /* the buffer! */
+       eBuf            recvQ;                  /* inbound queue */
+       eBuf            sendQ;                  /* outbound queue */
        int             sockerr;                /* any socket error is cached here */
        unsigned short  port;                   /* the remote port */
        int             type;                   /* the type of this connection (tcp/udp/raw) */
index 3aa455d0b32c73c933ab2a101bbe0dcf5e05f9b4..0deab9a14186f0d1cf31f289388bad59cb5758c9 100644 (file)
@@ -53,6 +53,16 @@ void client_do_rw(SockEng *s, Client *c, int rr, int rw)
                if(pack_off) {
                        /* will enter into the parse queue later */
                        c->parser(c, pack_off);
+               } else {
+                       /* no contained message, queue and check against older stuff */
+                       if(ebuf_put(&c->recvQ, &readbuf, len))
+                               return;
+                       if(eBufLength(&c->recvQ) > len) {
+                               len = ebuf_get(&c->recvQ, &readbuf, BUFSIZE);
+                               pack_off = c->packeter(c, &readbuf, len);
+                               if(pack_off)
+                                       c->parser(c, pack_off);
+                       }
                }
        }
        if(rw) {
index a160415a5584f56955075d54289f7d93ad834df2..584460500cf28a98ec4a0ae313eb707972ce5b13 100644 (file)
@@ -3,6 +3,7 @@
  */
 
 #include "sockeng.h"
+#include "ebuf.h"
 
 /* Maximum message length */
 #define MAX_MSGLEN 512
 #define EBUF_LARGE_BUFFER      MAX_MSGLEN
 #define EBUF_SMALL_BUFFER      (MAX_MSGLEN / 2)
 
-/* Macros */
-#define eBufLength(s)          ((s)->length)
-#define eBufClear(s)           ebuf_delete((s), (s)->length)
-
-/* forward declaration */
-typedef struct _eBuffer eBuffer;
-typedef struct _eBufUser eBufUser;
-typedef struct _eBuf eBuf;
-typedef struct _eBufBlock eBufBlock;
-typedef struct _eBufUserBlock eBufUserBlock;
-
-struct _eBuf
-{
-       int             length;
-       eBufUser        *head, *tail;
-};
-
-struct _eBuffer 
-{
-       eBuffer         *next;
-       int             shared;
-       int             bufsize;
-       int             refcount;
-       char            *end;
-};
-
-struct _eBufBlock
-{
-       int             num;
-       eBuffer         *bufs;
-       eBufBlock       *next;
-};
-
-struct _eBufUser
-{
-       char            *start;
-       eBuffer         *buf;
-       eBufUser        *next;
-};
-
-struct _eBufUserBlock
-{
-       int             num;
-       eBufUser        *users;
-       eBufUserBlock   *next;
-};
-
-
 typedef struct _eBufConfig {
        eBuffer         *largeebuf_pool, *smallebuf_pool;
        eBufUser        *user_pool;
@@ -274,7 +227,7 @@ int ebuf_init()
  * we allocate an ebuffer with the data, and return it to the calling
  * function via ptr
  */
-int ebuf_begin_share(const char *buffer, int len, void **ptr)
+eBuffer *ebuf_begin_share(const char *buffer, int len)
 {
        eBuffer *s;
 
@@ -282,23 +235,22 @@ int ebuf_begin_share(const char *buffer, int len, void **ptr)
                len = MAX_MSGLEN;
 
        s = ebuf_alloc(len);
-       if(!s || len > s->bufsize)
-               return ebuf_alloc_error();
+       if(!s || len > s->bufsize) {
+               ebuf_alloc_error();
+               return NULL;
+       }
 
        memcpy(s->end, buffer, len);
        s->end += len;
        s->refcount = 0;
        s->shared = 1;
 
-       *ptr = (void *) s;
-       return 0;
+       return s;
 }
 
 /* identify that this buffer has been associated to all references now */
-int ebuf_end_share(void *ptr)
+int ebuf_end_share(eBuffer *s)
 {
-       eBuffer *s = ptr;
-
        s->shared = 0;
        if(s->refcount == 0)
                ebuf_free(s);
@@ -307,10 +259,9 @@ int ebuf_end_share(void *ptr)
 }
 
 /* associate a given shared buffer with a queue */
-int ebuf_put_share(eBuf *sb, void *ptr)
+int ebuf_put_share(eBuf *sb, eBuffer *s)
 {
        eBufUser *user;
-       eBuffer  *s = ptr;
 
        if(!s)
                return -1;
@@ -331,7 +282,7 @@ int ebuf_put_share(eBuf *sb, void *ptr)
 }
 
 /* create and place a buffer in the correct user location */
-int ebuf_put(eBuf *sb, const char *buffer, int len)
+int ebuf_put(eBuf *sb, char *buffer, int len)
 {
        eBufUser        **user, *u;
        int             chunk;
@@ -381,6 +332,7 @@ int ebuf_put(eBuf *sb, const char *buffer, int len)
        return 0;
 }
 
+/*
 int ebuf_putiov(eBuf *sb, struct iovec *v, int count)
 {
        int i = 0, ret;
@@ -393,6 +345,7 @@ int ebuf_putiov(eBuf *sb, struct iovec *v, int count)
        }
        return 0;
 }
+*/
 
 int ebuf_delete(eBuf *sb, int len)
 {
@@ -486,31 +439,31 @@ int ebuf_flush(eBuf *sb)
 }
 */
 
+/* get an arbitrary length from the queue and put it in buffer */
 int ebuf_get(eBuf *sb, char *buffer, int len)
 {
        eBufUser        *user;
-       int             copied;
+       int             copied = 0;
        char            *ptr, *max;
        
-       if(ebuf_flush(sb) == 0)
-               return 0;
-       
-       copied = 0;
        for(user = sb->head; user && len; user = user->next) {
                max = user->start + len; 
                if(max > user->buf->end)
                        max = user->buf->end;
                
-               for(ptr = user->start; ptr < max && !IsEol(*ptr); )
+               ptr = user->start;
+               while(ptr < max)
                        *buffer++ = *ptr++;
                        
                copied += ptr - user->start;
                len -= ptr - user->start;
                
-               if(ptr < max) {
-                       *buffer = 0;
+               if(!len) {
+                       *buffer = 0;    /* null terminate.. */
+                       /*
                        ebuf_delete(sb, copied);
                        ebuf_flush(sb);
+                       */
                        return copied;
                }
        }
index 4339e027cf248caa0b3c01cd105498fdceaccc75..6192078c7f1f59f73cebc50f476dbcd44554bc70 100644 (file)
@@ -35,6 +35,7 @@ SockEng *init_sockeng()
        new->set_errorhandler = set_errorhandler;
 
        engine_init(new);
+       ebuf_init();
 
        return new;
 }