X-Git-Url: https://jfr.im/git/irc/evilnet/x3.git/blobdiff_plain/ff3b058ac51e9caf5cf1fd310b8a401a97a85582..59b38cc02dd57c5a072cf206794548fbb37e0571:/src/ioset.c diff --git a/src/ioset.c b/src/ioset.c index 4b6225e..da5b9f6 100644 --- a/src/ioset.c +++ b/src/ioset.c @@ -1,11 +1,11 @@ /* ioset.h - srvx event loop - * Copyright 2002-2004, 2006 srvx Development Team + * Copyright 2002-2004 srvx Development Team * - * This file is part of srvx. + * This file is part of x3. * - * srvx is free software; you can redistribute it and/or modify + * x3 is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or + * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, @@ -18,7 +18,7 @@ * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ -#include "ioset-impl.h" +#include "ioset.h" #include "log.h" #include "timeq.h" #include "saxdb.h" @@ -27,67 +27,27 @@ #ifdef HAVE_FCNTL_H #include #endif +#ifdef HAVE_SYS_SELECT_H +#include +#endif #ifdef HAVE_SYS_SOCKET_H #include #endif -#ifdef WITH_IOSET_WIN32 - -# undef errno -# define errno WSAGetLastError() -# undef EINPROGRESS -# define EINPROGRESS WSAEINPROGRESS -# undef EHOSTUNREACH -# define EHOSTUNREACH WSAEHOSTUNREACH -# undef ECONNREFUSED -# define ECONNREFUSED WSAECONNREFUSED -# undef EAGAIN -# define EAGAIN WSAEWOULDBLOCK -# define strerror wsa_strerror - -static const char * -wsa_strerror(int wsa_err) -{ - switch (wsa_err) - { - case WSAEINTR: return "Operation interrupted"; - case WSAEBADF: return "Bad file descriptor"; - case WSAEACCES: return "Permission denied"; - case WSAEFAULT: return "Invalid address"; - case WSAEINVAL: return "Invalid parameter"; - case WSAEMFILE: return "Too many open files"; - case WSAEWOULDBLOCK: return "Try again later"; - case WSAEINPROGRESS: return "Operation in progress"; - case WSAEALREADY: return "Operation already in progress"; - case WSAENOTSOCK: return "Not a socket"; - case WSAEDESTADDRREQ: return "Destination address required"; - case WSAEMSGSIZE: return "Invalid message size"; - case WSAEPROTOTYPE: return "Invalid protocol type for socket"; - case WSAENOPROTOOPT: return "Invalid protocol option"; - case WSAEPROTONOSUPPORT: return "Protocol not supported"; - case WSAEOPNOTSUPP: return "Operation not supported"; - case WSAEADDRINUSE: return "Address already in use"; - case WSAEADDRNOTAVAIL: return "Address not available"; - case WSAENETDOWN: return "Network down"; - case WSAENETUNREACH: return "Network unreachable"; - case WSAENETRESET: return "Network reset"; - case WSAECONNABORTED: return "Connection aborted"; - case WSAECONNRESET: return "Connection reset by peer"; - case WSAECONNREFUSED: return "Connection refused"; - } - return "unknown error"; -} - -#endif /* WITH_IOSET_WIN32 */ +#ifndef IOSET_DEBUG +#define IOSET_DEBUG 0 +#endif #define IS_EOL(CH) ((CH) == '\n') extern int uplink_connect(void); -int clock_skew; +static int clock_skew; int do_write_dbs; int do_reopen; -static struct io_engine *engine; -static struct io_fd *active_fd; + +static struct io_fd **fds; +static unsigned int fds_size; +static fd_set read_fds, write_fds; static void ioq_init(struct ioq *ioq, int size) { @@ -134,44 +94,9 @@ ioq_grow(struct ioq *ioq) { return new_size - ioq->put; } -extern struct io_engine io_engine_kqueue; -extern struct io_engine io_engine_epoll; -extern struct io_engine io_engine_win32; -extern struct io_engine io_engine_select; - -void -ioset_init(void) -{ - if (engine) /* someone beat us to it */ - return; - -#if WITH_IOSET_KQUEUE - if (!engine && io_engine_kqueue.init()) - engine = &io_engine_kqueue; -#endif - -#if WITH_IOSET_EPOLL - if (!engine && io_engine_epoll.init()) - engine = &io_engine_epoll; -#endif - -#if WITH_IOSET_WIN32 - if (!engine && io_engine_win32.init()) - engine = &io_engine_win32; -#endif - - if (engine) { - /* we found one that works */ - } else if (io_engine_select.init()) - engine = &io_engine_select; - else - log_module(MAIN_LOG, LOG_FATAL, "No usable I/O engine found."); - log_module(MAIN_LOG, LOG_DEBUG, "Using %s I/O engine.", engine->name); -} - void ioset_cleanup(void) { - engine->cleanup(); + free(fds); } struct io_fd * @@ -183,84 +108,29 @@ ioset_add(int fd) { log_module(MAIN_LOG, LOG_ERROR, "Somebody called ioset_add(%d) on a negative fd!", fd); return 0; } - if (!engine) - ioset_init(); res = calloc(1, sizeof(*res)); if (!res) return 0; res->fd = fd; ioq_init(&res->send, 1024); ioq_init(&res->recv, 1024); -#if defined(F_GETFL) + if ((unsigned)fd >= fds_size) { + unsigned int old_size = fds_size; + fds_size = fd + 8; + fds = realloc(fds, fds_size*sizeof(*fds)); + memset(fds+old_size, 0, (fds_size-old_size)*sizeof(*fds)); + } + fds[fd] = res; flags = fcntl(fd, F_GETFL); fcntl(fd, F_SETFL, flags|O_NONBLOCK); - flags = fcntl(fd, F_GETFD); - fcntl(fd, F_SETFD, flags|FD_CLOEXEC); -#else - /* I hope you're using the Win32 backend or something else that - * automatically marks the file descriptor non-blocking... - */ - (void)flags; -#endif - engine->add(res); return res; } -struct io_fd *ioset_listen(struct sockaddr *local, unsigned int sa_size, void *data, void (*accept_cb)(struct io_fd *listener, struct io_fd *new_connect)) -{ - struct io_fd *io_fd; - unsigned int opt; - int res; - int fd; - - fd = socket(local ? local->sa_family : PF_INET, SOCK_STREAM, 0); - if (fd < 0) { - log_module(MAIN_LOG, LOG_ERROR, "Unable to create listening socket: %s", strerror(errno)); - return NULL; - } - - if (local && sa_size) { - res = bind(fd, local, sa_size); - if (res < 0) { - log_module(MAIN_LOG, LOG_ERROR, "Unable to bind listening socket %d: %s", fd, strerror(errno)); - close(fd); - return NULL; - } - - opt = 1; - res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt)); - if (res < 0) { - log_module(MAIN_LOG, LOG_WARNING, "Unable to mark listener address as re-usable: %s", strerror(errno)); - } - } - - res = listen(fd, 1); - if (res < 0) { - log_module(MAIN_LOG, LOG_ERROR, "Unable to listen on socket %d: %s", fd, strerror(errno)); - close(fd); - return NULL; - } - - io_fd = ioset_add(fd); - if (!io_fd) { - close(fd); - return NULL; - } - io_fd->state = IO_LISTENING; - io_fd->data = data; - io_fd->accept_cb = accept_cb; - engine->update(io_fd); - return io_fd; -} - struct io_fd * ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, unsigned int port, int blocking, void *data, void (*connect_cb)(struct io_fd *fd, int error)) { - struct addrinfo hints; - struct addrinfo *ai; + int fd, res; struct io_fd *io_fd; - struct io_fd *old_active; - int res; - int fd; + struct addrinfo hints, *ai; char portnum[10]; memset(&hints, 0, sizeof(hints)); @@ -301,84 +171,56 @@ ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, un close(fd); return NULL; } - io_fd->state = IO_CONNECTING; io_fd->data = data; io_fd->connect_cb = connect_cb; if (res < 0) { switch (errno) { case EINPROGRESS: /* only if !blocking */ - engine->update(io_fd); return io_fd; default: log_module(MAIN_LOG, LOG_ERROR, "connect(%s:%d) (fd %d) returned errno %d (%s).", peer, port, io_fd->fd, errno, strerror(errno)); /* then fall through */ case EHOSTUNREACH: case ECONNREFUSED: - ioset_close(io_fd, 1); + ioset_close(io_fd->fd, 1); return NULL; } } - io_fd->state = IO_CONNECTED; - old_active = active_fd; if (connect_cb) connect_cb(io_fd, ((res < 0) ? errno : 0)); - if (active_fd) - engine->update(io_fd); - if (old_active != io_fd) - active_fd = old_active; return io_fd; } -void ioset_update(struct io_fd *fd) { - engine->update(fd); -} - static void ioset_try_write(struct io_fd *fd) { int res; - unsigned int req; - - req = ioq_get_avail(&fd->send); - res = send(fd->fd, fd->send.buf+fd->send.get, req, 0); + unsigned int req = ioq_get_avail(&fd->send); + res = write(fd->fd, fd->send.buf+fd->send.get, req); if (res < 0) { - if (errno != EAGAIN) { - log_module(MAIN_LOG, LOG_ERROR, "send() on fd %d error %d: %s", fd->fd, errno, strerror(errno)); + switch (errno) { + case EAGAIN: + break; + default: + log_module(MAIN_LOG, LOG_ERROR, "write() on fd %d error %d: %s", fd->fd, errno, strerror(errno)); } } else { fd->send.get += res; if (fd->send.get == fd->send.size) fd->send.get = 0; - engine->update(fd); } } void -ioset_close(struct io_fd *fdp, int os_close) { - if (!fdp) +ioset_close(int fd, int os_close) { + struct io_fd *fdp; + if (!(fdp = fds[fd])) return; - if (active_fd == fdp) - active_fd = NULL; + fds[fd] = NULL; if (fdp->destroy_cb) fdp->destroy_cb(fdp); -#if defined(HAVE_WSAEVENTSELECT) - /* This is one huge kludge. Sorry! */ - if (fdp->send.get != fdp->send.put && (os_close & 2)) { - engine->remove(fdp, 0); - ioset_try_write(fdp); - /* it may need to send the beginning of the buffer now.. */ - if (fdp->send.get != fdp->send.put) - ioset_try_write(fdp); - } - free(fdp->send.buf); - free(fdp->recv.buf); - if (os_close & 1) - closesocket(fdp->fd); -#else - if (fdp->send.get != fdp->send.put && (os_close & 2)) { - int flags; - - flags = fcntl(fdp->fd, F_GETFL); - fcntl(fdp->fd, F_SETFL, flags&~O_NONBLOCK); + if (fdp->send.get != fdp->send.put) { + int flags = fcntl(fd, F_GETFL); + fcntl(fd, F_SETFL, flags&~O_NONBLOCK); ioset_try_write(fdp); /* it may need to send the beginning of the buffer now.. */ if (fdp->send.get != fdp->send.put) @@ -386,39 +228,11 @@ ioset_close(struct io_fd *fdp, int os_close) { } free(fdp->send.buf); free(fdp->recv.buf); - if (os_close & 1) - close(fdp->fd); - engine->remove(fdp, os_close & 1); -#endif + if (os_close) + close(fd); free(fdp); -} - -static void -ioset_accept(struct io_fd *listener) -{ - struct io_fd *old_active; - struct io_fd *new_fd; - int fd; - - fd = accept(listener->fd, NULL, 0); - if (fd < 0) { - log_module(MAIN_LOG, LOG_ERROR, "Unable to accept new connection on listener %d: %s", listener->fd, strerror(errno)); - return; - } - - new_fd = ioset_add(fd); - new_fd->state = IO_CONNECTED; - old_active = active_fd; - active_fd = new_fd; - listener->accept_cb(listener, new_fd); - assert(active_fd == NULL || active_fd == new_fd); - if (active_fd == new_fd) { - if (new_fd->send.get != new_fd->send.put) - ioset_try_write(new_fd); - else - engine->update(new_fd); - } - active_fd = old_active; + FD_CLR(fd, &read_fds); + FD_CLR(fd, &write_fds); } static int @@ -439,24 +253,25 @@ ioset_find_line_length(struct io_fd *fd) { static void ioset_buffered_read(struct io_fd *fd) { int put_avail, nbr, fdnum; - + if (!(put_avail = ioq_put_avail(&fd->recv))) put_avail = ioq_grow(&fd->recv); - nbr = recv(fd->fd, fd->recv.buf + fd->recv.put, put_avail, 0); + nbr = read(fd->fd, fd->recv.buf + fd->recv.put, put_avail); if (nbr < 0) { - if (errno != EAGAIN) { - log_module(MAIN_LOG, LOG_ERROR, "Unexpected recv() error %d on fd %d: %s", errno, fd->fd, strerror(errno)); + switch (errno) { + case EAGAIN: + break; + default: + log_module(MAIN_LOG, LOG_ERROR, "Unexpected read() error %d on fd %d: %s", errno, fd->fd, strerror(errno)); /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */ - fd->state = IO_CLOSED; + fd->eof = 1; + fd->wants_reads = 0; fd->readable_cb(fd); - if (active_fd == fd) - engine->update(fd); } } else if (nbr == 0) { - fd->state = IO_CLOSED; + fd->eof = 1; + fd->wants_reads = 0; fd->readable_cb(fd); - if (active_fd == fd) - engine->update(fd); } else { if (fd->line_len == 0) { unsigned int pos; @@ -474,42 +289,29 @@ ioset_buffered_read(struct io_fd *fd) { if (fd->recv.put == fd->recv.size) fd->recv.put = 0; fdnum = fd->fd; - while (fd->line_len > 0) { - struct io_fd *old_active; - int died = 0; - - old_active = active_fd; - active_fd = fd; + while (fd->wants_reads && (fd->line_len > 0)) { fd->readable_cb(fd); - if (active_fd) - ioset_find_line_length(fd); - else - died = 1; - if (old_active != fd) - active_fd = old_active; - if (died) - break; + if (!fds[fdnum]) + break; /* make sure they didn't close on us */ + ioset_find_line_length(fd); } } } int ioset_line_read(struct io_fd *fd, char *dest, int max) { - int line_len; - int avail; - int done; - - line_len = fd->line_len; - if ((fd->state == IO_CLOSED) && (!ioq_get_avail(&fd->recv) || (line_len < 0))) + int avail, done; + if (fd->eof && (!ioq_get_avail(&fd->recv) || (fd->line_len < 0))) return 0; - if (line_len < 0) + if (fd->line_len < 0) return -1; - if (line_len < max) - max = line_len; + if (fd->line_len < max) + max = fd->line_len; avail = ioq_get_avail(&fd->recv); if (max > avail) { memcpy(dest, fd->recv.buf + fd->recv.get, avail); - assert(fd->recv.get + avail == fd->recv.size); + fd->recv.get += avail; + assert(fd->recv.get == fd->recv.size); fd->recv.get = 0; done = avail; } else { @@ -519,65 +321,46 @@ ioset_line_read(struct io_fd *fd, char *dest, int max) { fd->recv.get += max - done; if (fd->recv.get == fd->recv.size) fd->recv.get = 0; - dest[max - 1] = 0; + dest[max] = 0; ioset_find_line_length(fd); - return line_len; + return max; } -void -ioset_events(struct io_fd *fd, int readable, int writable) -{ - if (!fd || (!readable && !writable)) - return; - active_fd = fd; - switch (fd->state) { - case IO_CLOSED: - break; - case IO_LISTENING: - if (active_fd && readable) - ioset_accept(fd); - break; - case IO_CONNECTING: - assert(active_fd == NULL || active_fd == fd); - if (active_fd && readable) { - socklen_t arglen; - int rc; - arglen = sizeof(rc); - if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &rc, &arglen) < 0) - rc = errno; - fd->state = IO_CLOSED; - if (fd->connect_cb) - fd->connect_cb(fd, rc); - } else if (active_fd && writable) { - fd->state = IO_CONNECTED; - if (fd->connect_cb) - fd->connect_cb(fd, 0); - } - if (active_fd != fd) - break; - engine->update(fd); - /* and fall through */ - case IO_CONNECTED: - assert(active_fd == NULL || active_fd == fd); - if (active_fd && readable) { - if (fd->line_reads) - ioset_buffered_read(fd); - else - fd->readable_cb(fd); - } - - assert(active_fd == NULL || active_fd == fd); - if (active_fd && writable) - ioset_try_write(fd); - break; +#if 1 +#define debug_fdsets(MSG, NFDS, READ_FDS, WRITE_FDS, EXCEPT_FDS, SELECT_TIMEOUT) (void)0 +#else +static void +debug_fdsets(const char *msg, int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *select_timeout) { + static const char *flag_text[8] = { "---", "r", "w", "rw", "e", "er", "ew", "erw" }; + char buf[MAXLEN]; + int pos, ii, flags; + struct timeval now; + + for (pos=ii=0; iitv_sec, select_timeout->tv_usec); + } else { + log_module(MAIN_LOG, LOG_DEBUG, "%s, at "FMT_TIME_T".%06ld:%s (no timeout)", msg, now.tv_sec, now.tv_usec, buf); } } +#endif void ioset_run(void) { extern struct io_fd *socket_io_fd; - struct timeval timeout; + struct timeval select_timeout; + unsigned int nn; + int select_result, max_fd; time_t wakey; + struct io_fd *fd; while (!quit_services) { while (!socket_io_fd) @@ -586,13 +369,66 @@ ioset_run(void) { /* How long to sleep? (fill in select_timeout) */ wakey = timeq_next(); if ((wakey - now) < 0) - timeout.tv_sec = 0; + select_timeout.tv_sec = 0; else - timeout.tv_sec = wakey - now; - timeout.tv_usec = 0; + select_timeout.tv_sec = wakey - now; + select_timeout.tv_usec = 0; + + /* Set up read_fds and write_fds fdsets. */ + FD_ZERO(&read_fds); + FD_ZERO(&write_fds); + max_fd = 0; + for (nn=0; nnwants_reads) + FD_SET(nn, &read_fds); + if ((fd->send.get != fd->send.put) || !fd->connected) + FD_SET(nn, &write_fds); + } - if (engine->loop(&timeout)) + /* Check for activity, update time. */ + debug_fdsets("Entering select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout); + select_result = select(max_fd + 1, &read_fds, &write_fds, NULL, &select_timeout); + debug_fdsets("After select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout); + now = time(NULL) + clock_skew; + if (select_result < 0) { + if (errno != EINTR) { + log_module(MAIN_LOG, LOG_ERROR, "select() error %d: %s", errno, strerror(errno)); + close_socket(); + } continue; + } + + /* Call back anybody that has connect or read activity and wants to know. */ + for (nn=0; nnline_reads) + ioset_buffered_read(fd); + else + fd->readable_cb(fd); + } + if (FD_ISSET(nn, &write_fds) && !fd->connected) { + socklen_t arglen; + int rc; + + arglen = sizeof(rc); + if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &rc, &arglen) < 0) + rc = errno; + fd->connected = 1; + if (fd->connect_cb) + fd->connect_cb(fd, rc); + } + /* Note: check whether write FD is still set, since the + * connect_cb() might close the FD, making us dereference + * a free()'d pointer for the fd. + */ + if (FD_ISSET(nn, &write_fds) && (fd->send.get != fd->send.put)) + ioset_try_write(fd); + } /* Call any timeq events we need to call. */ timeq_run(); @@ -624,21 +460,6 @@ ioset_write(struct io_fd *fd, const char *buf, unsigned int nbw) { fd->send.put += nbw; if (fd->send.put == fd->send.size) fd->send.put = 0; - engine->update(fd); -} - -int -ioset_printf(struct io_fd *fd, const char *fmt, ...) { - char tmpbuf[MAXLEN]; - va_list ap; - int res; - - va_start(ap, fmt); - res = vsnprintf(tmpbuf, sizeof(tmpbuf), fmt, ap); - va_end(ap); - if (res > 0 && (size_t)res <= sizeof(tmpbuf)) - ioset_write(fd, tmpbuf, res); - return res; } void