X-Git-Url: https://jfr.im/git/irc/quakenet/newserv.git/blobdiff_plain/008e57e52d3e331d61915860c5ea24d79be64eb8..8855bb48b449ed06cfd3ce528b3c0a77c37cb24b:/sqlite/sqlite.c?ds=inline diff --git a/sqlite/sqlite.c b/sqlite/sqlite.c index 13ad66c2..fa852571 100644 --- a/sqlite/sqlite.c +++ b/sqlite/sqlite.c @@ -2,26 +2,30 @@ * SQLite module */ +#include +#include +#include +#include + +#ifndef __USE_POSIX199309 +#define __USE_POSIX199309 +#endif + +#include + #include "../core/config.h" #include "../core/error.h" -#include "../irc/irc_config.h" #include "../core/events.h" #include "../core/hooks.h" -#include "../lib/irc_string.h" #include "../lib/version.h" #include "../lib/strlfunc.h" #include "../core/nsmalloc.h" -#include "sqlite.h" +#include "../core/schedule.h" #define BUILDING_DBAPI #include "../dbapi/dbapi.h" -#include -#include -#include -#include -#define __USE_POSIX199309 -#include +#include "sqlite.h" MODULE_VERSION(""); @@ -29,68 +33,174 @@ static int dbconnected = 0; static struct sqlite3 *conn; static SQLiteModuleIdentifier modid; +struct sqlitequeue { + sqlite3_stmt *statement; + SQLiteQueryHandler handler; + void *tag; + int identifier; + struct sqlitequeue *next; +}; + +static struct sqlitequeue *head, *tail; +static int queuesize; +static void *processsched; +static int inited; + #define SYNC_MODE "OFF" +static void sqlitequeueprocessor(void *arg); +static void dbstatus(int hooknum, void *arg); + void _init(void) { sstring *dbfile; int rc; dbfile = getcopyconfigitem("sqlite", "file", "newserv.db", 100); - if(!dbfile) + if(!dbfile) { + Error("sqlite", ERR_ERROR, "Unable to get config settings."); return; + } + processsched = schedulerecurring(time(NULL), 0, 1, &sqlitequeueprocessor, NULL); + if(!processsched) { + Error("sqlite", ERR_ERROR, "Unable to schedule query processor."); + freesstring(dbfile); + return; + } + + + if(sqlite3_initialize() != SQLITE_OK) { + Error("sqlite", ERR_ERROR, "Unable to initialise sqlite"); + return; + } + sqlite3_config(SQLITE_CONFIG_SINGLETHREAD); + + inited = 1; + rc = sqlite3_open(dbfile->content, &conn); freesstring(dbfile); if(rc) { Error("sqlite", ERR_ERROR, "Unable to connect to database: %s", sqlite3_errmsg(conn)); + deleteschedule(processsched, &sqlitequeueprocessor, NULL); return; } dbconnected = 1; sqliteasyncqueryf(0, NULL, NULL, 0, "PRAGMA synchronous=" SYNC_MODE ";"); + registerhook(HOOK_CORE_STATSREQUEST, dbstatus); } void _fini(void) { - if(!sqliteconnected()) - return; + struct sqlitequeue *q, *nq; + + if(sqliteconnected()) { + deregisterhook(HOOK_CORE_STATSREQUEST, dbstatus); + deleteschedule(processsched, &sqlitequeueprocessor, NULL); + + /* we assume every module that's being unloaded + * has us as a dependency and will have cleaned up + * their queries by using freeid.. + */ + for(q=head;q;q=nq) { + nq = q->next; + sqlite3_finalize(q->statement); + nsfree(POOL_SQLITE, q); + } + + sqlite3_close(conn); + + dbconnected = 0; + } - sqlite3_close(conn); + if(inited) { + sqlite3_shutdown(); + inited = 0; + } - dbconnected = 0; nscheckfreeall(POOL_SQLITE); } -int sqlitestep(sqlite3_stmt *s) { - int rc; - struct timespec t; +/* busy processing is done externally as that varies depending on what you are... */ +static void processstatement(int rc, sqlite3_stmt *s, SQLiteQueryHandler handler, void *tag, char *querybuf) { + if(handler) { /* the handler deals with the cleanup */ + SQLiteResult *r; - t.tv_sec = 0; + if((rc != SQLITE_ROW) && (rc != SQLITE_DONE)) { + Error("sqlite", ERR_WARNING, "SQL error %d: %s (query: %s)", rc, sqlite3_errmsg(conn), querybuf); + handler(NULL, tag); + return; + } - for(;;) { - rc = sqlite3_step(s); - if((rc == SQLITE_ROW) || (rc == SQLITE_OK) || (rc == SQLITE_DONE)) - return rc; - if(rc == SQLITE_BUSY) { - t.tv_nsec = rand() % 50 + 50; - Error("sqlite", ERR_WARNING, "SQLite is busy, retrying in %dns...", t.tv_nsec); - nanosleep(&t, NULL); - } else { - Error("sqlite", ERR_WARNING, "SQL error %d: %s", rc, sqlite3_errmsg(conn)); - return 0; + r = (SQLiteResult *)nsmalloc(POOL_SQLITE, sizeof(SQLiteResult)); + r->r = s; + r->first = 1; + if(rc == SQLITE_ROW) { + r->final = 0; + } else if(rc == SQLITE_DONE) { + r->final = 1; } + handler(r, tag); + } else { + if(rc == SQLITE_ROW) { + Error("sqlite", ERR_WARNING, "Unhandled data from query: %s", querybuf); + } else if(rc != SQLITE_DONE) { + Error("sqlite", ERR_WARNING, "SQL error %d: %s (query: %s)", rc, sqlite3_errmsg(conn), querybuf); + } + sqlite3_finalize(s); + } +} + +static void pushqueue(sqlite3_stmt *s, int identifier, SQLiteQueryHandler handler, void *tag) { + struct sqlitequeue *q = (struct sqlitequeue *)nsmalloc(POOL_SQLITE, sizeof(struct sqlitequeue)); + + q->identifier = identifier; + q->handler = handler; + q->tag = tag; + q->next = NULL; + q->statement = s; + + if(!tail) { + head = q; + } else { + tail->next = q; + } + tail = q; + queuesize++; +} + +static struct sqlitequeue *peekqueue(void) { + return head; +} + +/* a weird pop that doesn't return the value */ +static void popqueue(void) { + struct sqlitequeue *q; + if(!head) + return; + + q = head; + if(head == tail) { + head = NULL; + tail = NULL; + } else { + head = head->next; } + + nsfree(POOL_SQLITE, q); + queuesize--; + return; } void sqliteasyncqueryf(int identifier, SQLiteQueryHandler handler, void *tag, int flags, char *format, ...) { char querybuf[8192]; - va_list va; int len; int rc; sqlite3_stmt *s; - + va_list va; + if(!sqliteconnected()) return; @@ -107,45 +217,76 @@ void sqliteasyncqueryf(int identifier, SQLiteQueryHandler handler, void *tag, in return; } - if(handler) { - handler(s, tag); - } else { - rc = sqlitestep(s); - sqlite3_finalize(s); - if(!rc) - Error("sqlite", ERR_WARNING, "SQL error %d: %s (query: %s)", rc, sqlite3_errmsg(conn), querybuf); + if(head) { /* stuff already queued */ + pushqueue(s, identifier, handler, tag); + return; } + + rc = sqlite3_step(s); + if(rc == SQLITE_BUSY) { + pushqueue(s, identifier, handler, tag); + return; + } + + processstatement(rc, s, handler, tag, querybuf); } int sqliteconnected(void) { return dbconnected; } -void sqliteescapestring(char *buf, char *src, unsigned int len) { +size_t sqliteescapestring(char *buf, char *src, unsigned int len) { unsigned int i; - char *p; + char *p, *d; - for(p=src,i=0;ir = r; - return r2; + return r; } int sqlitefetchrow(SQLiteResult *r) { - if(sqlitestep(r->r) != SQLITE_ROW) + int rc, v; + struct timespec t; + + if(!r || !r->r || r->final) + return 0; + + if(r->first) { /* we've extracted the first row already */ + r->first = 0; + return 1; + } + + t.tv_sec = 0; + for(;;) { + rc = sqlite3_step(r->r); + if(rc != SQLITE_BUSY) + break; + + v = rand() % 50 + 50; + t.tv_nsec = v * 1000000; + Error("sqlite", ERR_WARNING, "SQLite is busy, retrying in %fs...", (double)v / 1000); + nanosleep(&t, NULL); + } + + if(rc == SQLITE_DONE) { + r->final = 1; return 0; + } + + if(rc != SQLITE_ROW) { + Error("sqlite", ERR_WARNING, "SQL error %d: %s", rc, sqlite3_errmsg(conn)); + r->final = 1; + return 0; + } return 1; } @@ -169,6 +310,7 @@ int sqlitequerysuccessful(SQLiteResult *r) { struct sqlitetableloader { SQLiteQueryHandler init, data, fini; + void *tag; char tablename[]; }; @@ -176,20 +318,20 @@ static void loadtablerows(SQLiteConn *c, void *tag) { struct sqlitetableloader *t = (struct sqlitetableloader *)tag; if(!c) { /* pqsql doesnt call the handlers so we don't either */ - free(t); + nsfree(POOL_SQLITE, t); return; } /* the handlers do all the checking and cleanup */ if(t->init) - (t->init)(c, NULL); + (t->init)(NULL, t->tag); - (t->data)(c, NULL); + (t->data)(c, t->tag); if(t->fini) - (t->fini)(c, NULL); + (t->fini)(NULL, t->tag); - free(t); + nsfree(POOL_SQLITE, t); } static void loadtablecount(SQLiteConn *c, void *tag) { @@ -197,13 +339,13 @@ static void loadtablecount(SQLiteConn *c, void *tag) { SQLiteResult *r = NULL; if(!c) { /* unloaded */ - free(t); + nsfree(POOL_SQLITE, t); return; } if(!(r = sqlitegetresult(c)) || !sqlitefetchrow(r)) { Error("sqlite", ERR_ERROR, "Error getting row count for %s.", t->tablename); - free(t); + nsfree(POOL_SQLITE, t); if(r) sqliteclear(r); @@ -216,7 +358,7 @@ static void loadtablecount(SQLiteConn *c, void *tag) { sqliteasyncqueryf(0, loadtablerows, t, 0, "SELECT * FROM %s", t->tablename); } -void sqliteloadtable(char *tablename, SQLiteQueryHandler init, SQLiteQueryHandler data, SQLiteQueryHandler fini) { +void sqliteloadtable(char *tablename, SQLiteQueryHandler init, SQLiteQueryHandler data, SQLiteQueryHandler fini, void *tag) { struct sqlitetableloader *t; int len; @@ -225,18 +367,23 @@ void sqliteloadtable(char *tablename, SQLiteQueryHandler init, SQLiteQueryHandle len = strlen(tablename); - t = (struct sqlitetableloader *)malloc(sizeof(struct sqlitetableloader) + len + 1); + t = (struct sqlitetableloader *)nsmalloc(POOL_SQLITE, sizeof(struct sqlitetableloader) + len + 1); memcpy(t->tablename, tablename, len + 1); t->init = init; t->data = data; t->fini = fini; + t->tag = tag; sqliteasyncqueryf(0, loadtablecount, t, 0, "SELECT COUNT(*) FROM %s", tablename); } -void sqlitecreateschema(char *schema) { +void sqliteattach(char *schema) { sqliteasyncqueryf(0, NULL, NULL, 0, "ATTACH DATABASE '%s.db' AS %s", schema, schema); - sqliteasyncqueryf(0, NULL, NULL, 0, "PRAGMA %s.synchronous=" SYNC_MODE ";", schema); + sqliteasyncqueryf(0, NULL, NULL, 0, "PRAGMA %s.synchronous=" SYNC_MODE, schema); +} + +void sqlitedetach(char *schema) { + sqliteasyncqueryf(0, NULL, NULL, 0, "DETACH DATABASE %s", schema); } int sqlitegetid(void) { @@ -248,6 +395,60 @@ int sqlitegetid(void) { } void sqlitefreeid(int id) { + struct sqlitequeue *q, *pq; + if(id == 0) + return; + for(pq=NULL,q=head;q;) { + if(q->identifier == id) { + if(pq) { + pq->next = q->next; + if(q == tail) + tail = pq; + } else { /* head */ + head = q->next; + if(q == tail) + tail = NULL; + } + sqlite3_finalize(q->statement); + + q->handler(NULL, q->tag); + nsfree(POOL_SQLITE, q); + + queuesize--; + if(pq) { + q = pq->next; + } else { + q = head; + } + } else { + pq = q; + q = q->next; + } + } +} + +static void sqlitequeueprocessor(void *arg) { + struct sqlitequeue *q = peekqueue(); + + while(q) { + int rc = sqlite3_step(q->statement); + if(rc == SQLITE_BUSY) + return; + + processstatement(rc, q->statement, q->handler, q->tag, "??"); + popqueue(); + + q = peekqueue(); + } +} + +static void dbstatus(int hooknum, void *arg) { + if((long)arg > 10) { + char message[100]; + + snprintf(message, sizeof(message), "SQLite : %6d queries queued.", queuesize); + triggerhook(HOOK_CORE_STATSREPLY, message); + } }