1 # -*- coding: utf-8 -*-
3 # testproto.py - command-line program for testing WeeChat/relay protocol
5 # Copyright (C) 2013-2014 Sébastien Helleu <flashcode@flashtux.org>
7 # This file is part of QWeeChat, a Qt remote GUI for WeeChat.
9 # QWeeChat is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # QWeeChat is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
24 Command-line program for testing WeeChat/relay protocol.
27 from __future__
import print_function
39 import protocol
# WeeChat/relay protocol
41 NAME
= 'qweechat-testproto'
44 class TestProto(object):
45 """Test of WeeChat/relay protocol."""
47 def __init__(self
, args
):
51 self
.address
= '{self.args.hostname}/{self.args.port} ' \
52 '(IPv{0})'.format(6 if self
.args
.ipv6
else 4, self
=self
)
56 Connect to WeeChat/relay.
57 Return True if OK, False if error.
59 inet
= socket
.AF_INET6
if self
.args
.ipv6
else socket
.AF_INET
61 self
.sock
= socket
.socket(inet
, socket
.SOCK_STREAM
)
62 self
.sock
.connect((self
.args
.hostname
, self
.args
.port
))
66 print('Failed to connect to', self
.address
)
68 print('Connected to', self
.address
)
71 def send(self
, messages
):
73 Send a text message to WeeChat/relay.
74 Return True if OK, False if error.
77 for msg
in messages
.split('\n'):
80 self
.sock
.sendall(msg
+ '\n')
81 print('\x1b[33m<-- ' + msg
+ '\x1b[0m')
84 print('Failed to send message')
88 def decode(self
, message
):
90 Decode a binary message received from WeeChat/relay.
91 Return True if OK, False if error.
94 proto
= protocol
.Protocol()
95 msgd
= proto
.decode(message
,
96 separator
='\n' if self
.args
.verbose
> 0
99 if self
.args
.verbose
>= 2 and msgd
.uncompressed
:
100 # display raw message
101 print('\x1b[32m--> message uncompressed ({0} bytes):\n'
103 ''.format(msgd
.size_uncompressed
,
104 protocol
.hex_and_ascii(msgd
.uncompressed
, 20)))
105 # display decoded message
106 print('\x1b[32m--> {0}\x1b[0m'.format(msgd
))
108 traceback
.print_exc()
109 print('Error while decoding message from WeeChat')
113 def send_stdin(self
):
115 Send commands from standard input if some data is available.
116 Return True if OK (it's OK if stdin has no commands),
119 inr
= select
.select([sys
.stdin
], [], [], 0)[0]
121 data
= os
.read(sys
.stdin
.fileno(), 4096)
123 if not self
.send(data
.strip()):
126 # open stdin to read user commands
127 sys
.stdin
= open('/dev/tty')
132 Main loop: read keyboard, send commands, read socket,
133 decode/display binary messages received from WeeChat/relay.
134 Return 0 if OK, 4 if send error, 5 if decode error.
140 prompt
= '\x1b[36mrelay> \x1b[0m'
141 sys
.stdout
.write(prompt
)
144 while not self
.has_quit
:
145 inr
= select
.select([sys
.stdin
, self
.sock
], [], [], 1)[0]
147 if _file
== sys
.stdin
:
148 buf
= os
.read(_file
.fileno(), 4096)
152 messages
= message
.split('\n')
153 msgsent
= '\n'.join(messages
[:-1])
154 if msgsent
and not self
.send(msgsent
):
156 message
= messages
[-1]
157 sys
.stdout
.write(prompt
+ message
)
160 buf
= _file
.recv(4096)
163 while len(recvbuf
) >= 4:
165 length
= struct
.unpack('>i', recvbuf
[0:4])[0]
166 if len(recvbuf
) < length
:
167 # partial message, just wait for the
170 # more than one message?
171 if length
< len(recvbuf
):
172 # save beginning of another message
173 remainder
= recvbuf
[length
:]
174 recvbuf
= recvbuf
[0:length
]
175 if not self
.decode(recvbuf
):
181 sys
.stdout
.write(prompt
+ message
)
184 traceback
.print_exc()
189 print('Closing connection with', self
.address
)
196 # parse command line arguments
197 parser
= argparse
.ArgumentParser(
198 formatter_class
=argparse
.RawDescriptionHelpFormatter
,
199 fromfile_prefix_chars
='@',
200 description
='Command-line program for testing WeeChat/relay protocol.',
202 Environment variable "QWEECHAT_PROTO_OPTIONS" can be set with default options.
203 Argument "@file.txt" can be used to read default options in a file.
205 Some commands can be piped to the script, for example:
206 echo "init password=xxxx" | {name} localhost 5000
207 {name} localhost 5000 < commands.txt
211 2: wrong arguments (command line)
213 4: send error (message sent to WeeChat)
214 5: decode error (message received from WeeChat)
215 '''.format(name
=NAME
))
216 parser
.add_argument('-6', '--ipv6', action
='store_true',
217 help='connect using IPv6')
218 parser
.add_argument('-v', '--verbose', action
='count', default
=0,
219 help='verbose mode: long objects view '
220 '(-vv: display raw messages)')
221 parser
.add_argument('hostname',
222 help='hostname (or IP address) of machine running '
224 parser
.add_argument('port', type=int,
225 help='port of machine running WeeChat/relay')
226 if len(sys
.argv
) == 1:
229 _args
= parser
.parse_args(
230 shlex
.split(os
.getenv('QWEECHAT_PROTO_OPTIONS') or '') + sys
.argv
[1:])
232 test
= TestProto(_args
)
234 # connect to WeeChat/relay
235 if not test
.connect():
238 # send commands from standard input if some data is available
239 if not test
.send_stdin():
242 # main loop (wait commands, display messages received)
243 returncode
= test
.mainloop()
248 if __name__
== "__main__":