2 * IRC - Internet Relay Chat, ircd/msgq.c
3 * Copyright (C) 2000 Kevin L. Mitchell <klmitch@mit.edu>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 1, or (at your option)
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 * @brief Outbound message queue implementation.
21 * @version $Id: msgq.c,v 1.12.2.1 2006/03/14 03:56:58 entrope Exp $
27 #include "ircd_alloc.h"
28 #include "ircd_defs.h"
29 #include "ircd_features.h"
31 #include "ircd_reply.h"
32 #include "ircd_snprintf.h"
38 /* #include <assert.h> -- Now using assert in ircd_log.h */
41 #include <sys/types.h>
42 #include <sys/uio.h> /* struct iovec */
44 #define MB_BASE_SHIFT 5 /**< Log2 of smallest message body to allocate. */
45 #define MB_MAX_SHIFT 9 /**< Log2 of largest message body to allocate. */
47 /** Buffer for a single message. */
49 struct MsgBuf
*next
; /**< next msg in global queue */
50 struct MsgBuf
**prev_p
; /**< what points to us in linked list */
51 struct MsgBuf
*real
; /**< the actual MsgBuf we're attaching */
52 unsigned int ref
; /**< reference count */
53 unsigned int length
; /**< length of message */
54 unsigned int power
; /**< size of buffer (power of 2) */
55 char msg
[1]; /**< the message */
58 /** Return allocated length of the buffer of \a buf. */
59 #define bufsize(buf) (1 << (buf)->power)
61 /** Message body for a particular destination. */
63 struct Msg
*next
; /**< next msg */
64 unsigned int sent
; /**< bytes in msg that have already been sent */
65 struct MsgBuf
*msg
; /**< actual message in queue */
68 /** Statistics tracking for message sizes. */
70 unsigned int msgs
; /**< total number of messages */
71 unsigned int sizes
[BUFSIZE
]; /**< histogram of message sizes */
74 /** Global tracking data for message buffers. */
76 struct MsgBuf
*msglist
; /**< list of in-use MsgBuf's */
78 unsigned int alloc
; /**< number of Msg's allocated */
79 unsigned int used
; /**< number of Msg's in use */
80 struct Msg
*free
; /**< freelist of Msg's */
81 } msgs
; /**< tracking info for Msg structs */
82 size_t tot_bufsize
; /**< total amount of memory in buffers */
83 /** Array of MsgBuf information, one entry for each used bucket size. */
85 unsigned int alloc
; /**< total MsgBuf's of this size */
86 unsigned int used
; /**< number of MsgBuf's of this size in use */
87 struct MsgBuf
*free
; /**< list of free MsgBuf's */
88 } msgBufs
[MB_MAX_SHIFT
- MB_BASE_SHIFT
+ 1];
89 struct MsgSizes sizes
; /**< histogram of message sizes */
93 * This routine is used to remove a certain amount of data from a given
94 * queue and release the Msg (and MsgBuf) structure if needed
96 /** Remove some data from a list within a message queue.
97 * @param[in,out] mq Message queue to remove from.
98 * @param[in,out] qlist Particular list within queue to remove from.
99 * @param[in,out] length_p Number of bytes left to remove.
102 msgq_delmsg(struct MsgQ
*mq
, struct MsgQList
*qlist
, unsigned int *length_p
)
109 assert(0 != qlist
->head
);
110 assert(0 != length_p
);
112 m
= qlist
->head
; /* find the msg we're deleting from */
114 msglen
= m
->msg
->length
- m
->sent
; /* calculate how much is left */
116 if (*length_p
>= msglen
) { /* deleted it all? */
117 mq
->length
-= msglen
; /* decrement length */
118 mq
->count
--; /* decrement the message count */
121 msgq_clean(m
->msg
); /* free up the struct MsgBuf */
122 m
->msg
= 0; /* don't let it point anywhere nasty, please */
124 if (qlist
->head
== qlist
->tail
) /* figure out if we emptied the queue */
125 qlist
->head
= qlist
->tail
= 0;
127 qlist
->head
= m
->next
; /* just shift the list down some */
129 MQData
.msgs
.used
--; /* struct Msg is not in use anymore */
131 m
->next
= MQData
.msgs
.free
; /* throw it onto the free list */
132 MQData
.msgs
.free
= m
;
134 mq
->length
-= *length_p
; /* decrement queue length */
135 m
->sent
+= *length_p
; /* this much of the message has been sent */
136 *length_p
= 0; /* we've dealt with it all */
140 /** Initialize \a mq.
141 * @param[in] mq MsgQ to initialize.
144 msgq_init(struct MsgQ
*mq
)
156 /** Delete bytes from the front of a message queue.
157 * @param[in] mq Queue to drop data from.
158 * @param[in] length Number of bytes to drop.
161 msgq_delete(struct MsgQ
*mq
, unsigned int length
)
166 if (mq
->queue
.head
&& mq
->queue
.head
->sent
> 0) /* partial msg on norm q */
167 msgq_delmsg(mq
, &mq
->queue
, &length
);
168 else if (mq
->prio
.head
) /* message (partial or complete) on prio queue */
169 msgq_delmsg(mq
, &mq
->prio
, &length
);
170 else if (mq
->queue
.head
) /* message on normal queue */
171 msgq_delmsg(mq
, &mq
->queue
, &length
);
177 /** Map data from a message queue to an I/O vector.
178 * @param[in] mq Message queue to send from.
179 * @param[out] iov Output vector.
180 * @param[in] count Number of elements in \a iov.
181 * @param[out] len Number of bytes mapped from \a mq to \a iov.
182 * @return Number of elements filled in \a iov.
185 msgq_mapiov(const struct MsgQ
*mq
, struct iovec
*iov
, int count
,
197 if (mq
->length
<= 0) /* no data to map */
200 if (mq
->queue
.head
&& mq
->queue
.head
->sent
> 0) { /* partial msg on norm q */
201 iov
[i
].iov_base
= mq
->queue
.head
->msg
->msg
+ mq
->queue
.head
->sent
;
202 iov
[i
].iov_len
= mq
->queue
.head
->msg
->length
- mq
->queue
.head
->sent
;
203 *len
+= iov
[i
].iov_len
;
205 queue
= mq
->queue
.head
->next
; /* where we start later... */
207 i
++; /* filled an iovec... */
208 if (!--count
) /* check for space */
211 queue
= mq
->queue
.head
; /* start at head of queue */
213 if (mq
->prio
.head
&& mq
->prio
.head
->sent
> 0) { /* partial msg on prio q */
214 iov
[i
].iov_base
= mq
->prio
.head
->msg
->msg
+ mq
->prio
.head
->sent
;
215 iov
[i
].iov_len
= mq
->prio
.head
->msg
->length
- mq
->prio
.head
->sent
;
216 *len
+= iov
[i
].iov_len
;
218 prio
= mq
->prio
.head
->next
; /* where we start later... */
220 i
++; /* filled an iovec... */
221 if (!--count
) /* check for space */
224 prio
= mq
->prio
.head
; /* start at head of prio */
226 for (; prio
; prio
= prio
->next
) { /* go through prio queue */
227 iov
[i
].iov_base
= prio
->msg
->msg
; /* store message */
228 iov
[i
].iov_len
= prio
->msg
->length
;
229 *len
+= iov
[i
].iov_len
;
231 i
++; /* filled an iovec... */
232 if (!--count
) /* check for space */
236 for (; queue
; queue
= queue
->next
) { /* go through normal queue */
237 iov
[i
].iov_base
= queue
->msg
->msg
;
238 iov
[i
].iov_len
= queue
->msg
->length
;
239 *len
+= iov
[i
].iov_len
;
241 i
++; /* filled an iovec... */
242 if (!--count
) /* check for space */
249 /** Allocate a message buffer large enough to hold \a length bytes.
250 * TODO: \a in_mb needs better documentation.
251 * @param[in] in_mb Some other message buffer(?).
252 * @param[in] length Number of bytes of space to reserve in output.
253 * @return Pointer to some usable message buffer.
255 static struct MsgBuf
*
256 msgq_alloc(struct MsgBuf
*in_mb
, int length
)
261 /* Find the power of two size that will accommodate the message */
262 for (power
= MB_BASE_SHIFT
; power
< MB_MAX_SHIFT
+ 1; power
++)
263 if ((length
- 1) >> power
== 0)
265 assert((1 << power
) >= length
);
266 assert((1 << power
) <= 512);
267 length
= 1 << power
; /* reset the length */
269 /* If the message needs a buffer of exactly the existing size, just use it */
270 if (in_mb
&& in_mb
->power
== power
) {
271 in_mb
->real
= in_mb
; /* real buffer is this buffer */
275 /* Try popping one off the freelist first */
276 if ((mb
= MQData
.msgBufs
[power
- MB_BASE_SHIFT
].free
)) {
277 MQData
.msgBufs
[power
- MB_BASE_SHIFT
].free
= mb
->next
;
278 } else if (MQData
.tot_bufsize
< feature_int(FEAT_BUFFERPOOL
)) {
279 /* Allocate another if we won't bust the BUFFERPOOL */
280 Debug((DEBUG_MALLOC
, "Allocating MsgBuf of length %d (total size %zu)",
281 length
, sizeof(struct MsgBuf
) + length
));
282 mb
= (struct MsgBuf
*)MyMalloc(sizeof(struct MsgBuf
) + length
);
283 MQData
.msgBufs
[power
- MB_BASE_SHIFT
].alloc
++;
284 mb
->power
= power
; /* remember size */
285 MQData
.tot_bufsize
+= length
;
289 MQData
.msgBufs
[power
- MB_BASE_SHIFT
].used
++; /* how many are we using? */
291 mb
->real
= 0; /* essential initializations */
294 if (in_mb
) /* remember who's the *real* buffer */
296 } else if (in_mb
) /* just use the input buffer */
297 mb
= in_mb
->real
= in_mb
;
299 return mb
; /* return the buffer */
302 /** Deallocate unused message buffers.
305 msgq_clear_freembs(void)
310 /* Walk through the various size classes */
311 for (i
= MB_BASE_SHIFT
; i
< MB_MAX_SHIFT
+ 1; i
++)
312 /* walk down the free list */
313 while ((mb
= MQData
.msgBufs
[i
- MB_BASE_SHIFT
].free
)) {
314 MQData
.msgBufs
[i
- MB_BASE_SHIFT
].free
= mb
->next
; /* shift free list */
315 MQData
.msgBufs
[i
- MB_BASE_SHIFT
].alloc
--; /* reduce allocation count */
316 MQData
.tot_bufsize
-= 1 << i
; /* reduce total buffer allocation count */
317 MyFree(mb
); /* and free the buffer */
321 /** Format a message buffer for a client from a format string.
322 * @param[in] dest %Client that receives the data (may be NULL).
323 * @param[in] format Format string for message.
324 * @param[in] vl Argument list for \a format.
325 * @return Allocated MsgBuf.
328 msgq_vmake(struct Client
*dest
, const char *format
, va_list vl
)
334 if (!(mb
= msgq_alloc(0, BUFSIZE
))) {
335 if (feature_bool(FEAT_HAS_FERGUSON_FLUSHER
)) {
337 * from "Married With Children" episode were Al bought a REAL toilet
338 * on the black market because he was tired of the wimpy water
339 * conserving toilets they make these days --Bleep
342 * Apparently this doesn't work, the server _has_ to
343 * dump a few clients to handle the load. A fully loaded
344 * server cannot handle a net break without dumping some
345 * clients. If we flush the connections here under a full
346 * load we may end up starving the kernel for mbufs and
350 * attempt to recover from buffer starvation before
351 * bailing this may help servers running out of memory
353 flush_connections(0);
354 mb
= msgq_alloc(0, BUFSIZE
);
356 if (!mb
) { /* OK, try clearing the buffer free list */
357 msgq_clear_freembs();
358 mb
= msgq_alloc(0, BUFSIZE
);
360 if (!mb
) { /* OK, try killing a client */
361 kill_highest_sendq(0); /* Don't kill any server connections */
362 msgq_clear_freembs(); /* Release whatever was just freelisted */
363 mb
= msgq_alloc(0, BUFSIZE
);
365 if (!mb
) { /* hmmm... */
366 kill_highest_sendq(1); /* Try killing a server connection now */
367 msgq_clear_freembs(); /* Clear freelist again */
368 mb
= msgq_alloc(0, BUFSIZE
);
370 if (!mb
) /* AIEEEE! */
371 server_panic("Unable to allocate buffers!");
374 mb
->next
= MQData
.msglist
; /* initialize the msgbuf */
375 mb
->prev_p
= &MQData
.msglist
;
377 /* fill the buffer */
378 mb
->length
= ircd_vsnprintf(dest
, mb
->msg
, bufsize(mb
) - 1, format
, vl
);
380 if (mb
->length
> bufsize(mb
) - 2)
381 mb
->length
= bufsize(mb
) - 2;
383 mb
->msg
[mb
->length
++] = '\r'; /* add \r\n to buffer */
384 mb
->msg
[mb
->length
++] = '\n';
385 mb
->msg
[mb
->length
] = '\0'; /* not strictly necessary */
387 assert(mb
->length
<= bufsize(mb
));
389 if (MQData
.msglist
) /* link it into the list */
390 MQData
.msglist
->prev_p
= &mb
->next
;
396 /** Format a message buffer for a client from a format string.
397 * @param[in] dest %Client that receives the data (may be NULL).
398 * @param[in] format Format string for message.
399 * @return Allocated MsgBuf.
402 msgq_make(struct Client
*dest
, const char *format
, ...)
407 va_start(vl
, format
);
408 mb
= msgq_vmake(dest
, format
, vl
);
414 /** Append text to an existing message buffer.
415 * @param[in] dest %Client for whom to format the message.
416 * @param[in] mb Message buffer to append to.
417 * @param[in] format Format string of what to append.
420 msgq_append(struct Client
*dest
, struct MsgBuf
*mb
, const char *format
, ...)
426 assert(0 == mb
->real
);
428 assert(2 < mb
->length
);
429 assert(bufsize(mb
) >= mb
->length
);
431 mb
->length
-= 2; /* back up to before \r\n */
433 va_start(vl
, format
); /* append to the buffer */
434 mb
->length
+= ircd_vsnprintf(dest
, mb
->msg
+ mb
->length
,
435 bufsize(mb
) - mb
->length
- 1, format
, vl
);
438 if (mb
->length
> bufsize(mb
) - 2)
439 mb
->length
= bufsize(mb
) - 2;
441 mb
->msg
[mb
->length
++] = '\r'; /* add \r\n to buffer */
442 mb
->msg
[mb
->length
++] = '\n';
443 mb
->msg
[mb
->length
] = '\0'; /* not strictly necessary */
445 assert(mb
->length
<= bufsize(mb
));
448 /** Decrement the reference count on \a mb, freeing it if needed.
449 * @param[in] mb MsgBuf to release.
452 msgq_clean(struct MsgBuf
*mb
)
457 if (!--mb
->ref
) { /* deallocate the message */
459 *mb
->prev_p
= mb
->next
; /* clip it out of active MsgBuf's list */
461 mb
->next
->prev_p
= mb
->prev_p
;
464 if (mb
->real
&& mb
->real
!= mb
) /* clean up the real buffer */
465 msgq_clean(mb
->real
);
467 mb
->next
= MQData
.msgBufs
[mb
->power
- MB_BASE_SHIFT
].free
;
468 MQData
.msgBufs
[mb
->power
- MB_BASE_SHIFT
].free
= mb
;
469 MQData
.msgBufs
[mb
->power
- MB_BASE_SHIFT
].used
--;
475 /** Append a message to a peer's message queue.
476 * @param[in] mq Message queue to append to.
477 * @param[in] mb Message to append.
478 * @param[in] prio If non-zero, use the high-priority (lag-busting) message list; else use the normal list.
481 msgq_add(struct MsgQ
*mq
, struct MsgBuf
*mb
, int prio
)
483 struct MsgQList
*qlist
;
489 assert(0 < mb
->length
);
491 Debug((DEBUG_SEND
, "Adding buffer %p [%.*s] length %u to %s queue", mb
,
492 mb
->length
- 2, mb
->msg
, mb
->length
, prio
? "priority" : "normal"));
494 qlist
= prio
? &mq
->prio
: &mq
->queue
;
496 if (!(msg
= MQData
.msgs
.free
)) { /* do I need to allocate one? */
497 msg
= (struct Msg
*)MyMalloc(sizeof(struct Msg
));
498 MQData
.msgs
.alloc
++; /* we allocated another */
499 } else /* shift the free list */
500 MQData
.msgs
.free
= MQData
.msgs
.free
->next
;
502 MQData
.msgs
.used
++; /* we're using another */
504 msg
->next
= 0; /* initialize the msg */
507 /* Get the real buffer, allocating one if necessary */
511 MQData
.sizes
.msgs
++; /* update histogram counts */
512 MQData
.sizes
.sizes
[mb
->length
- 1]++;
514 tmp
= msgq_alloc(mb
, mb
->length
); /* allocate a close-fitting buffer */
516 if (tmp
!= mb
) { /* OK, prepare the new "real" buffer */
517 Debug((DEBUG_SEND
, "Copying old buffer %p [%.*s] length %u into new "
518 "buffer %p size %u", mb
, mb
->length
- 2, mb
->msg
, mb
->length
,
520 memcpy(tmp
->msg
, mb
->msg
, mb
->length
+ 1); /* copy string over */
521 tmp
->length
= mb
->length
;
523 tmp
->next
= mb
->next
; /* replace it in the list, now */
525 tmp
->next
->prev_p
= &tmp
->next
;
526 tmp
->prev_p
= mb
->prev_p
;
529 mb
->next
= 0; /* this one's no longer in the list */
534 mb
= mb
->real
; /* work with the real buffer */
535 mb
->ref
++; /* increment the ref count on the buffer */
537 msg
->msg
= mb
; /* point at the real message buffer now */
539 if (!qlist
->head
) /* queue list was empty; head and tail point to msg */
540 qlist
->head
= qlist
->tail
= msg
;
542 assert(0 != qlist
->tail
);
544 qlist
->tail
->next
= msg
; /* queue had something in it; add to end */
548 mq
->length
+= mb
->length
; /* update the queue length */
549 mq
->count
++; /* and the queue count */
552 /** Report memory statistics for message buffers.
553 * @param[in] cptr Client requesting information.
554 * @param[out] msg_alloc Receives number of bytes allocated in Msg structs.
555 * @param[out] msgbuf_alloc Receives number of bytes allocated in MsgBuf structs.
558 msgq_count_memory(struct Client
*cptr
, size_t *msg_alloc
, size_t *msgbuf_alloc
)
561 size_t total
= 0, size
;
564 assert(0 != msg_alloc
);
565 assert(0 != msgbuf_alloc
);
567 /* Data for Msg's is simple, so just send it */
568 send_reply(cptr
, SND_EXPLICIT
| RPL_STATSDEBUG
,
569 ":Msgs allocated %d(%zu) used %d(%zu) text %zu",
570 MQData
.msgs
.alloc
, MQData
.msgs
.alloc
* sizeof(struct Msg
),
571 MQData
.msgs
.used
, MQData
.msgs
.used
* sizeof(struct Msg
),
573 /* count_memory() wants to know the total */
574 *msg_alloc
= MQData
.msgs
.alloc
* sizeof(struct Msg
);
576 /* Ok, now walk through each size class */
577 for (i
= MB_BASE_SHIFT
; i
< MB_MAX_SHIFT
+ 1; i
++) {
578 size
= sizeof(struct MsgBuf
) + (1 << i
); /* total size of a buffer */
580 /* Send information for this buffer size class */
581 send_reply(cptr
, SND_EXPLICIT
| RPL_STATSDEBUG
,
582 ":MsgBufs of size %zu allocated %d(%zu) used %d(%zu)", 1 << i
,
583 MQData
.msgBufs
[i
- MB_BASE_SHIFT
].alloc
,
584 MQData
.msgBufs
[i
- MB_BASE_SHIFT
].alloc
* size
,
585 MQData
.msgBufs
[i
- MB_BASE_SHIFT
].used
,
586 MQData
.msgBufs
[i
- MB_BASE_SHIFT
].used
* size
);
588 /* count_memory() wants to know the total */
589 total
+= MQData
.msgBufs
[i
- MB_BASE_SHIFT
].alloc
* size
;
591 *msgbuf_alloc
= total
;
594 /** Report remaining space in a MsgBuf.
595 * @param[in] mb Message buffer to check.
596 * @return Number of additional bytes that can be appended to the message.
599 msgq_bufleft(struct MsgBuf
*mb
)
603 return bufsize(mb
) - mb
->length
; /* \r\n counted in mb->length */
606 /** Send histogram of message lengths to a client.
607 * @param[in] cptr Client requesting statistics.
608 * @param[in] sd Stats descriptor for request (ignored).
609 * @param[in] param Extra parameter from user (ignored).
612 msgq_histogram(struct Client
*cptr
, const struct StatDesc
*sd
, char *param
)
614 struct MsgSizes tmp
= MQData
.sizes
; /* All hail structure copy! */
617 send_reply(cptr
, SND_EXPLICIT
| RPL_STATSDEBUG
,
618 ":Histogram of message lengths (%lu messages)", tmp
.msgs
);
619 for (i
= 0; i
+ 16 <= BUFSIZE
; i
+= 16)
620 send_reply(cptr
, SND_EXPLICIT
| RPL_STATSDEBUG
, ":% 4d: %u %u %u %u "
621 "%u %u %u %u %u %u %u %u %u %u %u %u", i
+ 1,
622 tmp
.sizes
[i
+ 0], tmp
.sizes
[i
+ 1], tmp
.sizes
[i
+ 2],
623 tmp
.sizes
[i
+ 3], tmp
.sizes
[i
+ 4], tmp
.sizes
[i
+ 5],
624 tmp
.sizes
[i
+ 6], tmp
.sizes
[i
+ 7], tmp
.sizes
[i
+ 8],
625 tmp
.sizes
[i
+ 9], tmp
.sizes
[i
+ 10], tmp
.sizes
[i
+ 11],
626 tmp
.sizes
[i
+ 12], tmp
.sizes
[i
+ 13], tmp
.sizes
[i
+ 14],