]> jfr.im git - irc/quakenet/newserv.git/blob - pqsql/pqsql.c
BUILD: add require-all build mode
[irc/quakenet/newserv.git] / pqsql / pqsql.c
1 /*
2 * PQSQL module
3 *
4 * 99% of the handling is stolen from Q9.
5 */
6
7 #include "../core/config.h"
8 #include "../core/error.h"
9 #include "../irc/irc_config.h"
10 #include "../core/events.h"
11 #include "../core/hooks.h"
12 #include "../core/nsmalloc.h"
13 #include "../lib/irc_string.h"
14 #include "../lib/version.h"
15 #include "../lib/strlfunc.h"
16 #include "pqsql.h"
17
18 #define BUILDING_DBAPI
19 #include "../dbapi/dbapi.h"
20
21 #include <stdlib.h>
22 #include <sys/poll.h>
23 #include <stdarg.h>
24 #include <string.h>
25
26 MODULE_VERSION("");
27
28 /* It's possible that we might want to do a very long query, longer than the
29 * IRC-oriented SSTRING_MAX value. One option would be to increase
30 * SSTRING_MAX, but the whole purpose of sstring's is to efficiently deal
31 * with situations where the malloc() padding overhead is large compared to
32 * string length and strings are frequently recycled. Since neither of
33 * these are necessarily true for longer strings it makes more sense to use
34 * malloc() for them.
35 *
36 * So, query always points at the query string. If it fitted in a sstring,
37 * query_ss will point at the sstring for freeing purposes. If query_ss is
38 * NULL then it was malloc'd so should be free()'d directly.
39 */
40 typedef struct pqasyncquery_s {
41 sstring *query_ss;
42 char *query;
43 void *tag;
44 PQQueryHandler handler;
45 int flags;
46 PQModuleIdentifier identifier;
47 struct pqasyncquery_s *next;
48 } pqasyncquery_s;
49
50 typedef struct pqtableloaderinfo_s
51 {
52 sstring *tablename;
53 PQQueryHandler init, data, fini;
54 void *tag;
55 } pqtableloaderinfo_s;
56
57 pqasyncquery_s *queryhead = NULL, *querytail = NULL;
58
59 static int dbconnected = 0;
60 static PQModuleIdentifier moduleid = 0;
61 static PGconn *dbconn;
62
63 void dbhandler(int fd, short revents);
64 void pqstartloadtable(PGconn *dbconn, void *arg);
65 void dbstatus(int hooknum, void *arg);
66 void disconnectdb(void);
67 void connectdb(void);
68 char* pqlasterror(PGconn * pgconn);
69
70 void _init(void) {
71 connectdb();
72 }
73
74 void _fini(void) {
75 disconnectdb();
76
77 nscheckfreeall(POOL_PQSQL);
78 }
79
80 PQModuleIdentifier pqgetid(void) {
81 moduleid++;
82 if(moduleid < 10)
83 moduleid = 10;
84
85 return moduleid;
86 }
87
88 void pqfreeid(PQModuleIdentifier identifier) {
89 pqasyncquery_s *q, *p;
90
91 if(identifier == 0 || !queryhead)
92 return;
93
94 if(queryhead->identifier == identifier) {
95 (queryhead->handler)(NULL, queryhead->tag);
96 queryhead->identifier = QH_ALREADYFIRED;
97 }
98
99 for(p=queryhead,q=queryhead->next;q;) {
100 if(q->identifier == identifier) {
101 (q->handler)(NULL, q->tag);
102 p->next = q->next;
103
104 if (q->query_ss) {
105 freesstring(q->query_ss);
106 } else {
107 nsfree(POOL_PQSQL, q->query);
108 }
109 nsfree(POOL_PQSQL, q);
110 q = p->next;
111 } else {
112 p = q;
113 q = q->next;
114 }
115 }
116
117 querytail = p;
118 }
119
120 void connectdb(void) {
121 sstring *dbhost, *dbusername, *dbpassword, *dbdatabase, *dbport;
122 char connectstr[1024];
123
124 if(pqconnected())
125 return;
126
127 /* stolen from chanserv as I'm lazy */
128 dbhost = getcopyconfigitem("pqsql", "host", "UNIX", HOSTLEN);
129 dbusername = getcopyconfigitem("pqsql", "username", "newserv", 20);
130 dbpassword = getcopyconfigitem("pqsql", "password", "moo", 20);
131 dbdatabase = getcopyconfigitem("pqsql", "database", "newserv", 20);
132 dbport = getcopyconfigitem("pqsql", "port", "431", 8);
133
134 if(!dbhost || !dbusername || !dbpassword || !dbdatabase || !dbport) {
135 /* freesstring allows NULL */
136 freesstring(dbhost);
137 freesstring(dbusername);
138 freesstring(dbpassword);
139 freesstring(dbdatabase);
140 freesstring(dbport);
141 return;
142 }
143
144 if (!strcmp(dbhost->content,"UNIX")) {
145 snprintf(connectstr, sizeof(connectstr), "dbname=%s user=%s password=%s", dbdatabase->content, dbusername->content, dbpassword->content);
146 } else {
147 snprintf(connectstr, sizeof(connectstr), "host=%s port=%s dbname=%s user=%s password=%s", dbhost->content, dbport->content, dbdatabase->content, dbusername->content, dbpassword->content);
148 }
149
150 freesstring(dbhost);
151 freesstring(dbusername);
152 freesstring(dbpassword);
153 freesstring(dbdatabase);
154 freesstring(dbport);
155
156 Error("pqsql", ERR_INFO, "Attempting database connection: %s", connectstr);
157
158 /* Blocking connect for now.. */
159 dbconn = PQconnectdb(connectstr);
160
161 if (!dbconn || (PQstatus(dbconn) != CONNECTION_OK)) {
162 Error("pqsql", ERR_ERROR, "Unable to connect to db: %s", pqlasterror(dbconn));
163 return;
164 }
165 Error("pqsql", ERR_INFO, "Connected!");
166
167 dbconnected = 1;
168
169 PQsetnonblocking(dbconn, 1);
170
171 /* this kicks ass, thanks splidge! */
172 registerhandler(PQsocket(dbconn), POLLIN, dbhandler);
173 registerhook(HOOK_CORE_STATSREQUEST, dbstatus);
174 }
175
176 void dbhandler(int fd, short revents) {
177 PGresult *res;
178 pqasyncquery_s *qqp;
179
180 if(revents & POLLIN) {
181 PQconsumeInput(dbconn);
182
183 if(!PQisBusy(dbconn)) { /* query is complete */
184 if(queryhead->handler && queryhead->identifier != QH_ALREADYFIRED)
185 (queryhead->handler)(dbconn, queryhead->tag);
186
187 while((res = PQgetResult(dbconn))) {
188 if(queryhead->identifier != QH_ALREADYFIRED) {
189 switch(PQresultStatus(res)) {
190 case PGRES_TUPLES_OK:
191 if(!(queryhead->flags & DB_CALL))
192 Error("pqsql", ERR_WARNING, "Unhandled tuples output (query: %s)", queryhead->query);
193 break;
194
195 case PGRES_NONFATAL_ERROR:
196 case PGRES_FATAL_ERROR:
197 /* if a create query returns an error assume it went ok, paul will winge about this */
198 if(!(queryhead->flags & DB_CREATE))
199 Error("pqsql", ERR_WARNING, "Unhandled error response (query: %s): %s", queryhead->query, PQresultErrorMessage(res));
200 break;
201
202 default:
203 break;
204 }
205 }
206
207 PQclear(res);
208 }
209
210 /* Free the query and advance */
211 qqp = queryhead;
212 if(queryhead == querytail)
213 querytail = NULL;
214
215 queryhead = queryhead->next;
216
217 if (qqp->query_ss) {
218 freesstring(qqp->query_ss);
219 qqp->query_ss=NULL;
220 qqp->query=NULL;
221 } else if (qqp->query) {
222 nsfree(POOL_PQSQL, qqp->query);
223 qqp->query=NULL;
224 }
225 nsfree(POOL_PQSQL, qqp);
226
227 if(queryhead) { /* Submit the next query */
228 PQsendQuery(dbconn, queryhead->query);
229 PQflush(dbconn);
230 }
231 }
232 }
233 }
234
235 /* sorry Q9 */
236 void pqasyncqueryf(int identifier, PQQueryHandler handler, void *tag, int flags, char *format, ...) {
237 char querybuf[8192];
238 int len;
239 pqasyncquery_s *qp;
240 va_list va;
241
242 if(!pqconnected())
243 return;
244
245 va_start(va, format);
246 len = vsnprintf(querybuf, sizeof(querybuf), format, va);
247 va_end(va);
248
249 /* PPA: no check here... */
250 qp = (pqasyncquery_s *)nsmalloc(POOL_PQSQL, sizeof(pqasyncquery_s));
251
252 if(!qp)
253 Error("pqsql",ERR_STOP,"malloc() failed in pqsql.c");
254
255 /* Use sstring or allocate (see above rant) */
256 if (len > SSTRING_MAX) {
257 qp->query = (char *)nsmalloc(POOL_PQSQL, len+1);
258 strcpy(qp->query,querybuf);
259 qp->query_ss=NULL;
260 } else {
261 qp->query_ss = getsstring(querybuf, len);
262 qp->query = qp->query_ss->content;
263 }
264 qp->tag = tag;
265 qp->handler = handler;
266 qp->next = NULL; /* shove them at the end */
267 qp->flags = flags;
268 qp->identifier = identifier;
269
270 if(querytail) {
271 querytail->next = qp;
272 querytail = qp;
273 } else {
274 querytail = queryhead = qp;
275 PQsendQuery(dbconn, qp->query);
276 PQflush(dbconn);
277 }
278 }
279
280 void pqloadtable(char *tablename, PQQueryHandler init, PQQueryHandler data, PQQueryHandler fini, void *tag)
281 {
282 pqtableloaderinfo_s *tli;
283
284 tli=(pqtableloaderinfo_s *)nsmalloc(POOL_PQSQL, sizeof(pqtableloaderinfo_s));
285 tli->tablename=getsstring(tablename, 100);
286 tli->init=init;
287 tli->data=data;
288 tli->fini=fini;
289 tli->tag=tag;
290 pqasyncquery(pqstartloadtable, tli, "SELECT COUNT(*) FROM %s", tli->tablename->content);
291 }
292
293 void pqstartloadtable(PGconn *dbconn, void *arg)
294 {
295 PGresult *res;
296 unsigned long i, count, tablecrc;
297 pqtableloaderinfo_s *tli = arg;
298
299 res = PQgetResult(dbconn);
300
301 if (PQresultStatus(res) != PGRES_TUPLES_OK && PQresultStatus(res) != PGRES_COMMAND_OK) {
302 Error("pqsql", ERR_ERROR, "Error getting row count for %s.", tli->tablename->content);
303 return;
304 }
305
306 if (PQnfields(res) != 1) {
307 Error("pqsql", ERR_ERROR, "Count query format error for %s.", tli->tablename->content);
308 return;
309 }
310
311 tablecrc=irc_crc32(tli->tablename->content);
312 count=strtoul(PQgetvalue(res, 0, 0), NULL, 10);
313 PQclear(res);
314
315 Error("pqsql", ERR_INFO, "Found %lu entries in table %s, scheduling load.", count, tli->tablename->content);
316
317 pqasyncquery(tli->init, tli->tag, "BEGIN");
318 pqasyncquery(NULL, NULL, "DECLARE table%lx%lx CURSOR FOR SELECT * FROM %s", tablecrc, count, tli->tablename->content);
319
320 for (i=0;(count - i) > 1000; i+=1000)
321 pqasyncquery(tli->data, tli->tag, "FETCH 1000 FROM table%lx%lx", tablecrc, count);
322
323 pqasyncquery(tli->data, tli->tag, "FETCH ALL FROM table%lx%lx", tablecrc, count);
324
325 pqasyncquery(NULL, NULL, "CLOSE table%lx%lx", tablecrc, count);
326 pqasyncquery(tli->fini, tli->tag, "COMMIT");
327
328 freesstring(tli->tablename);
329 nsfree(POOL_PQSQL, tli);
330 }
331
332 void disconnectdb(void) {
333 pqasyncquery_s *qqp = queryhead, *nqqp;
334
335 if(!pqconnected())
336 return;
337
338 /* do this first else we may get conflicts */
339 deregisterhandler(PQsocket(dbconn), 0);
340
341 /* Throw all the queued queries away, beware of data malloc()ed inside the query item.. */
342 while(qqp) {
343 nqqp = qqp->next;
344 if (qqp->query_ss) {
345 freesstring(qqp->query_ss);
346 qqp->query_ss=NULL;
347 qqp->query=NULL;
348 } else if (qqp->query) {
349 nsfree(POOL_PQSQL, qqp->query);
350 qqp->query=NULL;
351 }
352 nsfree(POOL_PQSQL, qqp);
353 qqp = nqqp;
354 }
355
356 deregisterhook(HOOK_CORE_STATSREQUEST, dbstatus);
357 PQfinish(dbconn);
358 dbconn = NULL; /* hmm? */
359
360 dbconnected = 0;
361 }
362
363 /* more stolen code from Q9 */
364 void dbstatus(int hooknum, void *arg) {
365 if ((long)arg > 10) {
366 int i = 0;
367 pqasyncquery_s *qqp;
368 char message[100];
369
370 if(queryhead)
371 for(qqp=queryhead;qqp;qqp=qqp->next)
372 i++;
373
374 snprintf(message, sizeof(message), "PQSQL : %6d queries queued.",i);
375
376 triggerhook(HOOK_CORE_STATSREPLY, message);
377 }
378 }
379
380 int pqconnected(void) {
381 return dbconnected;
382 }
383
384 char* pqlasterror(PGconn * pgconn) {
385 static char errormsg[PQ_ERRORMSG_LENGTH + 1];
386 int i;
387 if(!pgconn)
388 return "PGCONN NULL";
389 strlcpy(errormsg, PQerrorMessage(pgconn), PQ_ERRORMSG_LENGTH);
390 for(i=0;i<errormsg[i];i++) {
391 if((errormsg[i] == '\r') || (errormsg[i] == '\n'))
392 errormsg[i] = ' ';
393
394 }
395 return errormsg;
396 }
397
398 PQResult *pqgetresult(PGconn *c) {
399 PQResult *r;
400 if(!c)
401 return NULL;
402
403 r = (PQResult *)nsmalloc(POOL_PQSQL, sizeof(PQResult));
404 r->row = -1;
405 r->result = PQgetResult(c);
406 r->rows = PQntuples(r->result);
407
408 return r;
409 }
410
411 int pqfetchrow(PQResult *res) {
412 if(res->row + 1 == res->rows)
413 return 0;
414
415 res->row++;
416
417 return 1;
418 }
419
420 char *pqgetvalue(PQResult *res, int column) {
421 return PQgetvalue(res->result, res->row, column);
422 }
423
424 void pqclear(PQResult *res) {
425 if(!res)
426 return;
427
428 if(res->result)
429 PQclear(res->result);
430
431 nsfree(POOL_PQSQL, res);
432 }