-# coding: utf-8
-from __future__ import unicode_literals
-
import io
import quopri
import re
import uuid
from .fragment import FragmentFD
-from ..utils import (
- escapeHTML,
- formatSeconds,
- srt_subtitles_timecode,
- urljoin,
-)
+from ..compat import imghdr
+from ..utils import escapeHTML, formatSeconds, srt_subtitles_timecode, urljoin
from ..version import __version__ as YT_DLP_VERSION
class MhtmlFD(FragmentFD):
- FD_NAME = 'mhtml'
-
_STYLESHEET = """\
html, body {
margin: 0;
def _escape_mime(s):
return '=?utf-8?Q?' + (b''.join(
bytes((b,)) if b >= 0x20 else b'=%02X' % b
- for b in quopri.encodestring(s.encode('utf-8'), header=True)
+ for b in quopri.encodestring(s.encode(), header=True)
)).decode('us-ascii') + '?='
def _gen_cid(self, i, fragment, frag_boundary):
length=len(stub),
title=self._escape_mime(title),
stub=stub
- ).encode('utf-8'))
+ ).encode())
extra_state['header_written'] = True
for i, fragment in enumerate(fragments):
if (i + 1) <= ctx['fragment_index']:
continue
- fragment_url = urljoin(fragment_base_url, fragment['path'])
- success, frag_content = self._download_fragment(ctx, fragment_url, info_dict)
+ fragment_url = fragment.get('url')
+ if not fragment_url:
+ assert fragment_base_url
+ fragment_url = urljoin(fragment_base_url, fragment['path'])
+
+ success = self._download_fragment(ctx, fragment_url, info_dict)
if not success:
continue
-
- mime_type = b'image/jpeg'
- if frag_content.startswith(b'\x89PNG\r\n\x1a\n'):
- mime_type = b'image/png'
- if frag_content.startswith((b'GIF87a', b'GIF89a')):
- mime_type = b'image/gif'
- if frag_content.startswith(b'RIFF') and frag_content[8:12] == 'WEBP':
- mime_type = b'image/webp'
+ frag_content = self._read_fragment(ctx)
frag_header = io.BytesIO()
frag_header.write(
frag_header.write(
b'Content-ID: <%b>\r\n' % self._gen_cid(i, fragment, frag_boundary).encode('us-ascii'))
frag_header.write(
- b'Content-type: %b\r\n' % mime_type)
+ b'Content-type: %b\r\n' % f'image/{imghdr.what(h=frag_content) or "jpeg"}'.encode())
frag_header.write(
b'Content-length: %u\r\n' % len(frag_content))
frag_header.write(
ctx['dest_stream'].write(
b'--%b--\r\n\r\n' % frag_boundary.encode('us-ascii'))
- self._finish_frag_download(ctx, info_dict)
- return True
+ return self._finish_frag_download(ctx, info_dict)