X-Git-Url: https://jfr.im/git/solanum.git/blobdiff_plain/1160f6c9bf634c17f48e7777939fb876fa1538db..23e722ea21c49c02b7022a908b29f91042e60671:/wsockd/wsockd.c?ds=sidebyside diff --git a/wsockd/wsockd.c b/wsockd/wsockd.c index db1d3ddc..29bbdffb 100644 --- a/wsockd/wsockd.c +++ b/wsockd/wsockd.c @@ -95,9 +95,58 @@ typedef struct _conn char client_key[37]; /* maximum 36 bytes + nul */ } conn_t; +#define WEBSOCKET_OPCODE_TEXT_FRAME 1 + +#define WEBSOCKET_MASK_LENGTH 4 + +#define WEBSOCKET_MAX_UNEXTENDED_PAYLOAD_DATA_LENGTH 125 + +typedef struct { + uint8_t opcode_rsv_fin; // opcode: 4, rsv1: 1, rsv2: 1, rsv3: 1, fin: 1 + uint8_t payload_length_mask; // payload_length: 7, mask: 1 +} ws_frame_hdr_t; + +#define WEBSOCKET_FRAME_HDR_INIT ((ws_frame_hdr_t) { 0, 0 }) + +typedef struct { + ws_frame_hdr_t header; + uint8_t payload_data[WEBSOCKET_MAX_UNEXTENDED_PAYLOAD_DATA_LENGTH]; +} ws_frame_payload_t; + +typedef struct { + ws_frame_hdr_t header; +} ws_frame_t; + +typedef struct { + ws_frame_hdr_t header; + uint16_t payload_length_extended; +} ws_frame_ext_t; + +#define WEBSOCKET_FRAME_EXT_INIT ((ws_frame_ext_t) { WEBSOCKET_FRAME_HDR_INIT, 0 }) + +typedef struct { + ws_frame_hdr_t header; + uint64_t payload_length_extended; +} ws_frame_ext2_t; + +static inline void +ws_frame_set_opcode(ws_frame_hdr_t *header, int opcode) +{ + header->opcode_rsv_fin &= ~0xF; + header->opcode_rsv_fin |= opcode & 0xF; +} + +static inline void +ws_frame_set_fin(ws_frame_hdr_t *header, int fin) +{ + header->opcode_rsv_fin &= ~(0x1 << 7); + header->opcode_rsv_fin |= (fin << 7) & (0x1 << 7); +} + static void close_conn(conn_t * conn, int wait_plain, const char *fmt, ...); static void conn_mod_read_cb(rb_fde_t *fd, void *data); static void conn_plain_read_cb(rb_fde_t *fd, void *data); +static void conn_plain_process_recvq(conn_t *conn); #define FLAG_CORK 0x01 #define FLAG_DEAD 0x02 @@ -106,7 +155,6 @@ static void conn_plain_read_cb(rb_fde_t *fd, void *data); #define IsCork(x) ((x)->flags & FLAG_CORK) #define IsDead(x) ((x)->flags & FLAG_DEAD) -#define IsWS(x) ((x)->flags & FLAG_WSOCK) #define IsKeyed(x) ((x)->flags & FLAG_KEYED) #define SetCork(x) ((x)->flags |= FLAG_CORK) @@ -115,15 +163,10 @@ static void conn_plain_read_cb(rb_fde_t *fd, void *data); #define SetKeyed(x) ((x)->flags |= FLAG_KEYED) #define ClearCork(x) ((x)->flags &= ~FLAG_CORK) -#define ClearDead(x) ((x)->flags &= ~FLAG_DEAD) -#define ClearWS(x) ((x)->flags &= ~FLAG_WSOCK) -#define ClearKeyed(x) ((x)->flags &= ~FLAG_KEYED) #define NO_WAIT 0x0 #define WAIT_PLAIN 0x1 -#define HASH_WALK_SAFE(i, max, ptr, next, table) for(i = 0; i < max; i++) { RB_DLINK_FOREACH_SAFE(ptr, next, table[i].head) -#define HASH_WALK_END } #define CONN_HASH_SIZE 2000 #define connid_hash(x) (&connid_hash_table[(x % CONN_HASH_SIZE)]) @@ -185,21 +228,6 @@ maxconn(void) return MAXCONNECTIONS; } -static conn_t * -conn_find_by_id(uint32_t id) -{ - rb_dlink_node *ptr; - conn_t *conn; - - RB_DLINK_FOREACH(ptr, (connid_hash(id))->head) - { - conn = ptr->data; - if(conn->id == id && !IsDead(conn)) - return conn; - } - return NULL; -} - static void conn_add_id_hash(conn_t * conn, uint32_t id) { @@ -234,6 +262,30 @@ clean_dead_conns(void *unused) dead_list.tail = dead_list.head = NULL; } +static void +conn_plain_write_sendq(rb_fde_t *fd, void *data) +{ + conn_t *conn = data; + int retlen; + + if(IsDead(conn)) + return; + + while((retlen = rb_linebuf_flush(fd, &conn->plainbuf_out)) > 0) + conn->plain_out += retlen; + + if(retlen == 0 || (retlen < 0 && !rb_ignore_errno(errno))) + { + close_conn(data, NO_WAIT, NULL); + return; + } + + if(rb_linebuf_alloclen(&conn->plainbuf_out) > 0) + rb_setselect(conn->plain_fd, RB_SELECT_WRITE, conn_plain_write_sendq, conn); + else + rb_setselect(conn->plain_fd, RB_SELECT_WRITE, NULL, NULL); +} + static void conn_mod_write_sendq(rb_fde_t *fd, void *data) { @@ -277,11 +329,47 @@ conn_mod_write(conn_t * conn, void *data, size_t len) } static void -conn_plain_write(conn_t * conn, void *data, size_t len) +conn_mod_write_short_frame(conn_t * conn, void *data, int len) { - if(IsDead(conn)) /* again no point in queueing to dead men */ + ws_frame_hdr_t hdr = WEBSOCKET_FRAME_HDR_INIT; + + ws_frame_set_opcode(&hdr, WEBSOCKET_OPCODE_TEXT_FRAME); + ws_frame_set_fin(&hdr, 1); + hdr.payload_length_mask = (len + 2) & 0x7f; + + conn_mod_write(conn, &hdr, sizeof(hdr)); + conn_mod_write(conn, data, len); + conn_mod_write(conn, "\r\n", 2); +} + +static void +conn_mod_write_long_frame(conn_t * conn, void *data, int len) +{ + ws_frame_ext_t hdr = WEBSOCKET_FRAME_EXT_INIT; + + ws_frame_set_opcode(&hdr.header, WEBSOCKET_OPCODE_TEXT_FRAME); + ws_frame_set_fin(&hdr.header, 1); + hdr.header.payload_length_mask = 126; + hdr.payload_length_extended = htons(len + 2); + + conn_mod_write(conn, &hdr, sizeof(hdr)); + conn_mod_write(conn, data, len); + conn_mod_write(conn, "\r\n", 2); +} + +static void +conn_mod_write_frame(conn_t *conn, void *data, int len) +{ + if(IsDead(conn)) /* no point in queueing to a dead man */ + return; + + if (len < 123) + { + conn_mod_write_short_frame(conn, data, len); return; - rb_linebuf_put(&conn->plainbuf_out, data, len); + } + + conn_mod_write_long_frame(conn, data, len); } static void @@ -337,6 +425,9 @@ close_conn(conn_t * conn, int wait_plain, const char *fmt, ...) if(IsDead(conn)) return; + if (IsKeyed(conn)) + conn_plain_process_recvq(conn); + rb_rawbuf_flush(conn->modbuf_out, conn->mod_fd); rb_linebuf_flush(conn->plain_fd, &conn->plainbuf_out); rb_close(conn->mod_fd); @@ -395,20 +486,142 @@ cleanup_bad_message(mod_ctl_t * ctl, mod_ctl_buf_t * ctlb) rb_close(ctlb->F[i]); } +static void +ws_frame_unmask(char *msg, int length, uint8_t maskval[WEBSOCKET_MASK_LENGTH]) +{ + int i; + + for (i = 0; i < length; i++) + msg[i] = msg[i] ^ maskval[i % 4]; +} + +static void +conn_mod_process_frame(conn_t *conn, ws_frame_hdr_t *hdr, int masked) +{ + char msg[WEBSOCKET_MAX_UNEXTENDED_PAYLOAD_DATA_LENGTH]; + uint8_t maskval[WEBSOCKET_MASK_LENGTH]; + int dolen; + + /* if we're masked, we get to collect the masking key for this frame */ + if (masked) + { + dolen = rb_rawbuf_get(conn->modbuf_in, maskval, sizeof(maskval)); + if (!dolen) + { + close_conn(conn, WAIT_PLAIN, "websocket error: fault unpacking unmask key"); + return; + } + } + + dolen = rb_rawbuf_get(conn->modbuf_in, msg, hdr->payload_length_mask); + if (!dolen) + { + close_conn(conn, WAIT_PLAIN, "websocket error: fault unpacking message"); + return; + } + + if (masked) + ws_frame_unmask(msg, dolen, maskval); + + rb_linebuf_parse(&conn->plainbuf_out, msg, dolen, 1); +} + +static void +conn_mod_process_large(conn_t *conn, ws_frame_hdr_t *hdr, int masked) +{ + char msg[READBUF_SIZE]; + uint16_t msglen; + uint8_t maskval[WEBSOCKET_MASK_LENGTH]; + int dolen; + + memset(msg, 0, sizeof msg); + + dolen = rb_rawbuf_get(conn->modbuf_in, &msglen, sizeof(msglen)); + if (!dolen) + { + close_conn(conn, WAIT_PLAIN, "websocket error: fault unpacking message size"); + return; + } + + msglen = ntohs(msglen); + + if (masked) + { + dolen = rb_rawbuf_get(conn->modbuf_in, maskval, sizeof(maskval)); + if (!dolen) + { + close_conn(conn, WAIT_PLAIN, "websocket error: fault unpacking unmask key"); + return; + } + } + + dolen = rb_rawbuf_get(conn->modbuf_in, msg, msglen); + if (!dolen) + { + close_conn(conn, WAIT_PLAIN, "websocket error: fault unpacking message"); + return; + } + + if (masked) + ws_frame_unmask(msg, dolen, maskval); + + rb_linebuf_parse(&conn->plainbuf_out, msg, dolen, 1); +} + +static void +conn_mod_process_huge(conn_t *conn, ws_frame_hdr_t *hdr, int masked) +{ + /* XXX implement me */ +} + +static void +conn_mod_process(conn_t *conn) +{ + ws_frame_hdr_t hdr; + + while (1) + { + int masked; + int dolen = rb_rawbuf_get(conn->modbuf_in, &hdr, sizeof(hdr)); + if (dolen != sizeof(hdr)) + break; + + masked = (hdr.payload_length_mask >> 7) == 1; + + hdr.payload_length_mask &= 0x7f; + switch (hdr.payload_length_mask) + { + case 126: + conn_mod_process_large(conn, &hdr, masked); + break; + case 127: + conn_mod_process_huge(conn, &hdr, masked); + break; + default: + conn_mod_process_frame(conn, &hdr, masked); + break; + } + } + + conn_plain_write_sendq(conn->plain_fd, conn); +} + static void conn_mod_handshake_process(conn_t *conn) { char inbuf[READBUF_SIZE]; + memset(inbuf, 0, sizeof inbuf); + while (1) { char *p = NULL; - size_t dolen = rb_rawbuf_get(conn->modbuf_in, inbuf, sizeof inbuf); + int dolen = rb_rawbuf_get(conn->modbuf_in, inbuf, sizeof inbuf); if (!dolen) break; - if ((p = strcasestr(inbuf, "Sec-WebSocket-Key:")) != NULL) + if ((p = rb_strcasestr(inbuf, "Sec-WebSocket-Key:")) != NULL) { char *start, *end; @@ -458,9 +671,12 @@ conn_mod_handshake_process(conn_t *conn) } static void -conn_mod_handshake_cb(rb_fde_t *fd, void *data) +conn_mod_read_cb(rb_fde_t *fd, void *data) { char inbuf[READBUF_SIZE]; + + memset(inbuf, 0, sizeof inbuf); + conn_t *conn = data; int length = 0; if (conn == NULL) @@ -476,10 +692,13 @@ conn_mod_handshake_cb(rb_fde_t *fd, void *data) length = rb_read(fd, inbuf, sizeof(inbuf)); - if (length < 0) + if (length < 0) { if (rb_ignore_errno(errno)) - rb_setselect(fd, RB_SELECT_READ, conn_mod_handshake_cb, conn); + { + rb_setselect(fd, RB_SELECT_READ, conn_mod_read_cb, conn); + conn_plain_write_sendq(conn->plain_fd, conn); + } else close_conn(conn, NO_WAIT, "Connection closed"); @@ -492,24 +711,104 @@ conn_mod_handshake_cb(rb_fde_t *fd, void *data) } rb_rawbuf_append(conn->modbuf_in, inbuf, length); - conn_mod_handshake_process(conn); + if (!IsKeyed(conn)) + conn_mod_handshake_process(conn); + else + conn_mod_process(conn); - if (length < sizeof(inbuf)) + if ((size_t) length < sizeof(inbuf)) { - rb_setselect(fd, RB_SELECT_READ, conn_mod_handshake_cb, conn); + rb_setselect(fd, RB_SELECT_READ, conn_mod_read_cb, conn); return; } } } +static bool +plain_check_cork(conn_t * conn) +{ + if(rb_rawbuf_length(conn->modbuf_out) >= 4096) + { + /* if we have over 4k pending outbound, don't read until + * we've cleared the queue */ + SetCork(conn); + rb_setselect(conn->plain_fd, RB_SELECT_READ, NULL, NULL); + /* try to write */ + if (IsKeyed(conn)) + conn_mod_write_sendq(conn->mod_fd, conn); + return true; + } + + return false; +} + static void -conn_mod_read_cb(rb_fde_t *fd, void *data) +conn_plain_process_recvq(conn_t *conn) { + char inbuf[READBUF_SIZE]; + + memset(inbuf, 0, sizeof inbuf); + + while (1) + { + int dolen = rb_linebuf_get(&conn->plainbuf_in, inbuf, sizeof inbuf, LINEBUF_COMPLETE, LINEBUF_PARSED); + if (!dolen) + break; + + conn_mod_write_frame(conn, inbuf, dolen); + } + + if (IsKeyed(conn)) + conn_mod_write_sendq(conn->mod_fd, conn); } static void conn_plain_read_cb(rb_fde_t *fd, void *data) { + char inbuf[READBUF_SIZE]; + + memset(inbuf, 0, sizeof inbuf); + + conn_t *conn = data; + int length = 0; + if(conn == NULL) + return; + + if(IsDead(conn)) + return; + + if(plain_check_cork(conn)) + return; + + while(1) + { + if(IsDead(conn)) + return; + + length = rb_read(conn->plain_fd, inbuf, sizeof(inbuf)); + + if(length == 0 || (length < 0 && !rb_ignore_errno(errno))) + { + close_conn(conn, NO_WAIT, NULL); + return; + } + + if(length < 0) + { + rb_setselect(conn->plain_fd, RB_SELECT_READ, conn_plain_read_cb, conn); + if (IsKeyed(conn)) + conn_plain_process_recvq(conn); + return; + } + conn->plain_in += length; + + (void) rb_linebuf_parse(&conn->plainbuf_in, inbuf, length, 0); + + if(IsDead(conn)) + return; + if(plain_check_cork(conn)) + return; + } } static void @@ -559,7 +858,8 @@ wsock_process(mod_ctl_t * ctl, mod_ctl_buf_t * ctlb) if(rb_get_type(conn->plain_fd) == RB_FD_UNKNOWN) rb_set_type(conn->plain_fd, RB_FD_SOCKET); - conn_mod_handshake_cb(conn->mod_fd, conn); + conn_mod_read_cb(conn->mod_fd, conn); + conn_plain_read_cb(conn->plain_fd, conn); } static void