]> jfr.im git - irc/weechat/qweechat.git/blob - qweechat/weechat/testproto.py
Update copyright dates
[irc/weechat/qweechat.git] / qweechat / weechat / testproto.py
1 # -*- coding: utf-8 -*-
2 #
3 # testproto.py - command-line program for testing WeeChat/relay protocol
4 #
5 # Copyright (C) 2013-2019 Sébastien Helleu <flashcode@flashtux.org>
6 #
7 # This file is part of QWeeChat, a Qt remote GUI for WeeChat.
8 #
9 # QWeeChat is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # QWeeChat is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """
24 Command-line program for testing WeeChat/relay protocol.
25 """
26
27 from __future__ import print_function
28
29 import argparse
30 import os
31 import select
32 import shlex
33 import socket
34 import struct
35 import sys
36 import time
37 import traceback
38
39 import protocol # WeeChat/relay protocol
40 from .. version import qweechat_version
41
42 NAME = 'qweechat-testproto'
43
44
45 class TestProto(object):
46 """Test of WeeChat/relay protocol."""
47
48 def __init__(self, args):
49 self.args = args
50 self.sock = None
51 self.has_quit = False
52 self.address = '{self.args.hostname}/{self.args.port} ' \
53 '(IPv{0})'.format(6 if self.args.ipv6 else 4, self=self)
54
55 def connect(self):
56 """
57 Connect to WeeChat/relay.
58 Return True if OK, False if error.
59 """
60 inet = socket.AF_INET6 if self.args.ipv6 else socket.AF_INET
61 try:
62 self.sock = socket.socket(inet, socket.SOCK_STREAM)
63 self.sock.connect((self.args.hostname, self.args.port))
64 except: # noqa: E722
65 if self.sock:
66 self.sock.close()
67 print('Failed to connect to', self.address)
68 return False
69 print('Connected to', self.address)
70 return True
71
72 def send(self, messages):
73 """
74 Send a text message to WeeChat/relay.
75 Return True if OK, False if error.
76 """
77 try:
78 for msg in messages.split('\n'):
79 if msg == 'quit':
80 self.has_quit = True
81 self.sock.sendall(msg + '\n')
82 print('\x1b[33m<-- ' + msg + '\x1b[0m')
83 except: # noqa: E722
84 traceback.print_exc()
85 print('Failed to send message')
86 return False
87 return True
88
89 def decode(self, message):
90 """
91 Decode a binary message received from WeeChat/relay.
92 Return True if OK, False if error.
93 """
94 try:
95 proto = protocol.Protocol()
96 msgd = proto.decode(message,
97 separator='\n' if self.args.debug > 0
98 else ', ')
99 print('')
100 if self.args.debug >= 2 and msgd.uncompressed:
101 # display raw message
102 print('\x1b[32m--> message uncompressed ({0} bytes):\n'
103 '{1}\x1b[0m'
104 ''.format(msgd.size_uncompressed,
105 protocol.hex_and_ascii(msgd.uncompressed, 20)))
106 # display decoded message
107 print('\x1b[32m--> {0}\x1b[0m'.format(msgd))
108 except: # noqa: E722
109 traceback.print_exc()
110 print('Error while decoding message from WeeChat')
111 return False
112 return True
113
114 def send_stdin(self):
115 """
116 Send commands from standard input if some data is available.
117 Return True if OK (it's OK if stdin has no commands),
118 False if error.
119 """
120 inr = select.select([sys.stdin], [], [], 0)[0]
121 if inr:
122 data = os.read(sys.stdin.fileno(), 4096)
123 if data:
124 if not self.send(data.strip()):
125 # self.sock.close()
126 return False
127 # open stdin to read user commands
128 sys.stdin = open('/dev/tty')
129 return True
130
131 def mainloop(self):
132 """
133 Main loop: read keyboard, send commands, read socket,
134 decode/display binary messages received from WeeChat/relay.
135 Return 0 if OK, 4 if send error, 5 if decode error.
136 """
137 if self.has_quit:
138 return 0
139 message = ''
140 recvbuf = ''
141 prompt = '\x1b[36mrelay> \x1b[0m'
142 sys.stdout.write(prompt)
143 sys.stdout.flush()
144 try:
145 while not self.has_quit:
146 inr = select.select([sys.stdin, self.sock], [], [], 1)[0]
147 for _file in inr:
148 if _file == sys.stdin:
149 buf = os.read(_file.fileno(), 4096)
150 if buf:
151 message += buf
152 if '\n' in message:
153 messages = message.split('\n')
154 msgsent = '\n'.join(messages[:-1])
155 if msgsent and not self.send(msgsent):
156 return 4
157 message = messages[-1]
158 sys.stdout.write(prompt + message)
159 sys.stdout.flush()
160 else:
161 buf = _file.recv(4096)
162 if buf:
163 recvbuf += buf
164 while len(recvbuf) >= 4:
165 remainder = None
166 length = struct.unpack('>i', recvbuf[0:4])[0]
167 if len(recvbuf) < length:
168 # partial message, just wait for the
169 # end of message
170 break
171 # more than one message?
172 if length < len(recvbuf):
173 # save beginning of another message
174 remainder = recvbuf[length:]
175 recvbuf = recvbuf[0:length]
176 if not self.decode(recvbuf):
177 return 5
178 if remainder:
179 recvbuf = remainder
180 else:
181 recvbuf = ''
182 sys.stdout.write(prompt + message)
183 sys.stdout.flush()
184 except: # noqa: E722
185 traceback.print_exc()
186 self.send('quit')
187 return 0
188
189 def __del__(self):
190 print('Closing connection with', self.address)
191 time.sleep(0.5)
192 self.sock.close()
193
194
195 def main():
196 """Main function."""
197 # parse command line arguments
198 parser = argparse.ArgumentParser(
199 formatter_class=argparse.RawDescriptionHelpFormatter,
200 fromfile_prefix_chars='@',
201 description='Command-line program for testing WeeChat/relay protocol.',
202 epilog='''
203 Environment variable "QWEECHAT_PROTO_OPTIONS" can be set with default options.
204 Argument "@file.txt" can be used to read default options in a file.
205
206 Some commands can be piped to the script, for example:
207 echo "init password=xxxx" | {name} localhost 5000
208 {name} localhost 5000 < commands.txt
209
210 The script returns:
211 0: OK
212 2: wrong arguments (command line)
213 3: connection error
214 4: send error (message sent to WeeChat)
215 5: decode error (message received from WeeChat)
216 '''.format(name=NAME))
217 parser.add_argument('-6', '--ipv6', action='store_true',
218 help='connect using IPv6')
219 parser.add_argument('-d', '--debug', action='count', default=0,
220 help='debug mode: long objects view '
221 '(-dd: display raw messages)')
222 parser.add_argument('-v', '--version', action='version',
223 version=qweechat_version())
224 parser.add_argument('hostname',
225 help='hostname (or IP address) of machine running '
226 'WeeChat/relay')
227 parser.add_argument('port', type=int,
228 help='port of machine running WeeChat/relay')
229 if len(sys.argv) == 1:
230 parser.print_help()
231 sys.exit(0)
232 _args = parser.parse_args(
233 shlex.split(os.getenv('QWEECHAT_PROTO_OPTIONS') or '') + sys.argv[1:])
234
235 test = TestProto(_args)
236
237 # connect to WeeChat/relay
238 if not test.connect():
239 sys.exit(3)
240
241 # send commands from standard input if some data is available
242 if not test.send_stdin():
243 sys.exit(4)
244
245 # main loop (wait commands, display messages received)
246 returncode = test.mainloop()
247 del test
248 sys.exit(returncode)
249
250
251 if __name__ == "__main__":
252 main()