]>
jfr.im git - irc/evilnet/x3.git/blob - src/ioset.c
1 /* ioset.h - srvx event loop
2 * Copyright 2002-2004 srvx Development Team
4 * This file is part of x3.
6 * x3 is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with srvx; if not, write to the Free Software Foundation,
18 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
30 #ifdef HAVE_SYS_SELECT_H
31 #include <sys/select.h>
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
41 #define IS_EOL(CH) ((CH) == ' \n ' )
43 extern int uplink_connect ( void );
44 static int clock_skew
;
48 static struct io_fd
** fds
;
49 static unsigned int fds_size
;
50 static fd_set read_fds
, write_fds
;
53 ioq_init ( struct ioq
* ioq
, int size
) {
54 ioq
-> buf
= malloc ( size
);
55 ioq
-> get
= ioq
-> put
= 0 ;
60 ioq_put_avail ( const struct ioq
* ioq
) {
61 /* Subtract 1 from ioq->get to be sure we don't fill the buffer
62 * and make it look empty even when there's data in it. */
63 if ( ioq
-> put
< ioq
-> get
)
64 return ioq
-> get
- ioq
-> put
- 1 ;
65 else if ( ioq
-> get
== 0 )
66 return ioq
-> size
- ioq
-> put
- 1 ;
68 return ioq
-> size
- ioq
-> put
;
72 ioq_get_avail ( const struct ioq
* ioq
) {
73 return (( ioq
-> put
< ioq
-> get
) ? ioq
-> size
: ioq
-> put
) - ioq
-> get
;
77 ioq_used ( const struct ioq
* ioq
) {
78 return (( ioq
-> put
< ioq
-> get
) ? ioq
-> size
: 0 ) + ioq
-> put
- ioq
-> get
;
82 ioq_grow ( struct ioq
* ioq
) {
83 int new_size
= ioq
-> size
<< 1 ;
84 char * new_buf
= malloc ( new_size
);
85 int get_avail
= ioq_get_avail ( ioq
);
86 memcpy ( new_buf
, ioq
-> buf
+ ioq
-> get
, get_avail
);
87 if ( ioq
-> put
< ioq
-> get
)
88 memcpy ( new_buf
+ get_avail
, ioq
-> buf
, ioq
-> put
);
90 ioq
-> put
= ioq_used ( ioq
);
94 return new_size
- ioq
-> put
;
108 log_module ( MAIN_LOG
, LOG_ERROR
, "Somebody called ioset_add( %d ) on a negative fd!" , fd
);
111 res
= calloc ( 1 , sizeof (* res
));
115 ioq_init (& res
-> send
, 1024 );
116 ioq_init (& res
-> recv
, 1024 );
117 if (( unsigned ) fd
>= fds_size
) {
118 unsigned int old_size
= fds_size
;
120 fds
= realloc ( fds
, fds_size
* sizeof (* fds
));
121 memset ( fds
+ old_size
, 0 , ( fds_size
- old_size
)* sizeof (* fds
));
124 flags
= fcntl ( fd
, F_GETFL
);
125 fcntl ( fd
, F_SETFL
, flags
| O_NONBLOCK
);
130 ioset_connect ( struct sockaddr
* local
, unsigned int sa_size
, const char * peer
, unsigned int port
, int blocking
, void * data
, void (* connect_cb
)( struct io_fd
* fd
, int error
)) {
133 struct addrinfo hints
, * ai
;
136 memset (& hints
, 0 , sizeof ( hints
));
137 hints
. ai_family
= local
? local
-> sa_family
: 0 ;
138 hints
. ai_socktype
= SOCK_STREAM
;
139 snprintf ( portnum
, sizeof ( portnum
), " %u " , port
);
140 if ( getaddrinfo ( peer
, portnum
, & hints
, & ai
)) {
141 log_module ( MAIN_LOG
, LOG_ERROR
, "getaddrinfo( %s , %s ) failed." , peer
, portnum
);
146 if (( fd
= socket ( local
-> sa_family
, SOCK_STREAM
, 0 )) < 0 ) {
147 log_module ( MAIN_LOG
, LOG_ERROR
, "socket() for %s returned errno %d ( %s )" , peer
, errno
, strerror ( errno
));
151 if ( bind ( fd
, local
, sa_size
) < 0 ) {
152 log_module ( MAIN_LOG
, LOG_ERROR
, "bind() of socket for %s (fd %d ) returned errno %d ( %s ). Will let operating system choose." , peer
, fd
, errno
, strerror ( errno
));
155 if (( fd
= socket ( PF_INET
, SOCK_STREAM
, 0 )) < 0 ) {
156 log_module ( MAIN_LOG
, LOG_ERROR
, "socket() for %s returned errno %d ( %s )." , peer
, errno
, strerror ( errno
));
163 res
= connect ( fd
, ai
-> ai_addr
, ai
-> ai_addrlen
);
164 io_fd
= ioset_add ( fd
);
166 io_fd
= ioset_add ( fd
);
167 res
= connect ( fd
, ai
-> ai_addr
, ai
-> ai_addrlen
);
175 io_fd
-> connect_cb
= connect_cb
;
178 case EINPROGRESS
: /* only if !blocking */
181 log_module ( MAIN_LOG
, LOG_ERROR
, "connect( %s : %d ) (fd %d ) returned errno %d ( %s )." , peer
, port
, io_fd
-> fd
, errno
, strerror ( errno
));
182 /* then fall through */
185 ioset_close ( io_fd
-> fd
, 1 );
190 connect_cb ( io_fd
, (( res
< 0 ) ? errno
: 0 ));
195 ioset_try_write ( struct io_fd
* fd
) {
197 unsigned int req
= ioq_get_avail (& fd
-> send
);
198 res
= write ( fd
-> fd
, fd
-> send
. buf
+ fd
-> send
. get
, req
);
204 log_module ( MAIN_LOG
, LOG_ERROR
, "write() on fd %d error %d : %s " , fd
-> fd
, errno
, strerror ( errno
));
208 if ( fd
-> send
. get
== fd
-> send
. size
)
214 ioset_close ( int fd
, int os_close
) {
216 if (!( fdp
= fds
[ fd
]))
220 fdp
-> destroy_cb ( fdp
);
221 if ( fdp
-> send
. get
!= fdp
-> send
. put
) {
222 int flags
= fcntl ( fd
, F_GETFL
);
223 fcntl ( fd
, F_SETFL
, flags
&~ O_NONBLOCK
);
224 ioset_try_write ( fdp
);
225 /* it may need to send the beginning of the buffer now.. */
226 if ( fdp
-> send
. get
!= fdp
-> send
. put
)
227 ioset_try_write ( fdp
);
234 FD_CLR ( fd
, & read_fds
);
235 FD_CLR ( fd
, & write_fds
);
239 ioset_find_line_length ( struct io_fd
* fd
) {
240 unsigned int pos
, max
, len
;
242 max
= ( fd
-> recv
. put
< fd
-> recv
. get
) ? fd
-> recv
. size
: fd
-> recv
. put
;
243 for ( pos
= fd
-> recv
. get
; pos
< max
; ++ pos
, ++ len
)
244 if ( IS_EOL ( fd
-> recv
. buf
[ pos
]))
245 return fd
-> line_len
= len
+ 1 ;
246 if ( fd
-> recv
. put
< fd
-> recv
. get
)
247 for ( pos
= 0 ; pos
< fd
-> recv
. put
; ++ pos
, ++ len
)
248 if ( IS_EOL ( fd
-> recv
. buf
[ pos
]))
249 return fd
-> line_len
= len
+ 1 ;
250 return fd
-> line_len
= 0 ;
254 ioset_buffered_read ( struct io_fd
* fd
) {
255 int put_avail
, nbr
, fdnum
;
257 if (!( put_avail
= ioq_put_avail (& fd
-> recv
)))
258 put_avail
= ioq_grow (& fd
-> recv
);
259 nbr
= read ( fd
-> fd
, fd
-> recv
. buf
+ fd
-> recv
. put
, put_avail
);
265 log_module ( MAIN_LOG
, LOG_ERROR
, "Unexpected read() error %d on fd %d : %s " , errno
, fd
-> fd
, strerror ( errno
));
266 /* Just flag it as EOF and call readable_cb() to notify the fd's owner. */
271 } else if ( nbr
== 0 ) {
276 if ( fd
-> line_len
== 0 ) {
278 for ( pos
= fd
-> recv
. put
; pos
< fd
-> recv
. put
+ nbr
; ++ pos
) {
279 if ( IS_EOL ( fd
-> recv
. buf
[ pos
])) {
280 if ( fd
-> recv
. put
< fd
-> recv
. get
)
281 fd
-> line_len
= fd
-> recv
. size
+ pos
+ 1 - fd
-> recv
. get
;
283 fd
-> line_len
= pos
+ 1 - fd
-> recv
. get
;
289 if ( fd
-> recv
. put
== fd
-> recv
. size
)
292 while ( fd
-> wants_reads
&& ( fd
-> line_len
> 0 )) {
295 break ; /* make sure they didn't close on us */
296 ioset_find_line_length ( fd
);
302 ioset_line_read ( struct io_fd
* fd
, char * dest
, int max
) {
304 if ( fd
-> eof
&& (! ioq_get_avail (& fd
-> recv
) || ( fd
-> line_len
< 0 )))
306 if ( fd
-> line_len
< 0 )
308 if ( fd
-> line_len
< max
)
310 avail
= ioq_get_avail (& fd
-> recv
);
312 memcpy ( dest
, fd
-> recv
. buf
+ fd
-> recv
. get
, avail
);
313 fd
-> recv
. get
+= avail
;
314 assert ( fd
-> recv
. get
== fd
-> recv
. size
);
320 memcpy ( dest
+ done
, fd
-> recv
. buf
+ fd
-> recv
. get
, max
- done
);
321 fd
-> recv
. get
+= max
- done
;
322 if ( fd
-> recv
. get
== fd
-> recv
. size
)
325 ioset_find_line_length ( fd
);
330 #define debug_fdsets(MSG, NFDS, READ_FDS, WRITE_FDS, EXCEPT_FDS, SELECT_TIMEOUT) (void)0
333 debug_fdsets ( const char * msg
, int nfds
, fd_set
* read_fds
, fd_set
* write_fds
, fd_set
* except_fds
, struct timeval
* select_timeout
) {
334 static const char * flag_text
[ 8 ] = { "---" , "r" , "w" , "rw" , "e" , "er" , "ew" , "erw" };
339 for ( pos
= ii
= 0 ; ii
< nfds
; ++ ii
) {
340 flags
= ( read_fds
&& FD_ISSET ( ii
, read_fds
)) ? 1 : 0 ;
341 flags
|= ( write_fds
&& FD_ISSET ( ii
, write_fds
)) ? 2 : 0 ;
342 flags
|= ( except_fds
&& FD_ISSET ( ii
, except_fds
)) ? 4 : 0 ;
345 pos
+= sprintf ( buf
+ pos
, " %d%s " , ii
, flag_text
[ flags
]);
347 gettimeofday (& now
, NULL
);
348 if ( select_timeout
) {
349 log_module ( MAIN_LOG
, LOG_DEBUG
, " %s , at " FMT_TIME_T
". %0 6ld: %s (timeout " FMT_TIME_T
". %0 6ld)" , msg
, now
. tv_sec
, now
. tv_usec
, buf
, select_timeout
-> tv_sec
, select_timeout
-> tv_usec
);
351 log_module ( MAIN_LOG
, LOG_DEBUG
, " %s , at " FMT_TIME_T
". %0 6ld: %s (no timeout)" , msg
, now
. tv_sec
, now
. tv_usec
, buf
);
358 extern struct io_fd
* socket_io_fd
;
359 struct timeval select_timeout
;
361 int select_result
, max_fd
;
365 while (! quit_services
) {
366 while (! socket_io_fd
)
369 /* How long to sleep? (fill in select_timeout) */
370 wakey
= timeq_next ();
371 if (( wakey
- now
) < 0 )
372 select_timeout
. tv_sec
= 0 ;
374 select_timeout
. tv_sec
= wakey
- now
;
375 select_timeout
. tv_usec
= 0 ;
377 /* Set up read_fds and write_fds fdsets. */
381 for ( nn
= 0 ; nn
< fds_size
; nn
++) {
386 FD_SET ( nn
, & read_fds
);
387 if (( fd
-> send
. get
!= fd
-> send
. put
) || ! fd
-> connected
)
388 FD_SET ( nn
, & write_fds
);
391 /* Check for activity, update time. */
392 debug_fdsets ( "Entering select" , max_fd
+ 1 , & read_fds
, & write_fds
, NULL
, & select_timeout
);
393 select_result
= select ( max_fd
+ 1 , & read_fds
, & write_fds
, NULL
, & select_timeout
);
394 debug_fdsets ( "After select" , max_fd
+ 1 , & read_fds
, & write_fds
, NULL
, & select_timeout
);
395 now
= time ( NULL
) + clock_skew
;
396 if ( select_result
< 0 ) {
397 if ( errno
!= EINTR
) {
398 log_module ( MAIN_LOG
, LOG_ERROR
, "select() error %d : %s " , errno
, strerror ( errno
));
404 /* Call back anybody that has connect or read activity and wants to know. */
405 for ( nn
= 0 ; nn
< fds_size
; nn
++) {
408 if ( FD_ISSET ( nn
, & read_fds
)) {
410 ioset_buffered_read ( fd
);
414 if ( FD_ISSET ( nn
, & write_fds
) && ! fd
-> connected
) {
419 if ( getsockopt ( fd
-> fd
, SOL_SOCKET
, SO_ERROR
, & rc
, & arglen
) < 0 )
423 fd
-> connect_cb ( fd
, rc
);
425 /* Note: check whether write FD is still set, since the
426 * connect_cb() might close the FD, making us dereference
427 * a free()'d pointer for the fd.
429 if ( FD_ISSET ( nn
, & write_fds
) && ( fd
-> send
. get
!= fd
-> send
. put
))
433 /* Call any timeq events we need to call. */
440 extern char * services_config
;
441 conf_read ( services_config
);
448 ioset_write ( struct io_fd
* fd
, const char * buf
, unsigned int nbw
) {
450 while ( ioq_used (& fd
-> send
) + nbw
>= fd
-> send
. size
)
452 avail
= ioq_put_avail (& fd
-> send
);
454 memcpy ( fd
-> send
. buf
+ fd
-> send
. put
, buf
, avail
);
459 memcpy ( fd
-> send
. buf
+ fd
-> send
. put
, buf
, nbw
);
461 if ( fd
-> send
. put
== fd
-> send
. size
)
466 ioset_set_time ( unsigned long new_now
) {
467 clock_skew
= new_now
- time ( NULL
);