]> jfr.im git - irc/quakenet/qwebirc.git/blob - qwebirc/engines/ajaxengine.py
Add admin engine and reorganise a lot of directory structure.
[irc/quakenet/qwebirc.git] / qwebirc / engines / ajaxengine.py
1 from twisted.web import resource, server, static
2 from twisted.names import client
3 from twisted.internet import reactor
4 from authgateengine import login_optional, getSessionData
5 import simplejson, md5, sys, os, time, config, weakref, traceback
6 import qwebirc.ircclient as ircclient
7 from adminengine import AdminEngineAction
8 from qwebirc.util import HitCounter
9
10 Sessions = {}
11
12 def get_session_id():
13 return md5.md5(os.urandom(16)).hexdigest()
14
15 class BufferOverflowException(Exception):
16 pass
17
18 class AJAXException(Exception):
19 pass
20
21 class IDGenerationException(Exception):
22 pass
23
24 NOT_DONE_YET = None
25
26 def jsondump(fn):
27 def decorator(*args, **kwargs):
28 try:
29 x = fn(*args, **kwargs)
30 if x is None:
31 return server.NOT_DONE_YET
32 x = (True, x)
33 except AJAXException, e:
34 x = (False, e[0])
35
36 return simplejson.dumps(x)
37 return decorator
38
39 def cleanupSession(id):
40 try:
41 del Sessions[id]
42 except KeyError:
43 pass
44
45 class IRCSession:
46 def __init__(self, id):
47 self.id = id
48 self.subscriptions = []
49 self.buffer = []
50 self.throttle = 0
51 self.schedule = None
52 self.closed = False
53 self.cleanupschedule = None
54
55 def subscribe(self, channel):
56 if len(self.subscriptions) >= config.MAXSUBSCRIPTIONS:
57 self.subscriptions.pop(0).close()
58
59 self.subscriptions.append(channel)
60 self.flush()
61
62 def flush(self, scheduled=False):
63 if scheduled:
64 self.schedule = None
65
66 if not self.buffer or not self.subscriptions:
67 return
68
69 t = time.time()
70
71 if t < self.throttle:
72 if not self.schedule:
73 self.schedule = reactor.callLater(self.throttle - t, self.flush, True)
74 return
75 else:
76 # process the rest of the packet
77 if not scheduled:
78 if not self.schedule:
79 self.schedule = reactor.callLater(0, self.flush, True)
80 return
81
82 self.throttle = t + config.UPDATE_FREQ
83
84 encdata = simplejson.dumps(self.buffer)
85 self.buffer = []
86
87 newsubs = []
88 for x in self.subscriptions:
89 if x.write(encdata):
90 newsubs.append(x)
91
92 self.subscriptions = newsubs
93 if self.closed and not self.subscriptions:
94 cleanupSession(self.id)
95
96 def event(self, data):
97 bufferlen = sum(map(len, self.buffer))
98 if bufferlen + len(data) > config.MAXBUFLEN:
99 self.buffer = []
100 self.client.error("Buffer overflow")
101 return
102
103 self.buffer.append(data)
104 self.flush()
105
106 def push(self, data):
107 if not self.closed:
108 self.client.write(data)
109
110 def disconnect(self):
111 # keep the session hanging around for a few seconds so the
112 # client has a chance to see what the issue was
113 self.closed = True
114
115 reactor.callLater(5, cleanupSession, self.id)
116
117 class Channel:
118 def __init__(self, request):
119 self.request = request
120
121 class SingleUseChannel(Channel):
122 def write(self, data):
123 self.request.write(data)
124 self.request.finish()
125 return False
126
127 def close(self):
128 self.request.finish()
129
130 class MultipleUseChannel(Channel):
131 def write(self, data):
132 self.request.write(data)
133 return True
134
135 class AJAXEngine(resource.Resource):
136 isLeaf = True
137
138 def __init__(self, prefix):
139 self.prefix = prefix
140 self.__connect_hit = HitCounter()
141 self.__total_hit = HitCounter()
142
143 @jsondump
144 def render_POST(self, request):
145 path = request.path[len(self.prefix):]
146 if path[0] == "/":
147 handler = self.COMMANDS.get(path[1:])
148 if handler is not None:
149 return handler(self, request)
150 raise AJAXException("404")
151
152 # def render_GET(self, request):
153 # return self.render_POST(request)
154
155 def newConnection(self, request):
156 ticket = login_optional(request)
157
158 _, ip, port = request.transport.getPeer()
159
160 nick, ident, realname = request.args.get("nick"), "webchat", config.REALNAME
161
162 if not nick:
163 raise AJAXException("Nickname not supplied")
164
165 nick = nick[0]
166
167 for i in xrange(10):
168 id = get_session_id()
169 if not Sessions.get(id):
170 break
171 else:
172 raise IDGenerationException()
173
174 session = IRCSession(id)
175
176 qticket = getSessionData(request).get("qticket")
177 if qticket is None:
178 perform = None
179 else:
180 perform = ["PRIVMSG %s :TICKETAUTH %s" % (config.QBOT, qticket)]
181
182 self.__connect_hit()
183 client = ircclient.createIRC(session, nick=nick, ident=ident, ip=ip, realname=realname, perform=perform)
184 session.client = client
185
186 Sessions[id] = session
187
188 return id
189
190 def getSession(self, request):
191 sessionid = request.args.get("s")
192 if sessionid is None:
193 raise AJAXException("Bad session ID")
194
195 session = Sessions.get(sessionid[0])
196 if not session:
197 raise AJAXException("Bad session ID")
198 return session
199
200 def subscribe(self, request):
201 self.getSession(request).subscribe(SingleUseChannel(request))
202 return NOT_DONE_YET
203
204 def push(self, request):
205 command = request.args.get("c")
206 if command is None:
207 raise AJAXException("No command specified")
208 self.__total_hit()
209
210 command = command[0]
211
212 session = self.getSession(request)
213
214 try:
215 decoded = command.decode("utf-8")
216 except UnicodeDecodeError:
217 decoded = command.decode("iso-8859-1", "ignore")
218
219 if len(decoded) > config.MAXLINELEN:
220 session.disconnect()
221 raise AJAXException("Line too long")
222
223 try:
224 session.push(decoded)
225 except AttributeError: # occurs when we haven't noticed an error
226 session.disconnect()
227 raise AJAXException("Connection closed by server.")
228 except Exception, e: # catch all
229 session.disconnect()
230 traceback.print_exc(file=sys.stderr)
231 raise AJAXException("Unknown error.")
232
233 return True
234
235 def closeById(self, k):
236 s = Sessions.get(k)
237 if s is None:
238 return
239 s.client.client.error("Closed by admin interface")
240
241 @property
242 def adminEngine(self):
243 return {
244 "Sessions": [(str(v.client.client), AdminEngineAction("close", self.closeById, k)) for k, v in Sessions.iteritems() if not v.closed],
245 "Connections": [(self.__connect_hit,)],
246 "Total hits": [(self.__total_hit,)],
247 }
248
249 COMMANDS = dict(p=push, n=newConnection, s=subscribe)
250