]> jfr.im git - irc/quakenet/newserv.git/blob - pqsql/pqsql.c
687cdee9159c12589563668e0755cfb1fd4e1c46
[irc/quakenet/newserv.git] / pqsql / pqsql.c
1 /*
2 * PQSQL module
3 *
4 * 99% of the handling is stolen from Q9.
5 */
6
7 #include "../core/config.h"
8 #include "../core/error.h"
9 #include "../irc/irc_config.h"
10 #include "../core/events.h"
11 #include "../core/hooks.h"
12 #include "../lib/irc_string.h"
13 #include "../lib/version.h"
14 #include "../lib/strlfunc.h"
15 #include "pqsql.h"
16
17 #include <stdlib.h>
18 #include <sys/poll.h>
19 #include <stdarg.h>
20 #include <string.h>
21
22 MODULE_VERSION("");
23
24 /* It's possible that we might want to do a very long query, longer than the
25 * IRC-oriented SSTRING_MAX value. One option would be to increase
26 * SSTRING_MAX, but the whole purpose of sstring's is to efficiently deal
27 * with situations where the malloc() padding overhead is large compared to
28 * string length and strings are frequently recycled. Since neither of
29 * these are necessarily true for longer strings it makes more sense to use
30 * malloc() for them.
31 *
32 * So, query always points at the query string. If it fitted in a sstring,
33 * query_ss will point at the sstring for freeing purposes. If query_ss is
34 * NULL then it was malloc'd so should be free()'d directly.
35 */
36 typedef struct pqasyncquery_s {
37 sstring *query_ss;
38 char *query;
39 void *tag;
40 PQQueryHandler handler;
41 int flags;
42 struct pqasyncquery_s *next;
43 } pqasyncquery_s;
44
45 typedef struct pqtableloaderinfo_s
46 {
47 sstring *tablename;
48 PQQueryHandler init, data, fini;
49 } pqtableloaderinfo_s;
50
51 pqasyncquery_s *queryhead = NULL, *querytail = NULL;
52
53 int dbconnected = 0;
54 PGconn *dbconn;
55
56 void dbhandler(int fd, short revents);
57 void pqstartloadtable(PGconn *dbconn, void *arg);
58 void dbstatus(int hooknum, void *arg);
59 void disconnectdb(void);
60 void connectdb(void);
61
62 void _init(void) {
63 connectdb();
64 }
65
66 void _fini(void) {
67 disconnectdb();
68 }
69
70 void connectdb(void) {
71 sstring *dbhost, *dbusername, *dbpassword, *dbdatabase, *dbport;
72 char connectstr[1024];
73
74 if(pqconnected())
75 return;
76
77 /* stolen from chanserv as I'm lazy */
78 dbhost = getcopyconfigitem("pqsql", "host", "UNIX", HOSTLEN);
79 dbusername = getcopyconfigitem("pqsql", "username", "newserv", 20);
80 dbpassword = getcopyconfigitem("pqsql", "password", "moo", 20);
81 dbdatabase = getcopyconfigitem("pqsql", "database", "newserv", 20);
82 dbport = getcopyconfigitem("pqsql", "port", "431", 8);
83
84 if(!dbhost || !dbusername || !dbpassword || !dbdatabase || !dbport) {
85 /* freesstring allows NULL */
86 freesstring(dbhost);
87 freesstring(dbusername);
88 freesstring(dbpassword);
89 freesstring(dbdatabase);
90 freesstring(dbport);
91 return;
92 }
93
94 if (!strcmp(dbhost->content,"UNIX")) {
95 snprintf(connectstr, sizeof(connectstr), "dbname=%s user=%s password=%s", dbdatabase->content, dbusername->content, dbpassword->content);
96 } else {
97 snprintf(connectstr, sizeof(connectstr), "host=%s port=%s dbname=%s user=%s password=%s", dbhost->content, dbport->content, dbdatabase->content, dbusername->content, dbpassword->content);
98 }
99
100 freesstring(dbhost);
101 freesstring(dbusername);
102 freesstring(dbpassword);
103 freesstring(dbdatabase);
104 freesstring(dbport);
105
106 Error("pqsql", ERR_INFO, "Attempting database connection: %s", connectstr);
107
108 /* Blocking connect for now.. */
109 dbconn = PQconnectdb(connectstr);
110
111 if (!dbconn || (PQstatus(dbconn) != CONNECTION_OK)) {
112 Error("pqsql", ERR_ERROR, "Unable to connect to db: %s", pqlasterror(dbconn));
113 return;
114 }
115 Error("pqsql", ERR_INFO, "Connected!");
116
117 dbconnected = 1;
118
119 PQsetnonblocking(dbconn, 1);
120
121 /* this kicks ass, thanks splidge! */
122 registerhandler(PQsocket(dbconn), POLLIN, dbhandler);
123 registerhook(HOOK_CORE_STATSREQUEST, dbstatus);
124 }
125
126 void dbhandler(int fd, short revents) {
127 PGresult *res;
128 pqasyncquery_s *qqp;
129
130 if(revents & POLLIN) {
131 PQconsumeInput(dbconn);
132
133 if(!PQisBusy(dbconn)) { /* query is complete */
134 if(queryhead->handler)
135 (queryhead->handler)(dbconn, queryhead->tag);
136
137 while((res = PQgetResult(dbconn))) {
138 switch(PQresultStatus(res)) {
139 case PGRES_TUPLES_OK:
140 Error("pqsql", ERR_WARNING, "Unhandled tuples output (query: %s)", queryhead->query);
141 break;
142
143 case PGRES_NONFATAL_ERROR:
144 case PGRES_FATAL_ERROR:
145 /* if a create query returns an error assume it went ok, paul will winge about this */
146 if(!(queryhead->flags & QH_CREATE))
147 Error("pqsql", ERR_WARNING, "Unhandled error response (query: %s)", queryhead->query);
148 break;
149
150 default:
151 break;
152 }
153
154 PQclear(res);
155 }
156
157 /* Free the query and advance */
158 qqp = queryhead;
159 if(queryhead == querytail)
160 querytail = NULL;
161
162 queryhead = queryhead->next;
163
164 if (qqp->query_ss) {
165 freesstring(qqp->query_ss);
166 qqp->query_ss=NULL;
167 qqp->query=NULL;
168 } else if (qqp->query) {
169 free(qqp->query);
170 qqp->query=NULL;
171 }
172 free(qqp);
173
174 if(queryhead) { /* Submit the next query */
175 PQsendQuery(dbconn, queryhead->query);
176 PQflush(dbconn);
177 }
178 }
179 }
180 }
181
182 /* sorry Q9 */
183 void pqasyncqueryf(PQQueryHandler handler, void *tag, int flags, char *format, ...) {
184 char querybuf[8192];
185 va_list va;
186 int len;
187 pqasyncquery_s *qp;
188
189 if(!pqconnected())
190 return;
191
192 va_start(va, format);
193 len = vsnprintf(querybuf, sizeof(querybuf), format, va);
194 va_end(va);
195
196 /* PPA: no check here... */
197 qp = (pqasyncquery_s *)malloc(sizeof(pqasyncquery_s));
198
199 if(!qp)
200 Error("pqsql",ERR_STOP,"malloc() failed in pqsql.c");
201
202 /* Use sstring or allocate (see above rant) */
203 if (len > SSTRING_MAX) {
204 qp->query = (char *)malloc(len+1);
205 strcpy(qp->query,querybuf);
206 qp->query_ss=NULL;
207 } else {
208 qp->query_ss = getsstring(querybuf, len);
209 qp->query = qp->query_ss->content;
210 }
211 qp->tag = tag;
212 qp->handler = handler;
213 qp->next = NULL; /* shove them at the end */
214 qp->flags = flags;
215
216 if(querytail) {
217 querytail->next = qp;
218 querytail = qp;
219 } else {
220 querytail = queryhead = qp;
221 PQsendQuery(dbconn, qp->query);
222 PQflush(dbconn);
223 }
224 }
225
226 void pqloadtable(char *tablename, PQQueryHandler init, PQQueryHandler data, PQQueryHandler fini)
227 {
228 pqtableloaderinfo_s *tli;
229
230 tli=(pqtableloaderinfo_s *)malloc(sizeof(pqtableloaderinfo_s));
231 tli->tablename=getsstring(tablename, 100);
232 tli->init=init;
233 tli->data=data;
234 tli->fini=fini;
235 pqasyncquery(pqstartloadtable, tli, "SELECT COUNT(*) FROM %s", tli->tablename->content);
236 }
237
238 void pqstartloadtable(PGconn *dbconn, void *arg)
239 {
240 PGresult *res;
241 unsigned long i, count, tablecrc;
242 pqtableloaderinfo_s *tli = arg;
243
244 res = PQgetResult(dbconn);
245
246 if (PQresultStatus(res) != PGRES_TUPLES_OK && PQresultStatus(res) != PGRES_COMMAND_OK) {
247 Error("pqsql", ERR_ERROR, "Error getting row count for %s.", tli->tablename->content);
248 return;
249 }
250
251 if (PQnfields(res) != 1) {
252 Error("pqsql", ERR_ERROR, "Count query format error for %s.", tli->tablename->content);
253 return;
254 }
255
256 tablecrc=crc32(tli->tablename->content);
257 count=strtoul(PQgetvalue(res, 0, 0), NULL, 10);
258 PQclear(res);
259
260 Error("pqsql", ERR_INFO, "Found %lu entries in table %s, scheduling load.", count, tli->tablename->content);
261
262 pqasyncquery(tli->init, NULL, "BEGIN");
263 pqasyncquery(NULL, NULL, "DECLARE table%lx%lx CURSOR FOR SELECT * FROM %s", tablecrc, count, tli->tablename->content);
264
265 for (i=0;(count - i) > 1000; i+=1000)
266 pqasyncquery(tli->data, NULL, "FETCH 1000 FROM table%lx%lx", tablecrc, count);
267
268 pqasyncquery(tli->data, NULL, "FETCH ALL FROM table%lx%lx", tablecrc, count);
269
270 pqasyncquery(NULL, NULL, "CLOSE table%lx%lx", tablecrc, count);
271 pqasyncquery(tli->fini, NULL, "COMMIT");
272
273 freesstring(tli->tablename);
274 free(tli);
275 }
276
277 void disconnectdb(void) {
278 pqasyncquery_s *qqp = queryhead, *nqqp;
279
280 if(!pqconnected())
281 return;
282
283 /* do this first else we may get conflicts */
284 deregisterhandler(PQsocket(dbconn), 0);
285
286 /* Throw all the queued queries away, beware of data malloc()ed inside the query item.. */
287 while(qqp) {
288 nqqp = qqp->next;
289 if (qqp->query_ss) {
290 freesstring(qqp->query_ss);
291 qqp->query_ss=NULL;
292 qqp->query=NULL;
293 } else if (qqp->query) {
294 free(qqp->query);
295 qqp->query=NULL;
296 }
297 free(qqp);
298 qqp = nqqp;
299 }
300
301 deregisterhook(HOOK_CORE_STATSREQUEST, dbstatus);
302 PQfinish(dbconn);
303 dbconn = NULL; /* hmm? */
304
305 dbconnected = 0;
306 }
307
308 /* more stolen code from Q9 */
309 void dbstatus(int hooknum, void *arg) {
310 if ((long)arg > 10) {
311 int i = 0;
312 pqasyncquery_s *qqp;
313 char message[100];
314
315 if(queryhead)
316 for(qqp=queryhead;qqp;qqp=qqp->next)
317 i++;
318
319 snprintf(message, sizeof(message), "PQSQL : %6d queries queued.",i);
320
321 triggerhook(HOOK_CORE_STATSREPLY, message);
322 }
323 }
324
325 int pqconnected(void) {
326 return dbconnected;
327 }
328
329 char* pqlasterror(PGconn * pgconn) {
330 static char errormsg[PQ_ERRORMSG_LENGTH + 1];
331 int i;
332 if(!pgconn)
333 return "PGCONN NULL";
334 strlcpy(errormsg, PQerrorMessage(pgconn), PQ_ERRORMSG_LENGTH);
335 for(i=0;i<errormsg[i];i++) {
336 if((errormsg[i] == '\r') || (errormsg[i] == '\n'))
337 errormsg[i] = ' ';
338
339 }
340 return errormsg;
341 }