]> jfr.im git - irc/quakenet/qwebirc.git/blob - qwebirc/engines/ajaxengine.py
Merge in default.
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
1 from twisted.web import resource, server, static, error as http_error
2 from twisted.names import client
3 from twisted.internet import reactor, error
4 from authgateengine import login_optional, getSessionData
5 import simplejson, md5, sys, os, time, config, weakref, traceback, socket
6 import qwebirc.ircclient as ircclient
7 from adminengine import AdminEngineAction
8 from qwebirc.util import HitCounter
9 import qwebirc.dns as qdns
10 Sessions = {}
11
12 def get_session_id():
13 return md5.md5(os.urandom(16)).hexdigest()
14
15 class BufferOverflowException(Exception):
16 pass
17
18 class AJAXException(Exception):
19 pass
20
21 class IDGenerationException(Exception):
22 pass
23
24 class PassthruException(Exception):
25 pass
26
27 NOT_DONE_YET = None
28
29 def jsondump(fn):
30 def decorator(*args, **kwargs):
31 try:
32 x = fn(*args, **kwargs)
33 if x is None:
34 return server.NOT_DONE_YET
35 x = (True, x)
36 except AJAXException, e:
37 x = (False, e[0])
38 except PassthruException, e:
39 return str(e)
40
41 return simplejson.dumps(x)
42 return decorator
43
44 def cleanupSession(id):
45 try:
46 del Sessions[id]
47 except KeyError:
48 pass
49
50 class IRCSession:
51 def __init__(self, id):
52 self.id = id
53 self.subscriptions = []
54 self.buffer = []
55 self.throttle = 0
56 self.schedule = None
57 self.closed = False
58 self.cleanupschedule = None
59
60 def subscribe(self, channel, notifier):
61 timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, self.timeout, channel)
62 def cancel_timeout(result):
63 if channel in self.subscriptions:
64 self.subscriptions.remove(channel)
65 try:
66 timeout_entry.cancel()
67 except error.AlreadyCalled:
68 pass
69 notifier.addCallbacks(cancel_timeout, cancel_timeout)
70
71 if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
72 self.subscriptions.pop(0).close()
73
74 self.subscriptions.append(channel)
75 self.flush()
76
77 def timeout(self, channel):
78 if self.schedule:
79 return
80
81 channel.write(simplejson.dumps([]))
82 if channel in self.subscriptions:
83 self.subscriptions.remove(channel)
84
85 def flush(self, scheduled=False):
86 if scheduled:
87 self.schedule = None
88
89 if not self.buffer or not self.subscriptions:
90 return
91
92 t = time.time()
93
94 if t < self.throttle:
95 if not self.schedule:
96 self.schedule = reactor.callLater(self.throttle - t, self.flush, True)
97 return
98 else:
99 # process the rest of the packet
100 if not scheduled:
101 if not self.schedule:
102 self.schedule = reactor.callLater(0, self.flush, True)
103 return
104
105 self.throttle = t + config.UPDATE_FREQ
106
107 encdata = simplejson.dumps(self.buffer)
108 self.buffer = []
109
110 newsubs = []
111 for x in self.subscriptions:
112 if x.write(encdata):
113 newsubs.append(x)
114
115 self.subscriptions = newsubs
116 if self.closed and not self.subscriptions:
117 cleanupSession(self.id)
118
119 def event(self, data):
120 bufferlen = sum(map(len, self.buffer))
121 if bufferlen + len(data) > config.MAXBUFLEN:
122 self.buffer = []
123 self.client.error("Buffer overflow.")
124 return
125
126 self.buffer.append(data)
127 self.flush()
128
129 def push(self, data):
130 if not self.closed:
131 self.client.write(data)
132
133 def disconnect(self):
134 # keep the session hanging around for a few seconds so the
135 # client has a chance to see what the issue was
136 self.closed = True
137
138 reactor.callLater(5, cleanupSession, self.id)
139
140 # DANGER! Breach of encapsulation!
141 def connect_notice(line):
142 return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line)
143
144 class Channel:
145 def __init__(self, request):
146 self.request = request
147
148 class SingleUseChannel(Channel):
149 def write(self, data):
150 self.request.write(data)
151 self.request.finish()
152 return False
153
154 def close(self):
155 self.request.finish()
156
157 class MultipleUseChannel(Channel):
158 def write(self, data):
159 self.request.write(data)
160 return True
161
162 class AJAXEngine(resource.Resource):
163 isLeaf = True
164
165 def __init__(self, prefix):
166 self.prefix = prefix
167 self.__connect_hit = HitCounter()
168 self.__total_hit = HitCounter()
169
170 @jsondump
171 def render_POST(self, request):
172 path = request.path[len(self.prefix):]
173 if path[0] == "/":
174 handler = self.COMMANDS.get(path[1:])
175 if handler is not None:
176 return handler(self, request)
177
178 raise PassthruException, http_error.NoResource().render(request)
179
180 def newConnection(self, request):
181 ticket = login_optional(request)
182
183 _, ip, port = request.transport.getPeer()
184
185 nick = request.args.get("nick")
186 if not nick:
187 raise AJAXException, "Nickname not supplied."
188 nick = ircclient.irc_decode(nick[0])
189
190 for i in xrange(10):
191 id = get_session_id()
192 if not Sessions.get(id):
193 break
194 else:
195 raise IDGenerationException()
196
197 session = IRCSession(id)
198
199 qticket = getSessionData(request).get("qticket")
200 if qticket is None:
201 perform = None
202 else:
203 service_mask = config.AUTH_SERVICE
204 msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1]
205 perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)]
206
207 ident, realname = config.IDENT, config.REALNAME
208 if ident is None:
209 ident = socket.inet_aton(ip).encode("hex")
210
211 self.__connect_hit()
212
213 def proceed(hostname):
214 client = ircclient.createIRC(session, nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname)
215 session.client = client
216
217 if config.WEBIRC_MODE != "hmac":
218 notice = lambda x: session.event(connect_notice(x))
219 notice("Looking up your hostname...")
220 def callback(hostname):
221 notice("Found your hostname.")
222 proceed(hostname)
223 def errback(failure):
224 notice("Couldn't look up your hostname!")
225 proceed(ip)
226 qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback)
227 else:
228 proceed(None) # hmac doesn't care
229
230 Sessions[id] = session
231
232 return id
233
234 def getSession(self, request):
235 bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page."
236
237 sessionid = request.args.get("s")
238 if sessionid is None:
239 raise AJAXException, bad_session_message
240
241 session = Sessions.get(sessionid[0])
242 if not session:
243 raise AJAXException, bad_session_message
244 return session
245
246 def subscribe(self, request):
247 request.channel.cancelTimeout()
248 self.getSession(request).subscribe(SingleUseChannel(request), request.notifyFinish())
249 return NOT_DONE_YET
250
251 def push(self, request):
252 command = request.args.get("c")
253 if command is None:
254 raise AJAXException, "No command specified."
255 self.__total_hit()
256
257 decoded = ircclient.irc_decode(command[0])
258
259 session = self.getSession(request)
260
261 if len(decoded) > config.MAXLINELEN:
262 session.disconnect()
263 raise AJAXException, "Line too long."
264
265 try:
266 session.push(decoded)
267 except AttributeError: # occurs when we haven't noticed an error
268 session.disconnect()
269 raise AJAXException, "Connection closed by server; try reconnecting by reloading the page."
270 except Exception, e: # catch all
271 session.disconnect()
272 traceback.print_exc(file=sys.stderr)
273 raise AJAXException, "Unknown error."
274
275 return True
276
277 def closeById(self, k):
278 s = Sessions.get(k)
279 if s is None:
280 return
281 s.client.client.error("Closed by admin interface")
282
283 @property
284 def adminEngine(self):
285 return {
286 "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed],
287 "Connections": [(self.__connect_hit,)],
288 "Total hits": [(self.__total_hit,)],
289 }
290
291 COMMANDS = dict(p=push, n=newConnection, s=subscribe)
292