]> jfr.im git - irc/quakenet/newserv.git/blobdiff - pqsql/pqsql.c
Merge branch 'master' into chanserv-live
[irc/quakenet/newserv.git] / pqsql / pqsql.c
index 515f070aa5c8c2799cec79eddd8a1fea815dd31b..c2174da90c9e1e656dc25ec10fd036bca81f1d25 100644 (file)
@@ -9,12 +9,19 @@
 #include "../irc/irc_config.h"
 #include "../core/events.h"
 #include "../core/hooks.h"
+#include "../core/nsmalloc.h"
+#include "../lib/irc_string.h"
 #include "../lib/version.h"
+#include "../lib/strlfunc.h"
 #include "pqsql.h"
 
+#define BUILDING_DBAPI
+#include "../dbapi/dbapi.h"
+
 #include <stdlib.h>
 #include <sys/poll.h>
 #include <stdarg.h>
+#include <string.h>
 
 MODULE_VERSION("");
 
@@ -36,18 +43,29 @@ typedef struct pqasyncquery_s {
   void *tag;
   PQQueryHandler handler;
   int flags;
+  PQModuleIdentifier identifier;
   struct pqasyncquery_s *next;
 } pqasyncquery_s;
 
+typedef struct pqtableloaderinfo_s
+{
+    sstring *tablename;
+    PQQueryHandler init, data, fini;
+    void *tag;
+} pqtableloaderinfo_s;
+
 pqasyncquery_s *queryhead = NULL, *querytail = NULL;
 
-int dbconnected = 0;
-PGconn *dbconn;
+static int dbconnected = 0;
+static PQModuleIdentifier moduleid = 0;
+static PGconn *dbconn;
 
 void dbhandler(int fd, short revents);
+void pqstartloadtable(PGconn *dbconn, void *arg);
 void dbstatus(int hooknum, void *arg);
 void disconnectdb(void);
 void connectdb(void);
+char* pqlasterror(PGconn * pgconn);
 
 void _init(void) {
   connectdb();
@@ -55,6 +73,48 @@ void _init(void) {
 
 void _fini(void) {
   disconnectdb();
+
+  nscheckfreeall(POOL_PQSQL);
+}
+
+PQModuleIdentifier pqgetid(void) {
+  moduleid++;
+  if(moduleid < 10)
+    moduleid = 10;
+
+  return moduleid;
+}
+
+void pqfreeid(PQModuleIdentifier identifier) {
+  pqasyncquery_s *q, *p;
+  
+  if(identifier == 0 || !queryhead)
+    return;
+
+  if(queryhead->identifier == identifier) {
+    (queryhead->handler)(NULL, queryhead->tag);
+    queryhead->identifier = QH_ALREADYFIRED;
+  }
+
+  for(p=queryhead,q=queryhead->next;q;) {
+    if(q->identifier == identifier) {
+      (q->handler)(NULL, q->tag);
+      p->next = q->next;
+
+      if (q->query_ss) {
+        freesstring(q->query_ss);
+      } else {
+        nsfree(POOL_PQSQL, q->query);
+      }
+      nsfree(POOL_PQSQL, q);
+      q = p->next;
+    } else {
+      p = q;
+      q = q->next;
+    }
+  }
+
+  querytail = p;
 }
 
 void connectdb(void) {
@@ -65,7 +125,7 @@ void connectdb(void) {
     return;
 
   /* stolen from chanserv as I'm lazy */
-  dbhost = getcopyconfigitem("pqsql", "host", "localhost", HOSTLEN);
+  dbhost = getcopyconfigitem("pqsql", "host", "UNIX", HOSTLEN);
   dbusername = getcopyconfigitem("pqsql", "username", "newserv", 20);
   dbpassword = getcopyconfigitem("pqsql", "password", "moo", 20);
   dbdatabase = getcopyconfigitem("pqsql", "database", "newserv", 20);
@@ -80,9 +140,13 @@ void connectdb(void) {
     freesstring(dbport);
     return;
   }
-
-  snprintf(connectstr, sizeof(connectstr), "dbname=%s user=%s password=%s", dbdatabase->content, dbusername->content, dbpassword->content);
   
+  if (!strcmp(dbhost->content,"UNIX")) {
+    snprintf(connectstr, sizeof(connectstr), "dbname=%s user=%s password=%s", dbdatabase->content, dbusername->content, dbpassword->content);
+  } else {
+    snprintf(connectstr, sizeof(connectstr), "host=%s port=%s dbname=%s user=%s password=%s", dbhost->content, dbport->content, dbdatabase->content, dbusername->content, dbpassword->content);
+  }  
+
   freesstring(dbhost);
   freesstring(dbusername);
   freesstring(dbpassword);
@@ -95,7 +159,7 @@ void connectdb(void) {
   dbconn = PQconnectdb(connectstr);
   
   if (!dbconn || (PQstatus(dbconn) != CONNECTION_OK)) {
-    Error("pqsql", ERR_ERROR, "Unable to connect to db.");
+    Error("pqsql", ERR_ERROR, "Unable to connect to db: %s", pqlasterror(dbconn));
     return;
   }
   Error("pqsql", ERR_INFO, "Connected!");
@@ -117,24 +181,27 @@ void dbhandler(int fd, short revents) {
     PQconsumeInput(dbconn);
     
     if(!PQisBusy(dbconn)) { /* query is complete */
-      if(queryhead->handler)
+      if(queryhead->handler && queryhead->identifier != QH_ALREADYFIRED)
         (queryhead->handler)(dbconn, queryhead->tag);
 
       while((res = PQgetResult(dbconn))) {
-        switch(PQresultStatus(res)) {
-          case PGRES_TUPLES_OK:
-            Error("pqsql", ERR_WARNING, "Unhandled tuples output (query: %s)", queryhead->query->content);
-            break;
-
-          case PGRES_NONFATAL_ERROR:
-          case PGRES_FATAL_ERROR:
-            /* if a create query returns an error assume it went ok, paul will winge about this */
-            if(!(queryhead->flags & QH_CREATE))
-              Error("pqsql", ERR_WARNING, "Unhandled error response (query: %s)", queryhead->query->content);
-            break;
+        if(queryhead->identifier != QH_ALREADYFIRED) {
+          switch(PQresultStatus(res)) {
+            case PGRES_TUPLES_OK:
+              if(!(queryhead->flags & DB_CALL))
+                Error("pqsql", ERR_WARNING, "Unhandled tuples output (query: %s)", queryhead->query);
+              break;
+
+            case PGRES_NONFATAL_ERROR:
+            case PGRES_FATAL_ERROR:
+              /* if a create query returns an error assume it went ok, paul will winge about this */
+              if(!(queryhead->flags & DB_CREATE))
+                Error("pqsql", ERR_WARNING, "Unhandled error response (query: %s): %s", queryhead->query, PQresultErrorMessage(res));
+              break;
          
-          default:
-            break;
+            default:
+              break;
+          }
         }
        
         PQclear(res);
@@ -152,13 +219,13 @@ void dbhandler(int fd, short revents) {
         qqp->query_ss=NULL;
         qqp->query=NULL;
       } else if (qqp->query) {
-        free(qqp->query);
+        nsfree(POOL_PQSQL, qqp->query);
         qqp->query=NULL;
       }
-      free(qqp);
+      nsfree(POOL_PQSQL, qqp);
 
       if(queryhead) { /* Submit the next query */            
-        PQsendQuery(dbconn, queryhead->query->content);
+        PQsendQuery(dbconn, queryhead->query);
         PQflush(dbconn);
       }
     }
@@ -166,11 +233,11 @@ void dbhandler(int fd, short revents) {
 }
 
 /* sorry Q9 */
-void pqasyncqueryf(PQQueryHandler handler, void *tag, int flags, char *format, ...) {
+void pqasyncqueryf(int identifier, PQQueryHandler handler, void *tag, int flags, char *format, ...) {
   char querybuf[8192];
-  va_list va;
   int len;
   pqasyncquery_s *qp;
+  va_list va;
 
   if(!pqconnected())
     return;
@@ -180,14 +247,14 @@ void pqasyncqueryf(PQQueryHandler handler, void *tag, int flags, char *format, .
   va_end(va);
 
   /* PPA: no check here... */
-  qp = (pqasyncquery_s *)malloc(sizeof(pqasyncquery_s));
+  qp = (pqasyncquery_s *)nsmalloc(POOL_PQSQL, sizeof(pqasyncquery_s));
 
   if(!qp)
     Error("pqsql",ERR_STOP,"malloc() failed in pqsql.c");
 
   /* Use sstring or allocate (see above rant) */
   if (len > SSTRING_MAX) {
-    qp->query = (char *)malloc(strlen(len)+1);
+    qp->query = (char *)nsmalloc(POOL_PQSQL, len+1);
     strcpy(qp->query,querybuf);
     qp->query_ss=NULL;
   } else {
@@ -198,17 +265,70 @@ void pqasyncqueryf(PQQueryHandler handler, void *tag, int flags, char *format, .
   qp->handler = handler;
   qp->next = NULL; /* shove them at the end */
   qp->flags = flags;
+  qp->identifier = identifier;
 
   if(querytail) {
     querytail->next = qp;
     querytail = qp;
   } else {
     querytail = queryhead = qp;
-    PQsendQuery(dbconn, qp->query->content);
+    PQsendQuery(dbconn, qp->query);
     PQflush(dbconn);
   }
 }
 
+void pqloadtable(char *tablename, PQQueryHandler init, PQQueryHandler data, PQQueryHandler fini, void *tag)
+{
+  pqtableloaderinfo_s *tli;
+
+  tli=(pqtableloaderinfo_s *)nsmalloc(POOL_PQSQL, sizeof(pqtableloaderinfo_s));
+  tli->tablename=getsstring(tablename, 100);
+  tli->init=init;
+  tli->data=data;
+  tli->fini=fini;
+  tli->tag=tag;
+  pqasyncquery(pqstartloadtable, tli, "SELECT COUNT(*) FROM %s", tli->tablename->content);
+}
+
+void pqstartloadtable(PGconn *dbconn, void *arg)
+{
+  PGresult *res;
+  unsigned long i, count, tablecrc;
+  pqtableloaderinfo_s *tli = arg;
+
+  res = PQgetResult(dbconn);
+
+  if (PQresultStatus(res) != PGRES_TUPLES_OK && PQresultStatus(res) != PGRES_COMMAND_OK) {
+    Error("pqsql", ERR_ERROR, "Error getting row count for %s.", tli->tablename->content);
+    return;
+  }
+
+  if (PQnfields(res) != 1) {
+    Error("pqsql", ERR_ERROR, "Count query format error for %s.", tli->tablename->content);
+    return;
+  }
+
+  tablecrc=irc_crc32(tli->tablename->content);
+  count=strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+  PQclear(res);
+
+  Error("pqsql", ERR_INFO, "Found %lu entries in table %s, scheduling load.", count, tli->tablename->content);
+
+  pqasyncquery(tli->init, tli->tag, "BEGIN");
+  pqasyncquery(NULL, NULL, "DECLARE table%lx%lx CURSOR FOR SELECT * FROM %s", tablecrc, count, tli->tablename->content);
+
+  for (i=0;(count - i) > 1000; i+=1000)
+    pqasyncquery(tli->data, tli->tag, "FETCH 1000 FROM table%lx%lx", tablecrc, count);
+
+  pqasyncquery(tli->data, tli->tag, "FETCH ALL FROM table%lx%lx", tablecrc, count);
+
+  pqasyncquery(NULL, NULL, "CLOSE table%lx%lx", tablecrc, count);
+  pqasyncquery(tli->fini, tli->tag, "COMMIT");
+
+  freesstring(tli->tablename);
+  nsfree(POOL_PQSQL, tli);
+}
+
 void disconnectdb(void) {
   pqasyncquery_s *qqp = queryhead, *nqqp;
 
@@ -222,14 +342,14 @@ void disconnectdb(void) {
   while(qqp) {
     nqqp = qqp->next;
     if (qqp->query_ss) {
-      freesstring(qqp->query);
+      freesstring(qqp->query_ss);
       qqp->query_ss=NULL;
       qqp->query=NULL;
     } else if (qqp->query) {
-      free(qqp->query);
+      nsfree(POOL_PQSQL, qqp->query);
       qqp->query=NULL;
     }
-    free(qqp);
+    nsfree(POOL_PQSQL, qqp);
     qqp = nqqp;
   }
 
@@ -260,3 +380,53 @@ void dbstatus(int hooknum, void *arg) {
 int pqconnected(void) {
   return dbconnected;
 }
+
+char* pqlasterror(PGconn * pgconn) {
+  static char errormsg[PQ_ERRORMSG_LENGTH + 1];
+  int i;
+  if(!pgconn) 
+    return "PGCONN NULL";
+  strlcpy(errormsg, PQerrorMessage(pgconn), PQ_ERRORMSG_LENGTH);
+  for(i=0;i<errormsg[i];i++) {
+    if((errormsg[i] == '\r') || (errormsg[i] == '\n'))
+      errormsg[i] = ' ';
+    
+  }
+  return errormsg;
+}
+
+PQResult *pqgetresult(PGconn *c) {
+  PQResult *r;
+  if(!c)
+    return NULL;
+
+  r = (PQResult *)nsmalloc(POOL_PQSQL, sizeof(PQResult));
+  r->row = -1;
+  r->result = PQgetResult(c);
+  r->rows = PQntuples(r->result);
+
+  return r;
+}
+
+int pqfetchrow(PQResult *res) {
+  if(res->row + 1 == res->rows)
+    return 0;
+
+  res->row++;
+
+  return 1;
+}
+
+char *pqgetvalue(PQResult *res, int column) {
+  return PQgetvalue(res->result, res->row, column);
+}
+
+void pqclear(PQResult *res) {
+  if(!res)
+    return;
+
+  if(res->result)
+    PQclear(res->result);
+
+  nsfree(POOL_PQSQL, res);
+}