]> jfr.im git - yt-dlp.git/blob - yt_dlp/downloader/mhtml.py
[cleanup] Misc cleanup
[yt-dlp.git] / yt_dlp / downloader / mhtml.py
1 import io
2 import quopri
3 import re
4 import uuid
5
6 from .fragment import FragmentFD
7 from ..utils import escapeHTML, formatSeconds, srt_subtitles_timecode, urljoin
8 from ..version import __version__ as YT_DLP_VERSION
9
10
11 class MhtmlFD(FragmentFD):
12 _STYLESHEET = """\
13 html, body {
14 margin: 0;
15 padding: 0;
16 height: 100vh;
17 }
18
19 html {
20 overflow-y: scroll;
21 scroll-snap-type: y mandatory;
22 }
23
24 body {
25 scroll-snap-type: y mandatory;
26 display: flex;
27 flex-flow: column;
28 }
29
30 body > figure {
31 max-width: 100vw;
32 max-height: 100vh;
33 scroll-snap-align: center;
34 }
35
36 body > figure > figcaption {
37 text-align: center;
38 height: 2.5em;
39 }
40
41 body > figure > img {
42 display: block;
43 margin: auto;
44 max-width: 100%;
45 max-height: calc(100vh - 5em);
46 }
47 """
48 _STYLESHEET = re.sub(r'\s+', ' ', _STYLESHEET)
49 _STYLESHEET = re.sub(r'\B \B|(?<=[\w\-]) (?=[^\w\-])|(?<=[^\w\-]) (?=[\w\-])', '', _STYLESHEET)
50
51 @staticmethod
52 def _escape_mime(s):
53 return '=?utf-8?Q?' + (b''.join(
54 bytes((b,)) if b >= 0x20 else b'=%02X' % b
55 for b in quopri.encodestring(s.encode(), header=True)
56 )).decode('us-ascii') + '?='
57
58 def _gen_cid(self, i, fragment, frag_boundary):
59 return '%u.%s@yt-dlp.github.io.invalid' % (i, frag_boundary)
60
61 def _gen_stub(self, *, fragments, frag_boundary, title):
62 output = io.StringIO()
63
64 output.write((
65 '<!DOCTYPE html>'
66 '<html>'
67 '<head>'
68 '' '<meta name="generator" content="yt-dlp {version}">'
69 '' '<title>{title}</title>'
70 '' '<style>{styles}</style>'
71 '<body>'
72 ).format(
73 version=escapeHTML(YT_DLP_VERSION),
74 styles=self._STYLESHEET,
75 title=escapeHTML(title)
76 ))
77
78 t0 = 0
79 for i, frag in enumerate(fragments):
80 output.write('<figure>')
81 try:
82 t1 = t0 + frag['duration']
83 output.write((
84 '<figcaption>Slide #{num}: {t0} {t1} (duration: {duration})</figcaption>'
85 ).format(
86 num=i + 1,
87 t0=srt_subtitles_timecode(t0),
88 t1=srt_subtitles_timecode(t1),
89 duration=formatSeconds(frag['duration'], msec=True)
90 ))
91 except (KeyError, ValueError, TypeError):
92 t1 = None
93 output.write((
94 '<figcaption>Slide #{num}</figcaption>'
95 ).format(num=i + 1))
96 output.write('<img src="cid:{cid}">'.format(
97 cid=self._gen_cid(i, frag, frag_boundary)))
98 output.write('</figure>')
99 t0 = t1
100
101 return output.getvalue()
102
103 def real_download(self, filename, info_dict):
104 fragment_base_url = info_dict.get('fragment_base_url')
105 fragments = info_dict['fragments'][:1] if self.params.get(
106 'test', False) else info_dict['fragments']
107 title = info_dict.get('title', info_dict['format_id'])
108 origin = info_dict.get('webpage_url', info_dict['url'])
109
110 ctx = {
111 'filename': filename,
112 'total_frags': len(fragments),
113 }
114
115 self._prepare_and_start_frag_download(ctx, info_dict)
116
117 extra_state = ctx.setdefault('extra_state', {
118 'header_written': False,
119 'mime_boundary': str(uuid.uuid4()).replace('-', ''),
120 })
121
122 frag_boundary = extra_state['mime_boundary']
123
124 if not extra_state['header_written']:
125 stub = self._gen_stub(
126 fragments=fragments,
127 frag_boundary=frag_boundary,
128 title=title
129 )
130
131 ctx['dest_stream'].write((
132 'MIME-Version: 1.0\r\n'
133 'From: <nowhere@yt-dlp.github.io.invalid>\r\n'
134 'To: <nowhere@yt-dlp.github.io.invalid>\r\n'
135 'Subject: {title}\r\n'
136 'Content-type: multipart/related; '
137 '' 'boundary="{boundary}"; '
138 '' 'type="text/html"\r\n'
139 'X.yt-dlp.Origin: {origin}\r\n'
140 '\r\n'
141 '--{boundary}\r\n'
142 'Content-Type: text/html; charset=utf-8\r\n'
143 'Content-Length: {length}\r\n'
144 '\r\n'
145 '{stub}\r\n'
146 ).format(
147 origin=origin,
148 boundary=frag_boundary,
149 length=len(stub),
150 title=self._escape_mime(title),
151 stub=stub
152 ).encode())
153 extra_state['header_written'] = True
154
155 for i, fragment in enumerate(fragments):
156 if (i + 1) <= ctx['fragment_index']:
157 continue
158
159 fragment_url = fragment.get('url')
160 if not fragment_url:
161 assert fragment_base_url
162 fragment_url = urljoin(fragment_base_url, fragment['path'])
163
164 success = self._download_fragment(ctx, fragment_url, info_dict)
165 if not success:
166 continue
167 frag_content = self._read_fragment(ctx)
168
169 mime_type = b'image/jpeg'
170 if frag_content.startswith(b'\x89PNG\r\n\x1a\n'):
171 mime_type = b'image/png'
172 if frag_content.startswith((b'GIF87a', b'GIF89a')):
173 mime_type = b'image/gif'
174 if frag_content.startswith(b'RIFF') and frag_content[8:12] == b'WEBP':
175 mime_type = b'image/webp'
176
177 frag_header = io.BytesIO()
178 frag_header.write(
179 b'--%b\r\n' % frag_boundary.encode('us-ascii'))
180 frag_header.write(
181 b'Content-ID: <%b>\r\n' % self._gen_cid(i, fragment, frag_boundary).encode('us-ascii'))
182 frag_header.write(
183 b'Content-type: %b\r\n' % mime_type)
184 frag_header.write(
185 b'Content-length: %u\r\n' % len(frag_content))
186 frag_header.write(
187 b'Content-location: %b\r\n' % fragment_url.encode('us-ascii'))
188 frag_header.write(
189 b'X.yt-dlp.Duration: %f\r\n' % fragment['duration'])
190 frag_header.write(b'\r\n')
191 self._append_fragment(
192 ctx, frag_header.getvalue() + frag_content + b'\r\n')
193
194 ctx['dest_stream'].write(
195 b'--%b--\r\n\r\n' % frag_boundary.encode('us-ascii'))
196 self._finish_frag_download(ctx, info_dict)
197 return True