]> jfr.im git - irc/rqf/shadowircd.git/blob - servlink/io.c
[svn] - the new plan:
[irc/rqf/shadowircd.git] / servlink / io.c
1 /************************************************************************
2 * IRC - Internet Relay Chat, servlink/io.c
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 1, or (at your option)
7 * any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17 *
18 * $Id: io.c 1285 2006-05-05 15:03:53Z nenolod $
19 */
20
21 #include "setup.h"
22
23 #include <sys/types.h>
24 #include <sys/socket.h>
25
26 #include <assert.h>
27 #include <errno.h>
28 #include <string.h>
29 #include <stdlib.h>
30 #include <stdarg.h>
31 #include <stdio.h>
32 #include <unistd.h>
33
34 #ifdef HAVE_LIBZ
35 #include <zlib.h>
36 #endif
37
38 #include "servlink.h"
39 #include "io.h"
40 #include "control.h"
41
42 static int check_error(int, int, int);
43
44 static const char *
45 fd_name(int fd)
46 {
47 if(fd == CONTROL.fd)
48 return "control";
49 if(fd == LOCAL.fd)
50 return "data";
51 if(fd == REMOTE.fd)
52 return "network";
53
54 /* uh oh... */
55 return "unknown";
56 }
57
58 #if defined( HAVE_LIBZ )
59 static unsigned char tmp_buf[BUFLEN];
60 static unsigned char tmp2_buf[BUFLEN];
61 #endif
62
63 static unsigned char ctrl_buf[256] = "";
64 static unsigned int ctrl_len = 0;
65 static unsigned int ctrl_ofs = 0;
66
67 void
68 io_loop(int nfds)
69 {
70 fd_set rfds;
71 fd_set wfds;
72 int i, ret;
73
74 /* loop forever */
75 for (;;)
76 {
77 FD_ZERO(&rfds);
78 FD_ZERO(&wfds);
79
80 for (i = 0; i < 3; i++)
81 {
82 if(fds[i].read_cb)
83 FD_SET(fds[i].fd, &rfds);
84 if(fds[i].write_cb)
85 FD_SET(fds[i].fd, &wfds);
86 }
87
88 /* we have <3 fds ever, so I don't think select is too painful */
89 ret = select(nfds, &rfds, &wfds, NULL, NULL);
90
91 if(ret < 0)
92 {
93 check_error(ret, IO_SELECT, -1); /* exit on fatal errors */
94 }
95 else if(ret > 0)
96 {
97 /* call any callbacks */
98 for (i = 0; i < 3; i++)
99 {
100 if(FD_ISSET(fds[i].fd, &rfds) && fds[i].read_cb)
101 (*fds[i].read_cb) ();
102 if(FD_ISSET(fds[i].fd, &wfds) && fds[i].write_cb)
103 (*fds[i].write_cb) ();
104 }
105 }
106 }
107 }
108
109 void
110 send_data_blocking(int fd, unsigned char *data, int datalen)
111 {
112 int ret;
113 fd_set wfds;
114
115 while (1)
116 {
117 ret = write(fd, data, datalen);
118
119 if(ret == datalen)
120 return;
121 else if(ret > 0)
122 {
123 data += ret;
124 datalen -= ret;
125 }
126
127 ret = check_error(ret, IO_WRITE, fd);
128
129 FD_ZERO(&wfds);
130 FD_SET(fd, &wfds);
131
132 /* sleep until we can write to the fd */
133 while (1)
134 {
135 ret = select(fd + 1, NULL, &wfds, NULL, NULL);
136
137 if(ret > 0) /* break out so we can write */
138 break;
139
140 if(ret < 0) /* error ? */
141 check_error(ret, IO_SELECT, fd); /* exit on fatal errors */
142
143 /* loop on non-fatal errors */
144 }
145 }
146 }
147
148 /*
149 * process_sendq:
150 *
151 * used before CMD_INIT to pass contents of SendQ from ircd
152 * to servlink. This data must _not_ be encrypted/compressed.
153 */
154 void
155 process_sendq(struct ctrl_command *cmd)
156 {
157 send_data_blocking(REMOTE.fd, cmd->data, cmd->datalen);
158 }
159
160 /*
161 * process_recvq:
162 *
163 * used before CMD_INIT to pass contents of RecvQ from ircd
164 * to servlink. This data must be decrypted/decopmressed before
165 * sending back to the ircd.
166 */
167 void
168 process_recvq(struct ctrl_command *cmd)
169 {
170 int ret;
171 unsigned char *buf;
172 int blen;
173 unsigned char *data = cmd->data;
174 unsigned int datalen = cmd->datalen;
175
176 buf = data;
177 blen = datalen;
178 ret = -1;
179 if(datalen > READLEN)
180 send_error("Error processing INJECT_RECVQ - buffer too long (%d > %d)",
181 datalen, READLEN);
182
183 #ifdef HAVE_LIBZ
184 if(in_state.zip)
185 {
186 /* decompress data */
187 in_state.zip_state.z_stream.next_in = buf;
188 in_state.zip_state.z_stream.avail_in = blen;
189 in_state.zip_state.z_stream.next_out = tmp2_buf;
190 in_state.zip_state.z_stream.avail_out = BUFLEN;
191
192 buf = tmp2_buf;
193 while (in_state.zip_state.z_stream.avail_in)
194 {
195 if((ret = inflate(&in_state.zip_state.z_stream, Z_NO_FLUSH)) != Z_OK)
196 {
197 if(!strncmp("ERROR ", (char *)in_state.zip_state.z_stream.next_in, 6))
198 send_error("Received uncompressed ERROR");
199 else
200 send_error("Inflate failed: %s", zError(ret));
201 }
202 blen = BUFLEN - in_state.zip_state.z_stream.avail_out;
203
204 if(in_state.zip_state.z_stream.avail_in)
205 {
206 send_data_blocking(LOCAL.fd, buf, blen);
207 blen = 0;
208 in_state.zip_state.z_stream.next_out = buf;
209 in_state.zip_state.z_stream.avail_out = BUFLEN;
210 }
211 }
212
213 if(!blen)
214 return;
215 }
216 #endif
217
218 send_data_blocking(LOCAL.fd, buf, blen);
219 }
220
221 void
222 send_zipstats(struct ctrl_command *unused)
223 {
224 #ifdef HAVE_LIBZ
225 int i = 0;
226 int ret;
227 u_int32_t len;
228 if(!in_state.active || !out_state.active)
229 send_error("Error processing CMD_ZIPSTATS - link is not active!");
230 if(!in_state.zip || !out_state.zip)
231 send_error("Error processing CMD_ZIPSTATS - link is not compressed!");
232
233 ctrl_buf[i++] = RPL_ZIPSTATS;
234 ctrl_buf[i++] = 0;
235 ctrl_buf[i++] = 16;
236
237 len = (u_int32_t) in_state.zip_state.z_stream.total_out;
238 ctrl_buf[i++] = ((len >> 24) & 0xFF);
239 ctrl_buf[i++] = ((len >> 16) & 0xFF);
240 ctrl_buf[i++] = ((len >> 8) & 0xFF);
241 ctrl_buf[i++] = ((len) & 0xFF);
242
243 len = (u_int32_t) in_state.zip_state.z_stream.total_in;
244 ctrl_buf[i++] = ((len >> 24) & 0xFF);
245 ctrl_buf[i++] = ((len >> 16) & 0xFF);
246 ctrl_buf[i++] = ((len >> 8) & 0xFF);
247 ctrl_buf[i++] = ((len) & 0xFF);
248
249 len = (u_int32_t) out_state.zip_state.z_stream.total_in;
250 ctrl_buf[i++] = ((len >> 24) & 0xFF);
251 ctrl_buf[i++] = ((len >> 16) & 0xFF);
252 ctrl_buf[i++] = ((len >> 8) & 0xFF);
253 ctrl_buf[i++] = ((len) & 0xFF);
254
255 len = (u_int32_t) out_state.zip_state.z_stream.total_out;
256 ctrl_buf[i++] = ((len >> 24) & 0xFF);
257 ctrl_buf[i++] = ((len >> 16) & 0xFF);
258 ctrl_buf[i++] = ((len >> 8) & 0xFF);
259 ctrl_buf[i++] = ((len) & 0xFF);
260
261 in_state.zip_state.z_stream.total_in = 0;
262 in_state.zip_state.z_stream.total_out = 0;
263 out_state.zip_state.z_stream.total_in = 0;
264 out_state.zip_state.z_stream.total_out = 0;
265
266 ret = check_error(write(CONTROL.fd, ctrl_buf, i), IO_WRITE, CONTROL.fd);
267 if(ret < i)
268 {
269 /* write incomplete, register write cb */
270 CONTROL.write_cb = write_ctrl;
271 /* deregister read_cb */
272 CONTROL.read_cb = NULL;
273 ctrl_ofs = ret;
274 ctrl_len = i - ret;
275 return;
276 }
277 #else
278 send_error("can't send_zipstats -- no zlib support!");
279 #endif
280 }
281
282 /* send_error
283 * - we ran into some problem, make a last ditch effort to
284 * flush the control fd sendq, then (blocking) send an
285 * error message over the control fd.
286 */
287 void
288 send_error(const char *message, ...)
289 {
290 va_list args;
291 static int sending_error = 0;
292 struct linger linger_opt = { 1, 30 }; /* wait 30 seconds */
293 int len;
294
295 if(sending_error)
296 exit(1); /* we did _try_ */
297
298 sending_error = 1;
299
300 if(ctrl_len) /* attempt to flush any data we have... */
301 {
302 send_data_blocking(CONTROL.fd, (ctrl_buf + ctrl_ofs), ctrl_len);
303 }
304
305 /* prepare the message, in in_buf, since we won't be using it again.. */
306 in_state.buf[0] = RPL_ERROR;
307 in_state.buf[1] = 0;
308 in_state.buf[2] = 0;
309
310 va_start(args, message);
311 len = vsprintf((char *) in_state.buf + 3, message, args);
312 va_end(args);
313
314 in_state.buf[3 + len++] = '\0';
315 in_state.buf[1] = len >> 8;
316 in_state.buf[2] = len & 0xFF;
317 len += 3;
318
319 send_data_blocking(CONTROL.fd, in_state.buf, len);
320
321 /* XXX - is this portable?
322 * this obviously will fail on a non socket.. */
323 setsockopt(CONTROL.fd, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(struct linger));
324
325 /* well, we've tried... */
326 exit(1); /* now abort */
327 }
328
329 /* read_ctrl
330 * called when a command is waiting on the control pipe
331 */
332 void
333 read_ctrl(void)
334 {
335 int ret;
336 unsigned char tmp[2];
337 unsigned char *len;
338 struct command_def *cdef;
339 static struct ctrl_command cmd = { 0, 0, 0, 0, NULL };
340
341 if(cmd.command == 0) /* we don't have a command yet */
342 {
343 cmd.gotdatalen = 0;
344 cmd.datalen = 0;
345 cmd.readdata = 0;
346 cmd.data = NULL;
347
348 /* read the command */
349 if(!(ret = check_error(read(CONTROL.fd, tmp, 1), IO_READ, CONTROL.fd)))
350 return;
351
352 cmd.command = tmp[0];
353 }
354
355 for (cdef = command_table; cdef->commandid; cdef++)
356 {
357 if((int)cdef->commandid == cmd.command)
358 break;
359 }
360
361 if(!cdef->commandid)
362 {
363 send_error("Unsupported command (servlink/ircd out of sync?): %d", cmd.command);
364 /* NOTREACHED */
365 }
366
367 /* read datalen for commands including data */
368 if(cdef->flags & COMMAND_FLAG_DATA)
369 {
370 if(cmd.gotdatalen < 2)
371 {
372 len = tmp;
373 if(!(ret = check_error(read(CONTROL.fd, len,
374 (2 - cmd.gotdatalen)), IO_READ, CONTROL.fd)))
375 return;
376
377 if(cmd.gotdatalen == 0)
378 {
379 cmd.datalen = len[0] << 8;
380 cmd.gotdatalen++;
381 ret--;
382 len++;
383 }
384 if(ret && (cmd.gotdatalen == 1))
385 {
386 cmd.datalen |= len[0];
387 cmd.gotdatalen++;
388 if(cmd.datalen > 0)
389 cmd.data = calloc(cmd.datalen, 1);
390 }
391 }
392 }
393
394 if(cmd.readdata < cmd.datalen) /* try to get any remaining data */
395 {
396 if(!(ret = check_error(read(CONTROL.fd,
397 (cmd.data + cmd.readdata),
398 cmd.datalen - cmd.readdata), IO_READ, CONTROL.fd)))
399 return;
400
401 cmd.readdata += ret;
402 if(cmd.readdata < cmd.datalen)
403 return;
404 }
405
406 /* we now have the command and any data */
407 (*cdef->handler) (&cmd);
408
409 if(cmd.datalen > 0)
410 free(cmd.data);
411 cmd.command = 0;
412 }
413
414 void
415 write_ctrl(void)
416 {
417 int ret;
418
419 assert(ctrl_len);
420
421 if(!(ret = check_error(write(CONTROL.fd, (ctrl_buf + ctrl_ofs),
422 ctrl_len), IO_WRITE, CONTROL.fd)))
423 return; /* no data waiting */
424
425 ctrl_len -= ret;
426
427 if(!ctrl_len)
428 {
429 /* write completed, de-register write cb */
430 CONTROL.write_cb = NULL;
431 /* reregister read_cb */
432 CONTROL.read_cb = read_ctrl;
433 ctrl_ofs = 0;
434 }
435 else
436 ctrl_ofs += ret;
437 }
438
439 void
440 read_data(void)
441 {
442 int ret, ret2;
443 unsigned char *buf = out_state.buf;
444 int blen;
445 ret2 = -1;
446 assert(!out_state.len);
447
448 #if defined(HAVE_LIBZ)
449 if(out_state.zip || out_state.crypt)
450 buf = tmp_buf;
451 #endif
452
453 while ((ret = check_error(read(LOCAL.fd, buf, READLEN), IO_READ, LOCAL.fd)))
454 {
455 blen = ret;
456 #ifdef HAVE_LIBZ
457 if(out_state.zip)
458 {
459 out_state.zip_state.z_stream.next_in = buf;
460 out_state.zip_state.z_stream.avail_in = ret;
461
462 buf = out_state.buf;
463 out_state.zip_state.z_stream.next_out = buf;
464 out_state.zip_state.z_stream.avail_out = BUFLEN;
465 if(!(ret2 = deflate(&out_state.zip_state.z_stream,
466 Z_PARTIAL_FLUSH)) == Z_OK)
467 send_error("error compressing outgoing data - deflate returned: %s",
468 zError(ret2));
469
470 if(!out_state.zip_state.z_stream.avail_out)
471 send_error("error compressing outgoing data - avail_out == 0");
472 if(out_state.zip_state.z_stream.avail_in)
473 send_error("error compressing outgoing data - avail_in != 0");
474
475 blen = BUFLEN - out_state.zip_state.z_stream.avail_out;
476 }
477 #endif
478
479
480 ret = check_error(write(REMOTE.fd, out_state.buf, blen), IO_WRITE, REMOTE.fd);
481 if(ret < blen)
482 {
483 /* write incomplete, register write cb */
484 REMOTE.write_cb = write_net;
485 /* deregister read_cb */
486 LOCAL.read_cb = NULL;
487 out_state.ofs = ret;
488 out_state.len = blen - ret;
489 return;
490 }
491 #if defined(HAVE_LIBZ)
492 if(out_state.zip)
493 buf = tmp_buf;
494 #endif
495 }
496
497 }
498
499 void
500 write_net(void)
501 {
502 int ret;
503
504 assert(out_state.len);
505
506 if(!(ret = check_error(write(REMOTE.fd,
507 (out_state.buf + out_state.ofs),
508 out_state.len), IO_WRITE, REMOTE.fd)))
509 return; /* no data waiting */
510
511 out_state.len -= ret;
512
513 if(!out_state.len)
514 {
515 /* write completed, de-register write cb */
516 REMOTE.write_cb = NULL;
517 /* reregister read_cb */
518 LOCAL.read_cb = read_data;
519 out_state.ofs = 0;
520 }
521 else
522 out_state.ofs += ret;
523 }
524
525 void
526 read_net(void)
527 {
528 int ret;
529 int ret2;
530 unsigned char *buf = in_state.buf;
531 int blen;
532 ret2 = -1;
533 assert(!in_state.len);
534
535 #if defined(HAVE_LIBZ)
536 if(in_state.zip)
537 buf = tmp_buf;
538 #endif
539
540 while ((ret = check_error(read(REMOTE.fd, buf, READLEN), IO_READ, REMOTE.fd)))
541 {
542 blen = ret;
543 #ifdef HAVE_LIBZ
544 if(in_state.zip)
545 {
546 /* decompress data */
547 in_state.zip_state.z_stream.next_in = buf;
548 in_state.zip_state.z_stream.avail_in = ret;
549 in_state.zip_state.z_stream.next_out = in_state.buf;
550 in_state.zip_state.z_stream.avail_out = BUFLEN;
551
552 while (in_state.zip_state.z_stream.avail_in)
553 {
554 if((ret2 = inflate(&in_state.zip_state.z_stream,
555 Z_NO_FLUSH)) != Z_OK)
556 {
557 if(!strncmp("ERROR ", (char *)buf, 6))
558 send_error("Received uncompressed ERROR");
559 send_error("Inflate failed: %s", zError(ret2));
560 }
561 blen = BUFLEN - in_state.zip_state.z_stream.avail_out;
562
563 if(in_state.zip_state.z_stream.avail_in)
564 {
565 if(blen)
566 {
567 send_data_blocking(LOCAL.fd, in_state.buf, blen);
568 blen = 0;
569 }
570
571 in_state.zip_state.z_stream.next_out = in_state.buf;
572 in_state.zip_state.z_stream.avail_out = BUFLEN;
573 }
574 }
575
576 if(!blen)
577 return; /* that didn't generate any decompressed input.. */
578 }
579 #endif
580
581 ret = check_error(write(LOCAL.fd, in_state.buf, blen), IO_WRITE, LOCAL.fd);
582
583 if(ret < blen)
584 {
585 in_state.ofs = ret;
586 in_state.len = blen - ret;
587 /* write incomplete, register write cb */
588 LOCAL.write_cb = write_data;
589 /* deregister read_cb */
590 REMOTE.read_cb = NULL;
591 return;
592 }
593 #if defined(HAVE_LIBZ)
594 if(in_state.zip)
595 buf = tmp_buf;
596 #endif
597 }
598 }
599
600 void
601 write_data(void)
602 {
603 int ret;
604
605 assert(in_state.len);
606
607 if(!(ret = check_error(write(LOCAL.fd,
608 (in_state.buf + in_state.ofs),
609 in_state.len), IO_WRITE, LOCAL.fd)))
610 return;
611
612 in_state.len -= ret;
613
614 if(!in_state.len)
615 {
616 /* write completed, de-register write cb */
617 LOCAL.write_cb = NULL;
618 /* reregister read_cb */
619 REMOTE.read_cb = read_net;
620 in_state.ofs = 0;
621 }
622 else
623 in_state.ofs += ret;
624 }
625
626 int
627 check_error(int ret, int io, int fd)
628 {
629 if(ret > 0) /* no error */
630 return ret;
631 if(ret == 0) /* EOF */
632 {
633 send_error("%s failed on %s: EOF", IO_TYPE(io), FD_NAME(fd));
634 exit(1); /* NOTREACHED */
635 }
636
637 /* ret == -1.. */
638 switch (errno)
639 {
640 case EINPROGRESS:
641 case EWOULDBLOCK:
642 #if EAGAIN != EWOULDBLOCK
643 case EAGAIN:
644 #endif
645 case EALREADY:
646 case EINTR:
647 #ifdef ERESTART
648 case ERESTART:
649 #endif
650 /* non-fatal error, 0 bytes read */
651 return 0;
652 }
653
654 /* fatal error */
655 send_error("%s failed on %s: %s", IO_TYPE(io), FD_NAME(fd), strerror(errno));
656 exit(1); /* NOTREACHED */
657 }