]> jfr.im git - irc/weechat/qweechat.git/blob - qweechat/weechat/protocol.py
Remove "Running with PySide6" from about dialog
[irc/weechat/qweechat.git] / qweechat / weechat / protocol.py
1 # -*- coding: utf-8 -*-
2 #
3 # protocol.py - decode binary messages received from WeeChat/relay
4 #
5 # Copyright (C) 2011-2021 Sébastien Helleu <flashcode@flashtux.org>
6 #
7 # This file is part of QWeeChat, a Qt remote GUI for WeeChat.
8 #
9 # QWeeChat is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # QWeeChat is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 #
24 # For info about protocol and format of messages, please read document
25 # "WeeChat Relay Protocol", available at: https://weechat.org/doc/
26 #
27 # History:
28 #
29 # 2011-11-23, Sébastien Helleu <flashcode@flashtux.org>:
30 # start dev
31 #
32
33 import collections
34 import struct
35 import zlib
36
37
38 class WeechatDict(collections.OrderedDict):
39 def __str__(self):
40 return '{%s}' % ', '.join(
41 ['%s: %s' % (repr(key), repr(self[key])) for key in self])
42
43
44 class WeechatObject:
45 def __init__(self, objtype, value, separator='\n'):
46 self.objtype = objtype
47 self.value = value
48 self.separator = separator
49 self.indent = ' ' if separator == '\n' else ''
50 self.separator1 = '\n%s' % self.indent if separator == '\n' else ''
51
52 def _str_value(self, v):
53 if type(v) is str and v is not None:
54 return '\'%s\'' % v
55 return str(v)
56
57 def _str_value_hdata(self):
58 lines = ['%skeys: %s%s%spath: %s' % (self.separator1,
59 str(self.value['keys']),
60 self.separator,
61 self.indent,
62 str(self.value['path']))]
63 for i, item in enumerate(self.value['items']):
64 lines.append(' item %d:%s%s' % (
65 (i + 1), self.separator,
66 self.separator.join(
67 ['%s%s: %s' % (self.indent * 2, key,
68 self._str_value(value))
69 for key, value in item.items()])))
70 return '\n'.join(lines)
71
72 def _str_value_infolist(self):
73 lines = ['%sname: %s' % (self.separator1, self.value['name'])]
74 for i, item in enumerate(self.value['items']):
75 lines.append(' item %d:%s%s' % (
76 (i + 1), self.separator,
77 self.separator.join(
78 ['%s%s: %s' % (self.indent * 2, key,
79 self._str_value(value))
80 for key, value in item.items()])))
81 return '\n'.join(lines)
82
83 def _str_value_other(self):
84 return self._str_value(self.value)
85
86 def __str__(self):
87 self._obj_cb = {
88 'hda': self._str_value_hdata,
89 'inl': self._str_value_infolist,
90 }
91 return '%s: %s' % (self.objtype,
92 self._obj_cb.get(self.objtype,
93 self._str_value_other)())
94
95
96 class WeechatObjects(list):
97 def __init__(self, separator='\n'):
98 self.separator = separator
99
100 def __str__(self):
101 return self.separator.join([str(obj) for obj in self])
102
103
104 class WeechatMessage:
105 def __init__(self, size, size_uncompressed, compression, uncompressed,
106 msgid, objects):
107 self.size = size
108 self.size_uncompressed = size_uncompressed
109 self.compression = compression
110 self.uncompressed = uncompressed
111 self.msgid = msgid
112 self.objects = objects
113
114 def __str__(self):
115 if self.compression != 0:
116 return 'size: %d/%d (%d%%), id=\'%s\', objects:\n%s' % (
117 self.size, self.size_uncompressed,
118 100 - ((self.size * 100) // self.size_uncompressed),
119 self.msgid, self.objects)
120 else:
121 return 'size: %d, id=\'%s\', objects:\n%s' % (self.size,
122 self.msgid,
123 self.objects)
124
125
126 class Protocol:
127 """Decode binary message received from WeeChat/relay."""
128
129 def __init__(self):
130 self._obj_cb = {
131 'chr': self._obj_char,
132 'int': self._obj_int,
133 'lon': self._obj_long,
134 'str': self._obj_str,
135 'buf': self._obj_buffer,
136 'ptr': self._obj_ptr,
137 'tim': self._obj_time,
138 'htb': self._obj_hashtable,
139 'hda': self._obj_hdata,
140 'inf': self._obj_info,
141 'inl': self._obj_infolist,
142 'arr': self._obj_array,
143 }
144
145 def _obj_type(self):
146 """Read type in data (3 chars)."""
147 if len(self.data) < 3:
148 self.data = ''
149 return ''
150 objtype = self.data[0:3].decode()
151 self.data = self.data[3:]
152 return objtype
153
154 def _obj_len_data(self, length_size):
155 """Read length (1 or 4 bytes), then value with this length."""
156 if len(self.data) < length_size:
157 self.data = ''
158 return None
159 if length_size == 1:
160 length = struct.unpack('B', self.data[0:1])[0]
161 self.data = self.data[1:]
162 else:
163 length = self._obj_int()
164 if length < 0:
165 return None
166 if length > 0:
167 value = self.data[0:length]
168 self.data = self.data[length:]
169 else:
170 value = ''
171 return value
172
173 def _obj_char(self):
174 """Read a char in data."""
175 if len(self.data) < 1:
176 return 0
177 value = struct.unpack('b', self.data[0:1])[0]
178 self.data = self.data[1:]
179 return value
180
181 def _obj_int(self):
182 """Read an integer in data (4 bytes)."""
183 if len(self.data) < 4:
184 self.data = ''
185 return 0
186 value = struct.unpack('>i', self.data[0:4])[0]
187 self.data = self.data[4:]
188 return value
189
190 def _obj_long(self):
191 """Read a long integer in data (length on 1 byte + value as string)."""
192 value = self._obj_len_data(1)
193 if value is None:
194 return None
195 return int(value)
196
197 def _obj_str(self):
198 """Read a string in data (length on 4 bytes + content)."""
199 value = self._obj_len_data(4)
200 if value in ("", None):
201 return ""
202 return value.decode()
203
204 def _obj_buffer(self):
205 """Read a buffer in data (length on 4 bytes + data)."""
206 return self._obj_len_data(4)
207
208 def _obj_ptr(self):
209 """Read a pointer in data (length on 1 byte + value as string)."""
210 value = self._obj_len_data(1)
211 if value is None:
212 return None
213 return '0x%s' % value
214
215 def _obj_time(self):
216 """Read a time in data (length on 1 byte + value as string)."""
217 value = self._obj_len_data(1)
218 if value is None:
219 return None
220 return int(value)
221
222 def _obj_hashtable(self):
223 """
224 Read a hashtable in data
225 (type for keys + type for values + count + items).
226 """
227 type_keys = self._obj_type()
228 type_values = self._obj_type()
229 count = self._obj_int()
230 hashtable = WeechatDict()
231 for _ in range(count):
232 key = self._obj_cb[type_keys]()
233 value = self._obj_cb[type_values]()
234 hashtable[key] = value
235 return hashtable
236
237 def _obj_hdata(self):
238 """Read a hdata in data."""
239 path = self._obj_str()
240 keys = self._obj_str()
241 count = self._obj_int()
242 list_path = path.split('/') if path else []
243 list_keys = keys.split(',') if keys else []
244 keys_types = []
245 dict_keys = WeechatDict()
246 for key in list_keys:
247 items = key.split(':')
248 keys_types.append(items)
249 dict_keys[items[0]] = items[1]
250 items = []
251 for _ in range(count):
252 item = WeechatDict()
253 item['__path'] = []
254 pointers = []
255 for _ in enumerate(list_path):
256 pointers.append(self._obj_ptr())
257 for key, objtype in keys_types:
258 item[key] = self._obj_cb[objtype]()
259 item['__path'] = pointers
260 items.append(item)
261 return {
262 'path': list_path,
263 'keys': dict_keys,
264 'count': count,
265 'items': items,
266 }
267
268 def _obj_info(self):
269 """Read an info in data."""
270 name = self._obj_str()
271 value = self._obj_str()
272 return (name, value)
273
274 def _obj_infolist(self):
275 """Read an infolist in data."""
276 name = self._obj_str()
277 count_items = self._obj_int()
278 items = []
279 for _ in range(count_items):
280 count_vars = self._obj_int()
281 variables = WeechatDict()
282 for _ in range(count_vars):
283 var_name = self._obj_str()
284 var_type = self._obj_type()
285 var_value = self._obj_cb[var_type]()
286 variables[var_name] = var_value
287 items.append(variables)
288 return {
289 'name': name,
290 'items': items
291 }
292
293 def _obj_array(self):
294 """Read an array of values in data."""
295 type_values = self._obj_type()
296 count_values = self._obj_int()
297 values = []
298 for _ in range(count_values):
299 values.append(self._obj_cb[type_values]())
300 return values
301
302 def decode(self, data, separator='\n'):
303 """Decode binary data and return list of objects."""
304 self.data = data
305 size = len(self.data)
306 size_uncompressed = size
307 uncompressed = None
308 # uncompress data (if it is compressed)
309 compression = struct.unpack('b', self.data[4:5])[0]
310 if compression:
311 uncompressed = zlib.decompress(self.data[5:])
312 size_uncompressed = len(uncompressed) + 5
313 uncompressed = b'%s%s%s' % (struct.pack('>i', size_uncompressed),
314 struct.pack('b', 0), uncompressed)
315 self.data = uncompressed
316 else:
317 uncompressed = self.data[:]
318 # skip length and compression flag
319 self.data = self.data[5:]
320 # read id
321 msgid = self._obj_str()
322 if msgid is None:
323 msgid = ''
324 # read objects
325 objects = WeechatObjects(separator=separator)
326 while len(self.data) > 0:
327 objtype = self._obj_type()
328 value = self._obj_cb[objtype]()
329 objects.append(WeechatObject(objtype, value, separator=separator))
330 return WeechatMessage(size, size_uncompressed, compression,
331 uncompressed, msgid, objects)
332
333
334 def hex_and_ascii(data, bytes_per_line=10):
335 """Convert a QByteArray to hex + ascii output."""
336 num_lines = ((len(data) - 1) // bytes_per_line) + 1
337 if num_lines == 0:
338 return ''
339 lines = []
340 for i in range(num_lines):
341 str_hex = []
342 str_ascii = []
343 for j in range(bytes_per_line):
344 # We can't easily iterate over individual bytes, so we are going to
345 # do it this way.
346 index = (i*bytes_per_line) + j
347 char = data[index:index+1]
348 if not char:
349 char = b'x'
350 byte = struct.unpack('B', char)[0]
351 str_hex.append(b'%02X' % int(byte))
352 if byte >= 32 and byte <= 127:
353 str_ascii.append(char)
354 else:
355 str_ascii.append(b'.')
356 fmt = b'%%-%ds %%s' % ((bytes_per_line * 3) - 1)
357 lines.append(fmt % (b' '.join(str_hex),
358 b''.join(str_ascii)))
359 return b'\n'.join(lines)