]> jfr.im git - irc/weechat/qweechat.git/commitdiff
testproto.py: use argparse module to parse command line arguments, full PEP8 compliance
authorSebastien Helleu <redacted>
Sun, 3 Nov 2013 12:44:43 +0000 (13:44 +0100)
committerSebastien Helleu <redacted>
Sun, 3 Nov 2013 12:44:43 +0000 (13:44 +0100)
src/qweechat/weechat/testproto.py

index 84d8c0aae639a713565a2d7aa892a958a8fd4e69..54065feac0261c84a6d561c3dd2a8ee3bce7856d 100755 (executable)
 # along with QWeeChat.  If not, see <http://www.gnu.org/licenses/>.
 #
 
-#
-# Usage:  python testproto.py [-h] [-v] [-6] <hostname> <port>
-#
-# With initial commands: echo "init password=xxxx" | python testproto.py localhost 5000
-#                        python testproto.py localhost 5000 < commands.txt
-#
-# Return code:
-#   0: OK
-#   1: missing/invalid arguments (hostname or port)
-#   2: connection to WeeChat/relay failed
-#   3: I/O error with WeeChat/relay
-#
+from __future__ import print_function
+
+import argparse
+import os
+import select
+import shlex
+import socket
+import struct
+import sys
+import time
+import traceback
 
-import os, sys, socket, select, struct, time
 import protocol  # WeeChat/relay protocol
 
-options = { 'h': 0, 'v': 0, '6': 0 }
-hostname = None
-port = None
-
-def usage():
-    """Display usage."""
-    print('\nSyntax: python %s [-h] [-v] [-6] <hostname> <port>\n' % sys.argv[0])
-    print('  -h              display this help')
-    print('  -v              verbose mode: long objects view (two -v: display raw messages)')
-    print('  -6              connect using IPv6')
-    print('  hostname, port  hostname (or IP address) and port of machine running WeeChat relay')
-    print('')
-    print('Some commands can be piped to the script, for example:')
-    print('  echo "init password=xxxx" | python %s localhost 5000' % sys.argv[0])
-    print('  python %s localhost 5000 < commands.txt' % sys.argv[0])
-    print('')
-
-def connect(address, ipv6):
-    """Connect to WeeChat/relay."""
-    inet = socket.AF_INET6 if ipv6 else socket.AF_INET
-    sock = None
-    try:
-        sock = socket.socket(inet, socket.SOCK_STREAM)
-        sock.connect(address)
-    except:
-        if sock:
-            sock.close()
-        print('Failed to connect to %s/%d using %s' % (address[0], address[1],
-                                                       'IPv4' if inet == socket.AF_INET else 'IPv6'))
-        return (None, None)
-    print('Connected to %s/%d (%s)' % (hostname, port,
-                                       'IPv4' if inet == socket.AF_INET else 'IPv6'))
-    return (sock, inet)
-
-def send(sock, messages):
-    """Send a text message to WeeChat/relay."""
-    has_quit = False
-    try:
-        for msg in messages.split('\n'):
-            if msg == 'quit':
-                has_quit = True
-            sock.sendall(msg + '\n')
-            print('\x1b[33m<-- %s\x1b[0m' % msg)
-    except:
-        print('Failed to send message')
-        return (False, has_quit)
-    return (True, has_quit)
-
-def decode(message):
-    """Decode a binary message received from WeeChat/relay."""
-    global options
-    try:
-        proto = protocol.Protocol()
-        message = proto.decode(message, separator='\n' if options['v'] else ', ')
-        print('')
-        if options['v'] >= 2 and message.uncompressed:
-            # display raw message
-            print('\x1b[32m--> message uncompressed (%d bytes):\n%s\x1b[0m'
-                  % (message.size_uncompressed,
-                     protocol.hex_and_ascii(message.uncompressed, 20)))
-        # display decoded message
-        print('\x1b[32m--> %s\x1b[0m' % message)
-    except:
-        print('Error while decoding message from WeeChat')
-        return False
-    return True
-
-def mainloop(sock):
-    """Main loop: read keyboard, send commands, read socket and decode and display received binary messages."""
-    message = ''
-    recvbuf = ''
-    prompt = '\x1b[36mrelay> \x1b[0m'
-    sys.stdout.write(prompt)
-    sys.stdout.flush()
-    try:
-        while True:
-            inr, outr, exceptr = select.select([sys.stdin, sock], [], [], 1)
-            for fd in inr:
-                if fd == sys.stdin:
-                    buf = os.read(fd.fileno(), 4096)
-                    if buf:
-                        message += buf
-                        if '\n' in message:
-                            messages = message.split('\n')
-                            msgsent = '\n'.join(messages[:-1])
-                            if msgsent:
-                                (send_ok, has_quit) = send(sock, msgsent)
-                                if not send_ok:
-                                    return 3
-                                if has_quit:
-                                    return 0
-                            message = messages[-1]
+
+class TestProto:
+
+    def __init__(self, args):
+        self.args = args
+        self.sock = None
+        self.has_quit = False
+        self.address = '{self.args.hostname}/{self.args.port} ' \
+            '(IPv{0})'.format(6 if self.args.ipv6 else 4, self=self)
+
+    def connect(self):
+        """
+        Connect to WeeChat/relay.
+        Return True if OK, False if error.
+        """
+        inet = socket.AF_INET6 if self.args.ipv6 else socket.AF_INET
+        try:
+            self.sock = socket.socket(inet, socket.SOCK_STREAM)
+            self.sock.connect((self.args.hostname, self.args.port))
+        except:
+            if self.sock:
+                self.sock.close()
+            print('Failed to connect to', self.address)
+            return False
+        print('Connected to', self.address)
+        return True
+
+    def send(self, messages):
+        """
+        Send a text message to WeeChat/relay.
+        Return True if OK, False if error.
+        """
+        try:
+            for msg in messages.split('\n'):
+                if msg == 'quit':
+                    self.has_quit = True
+                self.sock.sendall(msg + '\n')
+                print('\x1b[33m<-- ' + msg + '\x1b[0m')
+        except:
+            traceback.print_exc()
+            print('Failed to send message')
+            return False
+        return True
+
+    def decode(self, message):
+        """
+        Decode a binary message received from WeeChat/relay.
+        Return True if OK, False if error.
+        """
+        try:
+            proto = protocol.Protocol()
+            msgd = proto.decode(message,
+                                separator='\n' if self.args.verbose > 0
+                                else ', ')
+            print('')
+            if self.args.verbose >= 2 and msgd.uncompressed:
+                # display raw message
+                print('\x1b[32m--> message uncompressed ({0} bytes):\n'
+                      '{1}\x1b[0m'
+                      ''.format(msgd.size_uncompressed,
+                                protocol.hex_and_ascii(msgd.uncompressed, 20)))
+            # display decoded message
+            print('\x1b[32m--> {0}\x1b[0m'.format(msgd))
+        except:
+            traceback.print_exc()
+            print('Error while decoding message from WeeChat')
+            return False
+        return True
+
+    def send_stdin(self):
+        """
+        Send commands from standard input if some data is available.
+        Return True if OK (it's OK if stdin has no commands),
+        False if error.
+        """
+        inr, outr, exceptr = select.select([sys.stdin], [], [], 0)
+        if inr:
+            data = os.read(sys.stdin.fileno(), 4096)
+            if data:
+                if not test.send(data.strip()):
+                    #self.sock.close()
+                    return False
+            # open stdin to read user commands
+            sys.stdin = open('/dev/tty')
+        return True
+
+    def mainloop(self):
+        """
+        Main loop: read keyboard, send commands, read socket,
+        decode/display binary messages received from WeeChat/relay.
+        Return 0 if OK, 4 if send error, 5 if decode error.
+        """
+        if self.has_quit:
+            return 0
+        message = ''
+        recvbuf = ''
+        prompt = '\x1b[36mrelay> \x1b[0m'
+        sys.stdout.write(prompt)
+        sys.stdout.flush()
+        try:
+            while not self.has_quit:
+                inr, outr, exceptr = select.select([sys.stdin, self.sock],
+                                                   [], [], 1)
+                for fd in inr:
+                    if fd == sys.stdin:
+                        buf = os.read(fd.fileno(), 4096)
+                        if buf:
+                            message += buf
+                            if '\n' in message:
+                                messages = message.split('\n')
+                                msgsent = '\n'.join(messages[:-1])
+                                if msgsent and not self.send(msgsent):
+                                    return 4
+                                message = messages[-1]
+                                sys.stdout.write(prompt + message)
+                                sys.stdout.flush()
+                    else:
+                        buf = fd.recv(4096)
+                        if buf:
+                            recvbuf += buf
+                            while len(recvbuf) >= 4:
+                                remainder = None
+                                length = struct.unpack('>i', recvbuf[0:4])[0]
+                                if len(recvbuf) < length:
+                                    # partial message, just wait for the
+                                    # end of message
+                                    break
+                                # more than one message?
+                                if length < len(recvbuf):
+                                    # save beginning of another message
+                                    remainder = recvbuf[length:]
+                                    recvbuf = recvbuf[0:length]
+                                if not self.decode(recvbuf):
+                                    return 5
+                                if remainder:
+                                    recvbuf = remainder
+                                else:
+                                    recvbuf = ''
                             sys.stdout.write(prompt + message)
                             sys.stdout.flush()
-                else:
-                    buf = fd.recv(4096)
-                    if buf:
-                        recvbuf += buf
-                        while len(recvbuf) >= 4:
-                            remainder = None
-                            length = struct.unpack('>i', recvbuf[0:4])[0]
-                            if len(recvbuf) < length:
-                                # partial message, just wait for end of message
-                                break
-                            # more than one message?
-                            if length < len(recvbuf):
-                                # save beginning of another message
-                                remainder = recvbuf[length:]
-                                recvbuf = recvbuf[0:length]
-                            if not decode(recvbuf):
-                                return 3
-                            if remainder:
-                                recvbuf = remainder
-                            else:
-                                recvbuf = ''
-                        sys.stdout.write(prompt + message)
-                        sys.stdout.flush()
-    except:
-        send(sock, 'quit')
-
-# display help if arguments are missing
-if len(sys.argv) < 3:
-    usage()
-    sys.exit(1)
-
-# read command line arguments
-try:
-    for arg in sys.argv[1:]:
-        if arg[0] == '-':
-            for opt in arg[1:]:
-                options[opt] = options.get(opt, 0) + 1
-        elif hostname:
-            port = int(arg)
-        else:
-            hostname = arg
-except:
-    print('Invalid arguments')
-    sys.exit(1)
-
-if options['h']:
-    usage()
-    sys.exit(0)
-
-# connect to WeeChat/relay
-(sock, inet) = connect((hostname, port), options['6'])
-if not sock:
-    sys.exit(2)
-
-# send commands from standard input if some data is available
-has_quit = False
-inr, outr, exceptr = select.select([sys.stdin], [], [], 0)
-if inr:
-    data = os.read(sys.stdin.fileno(), 4096)
-    if data:
-        (send_ok, has_quit) = send(sock, data.strip())
-        if not send_ok:
-            sock.close()
-            sys.exit(3)
-    # open stdin to read user commands
-    sys.stdin = open('/dev/tty')
-
-# main loop (wait commands, display messages received)
-if not has_quit:
-    mainloop(sock)
-time.sleep(0.5)
-sock.close()
+        except:
+            traceback.print_exc()
+            self.send('quit')
+        return 0
+
+    def __del__(self):
+        print('Closing connection with', self.address)
+        time.sleep(0.5)
+        self.sock.close()
+
+
+if __name__ == "__main__":
+    # parse command line arguments
+    parser = argparse.ArgumentParser(
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        fromfile_prefix_chars='@',
+        description='Command-line program for testing protocol WeeChat/relay.',
+        epilog='''
+Environment variable "TESTPROTO_OPTIONS" can be set with default options.
+Argument "@file.txt" can be used to read default options in a file.
+
+Some commands can be piped to the script, for example:
+  echo "init password=xxxx" | python {0} localhost 5000
+  python {0} localhost 5000 < commands.txt
+
+The script returns:
+  0: OK
+  2: wrong arguments (command line)
+  3: connection error
+  4: send error (message sent to WeeChat)
+  5: decode error (message received from WeeChat)
+'''.format(sys.argv[0]))
+    parser.add_argument('-6', '--ipv6', action='store_true',
+                        help='connect using IPv6')
+    parser.add_argument('-v', '--verbose', action='count', default=0,
+                        help='verbose mode: long objects view '
+                        '(-vv: display raw messages)')
+    parser.add_argument('hostname',
+                        help='hostname (or IP address) of machine running '
+                        'WeeChat/relay')
+    parser.add_argument('port', type=int,
+                        help='port of machine running WeeChat/relay')
+    if len(sys.argv) == 1:
+        parser.print_help()
+        sys.exit(0)
+    args = parser.parse_args(
+        shlex.split(os.getenv('TESTPROTO_OPTIONS') or '') + sys.argv[1:])
+
+    test = TestProto(args)
+
+    # connect to WeeChat/relay
+    if not test.connect():
+        sys.exit(3)
+
+    # send commands from standard input if some data is available
+    if not test.send_stdin():
+        sys.exit(4)
+
+    # main loop (wait commands, display messages received)
+    rc = test.mainloop()
+    del test
+    sys.exit(rc)