]> jfr.im git - irc/quakenet/newserv.git/blob - pqsql/pqsql.c
allow forcing of settime commands (for devs only)
[irc/quakenet/newserv.git] / pqsql / pqsql.c
1 /*
2 * PQSQL module
3 *
4 * 99% of the handling is stolen from Q9.
5 */
6
7 #include "../core/config.h"
8 #include "../core/error.h"
9 #include "../irc/irc_config.h"
10 #include "../core/events.h"
11 #include "../core/hooks.h"
12 #include "../core/nsmalloc.h"
13 #include "../lib/irc_string.h"
14 #include "../lib/version.h"
15 #include "../lib/strlfunc.h"
16 #include "pqsql.h"
17
18 #define BUILDING_DBAPI
19 #include "../dbapi/dbapi.h"
20
21 #include <stdlib.h>
22 #include <sys/poll.h>
23 #include <stdarg.h>
24 #include <string.h>
25
26 MODULE_VERSION("");
27
28 /* It's possible that we might want to do a very long query, longer than the
29 * IRC-oriented SSTRING_MAX value. One option would be to increase
30 * SSTRING_MAX, but the whole purpose of sstring's is to efficiently deal
31 * with situations where the malloc() padding overhead is large compared to
32 * string length and strings are frequently recycled. Since neither of
33 * these are necessarily true for longer strings it makes more sense to use
34 * malloc() for them.
35 *
36 * So, query always points at the query string. If it fitted in a sstring,
37 * query_ss will point at the sstring for freeing purposes. If query_ss is
38 * NULL then it was malloc'd so should be free()'d directly.
39 */
40 typedef struct pqasyncquery_s {
41 sstring *query_ss;
42 char *query;
43 void *tag;
44 PQQueryHandler handler;
45 int flags;
46 PQModuleIdentifier identifier;
47 struct pqasyncquery_s *next;
48 } pqasyncquery_s;
49
50 typedef struct pqtableloaderinfo_s
51 {
52 sstring *tablename;
53 PQQueryHandler init, data, fini;
54 } pqtableloaderinfo_s;
55
56 pqasyncquery_s *queryhead = NULL, *querytail = NULL;
57
58 static int dbconnected = 0;
59 static PQModuleIdentifier moduleid = 0;
60 static PGconn *dbconn;
61
62 void dbhandler(int fd, short revents);
63 void pqstartloadtable(PGconn *dbconn, void *arg);
64 void dbstatus(int hooknum, void *arg);
65 void disconnectdb(void);
66 void connectdb(void);
67 char* pqlasterror(PGconn * pgconn);
68
69 void _init(void) {
70 connectdb();
71 }
72
73 void _fini(void) {
74 disconnectdb();
75
76 nscheckfreeall(POOL_PQSQL);
77 }
78
79 PQModuleIdentifier pqgetid(void) {
80 moduleid++;
81 if(moduleid < 10)
82 moduleid = 10;
83
84 return moduleid;
85 }
86
87 void pqfreeid(PQModuleIdentifier identifier) {
88 pqasyncquery_s *q, *p;
89
90 if(identifier == 0 || !queryhead)
91 return;
92
93 if(queryhead->identifier == identifier) {
94 (queryhead->handler)(NULL, queryhead->tag);
95 queryhead->identifier = QH_ALREADYFIRED;
96 }
97
98 for(p=queryhead,q=queryhead->next;q;) {
99 if(q->identifier == identifier) {
100 (q->handler)(NULL, q->tag);
101 p->next = q->next;
102
103 if (q->query_ss) {
104 freesstring(q->query_ss);
105 } else {
106 nsfree(POOL_PQSQL, q->query);
107 }
108 nsfree(POOL_PQSQL, q);
109 q = p->next;
110 } else {
111 p = q;
112 q = q->next;
113 }
114 }
115
116 querytail = p;
117 }
118
119 void connectdb(void) {
120 sstring *dbhost, *dbusername, *dbpassword, *dbdatabase, *dbport;
121 char connectstr[1024];
122
123 if(pqconnected())
124 return;
125
126 /* stolen from chanserv as I'm lazy */
127 dbhost = getcopyconfigitem("pqsql", "host", "UNIX", HOSTLEN);
128 dbusername = getcopyconfigitem("pqsql", "username", "newserv", 20);
129 dbpassword = getcopyconfigitem("pqsql", "password", "moo", 20);
130 dbdatabase = getcopyconfigitem("pqsql", "database", "newserv", 20);
131 dbport = getcopyconfigitem("pqsql", "port", "431", 8);
132
133 if(!dbhost || !dbusername || !dbpassword || !dbdatabase || !dbport) {
134 /* freesstring allows NULL */
135 freesstring(dbhost);
136 freesstring(dbusername);
137 freesstring(dbpassword);
138 freesstring(dbdatabase);
139 freesstring(dbport);
140 return;
141 }
142
143 if (!strcmp(dbhost->content,"UNIX")) {
144 snprintf(connectstr, sizeof(connectstr), "dbname=%s user=%s password=%s", dbdatabase->content, dbusername->content, dbpassword->content);
145 } else {
146 snprintf(connectstr, sizeof(connectstr), "host=%s port=%s dbname=%s user=%s password=%s", dbhost->content, dbport->content, dbdatabase->content, dbusername->content, dbpassword->content);
147 }
148
149 freesstring(dbhost);
150 freesstring(dbusername);
151 freesstring(dbpassword);
152 freesstring(dbdatabase);
153 freesstring(dbport);
154
155 Error("pqsql", ERR_INFO, "Attempting database connection: %s", connectstr);
156
157 /* Blocking connect for now.. */
158 dbconn = PQconnectdb(connectstr);
159
160 if (!dbconn || (PQstatus(dbconn) != CONNECTION_OK)) {
161 Error("pqsql", ERR_ERROR, "Unable to connect to db: %s", pqlasterror(dbconn));
162 return;
163 }
164 Error("pqsql", ERR_INFO, "Connected!");
165
166 dbconnected = 1;
167
168 PQsetnonblocking(dbconn, 1);
169
170 /* this kicks ass, thanks splidge! */
171 registerhandler(PQsocket(dbconn), POLLIN, dbhandler);
172 registerhook(HOOK_CORE_STATSREQUEST, dbstatus);
173 }
174
175 void dbhandler(int fd, short revents) {
176 PGresult *res;
177 pqasyncquery_s *qqp;
178
179 if(revents & POLLIN) {
180 PQconsumeInput(dbconn);
181
182 if(!PQisBusy(dbconn)) { /* query is complete */
183 if(queryhead->handler && queryhead->identifier != QH_ALREADYFIRED)
184 (queryhead->handler)(dbconn, queryhead->tag);
185
186 while((res = PQgetResult(dbconn))) {
187 if(queryhead->identifier != QH_ALREADYFIRED) {
188 switch(PQresultStatus(res)) {
189 case PGRES_TUPLES_OK:
190 Error("pqsql", ERR_WARNING, "Unhandled tuples output (query: %s)", queryhead->query);
191 break;
192
193 case PGRES_NONFATAL_ERROR:
194 case PGRES_FATAL_ERROR:
195 /* if a create query returns an error assume it went ok, paul will winge about this */
196 if(!(queryhead->flags & DB_CREATE))
197 Error("pqsql", ERR_WARNING, "Unhandled error response (query: %s)", queryhead->query);
198 break;
199
200 default:
201 break;
202 }
203 }
204
205 PQclear(res);
206 }
207
208 /* Free the query and advance */
209 qqp = queryhead;
210 if(queryhead == querytail)
211 querytail = NULL;
212
213 queryhead = queryhead->next;
214
215 if (qqp->query_ss) {
216 freesstring(qqp->query_ss);
217 qqp->query_ss=NULL;
218 qqp->query=NULL;
219 } else if (qqp->query) {
220 nsfree(POOL_PQSQL, qqp->query);
221 qqp->query=NULL;
222 }
223 nsfree(POOL_PQSQL, qqp);
224
225 if(queryhead) { /* Submit the next query */
226 PQsendQuery(dbconn, queryhead->query);
227 PQflush(dbconn);
228 }
229 }
230 }
231 }
232
233 /* sorry Q9 */
234 void pqasyncqueryf(int identifier, PQQueryHandler handler, void *tag, int flags, char *format, ...) {
235 char querybuf[8192];
236 va_list va;
237 int len;
238 pqasyncquery_s *qp;
239
240 if(!pqconnected())
241 return;
242
243 va_start(va, format);
244 len = vsnprintf(querybuf, sizeof(querybuf), format, va);
245 va_end(va);
246
247 /* PPA: no check here... */
248 qp = (pqasyncquery_s *)nsmalloc(POOL_PQSQL, sizeof(pqasyncquery_s));
249
250 if(!qp)
251 Error("pqsql",ERR_STOP,"malloc() failed in pqsql.c");
252
253 /* Use sstring or allocate (see above rant) */
254 if (len > SSTRING_MAX) {
255 qp->query = (char *)nsmalloc(POOL_PQSQL, len+1);
256 strcpy(qp->query,querybuf);
257 qp->query_ss=NULL;
258 } else {
259 qp->query_ss = getsstring(querybuf, len);
260 qp->query = qp->query_ss->content;
261 }
262 qp->tag = tag;
263 qp->handler = handler;
264 qp->next = NULL; /* shove them at the end */
265 qp->flags = flags;
266 qp->identifier = identifier;
267
268 if(querytail) {
269 querytail->next = qp;
270 querytail = qp;
271 } else {
272 querytail = queryhead = qp;
273 PQsendQuery(dbconn, qp->query);
274 PQflush(dbconn);
275 }
276 }
277
278 void pqloadtable(char *tablename, PQQueryHandler init, PQQueryHandler data, PQQueryHandler fini)
279 {
280 pqtableloaderinfo_s *tli;
281
282 tli=(pqtableloaderinfo_s *)nsmalloc(POOL_PQSQL, sizeof(pqtableloaderinfo_s));
283 tli->tablename=getsstring(tablename, 100);
284 tli->init=init;
285 tli->data=data;
286 tli->fini=fini;
287 pqasyncquery(pqstartloadtable, tli, "SELECT COUNT(*) FROM %s", tli->tablename->content);
288 }
289
290 void pqstartloadtable(PGconn *dbconn, void *arg)
291 {
292 PGresult *res;
293 unsigned long i, count, tablecrc;
294 pqtableloaderinfo_s *tli = arg;
295
296 res = PQgetResult(dbconn);
297
298 if (PQresultStatus(res) != PGRES_TUPLES_OK && PQresultStatus(res) != PGRES_COMMAND_OK) {
299 Error("pqsql", ERR_ERROR, "Error getting row count for %s.", tli->tablename->content);
300 return;
301 }
302
303 if (PQnfields(res) != 1) {
304 Error("pqsql", ERR_ERROR, "Count query format error for %s.", tli->tablename->content);
305 return;
306 }
307
308 tablecrc=crc32(tli->tablename->content);
309 count=strtoul(PQgetvalue(res, 0, 0), NULL, 10);
310 PQclear(res);
311
312 Error("pqsql", ERR_INFO, "Found %lu entries in table %s, scheduling load.", count, tli->tablename->content);
313
314 pqasyncquery(tli->init, NULL, "BEGIN");
315 pqasyncquery(NULL, NULL, "DECLARE table%lx%lx CURSOR FOR SELECT * FROM %s", tablecrc, count, tli->tablename->content);
316
317 for (i=0;(count - i) > 1000; i+=1000)
318 pqasyncquery(tli->data, NULL, "FETCH 1000 FROM table%lx%lx", tablecrc, count);
319
320 pqasyncquery(tli->data, NULL, "FETCH ALL FROM table%lx%lx", tablecrc, count);
321
322 pqasyncquery(NULL, NULL, "CLOSE table%lx%lx", tablecrc, count);
323 pqasyncquery(tli->fini, NULL, "COMMIT");
324
325 freesstring(tli->tablename);
326 nsfree(POOL_PQSQL, tli);
327 }
328
329 void disconnectdb(void) {
330 pqasyncquery_s *qqp = queryhead, *nqqp;
331
332 if(!pqconnected())
333 return;
334
335 /* do this first else we may get conflicts */
336 deregisterhandler(PQsocket(dbconn), 0);
337
338 /* Throw all the queued queries away, beware of data malloc()ed inside the query item.. */
339 while(qqp) {
340 nqqp = qqp->next;
341 if (qqp->query_ss) {
342 freesstring(qqp->query_ss);
343 qqp->query_ss=NULL;
344 qqp->query=NULL;
345 } else if (qqp->query) {
346 nsfree(POOL_PQSQL, qqp->query);
347 qqp->query=NULL;
348 }
349 nsfree(POOL_PQSQL, qqp);
350 qqp = nqqp;
351 }
352
353 deregisterhook(HOOK_CORE_STATSREQUEST, dbstatus);
354 PQfinish(dbconn);
355 dbconn = NULL; /* hmm? */
356
357 dbconnected = 0;
358 }
359
360 /* more stolen code from Q9 */
361 void dbstatus(int hooknum, void *arg) {
362 if ((long)arg > 10) {
363 int i = 0;
364 pqasyncquery_s *qqp;
365 char message[100];
366
367 if(queryhead)
368 for(qqp=queryhead;qqp;qqp=qqp->next)
369 i++;
370
371 snprintf(message, sizeof(message), "PQSQL : %6d queries queued.",i);
372
373 triggerhook(HOOK_CORE_STATSREPLY, message);
374 }
375 }
376
377 int pqconnected(void) {
378 return dbconnected;
379 }
380
381 char* pqlasterror(PGconn * pgconn) {
382 static char errormsg[PQ_ERRORMSG_LENGTH + 1];
383 int i;
384 if(!pgconn)
385 return "PGCONN NULL";
386 strlcpy(errormsg, PQerrorMessage(pgconn), PQ_ERRORMSG_LENGTH);
387 for(i=0;i<errormsg[i];i++) {
388 if((errormsg[i] == '\r') || (errormsg[i] == '\n'))
389 errormsg[i] = ' ';
390
391 }
392 return errormsg;
393 }
394
395 PQResult *pqgetresult(PGconn *c) {
396 PQResult *r;
397 if(!c)
398 return NULL;
399
400 r = (PQResult *)nsmalloc(POOL_PQSQL, sizeof(PQResult));
401 r->row = -1;
402 r->result = PQgetResult(c);
403 r->rows = PQntuples(r->result);
404
405 return r;
406 }
407
408 int pqfetchrow(PQResult *res) {
409 if(res->row + 1 == res->rows)
410 return 0;
411
412 res->row++;
413
414 return 1;
415 }
416
417 char *pqgetvalue(PQResult *res, int column) {
418 return PQgetvalue(res->result, res->row, column);
419 }
420
421 void pqclear(PQResult *res) {
422 if(!res)
423 return;
424
425 if(res->result)
426 PQclear(res->result);
427
428 nsfree(POOL_PQSQL, res);
429 }