1 # -*- coding: utf-8 -*-
3 # network.py - I/O with WeeChat/relay
5 # Copyright (C) 2011-2022 SĂ©bastien Helleu <flashcode@flashtux.org>
7 # This file is part of QWeeChat, a Qt remote GUI for WeeChat.
9 # QWeeChat is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # QWeeChat is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
23 """I/O with WeeChat/relay."""
29 from PySide6
import QtCore
, QtNetwork
31 from qweechat
import config
32 from qweechat
.debug
import DebugDialog
35 # list of supported hash algorithms on our side
36 # (the hash algorithm will be negotiated with the remote WeeChat)
44 _HASH_ALGOS
= ':'.join(_HASH_ALGOS_LIST
)
46 # handshake with remote WeeChat (before init)
47 _PROTO_HANDSHAKE
= f
'(handshake) handshake password_hash_algo={_HASH_ALGOS}\n'
49 # initialize with the password (plain text)
50 _PROTO_INIT_PWD
= 'init password=%(password)s%(totp)s\n' # nosec
52 # initialize with the hashed password
53 _PROTO_INIT_HASH
= ('init password_hash='
54 '%(algo)s:%(salt)s%(iter)s:%(hash)s%(totp)s\n')
58 '(listbuffers) hdata buffer:gui_buffers(*) number,full_name,short_name,'
59 'type,nicklist,title,local_variables',
61 '(listlines) hdata buffer:gui_buffers(*)/own_lines/last_line(-%(lines)d)/'
62 'data date,displayed,prefix,message',
63 # get nicklist for all buffers
64 '(nicklist) nicklist',
65 # enable synchronization
69 STATUS_DISCONNECTED
= 'disconnected'
70 STATUS_CONNECTING
= 'connecting'
71 STATUS_AUTHENTICATING
= 'authenticating'
72 STATUS_CONNECTED
= 'connected'
75 STATUS_DISCONNECTED
: {
76 'label': 'Disconnected',
78 'icon': 'dialog-close.png',
81 'label': 'Connecting…',
83 'icon': 'dialog-warning.png',
85 STATUS_AUTHENTICATING
: {
86 'label': 'Authenticating…',
88 'icon': 'dialog-password.png',
93 'icon': 'dialog-ok-apply.png',
98 class Network(QtCore
.QObject
):
99 """I/O with WeeChat/relay."""
101 statusChanged
= QtCore
.Signal(str, str)
102 messageFromWeechat
= QtCore
.Signal(QtCore
.QByteArray
)
104 def __init__(self
, *args
):
105 super().__init
__(*args
)
106 self
._init
_connection
()
107 self
.debug_lines
= []
108 self
.debug_dialog
= None
109 self
._lines
= config
.CONFIG_DEFAULT_RELAY_LINES
110 self
._buffer
= QtCore
.QByteArray()
111 self
._socket
= QtNetwork
.QSslSocket()
112 self
._socket
.connected
.connect(self
._socket
_connected
)
113 self
._socket
.readyRead
.connect(self
._socket
_read
)
114 self
._socket
.disconnected
.connect(self
._socket
_disconnected
)
116 def _init_connection(self
):
117 self
.status
= STATUS_DISCONNECTED
118 self
._hostname
= None
121 self
._password
= None
123 self
._handshake
_received
= False
124 self
._handshake
_timer
= None
125 self
._handshake
_timer
= False
126 self
._pwd
_hash
_algo
= None
127 self
._pwd
_hash
_iter
= 0
128 self
._server
_nonce
= None
130 def set_status(self
, status
):
131 """Set current status."""
133 self
.statusChanged
.emit(status
, None)
135 def pbkdf2(self
, hash_name
, salt
):
136 """Return hashed password with PBKDF2-HMAC."""
137 return hashlib
.pbkdf2_hmac(
139 password
=self
._password
.encode('utf-8'),
141 iterations
=self
._pwd
_hash
_iter
,
144 def _build_init_command(self
):
145 """Build the init command to send to WeeChat."""
146 totp
= f
',totp={self._totp}' if self
._totp
else ''
147 if self
._pwd
_hash
_algo
== 'plain': # nosec
148 cmd
= _PROTO_INIT_PWD
% {
149 'password': self
._password
,
153 client_nonce
= secrets
.token_bytes(16)
154 salt
= self
._server
_nonce
+ client_nonce
157 if self
._pwd
_hash
_algo
== 'pbkdf2+sha512': # nosec
158 pwd_hash
= self
.pbkdf2('sha512', salt
)
159 iterations
= f
':{self._pwd_hash_iter}'
160 elif self
._pwd
_hash
_algo
== 'pbkdf2+sha256': # nosec
161 pwd_hash
= self
.pbkdf2('sha256', salt
)
162 iterations
= f
':{self._pwd_hash_iter}'
163 elif self
._pwd
_hash
_algo
== 'sha512': # nosec
164 pwd
= salt
+ self
._password
.encode('utf-8')
165 pwd_hash
= hashlib
.sha512(pwd
).hexdigest()
166 elif self
._pwd
_hash
_algo
== 'sha256': # nosec
167 pwd
= salt
+ self
._password
.encode('utf-8')
168 pwd_hash
= hashlib
.sha256(pwd
).hexdigest()
171 cmd
= _PROTO_INIT_HASH
% {
172 'algo': self
._pwd
_hash
_algo
,
173 'salt': bytearray(salt
).hex(),
180 def _build_sync_command(self
):
181 """Build the sync commands to send to WeeChat."""
182 cmd
= '\n'.join(_PROTO_SYNC_CMDS
) + '\n'
183 return cmd
% {'lines': self._lines}
185 def handshake_timer_expired(self
):
186 if self
.status
== STATUS_AUTHENTICATING
:
187 self
._pwd
_hash
_algo
= 'plain' # nosec
188 self
.send_to_weechat(self
._build
_init
_command
())
190 self
.set_status(STATUS_CONNECTED
)
192 def _socket_connected(self
):
193 """Slot: socket connected."""
194 self
.set_status(STATUS_AUTHENTICATING
)
195 self
.send_to_weechat(_PROTO_HANDSHAKE
)
196 self
._handshake
_timer
= QtCore
.QTimer()
197 self
._handshake
_timer
.setSingleShot(True)
198 self
._handshake
_timer
.setInterval(2000)
199 self
._handshake
_timer
.timeout
.connect(self
.handshake_timer_expired
)
200 self
._handshake
_timer
.start()
202 def _socket_read(self
):
203 """Slot: data available on socket."""
204 data
= self
._socket
.readAll()
205 self
._buffer
.append(data
)
206 while len(self
._buffer
) >= 4:
208 length
= struct
.unpack('>i', self
._buffer
[0:4].data())[0]
209 if len(self
._buffer
) < length
:
210 # partial message, just wait for end of message
212 # more than one message?
213 if length
< len(self
._buffer
):
214 # save beginning of another message
215 remainder
= self
._buffer
[length
:]
216 self
._buffer
= self
._buffer
[0:length
]
217 self
.messageFromWeechat
.emit(self
._buffer
)
218 if not self
.is_connected():
222 self
._buffer
.append(remainder
)
224 def _socket_disconnected(self
):
225 """Slot: socket disconnected."""
226 if self
._handshake
_timer
:
227 self
._handshake
_timer
.stop()
228 self
._init
_connection
()
229 self
.set_status(STATUS_DISCONNECTED
)
231 def is_connected(self
):
232 """Return True if the socket is connected, False otherwise."""
233 return self
._socket
.state() == QtNetwork
.QAbstractSocket
.ConnectedState
236 """Return True if SSL is used, False otherwise."""
239 def connect_weechat(self
, hostname
, port
, ssl
, password
, totp
, lines
):
240 """Connect to WeeChat."""
241 self
._hostname
= hostname
243 self
._port
= int(port
)
247 self
._password
= password
250 self
._lines
= int(lines
)
252 self
._lines
= config
.CONFIG_DEFAULT_RELAY_LINES
253 if self
._socket
.state() == QtNetwork
.QAbstractSocket
.ConnectedState
:
255 if self
._socket
.state() != QtNetwork
.QAbstractSocket
.UnconnectedState
:
258 self
._socket
.ignoreSslErrors()
259 self
._socket
.connectToHostEncrypted(self
._hostname
, self
._port
)
261 self
._socket
.connectToHost(self
._hostname
, self
._port
)
262 self
.set_status(STATUS_CONNECTING
)
264 def disconnect_weechat(self
):
265 """Disconnect from WeeChat."""
266 if self
._socket
.state() == QtNetwork
.QAbstractSocket
.UnconnectedState
:
267 self
.set_status(STATUS_DISCONNECTED
)
269 if self
._socket
.state() == QtNetwork
.QAbstractSocket
.ConnectedState
:
270 self
.send_to_weechat('quit\n')
271 self
._socket
.waitForBytesWritten(1000)
273 self
.set_status(STATUS_DISCONNECTED
)
276 def send_to_weechat(self
, message
):
277 """Send a message to WeeChat."""
278 self
.debug_print(0, '<==', message
, forcecolor
='#AA0000')
279 self
._socket
.write(message
.encode('utf-8'))
281 def init_with_handshake(self
, response
):
282 """Initialize with WeeChat using the handshake response."""
283 self
._pwd
_hash
_algo
= response
['password_hash_algo']
284 self
._pwd
_hash
_iter
= int(response
['password_hash_iterations'])
285 self
._server
_nonce
= bytearray
.fromhex(response
['nonce'])
286 if self
._pwd
_hash
_algo
:
287 cmd
= self
._build
_init
_command
()
289 self
.send_to_weechat(cmd
)
291 self
.set_status(STATUS_CONNECTED
)
293 # failed to initialize: disconnect
294 self
.disconnect_weechat()
296 def desync_weechat(self
):
297 """Desynchronize from WeeChat."""
298 self
.send_to_weechat('desync\n')
300 def sync_weechat(self
):
301 """Synchronize with WeeChat."""
302 self
.send_to_weechat(self
._build
_sync
_command
())
304 def status_label(self
, status
):
305 """Return the label for a given status."""
306 return NETWORK_STATUS
.get(status
, {}).get('label', '')
308 def status_color(self
, status
):
309 """Return the color for a given status."""
310 return NETWORK_STATUS
.get(status
, {}).get('color', 'black')
312 def status_icon(self
, status
):
313 """Return the name of icon for a given status."""
314 return NETWORK_STATUS
.get(status
, {}).get('icon', '')
316 def get_options(self
):
317 """Get connection options."""
319 'hostname': self
._hostname
,
321 'ssl': 'on' if self
._ssl
else 'off',
322 'password': self
._password
,
323 'lines': str(self
._lines
),
326 def debug_print(self
, *args
, **kwargs
):
327 """Display a debug message."""
328 self
.debug_lines
.append((args
, kwargs
))
329 if self
.debug_dialog
:
330 self
.debug_dialog
.chat
.display(*args
, **kwargs
)
332 def _debug_dialog_closed(self
, result
):
333 """Called when debug dialog is closed."""
334 self
.debug_dialog
= None
336 def debug_input_text_sent(self
, text
):
337 """Send debug buffer input to WeeChat."""
338 if self
.network
.is_connected():
341 if text
.startswith('(') and pos
>= 0:
342 text
= '(debug_%s)%s' % (text
[1:pos
], text
[pos
+1:])
344 text
= '(debug) %s' % text
345 self
.network
.debug_print(0, '<==', text
, forcecolor
='#AA0000')
346 self
.network
.send_to_weechat(text
+ '\n')
348 def open_debug_dialog(self
):
349 """Open a dialog with debug messages."""
350 if not self
.debug_dialog
:
351 self
.debug_dialog
= DebugDialog()
352 self
.debug_dialog
.input.textSent
.connect(
353 self
.debug_input_text_sent
)
354 self
.debug_dialog
.finished
.connect(self
._debug
_dialog
_closed
)
355 self
.debug_dialog
.display_lines(self
.debug_lines
)
356 self
.debug_dialog
.chat
.scroll_bottom()