]>
Commit | Line | Data |
---|---|---|
1 | import asyncio | |
2 | import contextlib | |
3 | import os | |
4 | import signal | |
5 | import threading | |
6 | ||
7 | from .common import FileDownloader | |
8 | from .external import FFmpegFD | |
9 | from ..dependencies import websockets | |
10 | ||
11 | ||
12 | class FFmpegSinkFD(FileDownloader): | |
13 | """ A sink to ffmpeg for downloading fragments in any form """ | |
14 | ||
15 | def real_download(self, filename, info_dict): | |
16 | info_copy = info_dict.copy() | |
17 | info_copy['url'] = '-' | |
18 | ||
19 | async def call_conn(proc, stdin): | |
20 | try: | |
21 | await self.real_connection(stdin, info_dict) | |
22 | except OSError: | |
23 | pass | |
24 | finally: | |
25 | with contextlib.suppress(OSError): | |
26 | stdin.flush() | |
27 | stdin.close() | |
28 | os.kill(os.getpid(), signal.SIGINT) | |
29 | ||
30 | class FFmpegStdinFD(FFmpegFD): | |
31 | @classmethod | |
32 | def get_basename(cls): | |
33 | return FFmpegFD.get_basename() | |
34 | ||
35 | def on_process_started(self, proc, stdin): | |
36 | thread = threading.Thread(target=asyncio.run, daemon=True, args=(call_conn(proc, stdin), )) | |
37 | thread.start() | |
38 | ||
39 | return FFmpegStdinFD(self.ydl, self.params or {}).download(filename, info_copy) | |
40 | ||
41 | async def real_connection(self, sink, info_dict): | |
42 | """ Override this in subclasses """ | |
43 | raise NotImplementedError('This method must be implemented by subclasses') | |
44 | ||
45 | ||
46 | class WebSocketFragmentFD(FFmpegSinkFD): | |
47 | async def real_connection(self, sink, info_dict): | |
48 | async with websockets.connect(info_dict['url'], extra_headers=info_dict.get('http_headers', {})) as ws: | |
49 | while True: | |
50 | recv = await ws.recv() | |
51 | if isinstance(recv, str): | |
52 | recv = recv.encode('utf8') | |
53 | sink.write(recv) |