1 /* ioset.h - srvx event loop
2 * Copyright 2002-2004, 2006 srvx Development Team
4 * This file is part of srvx.
6 * srvx is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with srvx; if not, write to the Free Software Foundation,
18 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
21 #include "ioset-impl.h"
30 #ifdef HAVE_SYS_SOCKET_H
31 #include <sys/socket.h>
34 #ifdef WITH_IOSET_WIN32
37 # define errno WSAGetLastError()
39 # define EINPROGRESS WSAEINPROGRESS
41 # define EHOSTUNREACH WSAEHOSTUNREACH
43 # define ECONNREFUSED WSAECONNREFUSED
45 # define EAGAIN WSAEWOULDBLOCK
46 # define strerror wsa_strerror
49 wsa_strerror(int wsa_err
)
53 case WSAEINTR
: return "Operation interrupted";
54 case WSAEBADF
: return "Bad file descriptor";
55 case WSAEACCES
: return "Permission denied";
56 case WSAEFAULT
: return "Invalid address";
57 case WSAEINVAL
: return "Invalid parameter";
58 case WSAEMFILE
: return "Too many open files";
59 case WSAEWOULDBLOCK
: return "Try again later";
60 case WSAEINPROGRESS
: return "Operation in progress";
61 case WSAEALREADY
: return "Operation already in progress";
62 case WSAENOTSOCK
: return "Not a socket";
63 case WSAEDESTADDRREQ
: return "Destination address required";
64 case WSAEMSGSIZE
: return "Invalid message size";
65 case WSAEPROTOTYPE
: return "Invalid protocol type for socket";
66 case WSAENOPROTOOPT
: return "Invalid protocol option";
67 case WSAEPROTONOSUPPORT
: return "Protocol not supported";
68 case WSAEOPNOTSUPP
: return "Operation not supported";
69 case WSAEADDRINUSE
: return "Address already in use";
70 case WSAEADDRNOTAVAIL
: return "Address not available";
71 case WSAENETDOWN
: return "Network down";
72 case WSAENETUNREACH
: return "Network unreachable";
73 case WSAENETRESET
: return "Network reset";
74 case WSAECONNABORTED
: return "Connection aborted";
75 case WSAECONNRESET
: return "Connection reset by peer";
76 case WSAECONNREFUSED
: return "Connection refused";
78 return "unknown error";
81 #endif /* WITH_IOSET_WIN32 */
83 #define IS_EOL(CH) ((CH) == '\n')
85 extern int uplink_connect(void);
89 static struct io_engine
*engine
;
90 static struct io_fd
*active_fd
;
93 ioq_init(struct ioq
*ioq
, int size
) {
94 ioq
->buf
= malloc(size
);
95 ioq
->get
= ioq
->put
= 0;
100 ioq_put_avail(const struct ioq
*ioq
) {
101 /* Subtract 1 from ioq->get to be sure we don't fill the buffer
102 * and make it look empty even when there's data in it. */
103 if (ioq
->put
< ioq
->get
)
104 return ioq
->get
- ioq
->put
- 1;
105 else if (ioq
->get
== 0)
106 return ioq
->size
- ioq
->put
- 1;
108 return ioq
->size
- ioq
->put
;
112 ioq_get_avail(const struct ioq
*ioq
) {
113 return ((ioq
->put
< ioq
->get
) ? ioq
->size
: ioq
->put
) - ioq
->get
;
117 ioq_used(const struct ioq
*ioq
) {
118 return ((ioq
->put
< ioq
->get
) ? ioq
->size
: 0) + ioq
->put
- ioq
->get
;
122 ioq_grow(struct ioq
*ioq
) {
123 int new_size
= ioq
->size
<< 1;
124 char *new_buf
= malloc(new_size
);
125 int get_avail
= ioq_get_avail(ioq
);
126 memcpy(new_buf
, ioq
->buf
+ ioq
->get
, get_avail
);
127 if (ioq
->put
< ioq
->get
)
128 memcpy(new_buf
+ get_avail
, ioq
->buf
, ioq
->put
);
130 ioq
->put
= ioq_used(ioq
);
133 ioq
->size
= new_size
;
134 return new_size
- ioq
->put
;
137 extern struct io_engine io_engine_kevent
;
138 extern struct io_engine io_engine_epoll
;
139 extern struct io_engine io_engine_win32
;
140 extern struct io_engine io_engine_select
;
145 if (engine
) /* someone beat us to it */
148 #if WITH_IOSET_KEVENT
149 if (!engine
&& io_engine_kevent
.init())
150 engine
= &io_engine_kevent
;
154 if (!engine
&& io_engine_epoll
.init())
155 engine
= &io_engine_epoll
;
159 if (!engine
&& io_engine_win32
.init())
160 engine
= &io_engine_win32
;
164 /* we found one that works */
165 } else if (io_engine_select
.init())
166 engine
= &io_engine_select
;
168 log_module(MAIN_LOG
, LOG_FATAL
, "No usable I/O engine found.");
169 log_module(MAIN_LOG
, LOG_DEBUG
, "Using %s I/O engine.", engine
->name
);
173 ioset_cleanup(void) {
183 log_module(MAIN_LOG
, LOG_ERROR
, "Somebody called ioset_add(%d) on a negative fd!", fd
);
188 res
= calloc(1, sizeof(*res
));
192 ioq_init(&res
->send
, 1024);
193 ioq_init(&res
->recv
, 1024);
195 flags
= fcntl(fd
, F_GETFL
);
196 fcntl(fd
, F_SETFL
, flags
|O_NONBLOCK
);
197 flags
= fcntl(fd
, F_GETFD
);
198 fcntl(fd
, F_SETFD
, flags
|FD_CLOEXEC
);
200 /* I hope you're using the Win32 backend or something else that
201 * automatically marks the file descriptor non-blocking...
209 struct io_fd
*ioset_listen(struct sockaddr
*local
, unsigned int sa_size
, void *data
, void (*accept_cb
)(struct io_fd
*listener
, struct io_fd
*new_connect
))
216 fd
= socket(local
? local
->sa_family
: PF_INET
, SOCK_STREAM
, 0);
218 log_module(MAIN_LOG
, LOG_ERROR
, "Unable to create listening socket: %s", strerror(errno
));
222 if (local
&& sa_size
) {
223 res
= bind(fd
, local
, sa_size
);
225 log_module(MAIN_LOG
, LOG_ERROR
, "Unable to bind listening socket %d: %s", fd
, strerror(errno
));
231 res
= setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, (const char*)&opt
, sizeof(opt
));
233 log_module(MAIN_LOG
, LOG_WARNING
, "Unable to mark listener address as re-usable: %s", strerror(errno
));
239 log_module(MAIN_LOG
, LOG_ERROR
, "Unable to listen on socket %d: %s", fd
, strerror(errno
));
244 io_fd
= ioset_add(fd
);
249 io_fd
->state
= IO_LISTENING
;
251 io_fd
->accept_cb
= accept_cb
;
252 engine
->update(io_fd
);
257 ioset_connect(struct sockaddr
*local
, unsigned int sa_size
, const char *peer
, unsigned int port
, int blocking
, void *data
, void (*connect_cb
)(struct io_fd
*fd
, int error
)) {
258 struct addrinfo hints
;
261 struct io_fd
*old_active
;
266 memset(&hints
, 0, sizeof(hints
));
267 hints
.ai_family
= local
? local
->sa_family
: 0;
268 hints
.ai_socktype
= SOCK_STREAM
;
269 snprintf(portnum
, sizeof(portnum
), "%u", port
);
270 res
= getaddrinfo(peer
, portnum
, &hints
, &ai
);
272 log_module(MAIN_LOG
, LOG_ERROR
, "getaddrinfo(%s, %s) failed: %s.", peer
, portnum
, gai_strerror(res
));
277 if ((fd
= socket(local
->sa_family
, SOCK_STREAM
, 0)) < 0) {
278 log_module(MAIN_LOG
, LOG_ERROR
, "socket() for %s returned errno %d (%s)", peer
, errno
, strerror(errno
));
282 if (bind(fd
, local
, sa_size
) < 0) {
283 log_module(MAIN_LOG
, LOG_ERROR
, "bind() of socket for %s (fd %d) returned errno %d (%s). Will let operating system choose.", peer
, fd
, errno
, strerror(errno
));
286 if ((fd
= socket(PF_INET
, SOCK_STREAM
, 0)) < 0) {
287 log_module(MAIN_LOG
, LOG_ERROR
, "socket() for %s returned errno %d (%s).", peer
, errno
, strerror(errno
));
294 res
= connect(fd
, ai
->ai_addr
, ai
->ai_addrlen
);
295 io_fd
= ioset_add(fd
);
297 io_fd
= ioset_add(fd
);
298 res
= connect(fd
, ai
->ai_addr
, ai
->ai_addrlen
);
305 io_fd
->state
= IO_CONNECTING
;
307 io_fd
->connect_cb
= connect_cb
;
310 case EINPROGRESS
: /* only if !blocking */
311 engine
->update(io_fd
);
314 log_module(MAIN_LOG
, LOG_ERROR
, "connect(%s:%d) (fd %d) returned errno %d (%s).", peer
, port
, io_fd
->fd
, errno
, strerror(errno
));
315 /* then fall through */
318 ioset_close(io_fd
, 1);
322 io_fd
->state
= IO_CONNECTED
;
323 old_active
= active_fd
;
325 connect_cb(io_fd
, ((res
< 0) ? errno
: 0));
327 engine
->update(io_fd
);
328 if (old_active
!= io_fd
)
329 active_fd
= old_active
;
333 void ioset_update(struct io_fd
*fd
) {
338 ioset_try_write(struct io_fd
*fd
) {
342 req
= ioq_get_avail(&fd
->send
);
343 res
= send(fd
->fd
, fd
->send
.buf
+fd
->send
.get
, req
, 0);
345 if (errno
!= EAGAIN
) {
346 log_module(MAIN_LOG
, LOG_ERROR
, "send() on fd %d error %d: %s", fd
->fd
, errno
, strerror(errno
));
350 if (fd
->send
.get
== fd
->send
.size
)
357 ioset_close(struct io_fd
*fdp
, int os_close
) {
360 if (active_fd
== fdp
)
363 fdp
->destroy_cb(fdp
);
364 #if defined(HAVE_WSAEVENTSELECT)
365 /* This is one huge kludge. Sorry! */
366 if (fdp
->send
.get
!= fdp
->send
.put
&& (os_close
& 2)) {
367 engine
->remove(fdp
, 0);
368 ioset_try_write(fdp
);
369 /* it may need to send the beginning of the buffer now.. */
370 if (fdp
->send
.get
!= fdp
->send
.put
)
371 ioset_try_write(fdp
);
376 closesocket(fdp
->fd
);
378 if (fdp
->send
.get
!= fdp
->send
.put
&& (os_close
& 2)) {
381 flags
= fcntl(fdp
->fd
, F_GETFL
);
382 fcntl(fdp
->fd
, F_SETFL
, flags
&~O_NONBLOCK
);
383 ioset_try_write(fdp
);
384 /* it may need to send the beginning of the buffer now.. */
385 if (fdp
->send
.get
!= fdp
->send
.put
)
386 ioset_try_write(fdp
);
392 engine
->remove(fdp
, os_close
& 1);
398 ioset_accept(struct io_fd
*listener
)
400 struct io_fd
*old_active
;
401 struct io_fd
*new_fd
;
404 fd
= accept(listener
->fd
, NULL
, 0);
406 log_module(MAIN_LOG
, LOG_ERROR
, "Unable to accept new connection on listener %d: %s", listener
->fd
, strerror(errno
));
410 new_fd
= ioset_add(fd
);
411 new_fd
->state
= IO_CONNECTED
;
412 old_active
= active_fd
;
414 listener
->accept_cb(listener
, new_fd
);
415 assert(active_fd
== NULL
|| active_fd
== new_fd
);
416 if (active_fd
== new_fd
) {
417 if (new_fd
->send
.get
!= new_fd
->send
.put
)
418 ioset_try_write(new_fd
);
420 engine
->update(new_fd
);
422 active_fd
= old_active
;
426 ioset_find_line_length(struct io_fd
*fd
) {
427 unsigned int pos
, max
, len
;
429 max
= (fd
->recv
.put
< fd
->recv
.get
) ? fd
->recv
.size
: fd
->recv
.put
;
430 for (pos
= fd
->recv
.get
; pos
< max
; ++pos
, ++len
)
431 if (IS_EOL(fd
->recv
.buf
[pos
]))
432 return fd
->line_len
= len
+ 1;
433 if (fd
->recv
.put
< fd
->recv
.get
)
434 for (pos
= 0; pos
< fd
->recv
.put
; ++pos
, ++len
)
435 if (IS_EOL(fd
->recv
.buf
[pos
]))
436 return fd
->line_len
= len
+ 1;
437 return fd
->line_len
= 0;
441 ioset_buffered_read(struct io_fd
*fd
) {
442 int put_avail
, nbr
, fdnum
;
444 if (!(put_avail
= ioq_put_avail(&fd
->recv
)))
445 put_avail
= ioq_grow(&fd
->recv
);
446 nbr
= recv(fd
->fd
, fd
->recv
.buf
+ fd
->recv
.put
, put_avail
, 0);
448 if (errno
!= EAGAIN
) {
449 log_module(MAIN_LOG
, LOG_ERROR
, "Unexpected recv() error %d on fd %d: %s", errno
, fd
->fd
, strerror(errno
));
450 /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */
451 fd
->state
= IO_CLOSED
;
456 } else if (nbr
== 0) {
457 fd
->state
= IO_CLOSED
;
462 if (fd
->line_len
== 0) {
464 for (pos
= fd
->recv
.put
; pos
< fd
->recv
.put
+ nbr
; ++pos
) {
465 if (IS_EOL(fd
->recv
.buf
[pos
])) {
466 if (fd
->recv
.put
< fd
->recv
.get
)
467 fd
->line_len
= fd
->recv
.size
+ pos
+ 1 - fd
->recv
.get
;
469 fd
->line_len
= pos
+ 1 - fd
->recv
.get
;
475 if (fd
->recv
.put
== fd
->recv
.size
)
478 while (fd
->line_len
> 0) {
479 struct io_fd
*old_active
;
482 old_active
= active_fd
;
486 ioset_find_line_length(fd
);
489 if (old_active
!= fd
)
490 active_fd
= old_active
;
498 ioset_line_read(struct io_fd
*fd
, char *dest
, int max
) {
503 line_len
= fd
->line_len
;
504 if ((fd
->state
== IO_CLOSED
) && (!ioq_get_avail(&fd
->recv
) || (line_len
< 0)))
510 avail
= ioq_get_avail(&fd
->recv
);
512 memcpy(dest
, fd
->recv
.buf
+ fd
->recv
.get
, avail
);
513 assert(fd
->recv
.get
+ avail
== fd
->recv
.size
);
519 memcpy(dest
+ done
, fd
->recv
.buf
+ fd
->recv
.get
, max
- done
);
520 fd
->recv
.get
+= max
- done
;
521 if (fd
->recv
.get
== fd
->recv
.size
)
524 ioset_find_line_length(fd
);
529 ioset_events(struct io_fd
*fd
, int readable
, int writable
)
531 if (!fd
|| (!readable
&& !writable
))
538 if (active_fd
&& readable
)
542 assert(active_fd
== NULL
|| active_fd
== fd
);
543 if (active_fd
&& readable
) {
547 if (getsockopt(fd
->fd
, SOL_SOCKET
, SO_ERROR
, &rc
, &arglen
) < 0)
549 fd
->state
= IO_CLOSED
;
551 fd
->connect_cb(fd
, rc
);
552 } else if (active_fd
&& writable
) {
553 fd
->state
= IO_CONNECTED
;
555 fd
->connect_cb(fd
, 0);
560 /* and fall through */
562 assert(active_fd
== NULL
|| active_fd
== fd
);
563 if (active_fd
&& readable
) {
565 ioset_buffered_read(fd
);
570 assert(active_fd
== NULL
|| active_fd
== fd
);
571 if (active_fd
&& writable
)
579 extern struct io_fd
*socket_io_fd
;
580 struct timeval timeout
;
583 while (!quit_services
) {
584 while (!socket_io_fd
)
587 /* How long to sleep? (fill in select_timeout) */
588 wakey
= timeq_next();
592 timeout
.tv_sec
= wakey
- now
;
595 if (engine
->loop(&timeout
))
598 /* Call any timeq events we need to call. */
605 extern char *services_config
;
606 conf_read(services_config
);
613 ioset_write(struct io_fd
*fd
, const char *buf
, unsigned int nbw
) {
615 while (ioq_used(&fd
->send
) + nbw
>= fd
->send
.size
)
617 avail
= ioq_put_avail(&fd
->send
);
619 memcpy(fd
->send
.buf
+ fd
->send
.put
, buf
, avail
);
624 memcpy(fd
->send
.buf
+ fd
->send
.put
, buf
, nbw
);
626 if (fd
->send
.put
== fd
->send
.size
)
632 ioset_printf(struct io_fd
*fd
, const char *fmt
, ...) {
638 res
= vsnprintf(tmpbuf
, sizeof(tmpbuf
), fmt
, ap
);
640 if (res
> 0 && (size_t)res
<= sizeof(tmpbuf
))
641 ioset_write(fd
, tmpbuf
, res
);
646 ioset_set_time(unsigned long new_now
) {
647 clock_skew
= new_now
- time(NULL
);