]> jfr.im git - yt-dlp.git/blob - yt_dlp/downloader/mhtml.py
[cleanup] Add more ruff rules (#10149)
[yt-dlp.git] / yt_dlp / downloader / mhtml.py
1 import io
2 import quopri
3 import re
4 import uuid
5
6 from .fragment import FragmentFD
7 from ..compat import imghdr
8 from ..utils import escapeHTML, formatSeconds, srt_subtitles_timecode, urljoin
9 from ..version import __version__ as YT_DLP_VERSION
10
11
12 class MhtmlFD(FragmentFD):
13 _STYLESHEET = '''\
14 html, body {
15 margin: 0;
16 padding: 0;
17 height: 100vh;
18 }
19
20 html {
21 overflow-y: scroll;
22 scroll-snap-type: y mandatory;
23 }
24
25 body {
26 scroll-snap-type: y mandatory;
27 display: flex;
28 flex-flow: column;
29 }
30
31 body > figure {
32 max-width: 100vw;
33 max-height: 100vh;
34 scroll-snap-align: center;
35 }
36
37 body > figure > figcaption {
38 text-align: center;
39 height: 2.5em;
40 }
41
42 body > figure > img {
43 display: block;
44 margin: auto;
45 max-width: 100%;
46 max-height: calc(100vh - 5em);
47 }
48 '''
49 _STYLESHEET = re.sub(r'\s+', ' ', _STYLESHEET)
50 _STYLESHEET = re.sub(r'\B \B|(?<=[\w\-]) (?=[^\w\-])|(?<=[^\w\-]) (?=[\w\-])', '', _STYLESHEET)
51
52 @staticmethod
53 def _escape_mime(s):
54 return '=?utf-8?Q?' + (b''.join(
55 bytes((b,)) if b >= 0x20 else b'=%02X' % b
56 for b in quopri.encodestring(s.encode(), header=True)
57 )).decode('us-ascii') + '?='
58
59 def _gen_cid(self, i, fragment, frag_boundary):
60 return f'{i}.{frag_boundary}@yt-dlp.github.io.invalid'
61
62 def _gen_stub(self, *, fragments, frag_boundary, title):
63 output = io.StringIO()
64
65 output.write(
66 '<!DOCTYPE html>'
67 '<html>'
68 '<head>'
69 f'<meta name="generator" content="yt-dlp {escapeHTML(YT_DLP_VERSION)}">'
70 f'<title>{escapeHTML(title)}</title>'
71 f'<style>{self._STYLESHEET}</style>'
72 '<body>')
73
74 t0 = 0
75 for i, frag in enumerate(fragments):
76 output.write('<figure>')
77 try:
78 t1 = t0 + frag['duration']
79 output.write((
80 '<figcaption>Slide #{num}: {t0} {t1} (duration: {duration})</figcaption>'
81 ).format(
82 num=i + 1,
83 t0=srt_subtitles_timecode(t0),
84 t1=srt_subtitles_timecode(t1),
85 duration=formatSeconds(frag['duration'], msec=True),
86 ))
87 except (KeyError, ValueError, TypeError):
88 t1 = None
89 output.write(f'<figcaption>Slide #{i + 1}</figcaption>')
90 output.write(f'<img src="cid:{self._gen_cid(i, frag, frag_boundary)}">')
91 output.write('</figure>')
92 t0 = t1
93
94 return output.getvalue()
95
96 def real_download(self, filename, info_dict):
97 fragment_base_url = info_dict.get('fragment_base_url')
98 fragments = info_dict['fragments'][:1] if self.params.get(
99 'test', False) else info_dict['fragments']
100 title = info_dict.get('title', info_dict['format_id'])
101 origin = info_dict.get('webpage_url', info_dict['url'])
102
103 ctx = {
104 'filename': filename,
105 'total_frags': len(fragments),
106 }
107
108 self._prepare_and_start_frag_download(ctx, info_dict)
109
110 extra_state = ctx.setdefault('extra_state', {
111 'header_written': False,
112 'mime_boundary': str(uuid.uuid4()).replace('-', ''),
113 })
114
115 frag_boundary = extra_state['mime_boundary']
116
117 if not extra_state['header_written']:
118 stub = self._gen_stub(
119 fragments=fragments,
120 frag_boundary=frag_boundary,
121 title=title,
122 )
123
124 ctx['dest_stream'].write((
125 'MIME-Version: 1.0\r\n'
126 'From: <nowhere@yt-dlp.github.io.invalid>\r\n'
127 'To: <nowhere@yt-dlp.github.io.invalid>\r\n'
128 f'Subject: {self._escape_mime(title)}\r\n'
129 'Content-type: multipart/related; '
130 f'boundary="{frag_boundary}"; '
131 'type="text/html"\r\n'
132 f'X.yt-dlp.Origin: {origin}\r\n'
133 '\r\n'
134 f'--{frag_boundary}\r\n'
135 'Content-Type: text/html; charset=utf-8\r\n'
136 f'Content-Length: {len(stub)}\r\n'
137 '\r\n'
138 f'{stub}\r\n').encode())
139 extra_state['header_written'] = True
140
141 for i, fragment in enumerate(fragments):
142 if (i + 1) <= ctx['fragment_index']:
143 continue
144
145 fragment_url = fragment.get('url')
146 if not fragment_url:
147 assert fragment_base_url
148 fragment_url = urljoin(fragment_base_url, fragment['path'])
149
150 success = self._download_fragment(ctx, fragment_url, info_dict)
151 if not success:
152 continue
153 frag_content = self._read_fragment(ctx)
154
155 frag_header = io.BytesIO()
156 frag_header.write(
157 b'--%b\r\n' % frag_boundary.encode('us-ascii'))
158 frag_header.write(
159 b'Content-ID: <%b>\r\n' % self._gen_cid(i, fragment, frag_boundary).encode('us-ascii'))
160 frag_header.write(
161 b'Content-type: %b\r\n' % f'image/{imghdr.what(h=frag_content) or "jpeg"}'.encode())
162 frag_header.write(
163 b'Content-length: %u\r\n' % len(frag_content))
164 frag_header.write(
165 b'Content-location: %b\r\n' % fragment_url.encode('us-ascii'))
166 frag_header.write(
167 b'X.yt-dlp.Duration: %f\r\n' % fragment['duration'])
168 frag_header.write(b'\r\n')
169 self._append_fragment(
170 ctx, frag_header.getvalue() + frag_content + b'\r\n')
171
172 ctx['dest_stream'].write(
173 b'--%b--\r\n\r\n' % frag_boundary.encode('us-ascii'))
174 return self._finish_frag_download(ctx, info_dict)