1 # -*- coding: utf-8 -*-
3 # testproto.py - command-line program for testing WeeChat/relay protocol
5 # Copyright (C) 2013-2019 Sébastien Helleu <flashcode@flashtux.org>
7 # This file is part of QWeeChat, a Qt remote GUI for WeeChat.
9 # QWeeChat is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # QWeeChat is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
24 Command-line program for testing WeeChat/relay protocol.
27 from __future__
import print_function
39 import protocol
# WeeChat/relay protocol
40 from .. version
import qweechat_version
42 NAME
= 'qweechat-testproto'
45 class TestProto(object):
46 """Test of WeeChat/relay protocol."""
48 def __init__(self
, args
):
52 self
.address
= '{self.args.hostname}/{self.args.port} ' \
53 '(IPv{0})'.format(6 if self
.args
.ipv6
else 4, self
=self
)
57 Connect to WeeChat/relay.
58 Return True if OK, False if error.
60 inet
= socket
.AF_INET6
if self
.args
.ipv6
else socket
.AF_INET
62 self
.sock
= socket
.socket(inet
, socket
.SOCK_STREAM
)
63 self
.sock
.connect((self
.args
.hostname
, self
.args
.port
))
67 print('Failed to connect to', self
.address
)
69 print('Connected to', self
.address
)
72 def send(self
, messages
):
74 Send a text message to WeeChat/relay.
75 Return True if OK, False if error.
78 for msg
in messages
.split('\n'):
81 self
.sock
.sendall(msg
+ '\n')
82 print('\x1b[33m<-- ' + msg
+ '\x1b[0m')
85 print('Failed to send message')
89 def decode(self
, message
):
91 Decode a binary message received from WeeChat/relay.
92 Return True if OK, False if error.
95 proto
= protocol
.Protocol()
96 msgd
= proto
.decode(message
,
97 separator
='\n' if self
.args
.debug
> 0
100 if self
.args
.debug
>= 2 and msgd
.uncompressed
:
101 # display raw message
102 print('\x1b[32m--> message uncompressed ({0} bytes):\n'
104 ''.format(msgd
.size_uncompressed
,
105 protocol
.hex_and_ascii(msgd
.uncompressed
, 20)))
106 # display decoded message
107 print('\x1b[32m--> {0}\x1b[0m'.format(msgd
))
109 traceback
.print_exc()
110 print('Error while decoding message from WeeChat')
114 def send_stdin(self
):
116 Send commands from standard input if some data is available.
117 Return True if OK (it's OK if stdin has no commands),
120 inr
= select
.select([sys
.stdin
], [], [], 0)[0]
122 data
= os
.read(sys
.stdin
.fileno(), 4096)
124 if not self
.send(data
.strip()):
127 # open stdin to read user commands
128 sys
.stdin
= open('/dev/tty')
133 Main loop: read keyboard, send commands, read socket,
134 decode/display binary messages received from WeeChat/relay.
135 Return 0 if OK, 4 if send error, 5 if decode error.
141 prompt
= '\x1b[36mrelay> \x1b[0m'
142 sys
.stdout
.write(prompt
)
145 while not self
.has_quit
:
146 inr
= select
.select([sys
.stdin
, self
.sock
], [], [], 1)[0]
148 if _file
== sys
.stdin
:
149 buf
= os
.read(_file
.fileno(), 4096)
153 messages
= message
.split('\n')
154 msgsent
= '\n'.join(messages
[:-1])
155 if msgsent
and not self
.send(msgsent
):
157 message
= messages
[-1]
158 sys
.stdout
.write(prompt
+ message
)
161 buf
= _file
.recv(4096)
164 while len(recvbuf
) >= 4:
166 length
= struct
.unpack('>i', recvbuf
[0:4])[0]
167 if len(recvbuf
) < length
:
168 # partial message, just wait for the
171 # more than one message?
172 if length
< len(recvbuf
):
173 # save beginning of another message
174 remainder
= recvbuf
[length
:]
175 recvbuf
= recvbuf
[0:length
]
176 if not self
.decode(recvbuf
):
182 sys
.stdout
.write(prompt
+ message
)
185 traceback
.print_exc()
190 print('Closing connection with', self
.address
)
197 # parse command line arguments
198 parser
= argparse
.ArgumentParser(
199 formatter_class
=argparse
.RawDescriptionHelpFormatter
,
200 fromfile_prefix_chars
='@',
201 description
='Command-line program for testing WeeChat/relay protocol.',
203 Environment variable "QWEECHAT_PROTO_OPTIONS" can be set with default options.
204 Argument "@file.txt" can be used to read default options in a file.
206 Some commands can be piped to the script, for example:
207 echo "init password=xxxx" | {name} localhost 5000
208 {name} localhost 5000 < commands.txt
212 2: wrong arguments (command line)
214 4: send error (message sent to WeeChat)
215 5: decode error (message received from WeeChat)
216 '''.format(name
=NAME
))
217 parser
.add_argument('-6', '--ipv6', action
='store_true',
218 help='connect using IPv6')
219 parser
.add_argument('-d', '--debug', action
='count', default
=0,
220 help='debug mode: long objects view '
221 '(-dd: display raw messages)')
222 parser
.add_argument('-v', '--version', action
='version',
223 version
=qweechat_version())
224 parser
.add_argument('hostname',
225 help='hostname (or IP address) of machine running '
227 parser
.add_argument('port', type=int,
228 help='port of machine running WeeChat/relay')
229 if len(sys
.argv
) == 1:
232 _args
= parser
.parse_args(
233 shlex
.split(os
.getenv('QWEECHAT_PROTO_OPTIONS') or '') + sys
.argv
[1:])
235 test
= TestProto(_args
)
237 # connect to WeeChat/relay
238 if not test
.connect():
241 # send commands from standard input if some data is available
242 if not test
.send_stdin():
245 # main loop (wait commands, display messages received)
246 returncode
= test
.mainloop()
251 if __name__
== "__main__":