]> jfr.im git - irc/weechat/qweechat.git/blob - qweechat/weechat/testproto.py.bak
af8a3965ded3daf62980eab3d3c3e9e977d90499
[irc/weechat/qweechat.git] / qweechat / weechat / testproto.py.bak
1 # -*- coding: utf-8 -*-
2 #
3 # testproto.py - command-line program for testing WeeChat/relay protocol
4 #
5 # Copyright (C) 2013-2021 Sébastien Helleu <flashcode@flashtux.org>
6 #
7 # This file is part of QWeeChat, a Qt remote GUI for WeeChat.
8 #
9 # QWeeChat is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # QWeeChat is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """
24 Command-line program for testing WeeChat/relay protocol.
25 """
26
27 from __future__ import print_function
28
29 import argparse
30 import os
31 import select
32 import shlex
33 import socket
34 import struct
35 import sys
36 import time
37 import traceback
38
39 import protocol # WeeChat/relay protocol
40 # from .. version import qweechat_version
41 qweechat_version = '1.1'
42
43 NAME = 'qweechat-testproto'
44
45
46 class TestProto(object):
47 """Test of WeeChat/relay protocol."""
48
49 def __init__(self, args):
50 self.args = args
51 self.sock = None
52 self.has_quit = False
53 self.address = '{self.args.hostname}/{self.args.port} ' \
54 '(IPv{0})'.format(6 if self.args.ipv6 else 4, self=self)
55
56 def connect(self):
57 """
58 Connect to WeeChat/relay.
59 Return True if OK, False if error.
60 """
61 inet = socket.AF_INET6 if self.args.ipv6 else socket.AF_INET
62 try:
63 self.sock = socket.socket(inet, socket.SOCK_STREAM)
64 self.sock.connect((self.args.hostname, self.args.port))
65 except: # noqa: E722
66 if self.sock:
67 self.sock.close()
68 print('Failed to connect to', self.address)
69 return False
70 print('Connected to', self.address)
71 return True
72
73 def send(self, messages):
74 """
75 Send a text message to WeeChat/relay.
76 Return True if OK, False if error.
77 """
78 try:
79 for msg in messages.split('\n'):
80 if msg == 'quit':
81 self.has_quit = True
82 self.sock.sendall(msg + '\n')
83 print('\x1b[33m<-- ' + msg + '\x1b[0m')
84 except: # noqa: E722
85 traceback.print_exc()
86 print('Failed to send message')
87 return False
88 return True
89
90 def decode(self, message):
91 """
92 Decode a binary message received from WeeChat/relay.
93 Return True if OK, False if error.
94 """
95 try:
96 proto = protocol.Protocol()
97 msgd = proto.decode(message,
98 separator='\n' if self.args.debug > 0
99 else ', ')
100 print('')
101 if self.args.debug >= 2 and msgd.uncompressed:
102 # display raw message
103 print('\x1b[32m--> message uncompressed ({0} bytes):\n'
104 '{1}\x1b[0m'
105 ''.format(msgd.size_uncompressed,
106 protocol.hex_and_ascii(msgd.uncompressed, 20)))
107 # display decoded message
108 print('\x1b[32m--> {0}\x1b[0m'.format(msgd))
109 except: # noqa: E722
110 traceback.print_exc()
111 print('Error while decoding message from WeeChat')
112 return False
113 return True
114
115 def send_stdin(self):
116 """
117 Send commands from standard input if some data is available.
118 Return True if OK (it's OK if stdin has no commands),
119 False if error.
120 """
121 inr = select.select([sys.stdin], [], [], 0)[0]
122 if inr:
123 data = os.read(sys.stdin.fileno(), 4096)
124 if data:
125 if not self.send(data.strip()):
126 # self.sock.close()
127 return False
128 # open stdin to read user commands
129 sys.stdin = open('/dev/tty')
130 return True
131
132 def mainloop(self):
133 """
134 Main loop: read keyboard, send commands, read socket,
135 decode/display binary messages received from WeeChat/relay.
136 Return 0 if OK, 4 if send error, 5 if decode error.
137 """
138 if self.has_quit:
139 return 0
140 message = ''
141 recvbuf = ''
142 prompt = '\x1b[36mrelay> \x1b[0m'
143 sys.stdout.write(prompt)
144 sys.stdout.flush()
145 try:
146 while not self.has_quit:
147 inr = select.select([sys.stdin, self.sock], [], [], 1)[0]
148 for _file in inr:
149 if _file == sys.stdin:
150 buf = os.read(_file.fileno(), 4096)
151 if buf:
152 message += buf
153 if '\n' in message:
154 messages = message.split('\n')
155 msgsent = '\n'.join(messages[:-1])
156 if msgsent and not self.send(msgsent):
157 return 4
158 message = messages[-1]
159 sys.stdout.write(prompt + message)
160 sys.stdout.flush()
161 else:
162 buf = _file.recv(4096)
163 if buf:
164 recvbuf += buf
165 while len(recvbuf) >= 4:
166 remainder = None
167 length = struct.unpack('>i', recvbuf[0:4])[0]
168 if len(recvbuf) < length:
169 # partial message, just wait for the
170 # end of message
171 break
172 # more than one message?
173 if length < len(recvbuf):
174 # save beginning of another message
175 remainder = recvbuf[length:]
176 recvbuf = recvbuf[0:length]
177 if not self.decode(recvbuf):
178 return 5
179 if remainder:
180 recvbuf = remainder
181 else:
182 recvbuf = ''
183 sys.stdout.write(prompt + message)
184 sys.stdout.flush()
185 except: # noqa: E722
186 traceback.print_exc()
187 self.send('quit')
188 return 0
189
190 def __del__(self):
191 print('Closing connection with', self.address)
192 time.sleep(0.5)
193 self.sock.close()
194
195
196 def main():
197 """Main function."""
198 # parse command line arguments
199 parser = argparse.ArgumentParser(
200 formatter_class=argparse.RawDescriptionHelpFormatter,
201 fromfile_prefix_chars='@',
202 description='Command-line program for testing WeeChat/relay protocol.',
203 epilog='''
204 Environment variable "QWEECHAT_PROTO_OPTIONS" can be set with default options.
205 Argument "@file.txt" can be used to read default options in a file.
206
207 Some commands can be piped to the script, for example:
208 echo "init password=xxxx" | {name} localhost 5000
209 {name} localhost 5000 < commands.txt
210
211 The script returns:
212 0: OK
213 2: wrong arguments (command line)
214 3: connection error
215 4: send error (message sent to WeeChat)
216 5: decode error (message received from WeeChat)
217 '''.format(name=NAME))
218 parser.add_argument('-6', '--ipv6', action='store_true',
219 help='connect using IPv6')
220 parser.add_argument('-d', '--debug', action='count', default=0,
221 help='debug mode: long objects view '
222 '(-dd: display raw messages)')
223 parser.add_argument('-v', '--version', action='version',
224 version=qweechat_version)
225 parser.add_argument('hostname',
226 help='hostname (or IP address) of machine running '
227 'WeeChat/relay')
228 parser.add_argument('port', type=int,
229 help='port of machine running WeeChat/relay')
230 if len(sys.argv) == 1:
231 parser.print_help()
232 sys.exit(0)
233 _args = parser.parse_args(
234 shlex.split(os.getenv('QWEECHAT_PROTO_OPTIONS') or '') + sys.argv[1:])
235
236 test = TestProto(_args)
237
238 # connect to WeeChat/relay
239 if not test.connect():
240 sys.exit(3)
241
242 # send commands from standard input if some data is available
243 if not test.send_stdin():
244 sys.exit(4)
245
246 # main loop (wait commands, display messages received)
247 returncode = test.mainloop()
248 del test
249 sys.exit(returncode)
250
251
252 if __name__ == "__main__":
253 main()