X-Git-Url: https://jfr.im/git/irc/quakenet/newserv.git/blobdiff_plain/7cc1a026e336d9a38583f009cde049aba78ee342..18afb5fb8815bebadb2ff5f4465c326d1b4ce3d7:/pqsql/pqsql.c diff --git a/pqsql/pqsql.c b/pqsql/pqsql.c index 72036c3d..ea9fd102 100644 --- a/pqsql/pqsql.c +++ b/pqsql/pqsql.c @@ -9,29 +9,63 @@ #include "../irc/irc_config.h" #include "../core/events.h" #include "../core/hooks.h" +#include "../core/nsmalloc.h" +#include "../lib/irc_string.h" +#include "../lib/version.h" +#include "../lib/strlfunc.h" #include "pqsql.h" +#define BUILDING_DBAPI +#include "../dbapi/dbapi.h" + #include #include #include +#include + +MODULE_VERSION(""); +/* It's possible that we might want to do a very long query, longer than the + * IRC-oriented SSTRING_MAX value. One option would be to increase + * SSTRING_MAX, but the whole purpose of sstring's is to efficiently deal + * with situations where the malloc() padding overhead is large compared to + * string length and strings are frequently recycled. Since neither of + * these are necessarily true for longer strings it makes more sense to use + * malloc() for them. + * + * So, query always points at the query string. If it fitted in a sstring, + * query_ss will point at the sstring for freeing purposes. If query_ss is + * NULL then it was malloc'd so should be free()'d directly. + */ typedef struct pqasyncquery_s { - sstring *query; + sstring *query_ss; + char *query; void *tag; PQQueryHandler handler; int flags; + PQModuleIdentifier identifier; struct pqasyncquery_s *next; } pqasyncquery_s; +typedef struct pqtableloaderinfo_s +{ + sstring *tablename; + PQQueryHandler init, data, fini; + void *tag; +} pqtableloaderinfo_s; + pqasyncquery_s *queryhead = NULL, *querytail = NULL; -int dbconnected = 0; -PGconn *dbconn; +static int dbconnected = 0; +static PQModuleIdentifier moduleid = 0; +static PGconn *dbconn; void dbhandler(int fd, short revents); +void pqstartloadtable(PGconn *dbconn, void *arg); void dbstatus(int hooknum, void *arg); void disconnectdb(void); void connectdb(void); +char* pqlasterror(PGconn * pgconn); void _init(void) { connectdb(); @@ -39,6 +73,48 @@ void _init(void) { void _fini(void) { disconnectdb(); + + nscheckfreeall(POOL_PQSQL); +} + +PQModuleIdentifier pqgetid(void) { + moduleid++; + if(moduleid < 10) + moduleid = 10; + + return moduleid; +} + +void pqfreeid(PQModuleIdentifier identifier) { + pqasyncquery_s *q, *p; + + if(identifier == 0 || !queryhead) + return; + + if(queryhead->identifier == identifier) { + (queryhead->handler)(NULL, queryhead->tag); + queryhead->identifier = QH_ALREADYFIRED; + } + + for(p=queryhead,q=queryhead->next;q;) { + if(q->identifier == identifier) { + (q->handler)(NULL, q->tag); + p->next = q->next; + + if (q->query_ss) { + freesstring(q->query_ss); + } else { + nsfree(POOL_PQSQL, q->query); + } + nsfree(POOL_PQSQL, q); + q = p->next; + } else { + p = q; + q = q->next; + } + } + + querytail = p; } void connectdb(void) { @@ -49,7 +125,7 @@ void connectdb(void) { return; /* stolen from chanserv as I'm lazy */ - dbhost = getcopyconfigitem("pqsql", "host", "localhost", HOSTLEN); + dbhost = getcopyconfigitem("pqsql", "host", "UNIX", HOSTLEN); dbusername = getcopyconfigitem("pqsql", "username", "newserv", 20); dbpassword = getcopyconfigitem("pqsql", "password", "moo", 20); dbdatabase = getcopyconfigitem("pqsql", "database", "newserv", 20); @@ -64,9 +140,13 @@ void connectdb(void) { freesstring(dbport); return; } - - snprintf(connectstr, sizeof(connectstr), "dbname=%s user=%s password=%s", dbdatabase->content, dbusername->content, dbpassword->content); + if (!strcmp(dbhost->content,"UNIX")) { + snprintf(connectstr, sizeof(connectstr), "dbname=%s user=%s password=%s", dbdatabase->content, dbusername->content, dbpassword->content); + } else { + snprintf(connectstr, sizeof(connectstr), "host=%s port=%s dbname=%s user=%s password=%s", dbhost->content, dbport->content, dbdatabase->content, dbusername->content, dbpassword->content); + } + freesstring(dbhost); freesstring(dbusername); freesstring(dbpassword); @@ -79,7 +159,7 @@ void connectdb(void) { dbconn = PQconnectdb(connectstr); if (!dbconn || (PQstatus(dbconn) != CONNECTION_OK)) { - Error("pqsql", ERR_ERROR, "Unable to connect to db."); + Error("pqsql", ERR_ERROR, "Unable to connect to db: %s", pqlasterror(dbconn)); return; } Error("pqsql", ERR_INFO, "Connected!"); @@ -101,24 +181,27 @@ void dbhandler(int fd, short revents) { PQconsumeInput(dbconn); if(!PQisBusy(dbconn)) { /* query is complete */ - if(queryhead->handler) + if(queryhead->handler && queryhead->identifier != QH_ALREADYFIRED) (queryhead->handler)(dbconn, queryhead->tag); while((res = PQgetResult(dbconn))) { - switch(PQresultStatus(res)) { - case PGRES_TUPLES_OK: - Error("pqsql", ERR_WARNING, "Unhandled tuples output (query: %s)", queryhead->query->content); - break; - - case PGRES_NONFATAL_ERROR: - case PGRES_FATAL_ERROR: - /* if a create query returns an error assume it went ok, paul will winge about this */ - if(!(queryhead->flags & QH_CREATE)) - Error("pqsql", ERR_WARNING, "Unhandled error response (query: %s)", queryhead->query->content); - break; + if(queryhead->identifier != QH_ALREADYFIRED) { + switch(PQresultStatus(res)) { + case PGRES_TUPLES_OK: + if(!(queryhead->flags & DB_CALL)) + Error("pqsql", ERR_WARNING, "Unhandled tuples output (query: %s)", queryhead->query); + break; + + case PGRES_NONFATAL_ERROR: + case PGRES_FATAL_ERROR: + /* if a create query returns an error assume it went ok, paul will winge about this */ + if(!(queryhead->flags & DB_CREATE)) + Error("pqsql", ERR_WARNING, "Unhandled error response (query: %s): %s", queryhead->query, PQresultErrorMessage(res)); + break; - default: - break; + default: + break; + } } PQclear(res); @@ -131,11 +214,18 @@ void dbhandler(int fd, short revents) { queryhead = queryhead->next; - freesstring(qqp->query); - free(qqp); + if (qqp->query_ss) { + freesstring(qqp->query_ss); + qqp->query_ss=NULL; + qqp->query=NULL; + } else if (qqp->query) { + nsfree(POOL_PQSQL, qqp->query); + qqp->query=NULL; + } + nsfree(POOL_PQSQL, qqp); if(queryhead) { /* Submit the next query */ - PQsendQuery(dbconn, queryhead->query->content); + PQsendQuery(dbconn, queryhead->query); PQflush(dbconn); } } @@ -143,11 +233,11 @@ void dbhandler(int fd, short revents) { } /* sorry Q9 */ -void pqasyncqueryf(PQQueryHandler handler, void *tag, int flags, char *format, ...) { +void pqasyncqueryf(int identifier, PQQueryHandler handler, void *tag, int flags, char *format, ...) { char querybuf[8192]; - va_list va; int len; pqasyncquery_s *qp; + va_list va; if(!pqconnected()) return; @@ -157,26 +247,88 @@ void pqasyncqueryf(PQQueryHandler handler, void *tag, int flags, char *format, . va_end(va); /* PPA: no check here... */ - qp = (pqasyncquery_s *)malloc(sizeof(pqasyncquery_s)); + qp = (pqasyncquery_s *)nsmalloc(POOL_PQSQL, sizeof(pqasyncquery_s)); + if(!qp) - return; + Error("pqsql",ERR_STOP,"malloc() failed in pqsql.c"); - qp->query = getsstring(querybuf, len); + /* Use sstring or allocate (see above rant) */ + if (len > SSTRING_MAX) { + qp->query = (char *)nsmalloc(POOL_PQSQL, len+1); + strcpy(qp->query,querybuf); + qp->query_ss=NULL; + } else { + qp->query_ss = getsstring(querybuf, len); + qp->query = qp->query_ss->content; + } qp->tag = tag; qp->handler = handler; qp->next = NULL; /* shove them at the end */ qp->flags = flags; + qp->identifier = identifier; if(querytail) { querytail->next = qp; querytail = qp; } else { querytail = queryhead = qp; - PQsendQuery(dbconn, qp->query->content); + PQsendQuery(dbconn, qp->query); PQflush(dbconn); } } +void pqloadtable(char *tablename, PQQueryHandler init, PQQueryHandler data, PQQueryHandler fini, void *tag) +{ + pqtableloaderinfo_s *tli; + + tli=(pqtableloaderinfo_s *)nsmalloc(POOL_PQSQL, sizeof(pqtableloaderinfo_s)); + tli->tablename=getsstring(tablename, 100); + tli->init=init; + tli->data=data; + tli->fini=fini; + tli->tag=tag; + pqasyncquery(pqstartloadtable, tli, "SELECT COUNT(*) FROM %s", tli->tablename->content); +} + +void pqstartloadtable(PGconn *dbconn, void *arg) +{ + PGresult *res; + unsigned long i, count, tablecrc; + pqtableloaderinfo_s *tli = arg; + + res = PQgetResult(dbconn); + + if (PQresultStatus(res) != PGRES_TUPLES_OK && PQresultStatus(res) != PGRES_COMMAND_OK) { + Error("pqsql", ERR_ERROR, "Error getting row count for %s.", tli->tablename->content); + return; + } + + if (PQnfields(res) != 1) { + Error("pqsql", ERR_ERROR, "Count query format error for %s.", tli->tablename->content); + return; + } + + tablecrc=crc32(tli->tablename->content); + count=strtoul(PQgetvalue(res, 0, 0), NULL, 10); + PQclear(res); + + Error("pqsql", ERR_INFO, "Found %lu entries in table %s, scheduling load.", count, tli->tablename->content); + + pqasyncquery(tli->init, tli->tag, "BEGIN"); + pqasyncquery(NULL, NULL, "DECLARE table%lx%lx CURSOR FOR SELECT * FROM %s", tablecrc, count, tli->tablename->content); + + for (i=0;(count - i) > 1000; i+=1000) + pqasyncquery(tli->data, tli->tag, "FETCH 1000 FROM table%lx%lx", tablecrc, count); + + pqasyncquery(tli->data, tli->tag, "FETCH ALL FROM table%lx%lx", tablecrc, count); + + pqasyncquery(NULL, NULL, "CLOSE table%lx%lx", tablecrc, count); + pqasyncquery(tli->fini, tli->tag, "COMMIT"); + + freesstring(tli->tablename); + nsfree(POOL_PQSQL, tli); +} + void disconnectdb(void) { pqasyncquery_s *qqp = queryhead, *nqqp; @@ -189,8 +341,15 @@ void disconnectdb(void) { /* Throw all the queued queries away, beware of data malloc()ed inside the query item.. */ while(qqp) { nqqp = qqp->next; - freesstring(qqp->query); - free(qqp); + if (qqp->query_ss) { + freesstring(qqp->query_ss); + qqp->query_ss=NULL; + qqp->query=NULL; + } else if (qqp->query) { + nsfree(POOL_PQSQL, qqp->query); + qqp->query=NULL; + } + nsfree(POOL_PQSQL, qqp); qqp = nqqp; } @@ -203,7 +362,7 @@ void disconnectdb(void) { /* more stolen code from Q9 */ void dbstatus(int hooknum, void *arg) { - if ((int)arg > 10) { + if ((long)arg > 10) { int i = 0; pqasyncquery_s *qqp; char message[100]; @@ -212,7 +371,7 @@ void dbstatus(int hooknum, void *arg) { for(qqp=queryhead;qqp;qqp=qqp->next) i++; - snprintf(message, sizeof(message), "PQSQL : %6d database queries queued.",i); + snprintf(message, sizeof(message), "PQSQL : %6d queries queued.",i); triggerhook(HOOK_CORE_STATSREPLY, message); } @@ -221,3 +380,53 @@ void dbstatus(int hooknum, void *arg) { int pqconnected(void) { return dbconnected; } + +char* pqlasterror(PGconn * pgconn) { + static char errormsg[PQ_ERRORMSG_LENGTH + 1]; + int i; + if(!pgconn) + return "PGCONN NULL"; + strlcpy(errormsg, PQerrorMessage(pgconn), PQ_ERRORMSG_LENGTH); + for(i=0;irow = -1; + r->result = PQgetResult(c); + r->rows = PQntuples(r->result); + + return r; +} + +int pqfetchrow(PQResult *res) { + if(res->row + 1 == res->rows) + return 0; + + res->row++; + + return 1; +} + +char *pqgetvalue(PQResult *res, int column) { + return PQgetvalue(res->result, res->row, column); +} + +void pqclear(PQResult *res) { + if(!res) + return; + + if(res->result) + PQclear(res->result); + + nsfree(POOL_PQSQL, res); +}