]> jfr.im git - yt-dlp.git/blob - yt_dlp/webvtt.py
[cleanup] Sort imports
[yt-dlp.git] / yt_dlp / webvtt.py
1 """
2 A partial parser for WebVTT segments. Interprets enough of the WebVTT stream
3 to be able to assemble a single stand-alone subtitle file, suitably adjusting
4 timestamps on the way, while everything else is passed through unmodified.
5
6 Regular expressions based on the W3C WebVTT specification
7 <https://www.w3.org/TR/webvtt1/>. The X-TIMESTAMP-MAP extension is described
8 in RFC 8216 §3.5 <https://tools.ietf.org/html/rfc8216#section-3.5>.
9 """
10
11 import io
12 import re
13
14 from .compat import compat_Match, compat_Pattern
15 from .utils import int_or_none, timetuple_from_msec
16
17
18 class _MatchParser:
19 """
20 An object that maintains the current parsing position and allows
21 conveniently advancing it as syntax elements are successfully parsed.
22 """
23
24 def __init__(self, string):
25 self._data = string
26 self._pos = 0
27
28 def match(self, r):
29 if isinstance(r, compat_Pattern):
30 return r.match(self._data, self._pos)
31 if isinstance(r, str):
32 if self._data.startswith(r, self._pos):
33 return len(r)
34 return None
35 raise ValueError(r)
36
37 def advance(self, by):
38 if by is None:
39 amt = 0
40 elif isinstance(by, compat_Match):
41 amt = len(by.group(0))
42 elif isinstance(by, str):
43 amt = len(by)
44 elif isinstance(by, int):
45 amt = by
46 else:
47 raise ValueError(by)
48 self._pos += amt
49 return by
50
51 def consume(self, r):
52 return self.advance(self.match(r))
53
54 def child(self):
55 return _MatchChildParser(self)
56
57
58 class _MatchChildParser(_MatchParser):
59 """
60 A child parser state, which advances through the same data as
61 its parent, but has an independent position. This is useful when
62 advancing through syntax elements we might later want to backtrack
63 from.
64 """
65
66 def __init__(self, parent):
67 super().__init__(parent._data)
68 self.__parent = parent
69 self._pos = parent._pos
70
71 def commit(self):
72 """
73 Advance the parent state to the current position of this child state.
74 """
75 self.__parent._pos = self._pos
76 return self.__parent
77
78
79 class ParseError(Exception):
80 def __init__(self, parser):
81 super().__init__("Parse error at position %u (near %r)" % (
82 parser._pos, parser._data[parser._pos:parser._pos + 20]
83 ))
84
85
86 # While the specification <https://www.w3.org/TR/webvtt1/#webvtt-timestamp>
87 # prescribes that hours must be *2 or more* digits, timestamps with a single
88 # digit for the hour part has been seen in the wild.
89 # See https://github.com/yt-dlp/yt-dlp/issues/921
90 _REGEX_TS = re.compile(r'''(?x)
91 (?:([0-9]{1,}):)?
92 ([0-9]{2}):
93 ([0-9]{2})\.
94 ([0-9]{3})?
95 ''')
96 _REGEX_EOF = re.compile(r'\Z')
97 _REGEX_NL = re.compile(r'(?:\r\n|[\r\n])')
98 _REGEX_BLANK = re.compile(r'(?:\r\n|[\r\n])+')
99
100
101 def _parse_ts(ts):
102 """
103 Convert a parsed WebVTT timestamp (a re.Match obtained from _REGEX_TS)
104 into an MPEG PES timestamp: a tick counter at 90 kHz resolution.
105 """
106
107 h, min, s, ms = ts.groups()
108 return 90 * (
109 int(h or 0) * 3600000 + # noqa: W504,E221,E222
110 int(min) * 60000 + # noqa: W504,E221,E222
111 int(s) * 1000 + # noqa: W504,E221,E222
112 int(ms) # noqa: W504,E221,E222
113 )
114
115
116 def _format_ts(ts):
117 """
118 Convert an MPEG PES timestamp into a WebVTT timestamp.
119 This will lose sub-millisecond precision.
120 """
121 return '%02u:%02u:%02u.%03u' % timetuple_from_msec(int((ts + 45) // 90))
122
123
124 class Block:
125 """
126 An abstract WebVTT block.
127 """
128
129 def __init__(self, **kwargs):
130 for key, val in kwargs.items():
131 setattr(self, key, val)
132
133 @classmethod
134 def parse(cls, parser):
135 m = parser.match(cls._REGEX)
136 if not m:
137 return None
138 parser.advance(m)
139 return cls(raw=m.group(0))
140
141 def write_into(self, stream):
142 stream.write(self.raw)
143
144
145 class HeaderBlock(Block):
146 """
147 A WebVTT block that may only appear in the header part of the file,
148 i.e. before any cue blocks.
149 """
150
151 pass
152
153
154 class Magic(HeaderBlock):
155 _REGEX = re.compile(r'\ufeff?WEBVTT([ \t][^\r\n]*)?(?:\r\n|[\r\n])')
156
157 # XXX: The X-TIMESTAMP-MAP extension is described in RFC 8216 §3.5
158 # <https://tools.ietf.org/html/rfc8216#section-3.5>, but the RFC
159 # doesn’t specify the exact grammar nor where in the WebVTT
160 # syntax it should be placed; the below has been devised based
161 # on usage in the wild
162 #
163 # And strictly speaking, the presence of this extension violates
164 # the W3C WebVTT spec. Oh well.
165
166 _REGEX_TSMAP = re.compile(r'X-TIMESTAMP-MAP=')
167 _REGEX_TSMAP_LOCAL = re.compile(r'LOCAL:')
168 _REGEX_TSMAP_MPEGTS = re.compile(r'MPEGTS:([0-9]+)')
169 _REGEX_TSMAP_SEP = re.compile(r'[ \t]*,[ \t]*')
170
171 @classmethod
172 def __parse_tsmap(cls, parser):
173 parser = parser.child()
174
175 while True:
176 m = parser.consume(cls._REGEX_TSMAP_LOCAL)
177 if m:
178 m = parser.consume(_REGEX_TS)
179 if m is None:
180 raise ParseError(parser)
181 local = _parse_ts(m)
182 if local is None:
183 raise ParseError(parser)
184 else:
185 m = parser.consume(cls._REGEX_TSMAP_MPEGTS)
186 if m:
187 mpegts = int_or_none(m.group(1))
188 if mpegts is None:
189 raise ParseError(parser)
190 else:
191 raise ParseError(parser)
192 if parser.consume(cls._REGEX_TSMAP_SEP):
193 continue
194 if parser.consume(_REGEX_NL):
195 break
196 raise ParseError(parser)
197
198 parser.commit()
199 return local, mpegts
200
201 @classmethod
202 def parse(cls, parser):
203 parser = parser.child()
204
205 m = parser.consume(cls._REGEX)
206 if not m:
207 raise ParseError(parser)
208
209 extra = m.group(1)
210 local, mpegts = None, None
211 if parser.consume(cls._REGEX_TSMAP):
212 local, mpegts = cls.__parse_tsmap(parser)
213 if not parser.consume(_REGEX_NL):
214 raise ParseError(parser)
215 parser.commit()
216 return cls(extra=extra, mpegts=mpegts, local=local)
217
218 def write_into(self, stream):
219 stream.write('WEBVTT')
220 if self.extra is not None:
221 stream.write(self.extra)
222 stream.write('\n')
223 if self.local or self.mpegts:
224 stream.write('X-TIMESTAMP-MAP=LOCAL:')
225 stream.write(_format_ts(self.local if self.local is not None else 0))
226 stream.write(',MPEGTS:')
227 stream.write(str(self.mpegts if self.mpegts is not None else 0))
228 stream.write('\n')
229 stream.write('\n')
230
231
232 class StyleBlock(HeaderBlock):
233 _REGEX = re.compile(r'''(?x)
234 STYLE[\ \t]*(?:\r\n|[\r\n])
235 ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
236 (?:\r\n|[\r\n])
237 ''')
238
239
240 class RegionBlock(HeaderBlock):
241 _REGEX = re.compile(r'''(?x)
242 REGION[\ \t]*
243 ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
244 (?:\r\n|[\r\n])
245 ''')
246
247
248 class CommentBlock(Block):
249 _REGEX = re.compile(r'''(?x)
250 NOTE(?:\r\n|[\ \t\r\n])
251 ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
252 (?:\r\n|[\r\n])
253 ''')
254
255
256 class CueBlock(Block):
257 """
258 A cue block. The payload is not interpreted.
259 """
260
261 _REGEX_ID = re.compile(r'((?:(?!-->)[^\r\n])+)(?:\r\n|[\r\n])')
262 _REGEX_ARROW = re.compile(r'[ \t]+-->[ \t]+')
263 _REGEX_SETTINGS = re.compile(r'[ \t]+((?:(?!-->)[^\r\n])+)')
264 _REGEX_PAYLOAD = re.compile(r'[^\r\n]+(?:\r\n|[\r\n])?')
265
266 @classmethod
267 def parse(cls, parser):
268 parser = parser.child()
269
270 id = None
271 m = parser.consume(cls._REGEX_ID)
272 if m:
273 id = m.group(1)
274
275 m0 = parser.consume(_REGEX_TS)
276 if not m0:
277 return None
278 if not parser.consume(cls._REGEX_ARROW):
279 return None
280 m1 = parser.consume(_REGEX_TS)
281 if not m1:
282 return None
283 m2 = parser.consume(cls._REGEX_SETTINGS)
284 if not parser.consume(_REGEX_NL):
285 return None
286
287 start = _parse_ts(m0)
288 end = _parse_ts(m1)
289 settings = m2.group(1) if m2 is not None else None
290
291 text = io.StringIO()
292 while True:
293 m = parser.consume(cls._REGEX_PAYLOAD)
294 if not m:
295 break
296 text.write(m.group(0))
297
298 parser.commit()
299 return cls(
300 id=id,
301 start=start, end=end, settings=settings,
302 text=text.getvalue()
303 )
304
305 def write_into(self, stream):
306 if self.id is not None:
307 stream.write(self.id)
308 stream.write('\n')
309 stream.write(_format_ts(self.start))
310 stream.write(' --> ')
311 stream.write(_format_ts(self.end))
312 if self.settings is not None:
313 stream.write(' ')
314 stream.write(self.settings)
315 stream.write('\n')
316 stream.write(self.text)
317 stream.write('\n')
318
319 @property
320 def as_json(self):
321 return {
322 'id': self.id,
323 'start': self.start,
324 'end': self.end,
325 'text': self.text,
326 'settings': self.settings,
327 }
328
329 def __eq__(self, other):
330 return self.as_json == other.as_json
331
332 @classmethod
333 def from_json(cls, json):
334 return cls(
335 id=json['id'],
336 start=json['start'],
337 end=json['end'],
338 text=json['text'],
339 settings=json['settings']
340 )
341
342 def hinges(self, other):
343 if self.text != other.text:
344 return False
345 if self.settings != other.settings:
346 return False
347 return self.start <= self.end == other.start <= other.end
348
349
350 def parse_fragment(frag_content):
351 """
352 A generator that yields (partially) parsed WebVTT blocks when given
353 a bytes object containing the raw contents of a WebVTT file.
354 """
355
356 parser = _MatchParser(frag_content.decode('utf-8'))
357
358 yield Magic.parse(parser)
359
360 while not parser.match(_REGEX_EOF):
361 if parser.consume(_REGEX_BLANK):
362 continue
363
364 block = RegionBlock.parse(parser)
365 if block:
366 yield block
367 continue
368 block = StyleBlock.parse(parser)
369 if block:
370 yield block
371 continue
372 block = CommentBlock.parse(parser)
373 if block:
374 yield block # XXX: or skip
375 continue
376
377 break
378
379 while not parser.match(_REGEX_EOF):
380 if parser.consume(_REGEX_BLANK):
381 continue
382
383 block = CommentBlock.parse(parser)
384 if block:
385 yield block # XXX: or skip
386 continue
387 block = CueBlock.parse(parser)
388 if block:
389 yield block
390 continue
391
392 raise ParseError(parser)