]> jfr.im git - irc/quakenet/newserv.git/blob - pqsql/pqsql.c
46a547ba2a343fa899e766504a37da677a1bb822
[irc/quakenet/newserv.git] / pqsql / pqsql.c
1 /*
2 * PQSQL module
3 *
4 * 99% of the handling is stolen from Q9.
5 */
6
7 #include "../core/config.h"
8 #include "../core/error.h"
9 #include "../irc/irc_config.h"
10 #include "../core/events.h"
11 #include "../core/hooks.h"
12 #include "../core/nsmalloc.h"
13 #include "../lib/irc_string.h"
14 #include "../lib/version.h"
15 #include "../lib/strlfunc.h"
16 #include "pqsql.h"
17
18 #define BUILDING_DBAPI
19 #include "../dbapi/dbapi.h"
20
21 #include <stdlib.h>
22 #include <sys/poll.h>
23 #include <stdarg.h>
24 #include <string.h>
25
26 MODULE_VERSION("");
27
28 /* It's possible that we might want to do a very long query, longer than the
29 * IRC-oriented SSTRING_MAX value. One option would be to increase
30 * SSTRING_MAX, but the whole purpose of sstring's is to efficiently deal
31 * with situations where the malloc() padding overhead is large compared to
32 * string length and strings are frequently recycled. Since neither of
33 * these are necessarily true for longer strings it makes more sense to use
34 * malloc() for them.
35 *
36 * So, query always points at the query string. If it fitted in a sstring,
37 * query_ss will point at the sstring for freeing purposes. If query_ss is
38 * NULL then it was malloc'd so should be free()'d directly.
39 */
40 typedef struct pqasyncquery_s {
41 sstring *query_ss;
42 char *query;
43 void *tag;
44 PQQueryHandler handler;
45 int flags;
46 PQModuleIdentifier identifier;
47 struct pqasyncquery_s *next;
48 } pqasyncquery_s;
49
50 typedef struct pqtableloaderinfo_s
51 {
52 sstring *tablename;
53 PQQueryHandler init, data, fini;
54 void *tag;
55 } pqtableloaderinfo_s;
56
57 pqasyncquery_s *queryhead = NULL, *querytail = NULL;
58
59 static int dbconnected = 0;
60 static PQModuleIdentifier moduleid = 0;
61 static PGconn *dbconn;
62
63 void dbhandler(int fd, short revents);
64 void pqstartloadtable(PGconn *dbconn, void *arg);
65 void dbstatus(int hooknum, void *arg);
66 void disconnectdb(void);
67 void connectdb(void);
68 char* pqlasterror(PGconn * pgconn);
69
70 void _init(void) {
71 connectdb();
72 }
73
74 void _fini(void) {
75 disconnectdb();
76
77 nscheckfreeall(POOL_PQSQL);
78 }
79
80 PQModuleIdentifier pqgetid(void) {
81 moduleid++;
82 if(moduleid < 10)
83 moduleid = 10;
84
85 return moduleid;
86 }
87
88 void pqfreeid(PQModuleIdentifier identifier) {
89 pqasyncquery_s *q, *p;
90
91 if(identifier == 0 || !queryhead)
92 return;
93
94 if(queryhead->identifier == identifier) {
95 (queryhead->handler)(NULL, queryhead->tag);
96 queryhead->identifier = QH_ALREADYFIRED;
97 }
98
99 for(p=queryhead,q=queryhead->next;q;) {
100 if(q->identifier == identifier) {
101 (q->handler)(NULL, q->tag);
102 p->next = q->next;
103
104 if (q->query_ss) {
105 freesstring(q->query_ss);
106 } else {
107 nsfree(POOL_PQSQL, q->query);
108 }
109 nsfree(POOL_PQSQL, q);
110 q = p->next;
111 } else {
112 p = q;
113 q = q->next;
114 }
115 }
116
117 querytail = p;
118 }
119
120 void connectdb(void) {
121 sstring *dbhost, *dbusername, *dbpassword, *dbdatabase, *dbport;
122 char connectstr[1024];
123
124 if(pqconnected())
125 return;
126
127 /* stolen from chanserv as I'm lazy */
128 dbhost = getcopyconfigitem("pqsql", "host", "UNIX", HOSTLEN);
129 dbusername = getcopyconfigitem("pqsql", "username", "newserv", 20);
130 dbpassword = getcopyconfigitem("pqsql", "password", "moo", 20);
131 dbdatabase = getcopyconfigitem("pqsql", "database", "newserv", 20);
132 dbport = getcopyconfigitem("pqsql", "port", "431", 8);
133
134 if(!dbhost || !dbusername || !dbpassword || !dbdatabase || !dbport) {
135 /* freesstring allows NULL */
136 freesstring(dbhost);
137 freesstring(dbusername);
138 freesstring(dbpassword);
139 freesstring(dbdatabase);
140 freesstring(dbport);
141 return;
142 }
143
144 if (!strcmp(dbhost->content,"UNIX")) {
145 snprintf(connectstr, sizeof(connectstr), "dbname=%s user=%s password=%s", dbdatabase->content, dbusername->content, dbpassword->content);
146 } else {
147 snprintf(connectstr, sizeof(connectstr), "host=%s port=%s dbname=%s user=%s password=%s", dbhost->content, dbport->content, dbdatabase->content, dbusername->content, dbpassword->content);
148 }
149
150 freesstring(dbhost);
151 freesstring(dbusername);
152 freesstring(dbpassword);
153 freesstring(dbdatabase);
154 freesstring(dbport);
155
156 Error("pqsql", ERR_INFO, "Attempting database connection: %s", connectstr);
157
158 /* Blocking connect for now.. */
159 dbconn = PQconnectdb(connectstr);
160
161 if (!dbconn || (PQstatus(dbconn) != CONNECTION_OK)) {
162 Error("pqsql", ERR_ERROR, "Unable to connect to db: %s", pqlasterror(dbconn));
163 return;
164 }
165 Error("pqsql", ERR_INFO, "Connected!");
166
167 dbconnected = 1;
168
169 PQsetnonblocking(dbconn, 1);
170
171 /* this kicks ass, thanks splidge! */
172 registerhandler(PQsocket(dbconn), POLLIN, dbhandler);
173 registerhook(HOOK_CORE_STATSREQUEST, dbstatus);
174 }
175
176 void dbhandler(int fd, short revents) {
177 PGresult *res;
178 pqasyncquery_s *qqp;
179
180 if(revents & POLLIN) {
181 PQconsumeInput(dbconn);
182
183 if(!PQisBusy(dbconn)) { /* query is complete */
184 if(queryhead->handler && queryhead->identifier != QH_ALREADYFIRED)
185 (queryhead->handler)(dbconn, queryhead->tag);
186
187 while((res = PQgetResult(dbconn))) {
188 if(queryhead->identifier != QH_ALREADYFIRED) {
189 switch(PQresultStatus(res)) {
190 case PGRES_TUPLES_OK:
191 Error("pqsql", ERR_WARNING, "Unhandled tuples output (query: %s)", queryhead->query);
192 break;
193
194 case PGRES_NONFATAL_ERROR:
195 case PGRES_FATAL_ERROR:
196 /* if a create query returns an error assume it went ok, paul will winge about this */
197 if(!(queryhead->flags & DB_CREATE))
198 Error("pqsql", ERR_WARNING, "Unhandled error response (query: %s)", queryhead->query);
199 break;
200
201 default:
202 break;
203 }
204 }
205
206 PQclear(res);
207 }
208
209 /* Free the query and advance */
210 qqp = queryhead;
211 if(queryhead == querytail)
212 querytail = NULL;
213
214 queryhead = queryhead->next;
215
216 if (qqp->query_ss) {
217 freesstring(qqp->query_ss);
218 qqp->query_ss=NULL;
219 qqp->query=NULL;
220 } else if (qqp->query) {
221 nsfree(POOL_PQSQL, qqp->query);
222 qqp->query=NULL;
223 }
224 nsfree(POOL_PQSQL, qqp);
225
226 if(queryhead) { /* Submit the next query */
227 PQsendQuery(dbconn, queryhead->query);
228 PQflush(dbconn);
229 }
230 }
231 }
232 }
233
234 /* sorry Q9 */
235 void pqasyncqueryf(int identifier, PQQueryHandler handler, void *tag, int flags, char *format, ...) {
236 char querybuf[8192];
237 int len;
238 pqasyncquery_s *qp;
239 va_list va;
240
241 if(!pqconnected())
242 return;
243
244 va_start(va, format);
245 len = vsnprintf(querybuf, sizeof(querybuf), format, va);
246 va_end(va);
247
248 /* PPA: no check here... */
249 qp = (pqasyncquery_s *)nsmalloc(POOL_PQSQL, sizeof(pqasyncquery_s));
250
251 if(!qp)
252 Error("pqsql",ERR_STOP,"malloc() failed in pqsql.c");
253
254 /* Use sstring or allocate (see above rant) */
255 if (len > SSTRING_MAX) {
256 qp->query = (char *)nsmalloc(POOL_PQSQL, len+1);
257 strcpy(qp->query,querybuf);
258 qp->query_ss=NULL;
259 } else {
260 qp->query_ss = getsstring(querybuf, len);
261 qp->query = qp->query_ss->content;
262 }
263 qp->tag = tag;
264 qp->handler = handler;
265 qp->next = NULL; /* shove them at the end */
266 qp->flags = flags;
267 qp->identifier = identifier;
268
269 if(querytail) {
270 querytail->next = qp;
271 querytail = qp;
272 } else {
273 querytail = queryhead = qp;
274 PQsendQuery(dbconn, qp->query);
275 PQflush(dbconn);
276 }
277 }
278
279 void pqloadtable(char *tablename, PQQueryHandler init, PQQueryHandler data, PQQueryHandler fini, void *tag)
280 {
281 pqtableloaderinfo_s *tli;
282
283 tli=(pqtableloaderinfo_s *)nsmalloc(POOL_PQSQL, sizeof(pqtableloaderinfo_s));
284 tli->tablename=getsstring(tablename, 100);
285 tli->init=init;
286 tli->data=data;
287 tli->fini=fini;
288 tli->tag=tag;
289 pqasyncquery(pqstartloadtable, tli, "SELECT COUNT(*) FROM %s", tli->tablename->content);
290 }
291
292 void pqstartloadtable(PGconn *dbconn, void *arg)
293 {
294 PGresult *res;
295 unsigned long i, count, tablecrc;
296 pqtableloaderinfo_s *tli = arg;
297
298 res = PQgetResult(dbconn);
299
300 if (PQresultStatus(res) != PGRES_TUPLES_OK && PQresultStatus(res) != PGRES_COMMAND_OK) {
301 Error("pqsql", ERR_ERROR, "Error getting row count for %s.", tli->tablename->content);
302 return;
303 }
304
305 if (PQnfields(res) != 1) {
306 Error("pqsql", ERR_ERROR, "Count query format error for %s.", tli->tablename->content);
307 return;
308 }
309
310 tablecrc=crc32(tli->tablename->content);
311 count=strtoul(PQgetvalue(res, 0, 0), NULL, 10);
312 PQclear(res);
313
314 Error("pqsql", ERR_INFO, "Found %lu entries in table %s, scheduling load.", count, tli->tablename->content);
315
316 pqasyncquery(tli->init, tli->tag, "BEGIN");
317 pqasyncquery(NULL, NULL, "DECLARE table%lx%lx CURSOR FOR SELECT * FROM %s", tablecrc, count, tli->tablename->content);
318
319 for (i=0;(count - i) > 1000; i+=1000)
320 pqasyncquery(tli->data, tli->tag, "FETCH 1000 FROM table%lx%lx", tablecrc, count);
321
322 pqasyncquery(tli->data, tli->tag, "FETCH ALL FROM table%lx%lx", tablecrc, count);
323
324 pqasyncquery(NULL, NULL, "CLOSE table%lx%lx", tablecrc, count);
325 pqasyncquery(tli->fini, tli->tag, "COMMIT");
326
327 freesstring(tli->tablename);
328 nsfree(POOL_PQSQL, tli);
329 }
330
331 void disconnectdb(void) {
332 pqasyncquery_s *qqp = queryhead, *nqqp;
333
334 if(!pqconnected())
335 return;
336
337 /* do this first else we may get conflicts */
338 deregisterhandler(PQsocket(dbconn), 0);
339
340 /* Throw all the queued queries away, beware of data malloc()ed inside the query item.. */
341 while(qqp) {
342 nqqp = qqp->next;
343 if (qqp->query_ss) {
344 freesstring(qqp->query_ss);
345 qqp->query_ss=NULL;
346 qqp->query=NULL;
347 } else if (qqp->query) {
348 nsfree(POOL_PQSQL, qqp->query);
349 qqp->query=NULL;
350 }
351 nsfree(POOL_PQSQL, qqp);
352 qqp = nqqp;
353 }
354
355 deregisterhook(HOOK_CORE_STATSREQUEST, dbstatus);
356 PQfinish(dbconn);
357 dbconn = NULL; /* hmm? */
358
359 dbconnected = 0;
360 }
361
362 /* more stolen code from Q9 */
363 void dbstatus(int hooknum, void *arg) {
364 if ((long)arg > 10) {
365 int i = 0;
366 pqasyncquery_s *qqp;
367 char message[100];
368
369 if(queryhead)
370 for(qqp=queryhead;qqp;qqp=qqp->next)
371 i++;
372
373 snprintf(message, sizeof(message), "PQSQL : %6d queries queued.",i);
374
375 triggerhook(HOOK_CORE_STATSREPLY, message);
376 }
377 }
378
379 int pqconnected(void) {
380 return dbconnected;
381 }
382
383 char* pqlasterror(PGconn * pgconn) {
384 static char errormsg[PQ_ERRORMSG_LENGTH + 1];
385 int i;
386 if(!pgconn)
387 return "PGCONN NULL";
388 strlcpy(errormsg, PQerrorMessage(pgconn), PQ_ERRORMSG_LENGTH);
389 for(i=0;i<errormsg[i];i++) {
390 if((errormsg[i] == '\r') || (errormsg[i] == '\n'))
391 errormsg[i] = ' ';
392
393 }
394 return errormsg;
395 }
396
397 PQResult *pqgetresult(PGconn *c) {
398 PQResult *r;
399 if(!c)
400 return NULL;
401
402 r = (PQResult *)nsmalloc(POOL_PQSQL, sizeof(PQResult));
403 r->row = -1;
404 r->result = PQgetResult(c);
405 r->rows = PQntuples(r->result);
406
407 return r;
408 }
409
410 int pqfetchrow(PQResult *res) {
411 if(res->row + 1 == res->rows)
412 return 0;
413
414 res->row++;
415
416 return 1;
417 }
418
419 char *pqgetvalue(PQResult *res, int column) {
420 return PQgetvalue(res->result, res->row, column);
421 }
422
423 void pqclear(PQResult *res) {
424 if(!res)
425 return;
426
427 if(res->result)
428 PQclear(res->result);
429
430 nsfree(POOL_PQSQL, res);
431 }