char client_key[37]; /* maximum 36 bytes + nul */
} conn_t;
+#define WEBSOCKET_OPCODE_TEXT_FRAME 1
+
+#define WEBSOCKET_MASK_LENGTH 4
+
+#define WEBSOCKET_MAX_UNEXTENDED_PAYLOAD_DATA_LENGTH 125
+
+typedef struct {
+ uint8_t opcode_rsv_fin; // opcode: 4, rsv1: 1, rsv2: 1, rsv3: 1, fin: 1
+ uint8_t payload_length_mask; // payload_length: 7, mask: 1
+} ws_frame_hdr_t;
+
+#define WEBSOCKET_FRAME_HDR_INIT ((ws_frame_hdr_t) { 0, 0 })
+
+typedef struct {
+ ws_frame_hdr_t header;
+ uint8_t payload_data[WEBSOCKET_MAX_UNEXTENDED_PAYLOAD_DATA_LENGTH];
+} ws_frame_payload_t;
+
+typedef struct {
+ ws_frame_hdr_t header;
+} ws_frame_t;
+
+typedef struct {
+ ws_frame_hdr_t header;
+ uint16_t payload_length_extended;
+} ws_frame_ext_t;
+
+#define WEBSOCKET_FRAME_EXT_INIT ((ws_frame_ext_t) { WEBSOCKET_FRAME_HDR_INIT, 0 })
+
+typedef struct {
+ ws_frame_hdr_t header;
+ uint64_t payload_length_extended;
+} ws_frame_ext2_t;
+
+static inline void
+ws_frame_set_opcode(ws_frame_hdr_t *header, int opcode)
+{
+ header->opcode_rsv_fin &= ~0xF;
+ header->opcode_rsv_fin |= opcode & 0xF;
+}
+
+static inline void
+ws_frame_set_fin(ws_frame_hdr_t *header, int fin)
+{
+ header->opcode_rsv_fin &= ~(0x1 << 7);
+ header->opcode_rsv_fin |= (fin << 7) & (0x1 << 7);
+}
+
static void close_conn(conn_t * conn, int wait_plain, const char *fmt, ...);
static void conn_mod_read_cb(rb_fde_t *fd, void *data);
static void conn_plain_read_cb(rb_fde_t *fd, void *data);
+static void conn_plain_process_recvq(conn_t *conn);
#define FLAG_CORK 0x01
#define FLAG_DEAD 0x02
#define IsCork(x) ((x)->flags & FLAG_CORK)
#define IsDead(x) ((x)->flags & FLAG_DEAD)
-#define IsWS(x) ((x)->flags & FLAG_WSOCK)
#define IsKeyed(x) ((x)->flags & FLAG_KEYED)
#define SetCork(x) ((x)->flags |= FLAG_CORK)
#define SetKeyed(x) ((x)->flags |= FLAG_KEYED)
#define ClearCork(x) ((x)->flags &= ~FLAG_CORK)
-#define ClearDead(x) ((x)->flags &= ~FLAG_DEAD)
-#define ClearWS(x) ((x)->flags &= ~FLAG_WSOCK)
-#define ClearKeyed(x) ((x)->flags &= ~FLAG_KEYED)
#define NO_WAIT 0x0
#define WAIT_PLAIN 0x1
-#define HASH_WALK_SAFE(i, max, ptr, next, table) for(i = 0; i < max; i++) { RB_DLINK_FOREACH_SAFE(ptr, next, table[i].head)
-#define HASH_WALK_END }
#define CONN_HASH_SIZE 2000
#define connid_hash(x) (&connid_hash_table[(x % CONN_HASH_SIZE)])
return MAXCONNECTIONS;
}
-static conn_t *
-conn_find_by_id(uint32_t id)
-{
- rb_dlink_node *ptr;
- conn_t *conn;
-
- RB_DLINK_FOREACH(ptr, (connid_hash(id))->head)
- {
- conn = ptr->data;
- if(conn->id == id && !IsDead(conn))
- return conn;
- }
- return NULL;
-}
-
static void
conn_add_id_hash(conn_t * conn, uint32_t id)
{
dead_list.tail = dead_list.head = NULL;
}
+static void
+conn_plain_write_sendq(rb_fde_t *fd, void *data)
+{
+ conn_t *conn = data;
+ int retlen;
+
+ if(IsDead(conn))
+ return;
+
+ while((retlen = rb_linebuf_flush(fd, &conn->plainbuf_out)) > 0)
+ conn->plain_out += retlen;
+
+ if(retlen == 0 || (retlen < 0 && !rb_ignore_errno(errno)))
+ {
+ close_conn(data, NO_WAIT, NULL);
+ return;
+ }
+
+ if(rb_linebuf_alloclen(&conn->plainbuf_out) > 0)
+ rb_setselect(conn->plain_fd, RB_SELECT_WRITE, conn_plain_write_sendq, conn);
+ else
+ rb_setselect(conn->plain_fd, RB_SELECT_WRITE, NULL, NULL);
+}
+
static void
conn_mod_write_sendq(rb_fde_t *fd, void *data)
{
}
static void
-conn_plain_write(conn_t * conn, void *data, size_t len)
+conn_mod_write_short_frame(conn_t * conn, void *data, int len)
+{
+ ws_frame_hdr_t hdr = WEBSOCKET_FRAME_HDR_INIT;
+
+ ws_frame_set_opcode(&hdr, WEBSOCKET_OPCODE_TEXT_FRAME);
+ ws_frame_set_fin(&hdr, 1);
+ hdr.payload_length_mask = (len + 2) & 0x7f;
+
+ conn_mod_write(conn, &hdr, sizeof(hdr));
+ conn_mod_write(conn, data, len);
+ conn_mod_write(conn, "\r\n", 2);
+}
+
+static void
+conn_mod_write_long_frame(conn_t * conn, void *data, int len)
+{
+ ws_frame_ext_t hdr = WEBSOCKET_FRAME_EXT_INIT;
+
+ ws_frame_set_opcode(&hdr.header, WEBSOCKET_OPCODE_TEXT_FRAME);
+ ws_frame_set_fin(&hdr.header, 1);
+ hdr.header.payload_length_mask = 126;
+ hdr.payload_length_extended = htons(len + 2);
+
+ conn_mod_write(conn, &hdr, sizeof(hdr));
+ conn_mod_write(conn, data, len);
+ conn_mod_write(conn, "\r\n", 2);
+}
+
+static void
+conn_mod_write_frame(conn_t *conn, void *data, int len)
{
- if(IsDead(conn)) /* again no point in queueing to dead men */
+ if(IsDead(conn)) /* no point in queueing to a dead man */
+ return;
+
+ if (len < 123)
+ {
+ conn_mod_write_short_frame(conn, data, len);
return;
- rb_linebuf_put(&conn->plainbuf_out, data, len);
+ }
+
+ conn_mod_write_long_frame(conn, data, len);
}
static void
if(IsDead(conn))
return;
+ if (IsKeyed(conn))
+ conn_plain_process_recvq(conn);
+
rb_rawbuf_flush(conn->modbuf_out, conn->mod_fd);
rb_linebuf_flush(conn->plain_fd, &conn->plainbuf_out);
rb_close(conn->mod_fd);
rb_close(ctlb->F[i]);
}
+static void
+ws_frame_unmask(char *msg, int length, uint8_t maskval[WEBSOCKET_MASK_LENGTH])
+{
+ int i;
+
+ for (i = 0; i < length; i++)
+ msg[i] = msg[i] ^ maskval[i % 4];
+}
+
+static void
+conn_mod_process_frame(conn_t *conn, ws_frame_hdr_t *hdr, int masked)
+{
+ char msg[WEBSOCKET_MAX_UNEXTENDED_PAYLOAD_DATA_LENGTH];
+ uint8_t maskval[WEBSOCKET_MASK_LENGTH];
+ int dolen;
+
+ /* if we're masked, we get to collect the masking key for this frame */
+ if (masked)
+ {
+ dolen = rb_rawbuf_get(conn->modbuf_in, maskval, sizeof(maskval));
+ if (!dolen)
+ {
+ close_conn(conn, WAIT_PLAIN, "websocket error: fault unpacking unmask key");
+ return;
+ }
+ }
+
+ dolen = rb_rawbuf_get(conn->modbuf_in, msg, hdr->payload_length_mask);
+ if (!dolen)
+ {
+ close_conn(conn, WAIT_PLAIN, "websocket error: fault unpacking message");
+ return;
+ }
+
+ if (masked)
+ ws_frame_unmask(msg, dolen, maskval);
+
+ rb_linebuf_parse(&conn->plainbuf_out, msg, dolen, 1);
+}
+
+static void
+conn_mod_process_large(conn_t *conn, ws_frame_hdr_t *hdr, int masked)
+{
+ char msg[READBUF_SIZE];
+ uint16_t msglen;
+ uint8_t maskval[WEBSOCKET_MASK_LENGTH];
+ int dolen;
+
+ memset(msg, 0, sizeof msg);
+
+ dolen = rb_rawbuf_get(conn->modbuf_in, &msglen, sizeof(msglen));
+ if (!dolen)
+ {
+ close_conn(conn, WAIT_PLAIN, "websocket error: fault unpacking message size");
+ return;
+ }
+
+ msglen = ntohs(msglen);
+
+ if (masked)
+ {
+ dolen = rb_rawbuf_get(conn->modbuf_in, maskval, sizeof(maskval));
+ if (!dolen)
+ {
+ close_conn(conn, WAIT_PLAIN, "websocket error: fault unpacking unmask key");
+ return;
+ }
+ }
+
+ dolen = rb_rawbuf_get(conn->modbuf_in, msg, msglen);
+ if (!dolen)
+ {
+ close_conn(conn, WAIT_PLAIN, "websocket error: fault unpacking message");
+ return;
+ }
+
+ if (masked)
+ ws_frame_unmask(msg, dolen, maskval);
+
+ rb_linebuf_parse(&conn->plainbuf_out, msg, dolen, 1);
+}
+
+static void
+conn_mod_process_huge(conn_t *conn, ws_frame_hdr_t *hdr, int masked)
+{
+ /* XXX implement me */
+}
+
+static void
+conn_mod_process(conn_t *conn)
+{
+ ws_frame_hdr_t hdr;
+
+ while (1)
+ {
+ int masked;
+ int dolen = rb_rawbuf_get(conn->modbuf_in, &hdr, sizeof(hdr));
+ if (dolen != sizeof(hdr))
+ break;
+
+ masked = (hdr.payload_length_mask >> 7) == 1;
+
+ hdr.payload_length_mask &= 0x7f;
+ switch (hdr.payload_length_mask)
+ {
+ case 126:
+ conn_mod_process_large(conn, &hdr, masked);
+ break;
+ case 127:
+ conn_mod_process_huge(conn, &hdr, masked);
+ break;
+ default:
+ conn_mod_process_frame(conn, &hdr, masked);
+ break;
+ }
+ }
+
+ conn_plain_write_sendq(conn->plain_fd, conn);
+}
+
static void
conn_mod_handshake_process(conn_t *conn)
{
char inbuf[READBUF_SIZE];
+ memset(inbuf, 0, sizeof inbuf);
+
while (1)
{
char *p = NULL;
- size_t dolen = rb_rawbuf_get(conn->modbuf_in, inbuf, sizeof inbuf);
+ int dolen = rb_rawbuf_get(conn->modbuf_in, inbuf, sizeof inbuf);
if (!dolen)
break;
- if ((p = strcasestr(inbuf, "Sec-WebSocket-Key:")) != NULL)
+ if ((p = rb_strcasestr(inbuf, "Sec-WebSocket-Key:")) != NULL)
{
char *start, *end;
}
static void
-conn_mod_handshake_cb(rb_fde_t *fd, void *data)
+conn_mod_read_cb(rb_fde_t *fd, void *data)
{
char inbuf[READBUF_SIZE];
+
+ memset(inbuf, 0, sizeof inbuf);
+
conn_t *conn = data;
int length = 0;
if (conn == NULL)
length = rb_read(fd, inbuf, sizeof(inbuf));
- if (length < 0)
+ if (length < 0)
{
if (rb_ignore_errno(errno))
- rb_setselect(fd, RB_SELECT_READ, conn_mod_handshake_cb, conn);
+ {
+ rb_setselect(fd, RB_SELECT_READ, conn_mod_read_cb, conn);
+ conn_plain_write_sendq(conn->plain_fd, conn);
+ }
else
close_conn(conn, NO_WAIT, "Connection closed");
}
rb_rawbuf_append(conn->modbuf_in, inbuf, length);
- conn_mod_handshake_process(conn);
+ if (!IsKeyed(conn))
+ conn_mod_handshake_process(conn);
+ else
+ conn_mod_process(conn);
- if (length < sizeof(inbuf))
+ if ((size_t) length < sizeof(inbuf))
{
- rb_setselect(fd, RB_SELECT_READ, conn_mod_handshake_cb, conn);
+ rb_setselect(fd, RB_SELECT_READ, conn_mod_read_cb, conn);
return;
}
}
}
+static bool
+plain_check_cork(conn_t * conn)
+{
+ if(rb_rawbuf_length(conn->modbuf_out) >= 4096)
+ {
+ /* if we have over 4k pending outbound, don't read until
+ * we've cleared the queue */
+ SetCork(conn);
+ rb_setselect(conn->plain_fd, RB_SELECT_READ, NULL, NULL);
+ /* try to write */
+ if (IsKeyed(conn))
+ conn_mod_write_sendq(conn->mod_fd, conn);
+ return true;
+ }
+
+ return false;
+}
+
static void
-conn_mod_read_cb(rb_fde_t *fd, void *data)
+conn_plain_process_recvq(conn_t *conn)
{
+ char inbuf[READBUF_SIZE];
+
+ memset(inbuf, 0, sizeof inbuf);
+
+ while (1)
+ {
+ int dolen = rb_linebuf_get(&conn->plainbuf_in, inbuf, sizeof inbuf, LINEBUF_COMPLETE, LINEBUF_PARSED);
+ if (!dolen)
+ break;
+
+ conn_mod_write_frame(conn, inbuf, dolen);
+ }
+
+ if (IsKeyed(conn))
+ conn_mod_write_sendq(conn->mod_fd, conn);
}
static void
conn_plain_read_cb(rb_fde_t *fd, void *data)
{
+ char inbuf[READBUF_SIZE];
+
+ memset(inbuf, 0, sizeof inbuf);
+
+ conn_t *conn = data;
+ int length = 0;
+ if(conn == NULL)
+ return;
+
+ if(IsDead(conn))
+ return;
+
+ if(plain_check_cork(conn))
+ return;
+
+ while(1)
+ {
+ if(IsDead(conn))
+ return;
+
+ length = rb_read(conn->plain_fd, inbuf, sizeof(inbuf));
+
+ if(length == 0 || (length < 0 && !rb_ignore_errno(errno)))
+ {
+ close_conn(conn, NO_WAIT, NULL);
+ return;
+ }
+
+ if(length < 0)
+ {
+ rb_setselect(conn->plain_fd, RB_SELECT_READ, conn_plain_read_cb, conn);
+ if (IsKeyed(conn))
+ conn_plain_process_recvq(conn);
+ return;
+ }
+ conn->plain_in += length;
+
+ (void) rb_linebuf_parse(&conn->plainbuf_in, inbuf, length, 0);
+
+ if(IsDead(conn))
+ return;
+ if(plain_check_cork(conn))
+ return;
+ }
}
static void
if(rb_get_type(conn->plain_fd) == RB_FD_UNKNOWN)
rb_set_type(conn->plain_fd, RB_FD_SOCKET);
- conn_mod_handshake_cb(conn->mod_fd, conn);
+ conn_mod_read_cb(conn->mod_fd, conn);
+ conn_plain_read_cb(conn->plain_fd, conn);
}
static void
main(int argc, char **argv)
{
const char *s_ctlfd, *s_pipe, *s_pid;
- int ctlfd, pipefd, x, maxfd;
+ int ctlfd, pipefd, maxfd;
maxfd = maxconn();
s_ctlfd = getenv("CTL_FD");
ctlfd = atoi(s_ctlfd);
pipefd = atoi(s_pipe);
ppid = atoi(s_pid);
- x = 0;
+
#ifndef _WIN32
+ int x = 0;
for(x = 0; x < maxfd; x++)
{
if(x != ctlfd && x != pipefd && x > 2)