]>
Commit | Line | Data |
---|---|---|
99844c15 | 1 | from twisted.web import resource, server, static, error as http_error |
9e769c12 | 2 | from twisted.names import client |
265f5ce3 | 3 | from twisted.internet import reactor, error |
ace37679 | 4 | from authgateengine import login_optional, getSessionData |
becfa850 | 5 | import md5, sys, os, time, config, qwebirc.config_options as config_options, traceback, socket |
85f01e3f CP |
6 | import qwebirc.ircclient as ircclient |
7 | from adminengine import AdminEngineAction | |
8 | from qwebirc.util import HitCounter | |
28c4ad01 | 9 | import qwebirc.dns as qdns |
becfa850 | 10 | import qwebirc.util.qjson as json |
c60795d6 CP |
11 | import urlparse |
12 | ||
13 | TRANSPORTS = ["longpoll"] | |
14 | ||
15 | try: | |
16 | import autobahn.websocket | |
17 | import autobahn.resource | |
18 | has_websocket = True | |
19 | TRANSPORTS.append("websocket") | |
20 | except ImportError: | |
21 | has_websocket = False | |
22 | ||
23 | BAD_SESSION_MESSAGE = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page." | |
9e769c12 CP |
24 | Sessions = {} |
25 | ||
26 | def get_session_id(): | |
4e4bbf26 | 27 | return md5.md5(os.urandom(16)).hexdigest() |
8dc46dfa CP |
28 | |
29 | class BufferOverflowException(Exception): | |
30 | pass | |
31 | ||
f59585a7 CP |
32 | class AJAXException(Exception): |
33 | pass | |
34 | ||
4094890f CP |
35 | class IDGenerationException(Exception): |
36 | pass | |
37 | ||
c60795d6 | 38 | class LineTooLongException(Exception): |
99844c15 | 39 | pass |
bdd008f9 | 40 | |
c60795d6 | 41 | EMPTY_JSON_LIST = json.dumps([]) |
9e769c12 | 42 | |
8dc46dfa CP |
43 | def cleanupSession(id): |
44 | try: | |
45 | del Sessions[id] | |
46 | except KeyError: | |
47 | pass | |
48 | ||
9e769c12 CP |
49 | class IRCSession: |
50 | def __init__(self, id): | |
51 | self.id = id | |
52 | self.subscriptions = [] | |
53 | self.buffer = [] | |
8932790b | 54 | self.buflen = 0 |
9e769c12 CP |
55 | self.throttle = 0 |
56 | self.schedule = None | |
8dc46dfa CP |
57 | self.closed = False |
58 | self.cleanupschedule = None | |
59 | ||
c60795d6 | 60 | def subscribe(self, channel): |
0df6faa6 | 61 | if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS: |
4e221566 | 62 | self.subscriptions.pop(0).close() |
0df6faa6 | 63 | |
9e769c12 CP |
64 | self.subscriptions.append(channel) |
65 | self.flush() | |
c60795d6 CP |
66 | |
67 | def unsubscribe(self, channel): | |
68 | try: | |
69 | self.subscriptions.remove(channel) | |
70 | except ValueError: | |
71 | pass | |
72 | ||
265f5ce3 CP |
73 | def timeout(self, channel): |
74 | if self.schedule: | |
75 | return | |
76 | ||
becfa850 | 77 | channel.write(EMPTY_JSON_LIST) |
265f5ce3 CP |
78 | if channel in self.subscriptions: |
79 | self.subscriptions.remove(channel) | |
80 | ||
9e769c12 CP |
81 | def flush(self, scheduled=False): |
82 | if scheduled: | |
83 | self.schedule = None | |
84 | ||
85 | if not self.buffer or not self.subscriptions: | |
86 | return | |
87 | ||
88 | t = time.time() | |
89 | ||
90 | if t < self.throttle: | |
91 | if not self.schedule: | |
92 | self.schedule = reactor.callLater(self.throttle - t, self.flush, True) | |
93 | return | |
94 | else: | |
95 | # process the rest of the packet | |
96 | if not scheduled: | |
97 | if not self.schedule: | |
98 | self.schedule = reactor.callLater(0, self.flush, True) | |
99 | return | |
100 | ||
101 | self.throttle = t + config.UPDATE_FREQ | |
102 | ||
becfa850 | 103 | encdata = json.dumps(self.buffer) |
9e769c12 | 104 | self.buffer = [] |
8932790b CP |
105 | self.buflen = 0 |
106 | ||
9e769c12 CP |
107 | newsubs = [] |
108 | for x in self.subscriptions: | |
109 | if x.write(encdata): | |
110 | newsubs.append(x) | |
111 | ||
112 | self.subscriptions = newsubs | |
8dc46dfa CP |
113 | if self.closed and not self.subscriptions: |
114 | cleanupSession(self.id) | |
115 | ||
9e769c12 | 116 | def event(self, data): |
8932790b CP |
117 | newbuflen = self.buflen + len(data) |
118 | if newbuflen > config.MAXBUFLEN: | |
8dc46dfa | 119 | self.buffer = [] |
99844c15 | 120 | self.client.error("Buffer overflow.") |
8dc46dfa CP |
121 | return |
122 | ||
9e769c12 | 123 | self.buffer.append(data) |
8932790b | 124 | self.buflen = newbuflen |
9e769c12 CP |
125 | self.flush() |
126 | ||
127 | def push(self, data): | |
c60795d6 CP |
128 | if self.closed: |
129 | return | |
130 | ||
131 | if len(data) > config.MAXLINELEN: | |
132 | raise LineTooLongException | |
133 | ||
134 | self.client.write(data) | |
8dc46dfa CP |
135 | |
136 | def disconnect(self): | |
137 | # keep the session hanging around for a few seconds so the | |
138 | # client has a chance to see what the issue was | |
139 | self.closed = True | |
140 | ||
141 | reactor.callLater(5, cleanupSession, self.id) | |
142 | ||
28c4ad01 CP |
143 | # DANGER! Breach of encapsulation! |
144 | def connect_notice(line): | |
145 | return "c", "NOTICE", "", ("AUTH", "*** (qwebirc) %s" % line) | |
146 | ||
c60795d6 | 147 | class RequestChannel(object): |
9e769c12 CP |
148 | def __init__(self, request): |
149 | self.request = request | |
c60795d6 | 150 | |
9e769c12 CP |
151 | def write(self, data): |
152 | self.request.write(data) | |
153 | self.request.finish() | |
154 | return False | |
155 | ||
4e221566 CP |
156 | def close(self): |
157 | self.request.finish() | |
158 | ||
9e769c12 CP |
159 | class AJAXEngine(resource.Resource): |
160 | isLeaf = True | |
161 | ||
162 | def __init__(self, prefix): | |
163 | self.prefix = prefix | |
85f01e3f CP |
164 | self.__connect_hit = HitCounter() |
165 | self.__total_hit = HitCounter() | |
166 | ||
57ea572e | 167 | def render_POST(self, request): |
9e769c12 | 168 | path = request.path[len(self.prefix):] |
f59585a7 CP |
169 | if path[0] == "/": |
170 | handler = self.COMMANDS.get(path[1:]) | |
171 | if handler is not None: | |
c60795d6 CP |
172 | try: |
173 | return handler(self, request) | |
174 | except AJAXException, e: | |
175 | return json.dumps((False, e[0])) | |
176 | ||
177 | return "404" ## TODO: tidy up | |
f59585a7 | 178 | |
f59585a7 | 179 | def newConnection(self, request): |
f065bc69 CP |
180 | ticket = login_optional(request) |
181 | ||
23f85e9b | 182 | ip = request.getClientIP() |
9e769c12 | 183 | |
c70a7ff6 | 184 | nick = request.args.get("nick") |
f59585a7 | 185 | if not nick: |
99844c15 | 186 | raise AJAXException, "Nickname not supplied." |
c70a7ff6 | 187 | nick = ircclient.irc_decode(nick[0]) |
57ea572e | 188 | |
6ce70043 | 189 | password = request.args.get("password") |
2f74dea9 CP |
190 | if password is not None: |
191 | password = ircclient.irc_decode(password[0]) | |
192 | ||
c60795d6 | 193 | for i in range(10): |
f59585a7 CP |
194 | id = get_session_id() |
195 | if not Sessions.get(id): | |
196 | break | |
197 | else: | |
198 | raise IDGenerationException() | |
9e769c12 | 199 | |
f59585a7 | 200 | session = IRCSession(id) |
9e769c12 | 201 | |
ace37679 CP |
202 | qticket = getSessionData(request).get("qticket") |
203 | if qticket is None: | |
204 | perform = None | |
205 | else: | |
348574ee CP |
206 | service_mask = config.AUTH_SERVICE |
207 | msg_mask = service_mask.split("!")[0] + "@" + service_mask.split("@", 1)[1] | |
208 | perform = ["PRIVMSG %s :TICKETAUTH %s" % (msg_mask, qticket)] | |
ace37679 | 209 | |
b5c84380 | 210 | ident, realname = config.IDENT, config.REALNAME |
930be88a | 211 | if ident is config_options.IDENT_HEX or ident is None: # latter is legacy |
b5c84380 | 212 | ident = socket.inet_aton(ip).encode("hex") |
930be88a CP |
213 | elif ident is config_options.IDENT_NICKNAME: |
214 | ident = nick | |
b5c84380 | 215 | |
85f01e3f | 216 | self.__connect_hit() |
28c4ad01 CP |
217 | |
218 | def proceed(hostname): | |
2f74dea9 CP |
219 | kwargs = dict(nick=nick, ident=ident, ip=ip, realname=realname, perform=perform, hostname=hostname) |
220 | if password is not None: | |
221 | kwargs["password"] = password | |
222 | ||
223 | client = ircclient.createIRC(session, **kwargs) | |
28c4ad01 CP |
224 | session.client = client |
225 | ||
930be88a CP |
226 | if not hasattr(config, "WEBIRC_MODE") or config.WEBIRC_MODE == "hmac": |
227 | proceed(None) | |
228 | elif config.WEBIRC_MODE != "hmac": | |
28c4ad01 CP |
229 | notice = lambda x: session.event(connect_notice(x)) |
230 | notice("Looking up your hostname...") | |
231 | def callback(hostname): | |
232 | notice("Found your hostname.") | |
233 | proceed(hostname) | |
234 | def errback(failure): | |
235 | notice("Couldn't look up your hostname!") | |
236 | proceed(ip) | |
237 | qdns.lookupAndVerifyPTR(ip, timeout=[config.DNS_TIMEOUT]).addCallbacks(callback, errback) | |
28c4ad01 | 238 | |
f59585a7 CP |
239 | Sessions[id] = session |
240 | ||
c60795d6 | 241 | return json.dumps((True, id, TRANSPORTS)) |
f59585a7 CP |
242 | |
243 | def getSession(self, request): | |
71afd444 CP |
244 | bad_session_message = "Invalid session, this most likely means the server has restarted; close this dialog and then try refreshing the page." |
245 | ||
f59585a7 CP |
246 | sessionid = request.args.get("s") |
247 | if sessionid is None: | |
71afd444 | 248 | raise AJAXException, bad_session_message |
9e769c12 | 249 | |
f59585a7 CP |
250 | session = Sessions.get(sessionid[0]) |
251 | if not session: | |
71afd444 | 252 | raise AJAXException, bad_session_message |
f59585a7 | 253 | return session |
8dc46dfa | 254 | |
f59585a7 | 255 | def subscribe(self, request): |
1d924d97 | 256 | request.channel.cancelTimeout() |
c60795d6 CP |
257 | |
258 | channel = RequestChannel(request) | |
259 | session = self.getSession(request) | |
260 | notifier = request.notifyFinish() | |
261 | session.subscribe(channel) | |
262 | ||
263 | timeout_entry = reactor.callLater(config.HTTP_AJAX_REQUEST_TIMEOUT, session.timeout, channel) | |
264 | def cancel_timeout(result): | |
265 | session.unsubscribe(self) | |
266 | try: | |
267 | timeout_entry.cancel() | |
268 | except error.AlreadyCalled: | |
269 | pass | |
270 | notifier.addCallbacks(cancel_timeout, cancel_timeout) | |
271 | return server.NOT_DONE_YET | |
9e769c12 | 272 | |
f59585a7 CP |
273 | def push(self, request): |
274 | command = request.args.get("c") | |
275 | if command is None: | |
99844c15 | 276 | raise AJAXException, "No command specified." |
85f01e3f CP |
277 | self.__total_hit() |
278 | ||
f59585a7 | 279 | session = self.getSession(request) |
f59585a7 | 280 | try: |
c60795d6 | 281 | session.push(ircclient.irc_decode(command[0])) |
f59585a7 CP |
282 | except AttributeError: # occurs when we haven't noticed an error |
283 | session.disconnect() | |
99844c15 | 284 | raise AJAXException, "Connection closed by server; try reconnecting by reloading the page." |
f59585a7 CP |
285 | except Exception, e: # catch all |
286 | session.disconnect() | |
287 | traceback.print_exc(file=sys.stderr) | |
71afd444 | 288 | raise AJAXException, "Unknown error." |
f59585a7 | 289 | |
c60795d6 | 290 | return json.dumps((True, True)) |
f59585a7 | 291 | |
85f01e3f CP |
292 | def closeById(self, k): |
293 | s = Sessions.get(k) | |
294 | if s is None: | |
295 | return | |
296 | s.client.client.error("Closed by admin interface") | |
297 | ||
298 | @property | |
299 | def adminEngine(self): | |
300 | return { | |
301 | "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed], | |
302 | "Connections": [(self.__connect_hit,)], | |
303 | "Total hits": [(self.__total_hit,)], | |
304 | } | |
305 | ||
f59585a7 | 306 | COMMANDS = dict(p=push, n=newConnection, s=subscribe) |
b5c84380 | 307 | |
c60795d6 CP |
308 | if has_websocket: |
309 | class WebSocketChannel(object): | |
310 | def __init__(self, channel): | |
311 | self.channel = channel | |
312 | ||
313 | def write(self, data): | |
314 | self.channel.send("c", data) | |
315 | return True | |
316 | ||
317 | def close(self): | |
318 | self.channel.close() | |
319 | ||
320 | class WebSocketEngineProtocol(autobahn.websocket.WebSocketServerProtocol): | |
321 | AWAITING_AUTH, AUTHED = 0, 1 | |
322 | ||
323 | def __init__(self, *args, **kwargs): | |
324 | self.__state = self.AWAITING_AUTH | |
325 | self.__session = None | |
326 | self.__channel = None | |
327 | self.__timeout = None | |
328 | ||
329 | def onOpen(self): | |
330 | self.__timeout = reactor.callLater(5, self.close, "Authentication timeout") | |
331 | ||
332 | def onClose(self, wasClean, code, reason): | |
333 | self.__cancelTimeout() | |
334 | if self.__session: | |
335 | self.__session.unsubscribe(self.__channel) | |
336 | self.__session = None | |
337 | ||
338 | def onMessage(self, msg, binary): | |
339 | # we don't bother checking the Origin header, as if you can auth then you've been able to pass the browser's | |
340 | # normal origin handling (POSTed the new connection request and managed to get the session id) | |
341 | state = self.__state | |
342 | message_type, message = msg[:1], msg[1:] | |
343 | if state == self.AWAITING_AUTH: | |
344 | if message_type == "s": # subscribe | |
345 | session = Sessions.get(message) | |
346 | if not session: | |
347 | self.close(BAD_SESSION_MESSAGE) | |
348 | return | |
349 | ||
350 | self.__cancelTimeout() | |
351 | self.__session = session | |
352 | self.send("s", "True") | |
353 | self.__state = self.AUTHED | |
354 | self.__channel = WebSocketChannel(self) | |
355 | session.subscribe(self.__channel) | |
356 | return | |
357 | elif state == self.AUTHED: | |
358 | if message_type == "p": # push | |
359 | self.__session.push(ircclient.irc_decode(message)) | |
360 | return | |
361 | ||
362 | self.close("Bad message type") | |
363 | ||
364 | def __cancelTimeout(self): | |
365 | if self.__timeout is not None: | |
366 | self.__timeout.cancel() | |
367 | self.__timeout = None | |
368 | ||
369 | def close(self, reason=None): | |
370 | self.__cancelTimeout() | |
371 | if reason: | |
372 | self.sendClose(4999, reason) | |
373 | else: | |
374 | self.sendClose(4998) | |
375 | ||
376 | if self.__session: | |
377 | self.__session.unsubscribe(self.__channel) | |
378 | self.__session = None | |
379 | ||
380 | def send(self, message_type, message): | |
381 | self.sendMessage(message_type + message) | |
382 | ||
383 | class WebSocketResource(autobahn.resource.WebSocketResource): | |
384 | def render(self, request): | |
385 | request.channel.cancelTimeout() | |
386 | return autobahn.resource.WebSocketResource.render(self, request) | |
387 | ||
388 | def WebSocketEngine(path=None): | |
389 | parsed = urlparse.urlparse(config.BASE_URL) | |
390 | port = parsed.port | |
391 | if port is None: | |
392 | if parsed.scheme == "http": | |
393 | port = 80 | |
394 | elif parsed.scheme == "https": | |
395 | port = 443 | |
396 | else: | |
397 | raise Exception("Unable to determine port from BASE_URL: " + config.BASE_URL) | |
398 | ||
399 | factory = autobahn.websocket.WebSocketServerFactory("ws://localhost:%d" % port) | |
400 | factory.protocol = WebSocketEngineProtocol | |
401 | factory.setProtocolOptions(maxMessagePayloadSize=512, maxFramePayloadSize=512, tcpNoDelay=False) | |
402 | resource = WebSocketResource(factory) | |
403 | return resource |