class LenientJSONDecoder(json.JSONDecoder):
- def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
+ # TODO: Write tests
+ def __init__(self, *args, transform_source=None, ignore_extra=False, close_objects=0, **kwargs):
self.transform_source, self.ignore_extra = transform_source, ignore_extra
+ self._close_attempts = 2 * close_objects
super().__init__(*args, **kwargs)
+ @staticmethod
+ def _close_object(err):
+ doc = err.doc[:err.pos]
+ # We need to add comma first to get the correct error message
+ if err.msg.startswith('Expecting \',\''):
+ return doc + ','
+ elif not doc.endswith(','):
+ return
+
+ if err.msg.startswith('Expecting property name'):
+ return doc[:-1] + '}'
+ elif err.msg.startswith('Expecting value'):
+ return doc[:-1] + ']'
+
def decode(self, s):
if self.transform_source:
s = self.transform_source(s)
- try:
- if self.ignore_extra:
- return self.raw_decode(s.lstrip())[0]
- return super().decode(s)
- except json.JSONDecodeError as e:
- if e.pos is not None:
+ for attempt in range(self._close_attempts + 1):
+ try:
+ if self.ignore_extra:
+ return self.raw_decode(s.lstrip())[0]
+ return super().decode(s)
+ except json.JSONDecodeError as e:
+ if e.pos is None:
+ raise
+ elif attempt < self._close_attempts:
+ s = self._close_object(e)
+ if s is not None:
+ continue
raise type(e)(f'{e.msg} in {s[e.pos-10:e.pos+10]!r}', s, e.pos)
- raise
+ assert False, 'Too many attempts to decode JSON'
def sanitize_open(filename, open_mode):