4 from .fragment
import FragmentFD
5 from ..compat
import compat_urllib_error
6 from ..extractor
.youtube
import YoutubeBaseInfoExtractor
as YT_BaseIE
7 from ..utils
import RegexNotFoundError
, dict_get
, int_or_none
, try_get
10 class YoutubeLiveChatFD(FragmentFD
):
11 """ Downloads YouTube live chats fragment by fragment """
13 def real_download(self
, filename
, info_dict
):
14 video_id
= info_dict
['video_id']
15 self
.to_screen('[%s] Downloading live chat' % self
.FD_NAME
)
16 if not self
.params
.get('skip_download') and info_dict
['protocol'] == 'youtube_live_chat':
17 self
.report_warning('Live chat download runs until the livestream ends. '
18 'If you wish to download the video simultaneously, run a separate yt-dlp instance')
20 fragment_retries
= self
.params
.get('fragment_retries', 0)
21 test
= self
.params
.get('test', False)
29 ie
= YT_BaseIE(self
.ydl
)
31 start_time
= int(time
.time() * 1000)
33 def dl_fragment(url
, data
=None, headers
=None):
34 http_headers
= info_dict
.get('http_headers', {})
36 http_headers
= http_headers
.copy()
37 http_headers
.update(headers
)
38 return self
._download
_fragment
(ctx
, url
, info_dict
, http_headers
, data
)
40 def parse_actions_replay(live_chat_continuation
):
41 offset
= continuation_id
= click_tracking_params
= None
42 processed_fragment
= bytearray()
43 for action
in live_chat_continuation
.get('actions', []):
44 if 'replayChatItemAction' in action
:
45 replay_chat_item_action
= action
['replayChatItemAction']
46 offset
= int(replay_chat_item_action
['videoOffsetTimeMsec'])
47 processed_fragment
.extend(
48 json
.dumps(action
, ensure_ascii
=False).encode() + b
'\n')
49 if offset
is not None:
50 continuation
= try_get(
51 live_chat_continuation
,
52 lambda x
: x
['continuations'][0]['liveChatReplayContinuationData'], dict)
54 continuation_id
= continuation
.get('continuation')
55 click_tracking_params
= continuation
.get('clickTrackingParams')
56 self
._append
_fragment
(ctx
, processed_fragment
)
57 return continuation_id
, offset
, click_tracking_params
59 def try_refresh_replay_beginning(live_chat_continuation
):
60 # choose the second option that contains the unfiltered live chat replay
61 refresh_continuation
= try_get(
62 live_chat_continuation
,
63 lambda x
: x
['header']['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'][1]['continuation']['reloadContinuationData'], dict)
64 if refresh_continuation
:
65 # no data yet but required to call _append_fragment
66 self
._append
_fragment
(ctx
, b
'')
67 refresh_continuation_id
= refresh_continuation
.get('continuation')
69 click_tracking_params
= refresh_continuation
.get('trackingParams')
70 return refresh_continuation_id
, offset
, click_tracking_params
71 return parse_actions_replay(live_chat_continuation
)
75 def parse_actions_live(live_chat_continuation
):
77 continuation_id
= click_tracking_params
= None
78 processed_fragment
= bytearray()
79 for action
in live_chat_continuation
.get('actions', []):
80 timestamp
= self
.parse_live_timestamp(action
)
81 if timestamp
is not None:
82 live_offset
= timestamp
- start_time
83 # compatibility with replay format
85 'replayChatItemAction': {'actions': [action]}
,
86 'videoOffsetTimeMsec': str(live_offset
),
89 processed_fragment
.extend(
90 json
.dumps(pseudo_action
, ensure_ascii
=False).encode() + b
'\n')
91 continuation_data_getters
= [
92 lambda x
: x
['continuations'][0]['invalidationContinuationData'],
93 lambda x
: x
['continuations'][0]['timedContinuationData'],
95 continuation_data
= try_get(live_chat_continuation
, continuation_data_getters
, dict)
97 continuation_id
= continuation_data
.get('continuation')
98 click_tracking_params
= continuation_data
.get('clickTrackingParams')
99 timeout_ms
= int_or_none(continuation_data
.get('timeoutMs'))
100 if timeout_ms
is not None:
101 time
.sleep(timeout_ms
/ 1000)
102 self
._append
_fragment
(ctx
, processed_fragment
)
103 return continuation_id
, live_offset
, click_tracking_params
105 def download_and_parse_fragment(url
, frag_index
, request_data
=None, headers
=None):
107 while count
<= fragment_retries
:
109 success
= dl_fragment(url
, request_data
, headers
)
111 return False, None, None, None
112 raw_fragment
= self
._read
_fragment
(ctx
)
114 data
= ie
.extract_yt_initial_data(video_id
, raw_fragment
.decode('utf-8', 'replace'))
115 except RegexNotFoundError
:
118 data
= json
.loads(raw_fragment
)
119 live_chat_continuation
= try_get(
121 lambda x
: x
['continuationContents']['liveChatContinuation'], dict) or {}
122 if info_dict
['protocol'] == 'youtube_live_chat_replay':
124 continuation_id
, offset
, click_tracking_params
= try_refresh_replay_beginning(live_chat_continuation
)
126 continuation_id
, offset
, click_tracking_params
= parse_actions_replay(live_chat_continuation
)
127 elif info_dict
['protocol'] == 'youtube_live_chat':
128 continuation_id
, offset
, click_tracking_params
= parse_actions_live(live_chat_continuation
)
129 return True, continuation_id
, offset
, click_tracking_params
130 except compat_urllib_error
.HTTPError
as err
:
132 if count
<= fragment_retries
:
133 self
.report_retry_fragment(err
, frag_index
, count
, fragment_retries
)
134 if count
> fragment_retries
:
135 self
.report_error('giving up after %s fragment retries' % fragment_retries
)
136 return False, None, None, None
138 self
._prepare
_and
_start
_frag
_download
(ctx
, info_dict
)
140 success
= dl_fragment(info_dict
['url'])
143 raw_fragment
= self
._read
_fragment
(ctx
)
145 data
= ie
.extract_yt_initial_data(video_id
, raw_fragment
.decode('utf-8', 'replace'))
146 except RegexNotFoundError
:
148 continuation_id
= try_get(
150 lambda x
: x
['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['continuations'][0]['reloadContinuationData']['continuation'])
151 # no data yet but required to call _append_fragment
152 self
._append
_fragment
(ctx
, b
'')
154 ytcfg
= ie
.extract_ytcfg(video_id
, raw_fragment
.decode('utf-8', 'replace'))
158 api_key
= try_get(ytcfg
, lambda x
: x
['INNERTUBE_API_KEY'])
159 innertube_context
= try_get(ytcfg
, lambda x
: x
['INNERTUBE_CONTEXT'])
160 if not api_key
or not innertube_context
:
162 visitor_data
= try_get(innertube_context
, lambda x
: x
['client']['visitorData'], str)
163 if info_dict
['protocol'] == 'youtube_live_chat_replay':
164 url
= 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat_replay?key=' + api_key
165 chat_page_url
= 'https://www.youtube.com/live_chat_replay?continuation=' + continuation_id
166 elif info_dict
['protocol'] == 'youtube_live_chat':
167 url
= 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat?key=' + api_key
168 chat_page_url
= 'https://www.youtube.com/live_chat?continuation=' + continuation_id
170 frag_index
= offset
= 0
171 click_tracking_params
= None
172 while continuation_id
is not None:
175 'context': innertube_context
,
176 'continuation': continuation_id
,
179 request_data
['currentPlayerState'] = {'playerOffsetMs': str(max(offset - 5000, 0))}
180 if click_tracking_params
:
181 request_data
['context']['clickTracking'] = {'clickTrackingParams': click_tracking_params}
182 headers
= ie
.generate_api_headers(ytcfg
=ytcfg
, visitor_data
=visitor_data
)
183 headers
.update({'content-type': 'application/json'}
)
184 fragment_request_data
= json
.dumps(request_data
, ensure_ascii
=False).encode() + b
'\n'
185 success
, continuation_id
, offset
, click_tracking_params
= download_and_parse_fragment(
186 url
, frag_index
, fragment_request_data
, headers
)
188 success
, continuation_id
, offset
, click_tracking_params
= download_and_parse_fragment(
189 chat_page_url
, frag_index
)
195 self
._finish
_frag
_download
(ctx
, info_dict
)
199 def parse_live_timestamp(action
):
200 action_content
= dict_get(
202 ['addChatItemAction', 'addLiveChatTickerItemAction', 'addBannerToLiveChatCommand'])
203 if not isinstance(action_content
, dict):
205 item
= dict_get(action_content
, ['item', 'bannerRenderer'])
206 if not isinstance(item
, dict):
208 renderer
= dict_get(item
, [
210 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
211 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
213 'liveChatTickerPaidMessageItemRenderer',
214 'liveChatTickerSponsorItemRenderer',
216 'liveChatBannerRenderer',
218 if not isinstance(renderer
, dict):
220 parent_item_getters
= [
221 lambda x
: x
['showItemEndpoint']['showLiveChatItemEndpoint']['renderer'],
222 lambda x
: x
['contents'],
224 parent_item
= try_get(renderer
, parent_item_getters
, dict)
226 renderer
= dict_get(parent_item
, [
227 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
228 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
230 if not isinstance(renderer
, dict):
232 return int_or_none(renderer
.get('timestampUsec'), 1000)