]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * PQSQL module | |
3 | * | |
4 | * 99% of the handling is stolen from Q9. | |
5 | */ | |
6 | ||
7 | #include "../core/config.h" | |
8 | #include "../core/error.h" | |
9 | #include "../irc/irc_config.h" | |
10 | #include "../core/events.h" | |
11 | #include "../core/hooks.h" | |
12 | #include "../core/nsmalloc.h" | |
13 | #include "../lib/irc_string.h" | |
14 | #include "../lib/version.h" | |
15 | #include "../lib/strlfunc.h" | |
16 | #include "pqsql.h" | |
17 | ||
18 | #define BUILDING_DBAPI | |
19 | #include "../dbapi/dbapi.h" | |
20 | ||
21 | #include <stdlib.h> | |
22 | #include <sys/poll.h> | |
23 | #include <stdarg.h> | |
24 | #include <string.h> | |
25 | ||
26 | MODULE_VERSION(""); | |
27 | ||
28 | /* It's possible that we might want to do a very long query, longer than the | |
29 | * IRC-oriented SSTRING_MAX value. One option would be to increase | |
30 | * SSTRING_MAX, but the whole purpose of sstring's is to efficiently deal | |
31 | * with situations where the malloc() padding overhead is large compared to | |
32 | * string length and strings are frequently recycled. Since neither of | |
33 | * these are necessarily true for longer strings it makes more sense to use | |
34 | * malloc() for them. | |
35 | * | |
36 | * So, query always points at the query string. If it fitted in a sstring, | |
37 | * query_ss will point at the sstring for freeing purposes. If query_ss is | |
38 | * NULL then it was malloc'd so should be free()'d directly. | |
39 | */ | |
40 | typedef struct pqasyncquery_s { | |
41 | sstring *query_ss; | |
42 | char *query; | |
43 | void *tag; | |
44 | PQQueryHandler handler; | |
45 | int flags; | |
46 | PQModuleIdentifier identifier; | |
47 | struct pqasyncquery_s *next; | |
48 | } pqasyncquery_s; | |
49 | ||
50 | typedef struct pqtableloaderinfo_s | |
51 | { | |
52 | sstring *tablename; | |
53 | PQQueryHandler init, data, fini; | |
54 | void *tag; | |
55 | } pqtableloaderinfo_s; | |
56 | ||
57 | pqasyncquery_s *queryhead = NULL, *querytail = NULL; | |
58 | ||
59 | static int dbconnected = 0; | |
60 | static PQModuleIdentifier moduleid = 0; | |
61 | static PGconn *dbconn; | |
62 | ||
63 | void dbhandler(int fd, short revents); | |
64 | void pqstartloadtable(PGconn *dbconn, void *arg); | |
65 | void dbstatus(int hooknum, void *arg); | |
66 | void disconnectdb(void); | |
67 | void connectdb(void); | |
68 | char* pqlasterror(PGconn * pgconn); | |
69 | ||
70 | void _init(void) { | |
71 | connectdb(); | |
72 | } | |
73 | ||
74 | void _fini(void) { | |
75 | disconnectdb(); | |
76 | ||
77 | nscheckfreeall(POOL_PQSQL); | |
78 | } | |
79 | ||
80 | PQModuleIdentifier pqgetid(void) { | |
81 | moduleid++; | |
82 | if(moduleid < 10) | |
83 | moduleid = 10; | |
84 | ||
85 | return moduleid; | |
86 | } | |
87 | ||
88 | void pqfreeid(PQModuleIdentifier identifier) { | |
89 | pqasyncquery_s *q, *p; | |
90 | ||
91 | if(identifier == 0 || !queryhead) | |
92 | return; | |
93 | ||
94 | if(queryhead->identifier == identifier) { | |
95 | (queryhead->handler)(NULL, queryhead->tag); | |
96 | queryhead->identifier = QH_ALREADYFIRED; | |
97 | } | |
98 | ||
99 | for(p=queryhead,q=queryhead->next;q;) { | |
100 | if(q->identifier == identifier) { | |
101 | (q->handler)(NULL, q->tag); | |
102 | p->next = q->next; | |
103 | ||
104 | if (q->query_ss) { | |
105 | freesstring(q->query_ss); | |
106 | } else { | |
107 | nsfree(POOL_PQSQL, q->query); | |
108 | } | |
109 | nsfree(POOL_PQSQL, q); | |
110 | q = p->next; | |
111 | } else { | |
112 | p = q; | |
113 | q = q->next; | |
114 | } | |
115 | } | |
116 | ||
117 | querytail = p; | |
118 | } | |
119 | ||
120 | void connectdb(void) { | |
121 | sstring *dbhost, *dbusername, *dbpassword, *dbdatabase, *dbport; | |
122 | char connectstr[1024]; | |
123 | ||
124 | if(pqconnected()) | |
125 | return; | |
126 | ||
127 | /* stolen from chanserv as I'm lazy */ | |
128 | dbhost = getcopyconfigitem("pqsql", "host", "UNIX", HOSTLEN); | |
129 | dbusername = getcopyconfigitem("pqsql", "username", "newserv", 20); | |
130 | dbpassword = getcopyconfigitem("pqsql", "password", "moo", 20); | |
131 | dbdatabase = getcopyconfigitem("pqsql", "database", "newserv", 20); | |
132 | dbport = getcopyconfigitem("pqsql", "port", "431", 8); | |
133 | ||
134 | if(!dbhost || !dbusername || !dbpassword || !dbdatabase || !dbport) { | |
135 | /* freesstring allows NULL */ | |
136 | freesstring(dbhost); | |
137 | freesstring(dbusername); | |
138 | freesstring(dbpassword); | |
139 | freesstring(dbdatabase); | |
140 | freesstring(dbport); | |
141 | return; | |
142 | } | |
143 | ||
144 | if (!strcmp(dbhost->content,"UNIX")) { | |
145 | snprintf(connectstr, sizeof(connectstr), "dbname=%s user=%s password=%s", dbdatabase->content, dbusername->content, dbpassword->content); | |
146 | } else { | |
147 | snprintf(connectstr, sizeof(connectstr), "host=%s port=%s dbname=%s user=%s password=%s", dbhost->content, dbport->content, dbdatabase->content, dbusername->content, dbpassword->content); | |
148 | } | |
149 | ||
150 | freesstring(dbhost); | |
151 | freesstring(dbusername); | |
152 | freesstring(dbpassword); | |
153 | freesstring(dbdatabase); | |
154 | freesstring(dbport); | |
155 | ||
156 | Error("pqsql", ERR_INFO, "Attempting database connection: %s", connectstr); | |
157 | ||
158 | /* Blocking connect for now.. */ | |
159 | dbconn = PQconnectdb(connectstr); | |
160 | ||
161 | if (!dbconn || (PQstatus(dbconn) != CONNECTION_OK)) { | |
162 | Error("pqsql", ERR_ERROR, "Unable to connect to db: %s", pqlasterror(dbconn)); | |
163 | return; | |
164 | } | |
165 | Error("pqsql", ERR_INFO, "Connected!"); | |
166 | ||
167 | dbconnected = 1; | |
168 | ||
169 | PQsetnonblocking(dbconn, 1); | |
170 | ||
171 | /* this kicks ass, thanks splidge! */ | |
172 | registerhandler(PQsocket(dbconn), POLLIN, dbhandler); | |
173 | registerhook(HOOK_CORE_STATSREQUEST, dbstatus); | |
174 | } | |
175 | ||
176 | void dbhandler(int fd, short revents) { | |
177 | PGresult *res; | |
178 | pqasyncquery_s *qqp; | |
179 | ||
180 | if(revents & POLLIN) { | |
181 | PQconsumeInput(dbconn); | |
182 | ||
183 | if(!PQisBusy(dbconn)) { /* query is complete */ | |
184 | if(queryhead->handler && queryhead->identifier != QH_ALREADYFIRED) | |
185 | (queryhead->handler)(dbconn, queryhead->tag); | |
186 | ||
187 | while((res = PQgetResult(dbconn))) { | |
188 | if(queryhead->identifier != QH_ALREADYFIRED) { | |
189 | switch(PQresultStatus(res)) { | |
190 | case PGRES_TUPLES_OK: | |
191 | if(!(queryhead->flags & DB_CALL)) | |
192 | Error("pqsql", ERR_WARNING, "Unhandled tuples output (query: %s)", queryhead->query); | |
193 | break; | |
194 | ||
195 | case PGRES_NONFATAL_ERROR: | |
196 | case PGRES_FATAL_ERROR: | |
197 | /* if a create query returns an error assume it went ok, paul will winge about this */ | |
198 | if(!(queryhead->flags & DB_CREATE)) | |
199 | Error("pqsql", ERR_WARNING, "Unhandled error response (query: %s): %s", queryhead->query, PQresultErrorMessage(res)); | |
200 | break; | |
201 | ||
202 | default: | |
203 | break; | |
204 | } | |
205 | } | |
206 | ||
207 | PQclear(res); | |
208 | } | |
209 | ||
210 | /* Free the query and advance */ | |
211 | qqp = queryhead; | |
212 | if(queryhead == querytail) | |
213 | querytail = NULL; | |
214 | ||
215 | queryhead = queryhead->next; | |
216 | ||
217 | if (qqp->query_ss) { | |
218 | freesstring(qqp->query_ss); | |
219 | qqp->query_ss=NULL; | |
220 | qqp->query=NULL; | |
221 | } else if (qqp->query) { | |
222 | nsfree(POOL_PQSQL, qqp->query); | |
223 | qqp->query=NULL; | |
224 | } | |
225 | nsfree(POOL_PQSQL, qqp); | |
226 | ||
227 | if(queryhead) { /* Submit the next query */ | |
228 | PQsendQuery(dbconn, queryhead->query); | |
229 | PQflush(dbconn); | |
230 | } | |
231 | } | |
232 | } | |
233 | } | |
234 | ||
235 | /* sorry Q9 */ | |
236 | void pqasyncqueryf(int identifier, PQQueryHandler handler, void *tag, int flags, char *format, ...) { | |
237 | char querybuf[8192]; | |
238 | int len; | |
239 | pqasyncquery_s *qp; | |
240 | va_list va; | |
241 | ||
242 | if(!pqconnected()) | |
243 | return; | |
244 | ||
245 | va_start(va, format); | |
246 | len = vsnprintf(querybuf, sizeof(querybuf), format, va); | |
247 | va_end(va); | |
248 | ||
249 | /* PPA: no check here... */ | |
250 | qp = (pqasyncquery_s *)nsmalloc(POOL_PQSQL, sizeof(pqasyncquery_s)); | |
251 | ||
252 | if(!qp) | |
253 | Error("pqsql",ERR_STOP,"malloc() failed in pqsql.c"); | |
254 | ||
255 | /* Use sstring or allocate (see above rant) */ | |
256 | if (len > SSTRING_MAX) { | |
257 | qp->query = (char *)nsmalloc(POOL_PQSQL, len+1); | |
258 | strcpy(qp->query,querybuf); | |
259 | qp->query_ss=NULL; | |
260 | } else { | |
261 | qp->query_ss = getsstring(querybuf, len); | |
262 | qp->query = qp->query_ss->content; | |
263 | } | |
264 | qp->tag = tag; | |
265 | qp->handler = handler; | |
266 | qp->next = NULL; /* shove them at the end */ | |
267 | qp->flags = flags; | |
268 | qp->identifier = identifier; | |
269 | ||
270 | if(querytail) { | |
271 | querytail->next = qp; | |
272 | querytail = qp; | |
273 | } else { | |
274 | querytail = queryhead = qp; | |
275 | PQsendQuery(dbconn, qp->query); | |
276 | PQflush(dbconn); | |
277 | } | |
278 | } | |
279 | ||
280 | void pqloadtable(char *tablename, PQQueryHandler init, PQQueryHandler data, PQQueryHandler fini, void *tag) | |
281 | { | |
282 | pqtableloaderinfo_s *tli; | |
283 | ||
284 | tli=(pqtableloaderinfo_s *)nsmalloc(POOL_PQSQL, sizeof(pqtableloaderinfo_s)); | |
285 | tli->tablename=getsstring(tablename, 100); | |
286 | tli->init=init; | |
287 | tli->data=data; | |
288 | tli->fini=fini; | |
289 | tli->tag=tag; | |
290 | pqasyncquery(pqstartloadtable, tli, "SELECT COUNT(*) FROM %s", tli->tablename->content); | |
291 | } | |
292 | ||
293 | void pqstartloadtable(PGconn *dbconn, void *arg) | |
294 | { | |
295 | PGresult *res; | |
296 | unsigned long i, count, tablecrc; | |
297 | pqtableloaderinfo_s *tli = arg; | |
298 | ||
299 | res = PQgetResult(dbconn); | |
300 | ||
301 | if (PQresultStatus(res) != PGRES_TUPLES_OK && PQresultStatus(res) != PGRES_COMMAND_OK) { | |
302 | Error("pqsql", ERR_ERROR, "Error getting row count for %s.", tli->tablename->content); | |
303 | return; | |
304 | } | |
305 | ||
306 | if (PQnfields(res) != 1) { | |
307 | Error("pqsql", ERR_ERROR, "Count query format error for %s.", tli->tablename->content); | |
308 | return; | |
309 | } | |
310 | ||
311 | tablecrc=crc32(tli->tablename->content); | |
312 | count=strtoul(PQgetvalue(res, 0, 0), NULL, 10); | |
313 | PQclear(res); | |
314 | ||
315 | Error("pqsql", ERR_INFO, "Found %lu entries in table %s, scheduling load.", count, tli->tablename->content); | |
316 | ||
317 | pqasyncquery(tli->init, tli->tag, "BEGIN"); | |
318 | pqasyncquery(NULL, NULL, "DECLARE table%lx%lx CURSOR FOR SELECT * FROM %s", tablecrc, count, tli->tablename->content); | |
319 | ||
320 | for (i=0;(count - i) > 1000; i+=1000) | |
321 | pqasyncquery(tli->data, tli->tag, "FETCH 1000 FROM table%lx%lx", tablecrc, count); | |
322 | ||
323 | pqasyncquery(tli->data, tli->tag, "FETCH ALL FROM table%lx%lx", tablecrc, count); | |
324 | ||
325 | pqasyncquery(NULL, NULL, "CLOSE table%lx%lx", tablecrc, count); | |
326 | pqasyncquery(tli->fini, tli->tag, "COMMIT"); | |
327 | ||
328 | freesstring(tli->tablename); | |
329 | nsfree(POOL_PQSQL, tli); | |
330 | } | |
331 | ||
332 | void disconnectdb(void) { | |
333 | pqasyncquery_s *qqp = queryhead, *nqqp; | |
334 | ||
335 | if(!pqconnected()) | |
336 | return; | |
337 | ||
338 | /* do this first else we may get conflicts */ | |
339 | deregisterhandler(PQsocket(dbconn), 0); | |
340 | ||
341 | /* Throw all the queued queries away, beware of data malloc()ed inside the query item.. */ | |
342 | while(qqp) { | |
343 | nqqp = qqp->next; | |
344 | if (qqp->query_ss) { | |
345 | freesstring(qqp->query_ss); | |
346 | qqp->query_ss=NULL; | |
347 | qqp->query=NULL; | |
348 | } else if (qqp->query) { | |
349 | nsfree(POOL_PQSQL, qqp->query); | |
350 | qqp->query=NULL; | |
351 | } | |
352 | nsfree(POOL_PQSQL, qqp); | |
353 | qqp = nqqp; | |
354 | } | |
355 | ||
356 | deregisterhook(HOOK_CORE_STATSREQUEST, dbstatus); | |
357 | PQfinish(dbconn); | |
358 | dbconn = NULL; /* hmm? */ | |
359 | ||
360 | dbconnected = 0; | |
361 | } | |
362 | ||
363 | /* more stolen code from Q9 */ | |
364 | void dbstatus(int hooknum, void *arg) { | |
365 | if ((long)arg > 10) { | |
366 | int i = 0; | |
367 | pqasyncquery_s *qqp; | |
368 | char message[100]; | |
369 | ||
370 | if(queryhead) | |
371 | for(qqp=queryhead;qqp;qqp=qqp->next) | |
372 | i++; | |
373 | ||
374 | snprintf(message, sizeof(message), "PQSQL : %6d queries queued.",i); | |
375 | ||
376 | triggerhook(HOOK_CORE_STATSREPLY, message); | |
377 | } | |
378 | } | |
379 | ||
380 | int pqconnected(void) { | |
381 | return dbconnected; | |
382 | } | |
383 | ||
384 | char* pqlasterror(PGconn * pgconn) { | |
385 | static char errormsg[PQ_ERRORMSG_LENGTH + 1]; | |
386 | int i; | |
387 | if(!pgconn) | |
388 | return "PGCONN NULL"; | |
389 | strlcpy(errormsg, PQerrorMessage(pgconn), PQ_ERRORMSG_LENGTH); | |
390 | for(i=0;i<errormsg[i];i++) { | |
391 | if((errormsg[i] == '\r') || (errormsg[i] == '\n')) | |
392 | errormsg[i] = ' '; | |
393 | ||
394 | } | |
395 | return errormsg; | |
396 | } | |
397 | ||
398 | PQResult *pqgetresult(PGconn *c) { | |
399 | PQResult *r; | |
400 | if(!c) | |
401 | return NULL; | |
402 | ||
403 | r = (PQResult *)nsmalloc(POOL_PQSQL, sizeof(PQResult)); | |
404 | r->row = -1; | |
405 | r->result = PQgetResult(c); | |
406 | r->rows = PQntuples(r->result); | |
407 | ||
408 | return r; | |
409 | } | |
410 | ||
411 | int pqfetchrow(PQResult *res) { | |
412 | if(res->row + 1 == res->rows) | |
413 | return 0; | |
414 | ||
415 | res->row++; | |
416 | ||
417 | return 1; | |
418 | } | |
419 | ||
420 | char *pqgetvalue(PQResult *res, int column) { | |
421 | return PQgetvalue(res->result, res->row, column); | |
422 | } | |
423 | ||
424 | void pqclear(PQResult *res) { | |
425 | if(!res) | |
426 | return; | |
427 | ||
428 | if(res->result) | |
429 | PQclear(res->result); | |
430 | ||
431 | nsfree(POOL_PQSQL, res); | |
432 | } |