]> jfr.im git - irc/weechat/qweechat.git/blob - qweechat/network.py
d663dd8b93d2744833365aceed11e5ab94e6cf0f
[irc/weechat/qweechat.git] / qweechat / network.py
1 # -*- coding: utf-8 -*-
2 #
3 # network.py - I/O with WeeChat/relay
4 #
5 # Copyright (C) 2011-2021 SĂ©bastien Helleu <flashcode@flashtux.org>
6 #
7 # This file is part of QWeeChat, a Qt remote GUI for WeeChat.
8 #
9 # QWeeChat is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # QWeeChat is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """I/O with WeeChat/relay."""
24
25 import hashlib
26 import secrets
27 import struct
28
29 from PySide6 import QtCore, QtNetwork
30
31 from qweechat import config
32 from qweechat.debug import DebugDialog
33
34
35 # list of supported hash algorithms on our side
36 # (the hash algorithm will be negotiated with the remote WeeChat)
37 _HASH_ALGOS_LIST = [
38 'plain',
39 'sha256',
40 'sha512',
41 'pbkdf2+sha256',
42 'pbkdf2+sha512',
43 ]
44 _HASH_ALGOS = ':'.join(_HASH_ALGOS_LIST)
45
46 # handshake with remote WeeChat (before init)
47 _PROTO_HANDSHAKE = f'(handshake) handshake password_hash_algo={_HASH_ALGOS}\n'
48
49 # initialize with the password (plain text)
50 _PROTO_INIT_PWD = 'init password=%(password)s%(totp)s\n'
51
52 # initialize with the hashed password
53 _PROTO_INIT_HASH = ('init password_hash='
54 '%(algo)s:%(salt)s%(iter)s:%(hash)s%(totp)s\n')
55
56 _PROTO_SYNC = [
57 # get buffers
58 '(listbuffers) hdata buffer:gui_buffers(*) number,full_name,short_name,'
59 'type,nicklist,title,local_variables',
60 # get lines
61 '(listlines) hdata buffer:gui_buffers(*)/own_lines/last_line(-%(lines)d)/'
62 'data date,displayed,prefix,message',
63 # get nicklist for all buffers
64 '(nicklist) nicklist',
65 # enable synchronization
66 'sync',
67 ]
68
69 STATUS_DISCONNECTED = 'disconnected'
70 STATUS_CONNECTING = 'connecting'
71 STATUS_AUTHENTICATING = 'authenticating'
72 STATUS_CONNECTED = 'connected'
73
74 NETWORK_STATUS = {
75 STATUS_DISCONNECTED: {
76 'label': 'Disconnected',
77 'color': '#aa0000',
78 'icon': 'dialog-close.png',
79 },
80 STATUS_CONNECTING: {
81 'label': 'Connecting…',
82 'color': '#dd5f00',
83 'icon': 'dialog-warning.png',
84 },
85 STATUS_AUTHENTICATING: {
86 'label': 'Authenticating…',
87 'color': '#007fff',
88 'icon': 'dialog-password.png',
89 },
90 STATUS_CONNECTED: {
91 'label': 'Connected',
92 'color': 'green',
93 'icon': 'dialog-ok-apply.png',
94 },
95 }
96
97
98 class Network(QtCore.QObject):
99 """I/O with WeeChat/relay."""
100
101 statusChanged = QtCore.Signal(str, str)
102 messageFromWeechat = QtCore.Signal(QtCore.QByteArray)
103
104 def __init__(self, *args):
105 super().__init__(*args)
106 self._init_connection()
107 self.debug_lines = []
108 self.debug_dialog = None
109 self._lines = config.CONFIG_DEFAULT_RELAY_LINES
110 self._buffer = QtCore.QByteArray()
111 self._socket = QtNetwork.QSslSocket()
112 self._socket.connected.connect(self._socket_connected)
113 self._socket.readyRead.connect(self._socket_read)
114 self._socket.disconnected.connect(self._socket_disconnected)
115
116 def _init_connection(self):
117 self.status = STATUS_DISCONNECTED
118 self._server = None
119 self._port = None
120 self._ssl = None
121 self._password = None
122 self._totp = None
123 self._handshake_received = False
124 self._handshake_timer = None
125 self._handshake_timer = False
126 self._pwd_hash_algo = None
127 self._pwd_hash_iter = 0
128 self._server_nonce = None
129
130 def set_status(self, status):
131 """Set current status."""
132 self.status = status
133 self.statusChanged.emit(status, None)
134
135 def pbkdf2(self, hash_name, salt):
136 """Return hashed password with PBKDF2-HMAC."""
137 return hashlib.pbkdf2_hmac(
138 hash_name,
139 password=self._password.encode('utf-8'),
140 salt=salt,
141 iterations=self._pwd_hash_iter,
142 ).hex()
143
144 def _build_init_command(self):
145 """Build the init command to send to WeeChat."""
146 totp = f',totp={self._totp}' if self._totp else ''
147 if self._pwd_hash_algo == 'plain':
148 cmd = _PROTO_INIT_PWD % {
149 'password': self._password,
150 'totp': totp,
151 }
152 else:
153 client_nonce = secrets.token_bytes(16)
154 salt = self._server_nonce + client_nonce
155 pwd_hash = None
156 iterations = ''
157 if self._pwd_hash_algo == 'pbkdf2+sha512':
158 pwd_hash = self.pbkdf2('sha512', salt)
159 iterations = f':{self._pwd_hash_iter}'
160 elif self._pwd_hash_algo == 'pbkdf2+sha256':
161 pwd_hash = self.pbkdf2('sha256', salt)
162 iterations = f':{self._pwd_hash_iter}'
163 elif self._pwd_hash_algo == 'sha512':
164 pwd = salt + self._password.encode('utf-8')
165 pwd_hash = hashlib.sha512(pwd).hexdigest()
166 elif self._pwd_hash_algo == 'sha256':
167 pwd = salt + self._password.encode('utf-8')
168 pwd_hash = hashlib.sha256(pwd).hexdigest()
169 if not pwd_hash:
170 return None
171 cmd = _PROTO_INIT_HASH % {
172 'algo': self._pwd_hash_algo,
173 'salt': bytearray(salt).hex(),
174 'iter': iterations,
175 'hash': pwd_hash,
176 'totp': totp,
177 }
178 return cmd
179
180 def _build_sync_command(self):
181 """Build the sync commands to send to WeeChat."""
182 cmd = '\n'.join(_PROTO_SYNC) + '\n'
183 return cmd % {'lines': self._lines}
184
185 def handshake_timer_expired(self):
186 if self.status == STATUS_AUTHENTICATING:
187 self._pwd_hash_algo = 'plain'
188 self.send_to_weechat(self._build_init_command())
189 self.sync_weechat()
190 self.set_status(STATUS_CONNECTED)
191
192 def _socket_connected(self):
193 """Slot: socket connected."""
194 self.set_status(STATUS_AUTHENTICATING)
195 self.send_to_weechat(_PROTO_HANDSHAKE)
196 self._handshake_timer = QtCore.QTimer()
197 self._handshake_timer.setSingleShot(True)
198 self._handshake_timer.setInterval(2000)
199 self._handshake_timer.timeout.connect(self.handshake_timer_expired)
200 self._handshake_timer.start()
201
202 def _socket_read(self):
203 """Slot: data available on socket."""
204 data = self._socket.readAll()
205 self._buffer.append(data)
206 while len(self._buffer) >= 4:
207 remainder = None
208 length = struct.unpack('>i', self._buffer[0:4].data())[0]
209 if len(self._buffer) < length:
210 # partial message, just wait for end of message
211 break
212 # more than one message?
213 if length < len(self._buffer):
214 # save beginning of another message
215 remainder = self._buffer[length:]
216 self._buffer = self._buffer[0:length]
217 self.messageFromWeechat.emit(self._buffer)
218 if not self.is_connected():
219 return
220 self._buffer.clear()
221 if remainder:
222 self._buffer.append(remainder)
223
224 def _socket_disconnected(self):
225 """Slot: socket disconnected."""
226 if self._handshake_timer:
227 self._handshake_timer.stop()
228 self._init_connection()
229 self.set_status(STATUS_DISCONNECTED)
230
231 def is_connected(self):
232 """Return True if the socket is connected, False otherwise."""
233 return self._socket.state() == QtNetwork.QAbstractSocket.ConnectedState
234
235 def is_ssl(self):
236 """Return True if SSL is used, False otherwise."""
237 return self._ssl
238
239 def connect_weechat(self, server, port, ssl, password, totp, lines):
240 """Connect to WeeChat."""
241 self._server = server
242 try:
243 self._port = int(port)
244 except ValueError:
245 self._port = 0
246 self._ssl = ssl
247 self._password = password
248 self._totp = totp
249 try:
250 self._lines = int(lines)
251 except ValueError:
252 self._lines = config.CONFIG_DEFAULT_RELAY_LINES
253 if self._socket.state() == QtNetwork.QAbstractSocket.ConnectedState:
254 return
255 if self._socket.state() != QtNetwork.QAbstractSocket.UnconnectedState:
256 self._socket.abort()
257 if self._ssl:
258 self._socket.ignoreSslErrors()
259 self._socket.connectToHostEncrypted(self._server, self._port)
260 else:
261 self._socket.connectToHost(self._server, self._port)
262 self.set_status(STATUS_CONNECTING)
263
264 def disconnect_weechat(self):
265 """Disconnect from WeeChat."""
266 if self._socket.state() == QtNetwork.QAbstractSocket.UnconnectedState:
267 self.set_status(STATUS_DISCONNECTED)
268 return
269 if self._socket.state() == QtNetwork.QAbstractSocket.ConnectedState:
270 self.send_to_weechat('quit\n')
271 self._socket.waitForBytesWritten(1000)
272 else:
273 self.set_status(STATUS_DISCONNECTED)
274 self._socket.abort()
275
276 def send_to_weechat(self, message):
277 """Send a message to WeeChat."""
278 self.debug_print(0, '<==', message, forcecolor='#AA0000')
279 self._socket.write(message.encode('utf-8'))
280
281 def init_with_handshake(self, response):
282 """Initialize with WeeChat using the handshake response."""
283 self._pwd_hash_algo = response['password_hash_algo']
284 self._pwd_hash_iter = int(response['password_hash_iterations'])
285 self._server_nonce = bytearray.fromhex(response['nonce'])
286 if self._pwd_hash_algo:
287 cmd = self._build_init_command()
288 if cmd:
289 self.send_to_weechat(cmd)
290 self.sync_weechat()
291 self.set_status(STATUS_CONNECTED)
292 return
293 # failed to initialize: disconnect
294 self.disconnect_weechat()
295
296 def desync_weechat(self):
297 """Desynchronize from WeeChat."""
298 self.send_to_weechat('desync\n')
299
300 def sync_weechat(self):
301 """Synchronize with WeeChat."""
302 self.send_to_weechat(self._build_sync_command())
303
304 def status_label(self, status):
305 """Return the label for a given status."""
306 return NETWORK_STATUS.get(status, {}).get('label', '')
307
308 def status_color(self, status):
309 """Return the color for a given status."""
310 return NETWORK_STATUS.get(status, {}).get('color', 'black')
311
312 def status_icon(self, status):
313 """Return the name of icon for a given status."""
314 return NETWORK_STATUS.get(status, {}).get('icon', '')
315
316 def get_options(self):
317 """Get connection options."""
318 return {
319 'server': self._server,
320 'port': self._port,
321 'ssl': 'on' if self._ssl else 'off',
322 'password': self._password,
323 'lines': str(self._lines),
324 }
325
326 def debug_print(self, *args, **kwargs):
327 """Display a debug message."""
328 self.debug_lines.append((args, kwargs))
329 if self.debug_dialog:
330 self.debug_dialog.chat.display(*args, **kwargs)
331
332 def _debug_dialog_closed(self, result):
333 """Called when debug dialog is closed."""
334 self.debug_dialog = None
335
336 def debug_input_text_sent(self, text):
337 """Send debug buffer input to WeeChat."""
338 if self.network.is_connected():
339 text = str(text)
340 pos = text.find(')')
341 if text.startswith('(') and pos >= 0:
342 text = '(debug_%s)%s' % (text[1:pos], text[pos+1:])
343 else:
344 text = '(debug) %s' % text
345 self.network.debug_print(0, '<==', text, forcecolor='#AA0000')
346 self.network.send_to_weechat(text + '\n')
347
348 def open_debug_dialog(self):
349 """Open a dialog with debug messages."""
350 if not self.debug_dialog:
351 self.debug_dialog = DebugDialog()
352 self.debug_dialog.input.textSent.connect(
353 self.debug_input_text_sent)
354 self.debug_dialog.finished.connect(self._debug_dialog_closed)
355 self.debug_dialog.display_lines(self.debug_lines)
356 self.debug_dialog.chat.scroll_bottom()