]> jfr.im git - yt-dlp.git/blob - yt_dlp/downloader/websocket.py
[MainStreaming] Add extractor (#2180)
[yt-dlp.git] / yt_dlp / downloader / websocket.py
1 import os
2 import signal
3 import asyncio
4 import threading
5
6 try:
7 import websockets
8 has_websockets = True
9 except ImportError:
10 has_websockets = False
11
12 from .common import FileDownloader
13 from .external import FFmpegFD
14
15
16 class FFmpegSinkFD(FileDownloader):
17 """ A sink to ffmpeg for downloading fragments in any form """
18
19 def real_download(self, filename, info_dict):
20 info_copy = info_dict.copy()
21 info_copy['url'] = '-'
22
23 async def call_conn(proc, stdin):
24 try:
25 await self.real_connection(stdin, info_dict)
26 except (BrokenPipeError, OSError):
27 pass
28 finally:
29 try:
30 stdin.flush()
31 stdin.close()
32 except OSError:
33 pass
34 os.kill(os.getpid(), signal.SIGINT)
35
36 class FFmpegStdinFD(FFmpegFD):
37 @classmethod
38 def get_basename(cls):
39 return FFmpegFD.get_basename()
40
41 def on_process_started(self, proc, stdin):
42 thread = threading.Thread(target=asyncio.run, daemon=True, args=(call_conn(proc, stdin), ))
43 thread.start()
44
45 return FFmpegStdinFD(self.ydl, self.params or {}).download(filename, info_copy)
46
47 async def real_connection(self, sink, info_dict):
48 """ Override this in subclasses """
49 raise NotImplementedError('This method must be implemented by subclasses')
50
51
52 class WebSocketFragmentFD(FFmpegSinkFD):
53 async def real_connection(self, sink, info_dict):
54 async with websockets.connect(info_dict['url'], extra_headers=info_dict.get('http_headers', {})) as ws:
55 while True:
56 recv = await ws.recv()
57 if isinstance(recv, str):
58 recv = recv.encode('utf8')
59 sink.write(recv)