]>
jfr.im git - irc/weechat/qweechat.git/blob - qweechat/weechat/protocol.py
1 # -*- coding: utf-8 -*-
3 # protocol.py - decode binary messages received from WeeChat/relay
5 # Copyright (C) 2011-2021 Sébastien Helleu <flashcode@flashtux.org>
7 # This file is part of QWeeChat, a Qt remote GUI for WeeChat.
9 # QWeeChat is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # QWeeChat is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
24 # For info about protocol and format of messages, please read document
25 # "WeeChat Relay Protocol", available at: https://weechat.org/doc/
29 # 2011-11-23, Sébastien Helleu <flashcode@flashtux.org>:
38 class WeechatDict(collections
.OrderedDict
):
40 return '{%s}' % ', '.join(
41 ['%s: %s' % (repr(key
), repr(self
[key
])) for key
in self
])
45 def __init__(self
, objtype
, value
, separator
='\n'):
46 self
.objtype
= objtype
48 self
.separator
= separator
49 self
.indent
= ' ' if separator
== '\n' else ''
50 self
.separator1
= '\n%s' % self
.indent
if separator
== '\n' else ''
52 def _str_value(self
, v
):
53 if type(v
) is str and v
is not None:
57 def _str_value_hdata(self
):
58 lines
= ['%skeys: %s%s%spath: %s' % (self
.separator1
,
59 str(self
.value
['keys']),
62 str(self
.value
['path']))]
63 for i
, item
in enumerate(self
.value
['items']):
64 lines
.append(' item %d:%s%s' % (
65 (i
+ 1), self
.separator
,
67 ['%s%s: %s' % (self
.indent
* 2, key
,
68 self
._str
_value
(value
))
69 for key
, value
in item
.items()])))
70 return '\n'.join(lines
)
72 def _str_value_infolist(self
):
73 lines
= ['%sname: %s' % (self
.separator1
, self
.value
['name'])]
74 for i
, item
in enumerate(self
.value
['items']):
75 lines
.append(' item %d:%s%s' % (
76 (i
+ 1), self
.separator
,
78 ['%s%s: %s' % (self
.indent
* 2, key
,
79 self
._str
_value
(value
))
80 for key
, value
in item
.items()])))
81 return '\n'.join(lines
)
83 def _str_value_other(self
):
84 return self
._str
_value
(self
.value
)
88 'hda': self
._str
_value
_hdata
,
89 'inl': self
._str
_value
_infolist
,
91 return '%s: %s' % (self
.objtype
,
92 self
._obj
_cb
.get(self
.objtype
,
93 self
._str
_value
_other
)())
96 class WeechatObjects(list):
97 def __init__(self
, separator
='\n'):
98 self
.separator
= separator
101 return self
.separator
.join([str(obj
) for obj
in self
])
104 class WeechatMessage
:
105 def __init__(self
, size
, size_uncompressed
, compression
, uncompressed
,
108 self
.size_uncompressed
= size_uncompressed
109 self
.compression
= compression
110 self
.uncompressed
= uncompressed
112 self
.objects
= objects
115 if self
.compression
!= 0:
116 return 'size: %d/%d (%d%%), id=\'%s\', objects:\n%s' % (
117 self
.size
, self
.size_uncompressed
,
118 100 - ((self
.size
* 100) // self
.size_uncompressed
),
119 self
.msgid
, self
.objects
)
121 return 'size: %d, id=\'%s\', objects:\n%s' % (self
.size
,
127 """Decode binary message received from WeeChat/relay."""
131 'chr': self
._obj
_char
,
132 'int': self
._obj
_int
,
133 'lon': self
._obj
_long
,
134 'str': self
._obj
_str
,
135 'buf': self
._obj
_buffer
,
136 'ptr': self
._obj
_ptr
,
137 'tim': self
._obj
_time
,
138 'htb': self
._obj
_hashtable
,
139 'hda': self
._obj
_hdata
,
140 'inf': self
._obj
_info
,
141 'inl': self
._obj
_infolist
,
142 'arr': self
._obj
_array
,
146 """Read type in data (3 chars)."""
147 if len(self
.data
) < 3:
150 objtype
= self
.data
[0:3].decode()
151 self
.data
= self
.data
[3:]
154 def _obj_len_data(self
, length_size
):
155 """Read length (1 or 4 bytes), then value with this length."""
156 if len(self
.data
) < length_size
:
160 length
= struct
.unpack('B', self
.data
[0:1])[0]
161 self
.data
= self
.data
[1:]
163 length
= self
._obj
_int
()
167 value
= self
.data
[0:length
]
168 self
.data
= self
.data
[length
:]
174 """Read a char in data."""
175 if len(self
.data
) < 1:
177 value
= struct
.unpack('b', self
.data
[0:1])[0]
178 self
.data
= self
.data
[1:]
182 """Read an integer in data (4 bytes)."""
183 if len(self
.data
) < 4:
186 value
= struct
.unpack('>i', self
.data
[0:4])[0]
187 self
.data
= self
.data
[4:]
191 """Read a long integer in data (length on 1 byte + value as string)."""
192 value
= self
._obj
_len
_data
(1)
198 """Read a string in data (length on 4 bytes + content)."""
199 value
= self
._obj
_len
_data
(4)
200 if value
in ("", None):
202 return value
.decode()
204 def _obj_buffer(self
):
205 """Read a buffer in data (length on 4 bytes + data)."""
206 return self
._obj
_len
_data
(4)
209 """Read a pointer in data (length on 1 byte + value as string)."""
210 value
= self
._obj
_len
_data
(1)
213 return '0x%s' % value
216 """Read a time in data (length on 1 byte + value as string)."""
217 value
= self
._obj
_len
_data
(1)
222 def _obj_hashtable(self
):
224 Read a hashtable in data
225 (type for keys + type for values + count + items).
227 type_keys
= self
._obj
_type
()
228 type_values
= self
._obj
_type
()
229 count
= self
._obj
_int
()
230 hashtable
= WeechatDict()
231 for _
in range(count
):
232 key
= self
._obj
_cb
[type_keys
]()
233 value
= self
._obj
_cb
[type_values
]()
234 hashtable
[key
] = value
237 def _obj_hdata(self
):
238 """Read a hdata in data."""
239 path
= self
._obj
_str
()
240 keys
= self
._obj
_str
()
241 count
= self
._obj
_int
()
242 list_path
= path
.split('/') if path
else []
243 list_keys
= keys
.split(',') if keys
else []
245 dict_keys
= WeechatDict()
246 for key
in list_keys
:
247 items
= key
.split(':')
248 keys_types
.append(items
)
249 dict_keys
[items
[0]] = items
[1]
251 for _
in range(count
):
255 for _
in enumerate(list_path
):
256 pointers
.append(self
._obj
_ptr
())
257 for key
, objtype
in keys_types
:
258 item
[key
] = self
._obj
_cb
[objtype
]()
259 item
['__path'] = pointers
269 """Read an info in data."""
270 name
= self
._obj
_str
()
271 value
= self
._obj
_str
()
274 def _obj_infolist(self
):
275 """Read an infolist in data."""
276 name
= self
._obj
_str
()
277 count_items
= self
._obj
_int
()
279 for _
in range(count_items
):
280 count_vars
= self
._obj
_int
()
281 variables
= WeechatDict()
282 for _
in range(count_vars
):
283 var_name
= self
._obj
_str
()
284 var_type
= self
._obj
_type
()
285 var_value
= self
._obj
_cb
[var_type
]()
286 variables
[var_name
] = var_value
287 items
.append(variables
)
293 def _obj_array(self
):
294 """Read an array of values in data."""
295 type_values
= self
._obj
_type
()
296 count_values
= self
._obj
_int
()
298 for _
in range(count_values
):
299 values
.append(self
._obj
_cb
[type_values
]())
302 def decode(self
, data
, separator
='\n'):
303 """Decode binary data and return list of objects."""
305 size
= len(self
.data
)
306 size_uncompressed
= size
308 # uncompress data (if it is compressed)
309 compression
= struct
.unpack('b', self
.data
[4:5])[0]
311 uncompressed
= zlib
.decompress(self
.data
[5:])
312 size_uncompressed
= len(uncompressed
) + 5
313 uncompressed
= b
'%s%s%s' % (struct
.pack('>i', size_uncompressed
),
314 struct
.pack('b', 0), uncompressed
)
315 self
.data
= uncompressed
317 uncompressed
= self
.data
[:]
318 # skip length and compression flag
319 self
.data
= self
.data
[5:]
321 msgid
= self
._obj
_str
()
325 objects
= WeechatObjects(separator
=separator
)
326 while len(self
.data
) > 0:
327 objtype
= self
._obj
_type
()
328 value
= self
._obj
_cb
[objtype
]()
329 objects
.append(WeechatObject(objtype
, value
, separator
=separator
))
330 return WeechatMessage(size
, size_uncompressed
, compression
,
331 uncompressed
, msgid
, objects
)
334 def hex_and_ascii(data
, bytes_per_line
=10):
335 """Convert a QByteArray to hex + ascii output."""
336 num_lines
= ((len(data
) - 1) // bytes_per_line
) + 1
340 for i
in range(num_lines
):
343 for j
in range(bytes_per_line
):
344 # We can't easily iterate over individual bytes, so we are going to
346 index
= (i
*bytes_per_line
) + j
347 char
= data
[index
:index
+1]
350 byte
= struct
.unpack('B', char
)[0]
351 str_hex
.append(b
'%02X' % int(byte
))
352 if byte
>= 32 and byte
<= 127:
353 str_ascii
.append(char
)
355 str_ascii
.append(b
'.')
356 fmt
= b
'%%-%ds %%s' % ((bytes_per_line
* 3) - 1)
357 lines
.append(fmt
% (b
' '.join(str_hex
),
358 b
''.join(str_ascii
)))
359 return b
'\n'.join(lines
)