1 # -*- coding: utf-8 -*-
3 # testproto.py - command-line program for testing WeeChat/relay protocol
5 # Copyright (C) 2013-2021 Sébastien Helleu <flashcode@flashtux.org>
7 # This file is part of QWeeChat, a Qt remote GUI for WeeChat.
9 # QWeeChat is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # QWeeChat is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
24 Command-line program for testing WeeChat/relay protocol.
27 from __future__ import print_function
39 import protocol # WeeChat/relay protocol
40 # from .. version import qweechat_version
41 qweechat_version = '1.1'
43 NAME = 'qweechat-testproto'
46 class TestProto(object):
47 """Test of WeeChat/relay protocol."""
49 def __init__(self, args):
53 self.address = '{self.args.hostname}/{self.args.port} ' \
54 '(IPv{0})'.format(6 if self.args.ipv6 else 4, self=self)
58 Connect to WeeChat/relay.
59 Return True if OK, False if error.
61 inet = socket.AF_INET6 if self.args.ipv6 else socket.AF_INET
63 self.sock = socket.socket(inet, socket.SOCK_STREAM)
64 self.sock.connect((self.args.hostname, self.args.port))
68 print('Failed to connect to', self.address)
70 print('Connected to', self.address)
73 def send(self, messages):
75 Send a text message to WeeChat/relay.
76 Return True if OK, False if error.
79 for msg in messages.split('\n'):
82 self.sock.sendall(msg + '\n')
83 print('\x1b[33m<-- ' + msg + '\x1b[0m')
86 print('Failed to send message')
90 def decode(self, message):
92 Decode a binary message received from WeeChat/relay.
93 Return True if OK, False if error.
96 proto = protocol.Protocol()
97 msgd = proto.decode(message,
98 separator='\n' if self.args.debug > 0
101 if self.args.debug >= 2 and msgd.uncompressed:
102 # display raw message
103 print('\x1b[32m--> message uncompressed ({0} bytes):\n'
105 ''.format(msgd.size_uncompressed,
106 protocol.hex_and_ascii(msgd.uncompressed, 20)))
107 # display decoded message
108 print('\x1b[32m--> {0}\x1b[0m'.format(msgd))
110 traceback.print_exc()
111 print('Error while decoding message from WeeChat')
115 def send_stdin(self):
117 Send commands from standard input if some data is available.
118 Return True if OK (it's OK if stdin has no commands),
121 inr = select.select([sys.stdin], [], [], 0)[0]
123 data = os.read(sys.stdin.fileno(), 4096)
125 if not self.send(data.strip()):
128 # open stdin to read user commands
129 sys.stdin = open('/dev/tty')
134 Main loop: read keyboard, send commands, read socket,
135 decode/display binary messages received from WeeChat/relay.
136 Return 0 if OK, 4 if send error, 5 if decode error.
142 prompt = '\x1b[36mrelay> \x1b[0m'
143 sys.stdout.write(prompt)
146 while not self.has_quit:
147 inr = select.select([sys.stdin, self.sock], [], [], 1)[0]
149 if _file == sys.stdin:
150 buf = os.read(_file.fileno(), 4096)
154 messages = message.split('\n')
155 msgsent = '\n'.join(messages[:-1])
156 if msgsent and not self.send(msgsent):
158 message = messages[-1]
159 sys.stdout.write(prompt + message)
162 buf = _file.recv(4096)
165 while len(recvbuf) >= 4:
167 length = struct.unpack('>i', recvbuf[0:4])[0]
168 if len(recvbuf) < length:
169 # partial message, just wait for the
172 # more than one message?
173 if length < len(recvbuf):
174 # save beginning of another message
175 remainder = recvbuf[length:]
176 recvbuf = recvbuf[0:length]
177 if not self.decode(recvbuf):
183 sys.stdout.write(prompt + message)
186 traceback.print_exc()
191 print('Closing connection with', self.address)
198 # parse command line arguments
199 parser = argparse.ArgumentParser(
200 formatter_class=argparse.RawDescriptionHelpFormatter,
201 fromfile_prefix_chars='@',
202 description='Command-line program for testing WeeChat/relay protocol.',
204 Environment variable "QWEECHAT_PROTO_OPTIONS" can be set with default options.
205 Argument "@file.txt" can be used to read default options in a file.
207 Some commands can be piped to the script, for example:
208 echo "init password=xxxx" | {name} localhost 5000
209 {name} localhost 5000 < commands.txt
213 2: wrong arguments (command line)
215 4: send error (message sent to WeeChat)
216 5: decode error (message received from WeeChat)
217 '''.format(name=NAME))
218 parser.add_argument('-6', '--ipv6', action='store_true',
219 help='connect using IPv6')
220 parser.add_argument('-d', '--debug', action='count', default=0,
221 help='debug mode: long objects view '
222 '(-dd: display raw messages)')
223 parser.add_argument('-v', '--version', action='version',
224 version=qweechat_version)
225 parser.add_argument('hostname',
226 help='hostname (or IP address) of machine running '
228 parser.add_argument('port', type=int,
229 help='port of machine running WeeChat/relay')
230 if len(sys.argv) == 1:
233 _args = parser.parse_args(
234 shlex.split(os.getenv('QWEECHAT_PROTO_OPTIONS') or '') + sys.argv[1:])
236 test = TestProto(_args)
238 # connect to WeeChat/relay
239 if not test.connect():
242 # send commands from standard input if some data is available
243 if not test.send_stdin():
246 # main loop (wait commands, display messages received)
247 returncode = test.mainloop()
252 if __name__ == "__main__":