1 /* ioset.h - srvx event loop
2 * Copyright 2002-2004, 2006 srvx Development Team
4 * This file is part of srvx.
6 * srvx is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with srvx; if not, write to the Free Software Foundation,
18 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
21 #include "ioset-impl.h"
30 #ifdef HAVE_SYS_SOCKET_H
31 #include <sys/socket.h>
34 #ifdef WITH_IOSET_WIN32
37 # define errno WSAGetLastError()
39 # define EINPROGRESS WSAEINPROGRESS
41 # define EHOSTUNREACH WSAEHOSTUNREACH
43 # define ECONNREFUSED WSAECONNREFUSED
45 # define EAGAIN WSAEWOULDBLOCK
46 # define strerror wsa_strerror
49 wsa_strerror(int wsa_err
)
53 case WSAEINTR
: return "Operation interrupted";
54 case WSAEBADF
: return "Bad file descriptor";
55 case WSAEACCES
: return "Permission denied";
56 case WSAEFAULT
: return "Invalid address";
57 case WSAEINVAL
: return "Invalid parameter";
58 case WSAEMFILE
: return "Too many open files";
59 case WSAEWOULDBLOCK
: return "Try again later";
60 case WSAEINPROGRESS
: return "Operation in progress";
61 case WSAEALREADY
: return "Operation already in progress";
62 case WSAENOTSOCK
: return "Not a socket";
63 case WSAEDESTADDRREQ
: return "Destination address required";
64 case WSAEMSGSIZE
: return "Invalid message size";
65 case WSAEPROTOTYPE
: return "Invalid protocol type for socket";
66 case WSAENOPROTOOPT
: return "Invalid protocol option";
67 case WSAEPROTONOSUPPORT
: return "Protocol not supported";
68 case WSAEOPNOTSUPP
: return "Operation not supported";
69 case WSAEADDRINUSE
: return "Address already in use";
70 case WSAEADDRNOTAVAIL
: return "Address not available";
71 case WSAENETDOWN
: return "Network down";
72 case WSAENETUNREACH
: return "Network unreachable";
73 case WSAENETRESET
: return "Network reset";
74 case WSAECONNABORTED
: return "Connection aborted";
75 case WSAECONNRESET
: return "Connection reset by peer";
76 case WSAECONNREFUSED
: return "Connection refused";
78 return "unknown error";
81 #endif /* WITH_IOSET_WIN32 */
83 #define IS_EOL(CH) ((CH) == '\n')
85 extern int uplink_connect(void);
89 static struct io_engine
*engine
;
90 static struct io_fd
*active_fd
;
93 ioq_init(struct ioq
*ioq
, int size
) {
94 ioq
->buf
= malloc(size
);
95 ioq
->get
= ioq
->put
= 0;
100 ioq_put_avail(const struct ioq
*ioq
) {
101 /* Subtract 1 from ioq->get to be sure we don't fill the buffer
102 * and make it look empty even when there's data in it. */
103 if (ioq
->put
< ioq
->get
)
104 return ioq
->get
- ioq
->put
- 1;
105 else if (ioq
->get
== 0)
106 return ioq
->size
- ioq
->put
- 1;
108 return ioq
->size
- ioq
->put
;
112 ioq_get_avail(const struct ioq
*ioq
) {
113 return ((ioq
->put
< ioq
->get
) ? ioq
->size
: ioq
->put
) - ioq
->get
;
117 ioq_used(const struct ioq
*ioq
) {
118 return ((ioq
->put
< ioq
->get
) ? ioq
->size
: 0) + ioq
->put
- ioq
->get
;
122 ioq_grow(struct ioq
*ioq
) {
123 int new_size
= ioq
->size
<< 1;
124 char *new_buf
= malloc(new_size
);
125 int get_avail
= ioq_get_avail(ioq
);
126 memcpy(new_buf
, ioq
->buf
+ ioq
->get
, get_avail
);
127 if (ioq
->put
< ioq
->get
)
128 memcpy(new_buf
+ get_avail
, ioq
->buf
, ioq
->put
);
130 ioq
->put
= ioq_used(ioq
);
133 ioq
->size
= new_size
;
134 return new_size
- ioq
->put
;
137 extern struct io_engine io_engine_kevent
;
138 extern struct io_engine io_engine_epoll
;
139 extern struct io_engine io_engine_win32
;
140 extern struct io_engine io_engine_select
;
145 if (engine
) /* someone beat us to it */
148 #if WITH_IOSET_KEVENT
149 if (!engine
&& io_engine_kevent
.init())
150 engine
= &io_engine_kevent
;
154 if (!engine
&& io_engine_epoll
.init())
155 engine
= &io_engine_epoll
;
159 if (!engine
&& io_engine_win32
.init())
160 engine
= &io_engine_win32
;
164 /* we found one that works */
165 } else if (io_engine_select
.init())
166 engine
= &io_engine_select
;
168 log_module(MAIN_LOG
, LOG_FATAL
, "No usable I/O engine found.");
169 log_module(MAIN_LOG
, LOG_DEBUG
, "Using %s I/O engine.", engine
->name
);
173 ioset_cleanup(void) {
183 log_module(MAIN_LOG
, LOG_ERROR
, "Somebody called ioset_add(%d) on a negative fd!", fd
);
188 res
= calloc(1, sizeof(*res
));
192 ioq_init(&res
->send
, 1024);
193 ioq_init(&res
->recv
, 1024);
195 flags
= fcntl(fd
, F_GETFL
);
196 fcntl(fd
, F_SETFL
, flags
|O_NONBLOCK
);
197 flags
= fcntl(fd
, F_GETFD
);
198 fcntl(fd
, F_SETFD
, flags
|FD_CLOEXEC
);
200 /* I hope you're using the Win32 backend or something else that
201 * automatically marks the file descriptor non-blocking...
209 struct io_fd
*ioset_listen(struct sockaddr
*local
, unsigned int sa_size
, void *data
, void (*accept_cb
)(struct io_fd
*listener
, struct io_fd
*new_connect
))
216 fd
= socket(local
? local
->sa_family
: PF_INET
, SOCK_STREAM
, 0);
218 log_module(MAIN_LOG
, LOG_ERROR
, "Unable to create listening socket: %s", strerror(errno
));
222 if (local
&& sa_size
) {
224 res
= setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, (const char*)&opt
, sizeof(opt
));
226 log_module(MAIN_LOG
, LOG_WARNING
, "Unable to mark listener address as re-usable: %s", strerror(errno
));
228 res
= bind(fd
, local
, sa_size
);
230 log_module(MAIN_LOG
, LOG_ERROR
, "Unable to bind listening socket %d: %s", fd
, strerror(errno
));
238 log_module(MAIN_LOG
, LOG_ERROR
, "Unable to listen on socket %d: %s", fd
, strerror(errno
));
243 io_fd
= ioset_add(fd
);
248 io_fd
->state
= IO_LISTENING
;
250 io_fd
->accept_cb
= accept_cb
;
251 engine
->update(io_fd
);
256 ioset_connect(struct sockaddr
*local
, unsigned int sa_size
, const char *peer
, unsigned int port
, int blocking
, void *data
, void (*connect_cb
)(struct io_fd
*fd
, int error
)) {
257 struct addrinfo hints
;
260 struct io_fd
*old_active
;
265 memset(&hints
, 0, sizeof(hints
));
266 hints
.ai_family
= local
? local
->sa_family
: 0;
267 hints
.ai_socktype
= SOCK_STREAM
;
268 snprintf(portnum
, sizeof(portnum
), "%u", port
);
269 res
= getaddrinfo(peer
, portnum
, &hints
, &ai
);
271 log_module(MAIN_LOG
, LOG_ERROR
, "getaddrinfo(%s, %s) failed: %s.", peer
, portnum
, gai_strerror(res
));
276 if ((fd
= socket(local
->sa_family
, SOCK_STREAM
, 0)) < 0) {
277 log_module(MAIN_LOG
, LOG_ERROR
, "socket() for %s returned errno %d (%s)", peer
, errno
, strerror(errno
));
281 if (bind(fd
, local
, sa_size
) < 0) {
282 log_module(MAIN_LOG
, LOG_ERROR
, "bind() of socket for %s (fd %d) returned errno %d (%s). Will let operating system choose.", peer
, fd
, errno
, strerror(errno
));
285 if ((fd
= socket(PF_INET
, SOCK_STREAM
, 0)) < 0) {
286 log_module(MAIN_LOG
, LOG_ERROR
, "socket() for %s returned errno %d (%s).", peer
, errno
, strerror(errno
));
293 res
= connect(fd
, ai
->ai_addr
, ai
->ai_addrlen
);
294 io_fd
= ioset_add(fd
);
296 io_fd
= ioset_add(fd
);
297 res
= connect(fd
, ai
->ai_addr
, ai
->ai_addrlen
);
304 io_fd
->state
= IO_CONNECTING
;
306 io_fd
->connect_cb
= connect_cb
;
309 case EINPROGRESS
: /* only if !blocking */
310 engine
->update(io_fd
);
313 log_module(MAIN_LOG
, LOG_ERROR
, "connect(%s:%d) (fd %d) returned errno %d (%s).", peer
, port
, io_fd
->fd
, errno
, strerror(errno
));
314 /* then fall through */
317 ioset_close(io_fd
, 1);
321 io_fd
->state
= IO_CONNECTED
;
322 old_active
= active_fd
;
324 connect_cb(io_fd
, ((res
< 0) ? errno
: 0));
326 engine
->update(io_fd
);
327 if (old_active
!= io_fd
)
328 active_fd
= old_active
;
332 void ioset_update(struct io_fd
*fd
) {
337 ioset_try_write(struct io_fd
*fd
) {
341 req
= ioq_get_avail(&fd
->send
);
342 res
= send(fd
->fd
, fd
->send
.buf
+fd
->send
.get
, req
, 0);
344 if (errno
!= EAGAIN
) {
345 log_module(MAIN_LOG
, LOG_ERROR
, "send() on fd %d error %d: %s", fd
->fd
, errno
, strerror(errno
));
349 if (fd
->send
.get
== fd
->send
.size
)
356 ioset_close(struct io_fd
*fdp
, int os_close
) {
359 if (active_fd
== fdp
)
362 fdp
->destroy_cb(fdp
);
363 #if defined(HAVE_WSAEVENTSELECT)
364 /* This is one huge kludge. Sorry! */
365 if (fdp
->send
.get
!= fdp
->send
.put
&& (os_close
& 2)) {
366 engine
->remove(fdp
, 0);
367 ioset_try_write(fdp
);
368 /* it may need to send the beginning of the buffer now.. */
369 if (fdp
->send
.get
!= fdp
->send
.put
)
370 ioset_try_write(fdp
);
375 closesocket(fdp
->fd
);
377 if (fdp
->send
.get
!= fdp
->send
.put
&& (os_close
& 2)) {
380 flags
= fcntl(fdp
->fd
, F_GETFL
);
381 fcntl(fdp
->fd
, F_SETFL
, flags
&~O_NONBLOCK
);
382 ioset_try_write(fdp
);
383 /* it may need to send the beginning of the buffer now.. */
384 if (fdp
->send
.get
!= fdp
->send
.put
)
385 ioset_try_write(fdp
);
391 engine
->remove(fdp
, os_close
& 1);
397 ioset_accept(struct io_fd
*listener
)
399 struct io_fd
*old_active
;
400 struct io_fd
*new_fd
;
403 fd
= accept(listener
->fd
, NULL
, 0);
405 log_module(MAIN_LOG
, LOG_ERROR
, "Unable to accept new connection on listener %d: %s", listener
->fd
, strerror(errno
));
409 new_fd
= ioset_add(fd
);
410 new_fd
->state
= IO_CONNECTED
;
411 old_active
= active_fd
;
413 listener
->accept_cb(listener
, new_fd
);
414 assert(active_fd
== NULL
|| active_fd
== new_fd
);
415 if (active_fd
== new_fd
) {
416 if (new_fd
->send
.get
!= new_fd
->send
.put
)
417 ioset_try_write(new_fd
);
419 engine
->update(new_fd
);
421 active_fd
= old_active
;
425 ioset_find_line_length(struct io_fd
*fd
) {
426 unsigned int pos
, max
, len
;
428 max
= (fd
->recv
.put
< fd
->recv
.get
) ? fd
->recv
.size
: fd
->recv
.put
;
429 for (pos
= fd
->recv
.get
; pos
< max
; ++pos
, ++len
)
430 if (IS_EOL(fd
->recv
.buf
[pos
]))
431 return fd
->line_len
= len
+ 1;
432 if (fd
->recv
.put
< fd
->recv
.get
)
433 for (pos
= 0; pos
< fd
->recv
.put
; ++pos
, ++len
)
434 if (IS_EOL(fd
->recv
.buf
[pos
]))
435 return fd
->line_len
= len
+ 1;
436 return fd
->line_len
= 0;
440 ioset_buffered_read(struct io_fd
*fd
) {
443 if (!(put_avail
= ioq_put_avail(&fd
->recv
)))
444 put_avail
= ioq_grow(&fd
->recv
);
445 nbr
= recv(fd
->fd
, fd
->recv
.buf
+ fd
->recv
.put
, put_avail
, 0);
447 if (errno
!= EAGAIN
) {
448 log_module(MAIN_LOG
, LOG_ERROR
, "Unexpected recv() error %d on fd %d: %s", errno
, fd
->fd
, strerror(errno
));
449 /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */
450 fd
->state
= IO_CLOSED
;
455 } else if (nbr
== 0) {
456 fd
->state
= IO_CLOSED
;
461 if (fd
->line_len
== 0) {
463 for (pos
= fd
->recv
.put
; pos
< fd
->recv
.put
+ nbr
; ++pos
) {
464 if (IS_EOL(fd
->recv
.buf
[pos
])) {
465 if (fd
->recv
.put
< fd
->recv
.get
)
466 fd
->line_len
= fd
->recv
.size
+ pos
+ 1 - fd
->recv
.get
;
468 fd
->line_len
= pos
+ 1 - fd
->recv
.get
;
474 if (fd
->recv
.put
== fd
->recv
.size
)
476 while (fd
->line_len
> 0) {
477 struct io_fd
*old_active
;
480 old_active
= active_fd
;
484 ioset_find_line_length(fd
);
487 if (old_active
!= fd
)
488 active_fd
= old_active
;
496 ioset_line_read(struct io_fd
*fd
, char *dest
, int max
) {
501 line_len
= fd
->line_len
;
502 if ((fd
->state
== IO_CLOSED
) && (!ioq_get_avail(&fd
->recv
) || (line_len
< 0)))
508 avail
= ioq_get_avail(&fd
->recv
);
510 memcpy(dest
, fd
->recv
.buf
+ fd
->recv
.get
, avail
);
511 assert(fd
->recv
.get
+ avail
== fd
->recv
.size
);
517 memcpy(dest
+ done
, fd
->recv
.buf
+ fd
->recv
.get
, max
- done
);
518 fd
->recv
.get
+= max
- done
;
519 if (fd
->recv
.get
== fd
->recv
.size
)
522 ioset_find_line_length(fd
);
527 ioset_events(struct io_fd
*fd
, int readable
, int writable
)
529 if (!fd
|| (!readable
&& !writable
))
536 if (active_fd
&& readable
)
540 assert(active_fd
== NULL
|| active_fd
== fd
);
541 if (active_fd
&& readable
) {
545 if (getsockopt(fd
->fd
, SOL_SOCKET
, SO_ERROR
, &rc
, &arglen
) < 0)
547 fd
->state
= IO_CLOSED
;
549 fd
->connect_cb(fd
, rc
);
550 } else if (active_fd
&& writable
) {
551 fd
->state
= IO_CONNECTED
;
553 fd
->connect_cb(fd
, 0);
558 /* and fall through */
560 assert(active_fd
== NULL
|| active_fd
== fd
);
561 if (active_fd
&& readable
) {
563 ioset_buffered_read(fd
);
568 assert(active_fd
== NULL
|| active_fd
== fd
);
569 if (active_fd
&& writable
)
577 extern struct io_fd
*socket_io_fd
;
578 struct timeval timeout
;
581 while (!quit_services
) {
582 while (!socket_io_fd
)
585 /* How long to sleep? (fill in select_timeout) */
586 wakey
= timeq_next();
590 timeout
.tv_sec
= wakey
- now
;
593 if (engine
->loop(&timeout
))
596 /* Call any timeq events we need to call. */
599 saxdb_write_all(NULL
);
603 extern char *services_config
;
604 conf_read(services_config
);
611 ioset_write(struct io_fd
*fd
, const char *buf
, unsigned int nbw
) {
613 while (ioq_used(&fd
->send
) + nbw
>= fd
->send
.size
)
615 avail
= ioq_put_avail(&fd
->send
);
617 memcpy(fd
->send
.buf
+ fd
->send
.put
, buf
, avail
);
622 memcpy(fd
->send
.buf
+ fd
->send
.put
, buf
, nbw
);
624 if (fd
->send
.put
== fd
->send
.size
)
630 ioset_printf(struct io_fd
*fd
, const char *fmt
, ...) {
636 res
= vsnprintf(tmpbuf
, sizeof(tmpbuf
), fmt
, ap
);
638 if (res
> 0 && (size_t)res
<= sizeof(tmpbuf
))
639 ioset_write(fd
, tmpbuf
, res
);
644 ioset_set_time(unsigned long new_now
) {
645 clock_skew
= new_now
- time(NULL
);