# module helper functions, see modules/modtest.py for usage
# This file is released into the public domain; see http://unlicense.org/
+import abc
import sys
import socket
from functools import wraps
KNOWN = 1
AUTHED = 0 # Users which have are known to be authed
ANYONE = -1 # non-authed users have glevel set to -1
- IGNORED = -2 # The default reqglevel is ANYONE, so any commands will be ignored from IGNORED users unless the command reglevel=-2
+ IGNORED = -2 # If the user is IGNORED, no hooks or chanhooks will fire for their messages. numhooks can still be fired.
glevs = {
'OWNER': OWNER,
'MANAGER': MANAGER,
WRONGARGS = "Wrong number of arguments."
def __init__(self, name):
+ self.Socketlike = Socketlike
self.hooks = {} # {command:handler}
self.chanhooks = {} # {channel:handler}
self.exceptionhooks = [] # [(exception,handler)]
return self._hooksocket(socket.AF_UNIX, socket.SOCK_STREAM, path, data)
def _hooksocket(self, af, ty, address, data):
def realhook(cls):
- if not (hasattr(cls, 'getdata') and callable(cls.getdata)):
- # Check early that the object implements getdata.
- # If getdata ever returns a non-empty list, then a parse method must also exist, but we don't check that.
+ if not issubclass(cls, Socketlike):
raise Exception('Attempted to hook a socket without a class to process data')
self.sockhooks.append((af, ty, address, cls, data))
if self.parent is not None:
else:
return error('unknown parent')
+
+ def flags(self, *flags):
+ """Parses out "flags" to a command, like `MODUNLOAD -AUTOLOAD somemodule`
+ @lib.hook()
+ @lib.flags('autoload', 'force')
+ def baz(bot, user, chan, realtarget, flags, *args)
+
+ Note the added `flags` argument, which will be a dict - in this case `{'autounload':true,'force':false}`."""
+ def realhook(func):
+ func.flags = [f.lower() for f in flags]
+
+ @wraps(func)
+ def parseargs(bot, user, chan, realtarget, *_args):
+ args = list(_args) # we need a copy, also need a list, iterate over _args-tuple, mutate args-list
+ found_flags = {f: False for f in flags}
+ for arg in _args:
+ if arg[0] == "-" and len(arg) > 1:
+ found_prefix = None
+ possible_flag = arg[1:].lower()
+ for f in flags:
+ if possible_flag == f: # Exact match?
+ found_flags[possible_flag] = True
+ args.remove(arg)
+ found_prefix = None
+ break
+ elif f.find(possible_flag) == 0: # Is the current word a prefix of a flag?
+ if found_prefix is not None: # Is it also a prefix of another flag?
+ return 'Error: %s is a prefix of multiple flags (%s, %s).' % (possible_flag, found_prefix[1], f)
+ else:
+ found_prefix = (arg,f)
+ if found_prefix is not None: # found (only one) prefix
+ found_flags[found_prefix[1]] = True
+ args.remove(found_prefix[0])
+ return func(bot, user, chan, realtarget, found_flags, *args)
+
+ return parseargs
+ return realhook
+
+
def argsEQ(self, num):
def realhook(func):
@wraps(func)
def checkargs(bot, user, chan, realtarget, *args):
- if len(args) == num:
+ adjuster = 0
+ if hasattr(checkargs, 'flags'):
+ adjuster = 1
+ if len(args)-adjuster == num:
return func(bot, user, chan, realtarget, *args)
else:
bot.msg(user, self.WRONGARGS)
def realhook(func):
@wraps(func)
def checkargs(bot, user, chan, realtarget, *args):
- if len(args) >= num:
+ adjuster = 0
+ if hasattr(checkargs, 'flags'):
+ adjuster = 1
+ if len(args)-adjuster >= num:
return func(bot, user, chan, realtarget, *args)
else:
bot.msg(user, self.WRONGARGS)
return func
return realhook
-class _ListenSocket(object):
+class Socketlike(abc.ABC):
+ def __init__(self, sock, data):
+ """This default method saves the socket in self.sock and creates self.buffer for getdata(). The data is discarded."""
+ self.sock = sock
+ self.buffer = b''
+
+ def getdata(self):
+ """This default method gets LF or CRLF separated lines from the socket and returns an array of completely-seen lines to the core.
+ This should work well for most line-based protocols (like IRC)."""
+ recvd = self.sock.recv(8192)
+ if recvd == b"": # EOF
+ if len(self.buffer) != 0:
+ # Process what's left in the buffer. We'll get called again after.
+ remaining_buf = self.buffer.decode('utf-8', 'backslashreplace')
+ self.buffer = b""
+ return [remaining_buf]
+ else:
+ # Nothing left in the buffer. Return None to signal the core to close this socket.
+ return None
+ self.buffer += recvd
+ lines = []
+
+ while b"\n" in self.buffer:
+ pieces = self.buffer.split(b"\n", 1)
+ s = pieces[0].decode('utf-8', 'backslashreplace').rstrip("\r")
+ lines.append(pieces[0].decode('utf-8', 'backslashreplace'))
+ self.buffer = pieces[1]
+
+ return lines
+
+ def __str__(self):
+ return '%s#%d' % (self.__class__.__name__, self.sock.fileno())
+ def __repr__(self):
+ return '<%s.%s #%d %s:%d>' % ((self.__class__.__module__, self.__class__.__name__, self.sock.fileno())+self.sock.getpeername())
+
+ @abc.abstractmethod
+ def parse(self, chunk): pass
+
+ @classmethod
+ def __subclasshook__(cls, C):
+ if cls is Socketlike:
+ if any('parse' in B.__dict__ and 'getdata' in B.__dict__ for B in C.__mro__):
+ return True
+ return NotImplemented
+
+class _ListenSocket(Socketlike):
def __init__(self, lib, sock, cls, data):
self.clients = []
self.lib = lib
self.clients.remove((client,obj))
return close
+ def parse(self):
+ # getdata will never return a non-empty array, so parse will never be called; but Socketlike requires this method
+ pass
+
def getdata(self):
client, addr = self.sock.accept()
obj = self.cls(client, self.data)