]> jfr.im git - irc/rizon/acid.git/blob - acid/src/main/java/net/rizon/acid/sql/SQL.java
562b680ec93fd61c1c610df3c8471cafd3208d64
[irc/rizon/acid.git] / acid / src / main / java / net / rizon / acid / sql / SQL.java
1 package net.rizon.acid.sql;
2
3 import java.sql.Connection;
4 import java.sql.DriverManager;
5 import java.sql.PreparedStatement;
6 import java.sql.ResultSet;
7 import java.sql.SQLException;
8 import java.util.LinkedList;
9 import net.rizon.acid.conf.Database;
10 import net.rizon.acid.core.Acidictive;
11 import org.slf4j.LoggerFactory;
12
13 public class SQL extends Thread
14 {
15 private static final org.slf4j.Logger log = LoggerFactory.getLogger(SQL.class);
16
17 private PreparedStatement statement;
18 private ResultSet result;
19 private String url, username, password;
20 private volatile Connection con = null;
21 private volatile boolean shuttingDown = false;
22 private long last_connect = 0;
23 private volatile Object queryLock = new Object();
24 private volatile LinkedList<PreparedStatement> pendingQueries = new LinkedList<PreparedStatement>();
25
26 public SQL(final String url, final String username, final String password) throws ClassNotFoundException, SQLException
27 {
28 Class.forName("com.mysql.jdbc.Driver");
29
30 this.url = url;
31 this.username = username;
32 this.password = password;
33
34 this.connect();
35 this.start();
36 }
37
38 public String version()
39 {
40 try
41 {
42 if (this.con != null)
43 return this.con.getMetaData().getDatabaseProductName() + "-" + this.con.getMetaData().getDatabaseProductVersion();
44 }
45 catch (SQLException ex) { }
46 return "Unknown";
47 }
48
49 public void shutdown()
50 {
51 this.shuttingDown = true;
52
53 log.info("Flushing pending SQL queries...");
54
55 synchronized (this.queryLock)
56 {
57 this.queryLock.notify();
58 }
59
60 while (this.isAlive())
61 {
62 try
63 {
64 Thread.sleep(100L);
65 }
66 catch (InterruptedException ex) { }
67 }
68
69 log.info("All SQL queries successfully flushed");
70
71 try { this.con.close(); }
72 catch (Exception ex) { }
73 }
74
75 private void connect() throws SQLException
76 {
77 if (this.last_connect > System.currentTimeMillis() - 60 * 1000) // one minute
78 throw new SQLException("Reconnecting too fast");
79 this.last_connect = System.currentTimeMillis();
80 this.con = DriverManager.getConnection(this.url, this.username, this.password);
81
82 log.info("Successfully connected to " + this.version() + " using " + this.con.getMetaData().getDriverName() + " (" + this.con.getMetaData().getDriverVersion() + ")");
83 }
84
85 public PreparedStatement prepare(final String statement) throws SQLException
86 {
87 this.close(this.statement, this.result);
88
89 if (this.con == null || this.con.isClosed())
90 {
91 try
92 {
93 this.connect();
94 }
95 catch (SQLException ex)
96 {
97 handleException("Unable to connect to SQL", ex);
98 throw ex;
99 }
100 }
101
102 this.statement = this.con.prepareStatement(statement);
103 return this.statement;
104 }
105
106 public void executeThread(PreparedStatement statement)
107 {
108 this.statement = null;
109 this.result = null;
110
111 synchronized (this.queryLock)
112 {
113 this.pendingQueries.addLast(statement);
114 this.queryLock.notify();
115 }
116 }
117
118 public int executeUpdateBlocking(PreparedStatement statement) throws SQLException
119 {
120 int i = statement.executeUpdate();
121 log.debug("Successfully executed " + statement);
122 return i;
123 }
124
125 public ResultSet executeQuery(PreparedStatement statement) throws SQLException
126 {
127 this.result = statement.executeQuery();
128
129 log.debug("Successfully executed " + statement);
130
131 return this.result;
132 }
133
134 public PreparedStatement persist()
135 {
136 PreparedStatement stmt = this.statement;
137 this.statement = null;
138 this.result = null;
139 return stmt;
140 }
141
142 public void close(PreparedStatement p, ResultSet r)
143 {
144 try { if (r != null) r.close(); }
145 catch (Exception ex) { }
146
147 try { if (p != null) p.close(); }
148 catch (Exception ex) { }
149 }
150
151 public int getPendingQueryCount()
152 {
153 synchronized (this.queryLock)
154 {
155 return this.pendingQueries.size();
156 }
157 }
158
159 public void setAutoCommit(boolean state) throws SQLException
160 {
161 this.con.setAutoCommit(state);
162 }
163
164 @Override
165 public void run()
166 {
167 long lastQuery = System.currentTimeMillis();
168 while (true)
169 {
170 PreparedStatement q = null;
171
172 synchronized (this.queryLock)
173 {
174 if (this.pendingQueries.isEmpty() == false)
175 q = this.pendingQueries.remove();
176 else
177 {
178 if (this.shuttingDown)
179 break;
180
181 try
182 {
183 this.queryLock.wait(60 * 1000L);
184 }
185 catch (InterruptedException e) { }
186 }
187 }
188
189 if (q != null)
190 {
191 try
192 {
193 q.executeUpdate();
194 lastQuery = System.currentTimeMillis();
195
196 log.debug("Successfully executed " + q + " in worker thread");
197 }
198 catch (SQLException ex)
199 {
200 handleException("Unable to execute query in worker thread: " + q, ex);
201 }
202
203 this.close(q, null);
204 }
205
206 if (System.currentTimeMillis() - lastQuery > 60 * 1000)
207 {
208 lastQuery = System.currentTimeMillis();
209
210 try (PreparedStatement stmt = this.con.prepareStatement("SELECT 1"))
211 {
212 stmt.execute();
213 }
214 catch (SQLException ex)
215 {
216 ex.printStackTrace();
217 }
218 }
219 }
220 }
221
222 private static long lastWarn = 0;
223 public static void handleException(final String reason, SQLException ex)
224 {
225 long now = System.currentTimeMillis() / 1000L;
226 if (lastWarn + 60 < now)
227 {
228 log.error(reason, ex);
229 lastWarn = now;
230 }
231 }
232
233 public static SQL getConnection(final String name)
234 {
235 try
236 {
237 for (Database d : Acidictive.conf.database)
238 {
239 if (!d.name.equals(name))
240 continue;
241
242 return new SQL(d.host, d.user, d.pass);
243 }
244 }
245 catch (ClassNotFoundException ex)
246 {
247 throw new RuntimeException(ex.getMessage());
248 }
249 catch (SQLException e)
250 {
251 throw new RuntimeException(e.getMessage());
252 }
253
254 return null;
255 }
256 }