4 from .fragment
import FragmentFD
5 from ..compat
import compat_urllib_error
6 from ..utils
import RegexNotFoundError
, dict_get
, int_or_none
, try_get
9 class YoutubeLiveChatFD(FragmentFD
):
10 """ Downloads YouTube live chats fragment by fragment """
12 def real_download(self
, filename
, info_dict
):
13 video_id
= info_dict
['video_id']
14 self
.to_screen('[%s] Downloading live chat' % self
.FD_NAME
)
15 if not self
.params
.get('skip_download') and info_dict
['protocol'] == 'youtube_live_chat':
16 self
.report_warning('Live chat download runs until the livestream ends. '
17 'If you wish to download the video simultaneously, run a separate yt-dlp instance')
19 fragment_retries
= self
.params
.get('fragment_retries', 0)
20 test
= self
.params
.get('test', False)
28 from ..extractor
.youtube
import YoutubeBaseInfoExtractor
30 ie
= YoutubeBaseInfoExtractor(self
.ydl
)
32 start_time
= int(time
.time() * 1000)
34 def dl_fragment(url
, data
=None, headers
=None):
35 http_headers
= info_dict
.get('http_headers', {})
37 http_headers
= http_headers
.copy()
38 http_headers
.update(headers
)
39 return self
._download
_fragment
(ctx
, url
, info_dict
, http_headers
, data
)
41 def parse_actions_replay(live_chat_continuation
):
42 offset
= continuation_id
= click_tracking_params
= None
43 processed_fragment
= bytearray()
44 for action
in live_chat_continuation
.get('actions', []):
45 if 'replayChatItemAction' in action
:
46 replay_chat_item_action
= action
['replayChatItemAction']
47 offset
= int(replay_chat_item_action
['videoOffsetTimeMsec'])
48 processed_fragment
.extend(
49 json
.dumps(action
, ensure_ascii
=False).encode() + b
'\n')
50 if offset
is not None:
51 continuation
= try_get(
52 live_chat_continuation
,
53 lambda x
: x
['continuations'][0]['liveChatReplayContinuationData'], dict)
55 continuation_id
= continuation
.get('continuation')
56 click_tracking_params
= continuation
.get('clickTrackingParams')
57 self
._append
_fragment
(ctx
, processed_fragment
)
58 return continuation_id
, offset
, click_tracking_params
60 def try_refresh_replay_beginning(live_chat_continuation
):
61 # choose the second option that contains the unfiltered live chat replay
62 refresh_continuation
= try_get(
63 live_chat_continuation
,
64 lambda x
: x
['header']['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'][1]['continuation']['reloadContinuationData'], dict)
65 if refresh_continuation
:
66 # no data yet but required to call _append_fragment
67 self
._append
_fragment
(ctx
, b
'')
68 refresh_continuation_id
= refresh_continuation
.get('continuation')
70 click_tracking_params
= refresh_continuation
.get('trackingParams')
71 return refresh_continuation_id
, offset
, click_tracking_params
72 return parse_actions_replay(live_chat_continuation
)
76 def parse_actions_live(live_chat_continuation
):
78 continuation_id
= click_tracking_params
= None
79 processed_fragment
= bytearray()
80 for action
in live_chat_continuation
.get('actions', []):
81 timestamp
= self
.parse_live_timestamp(action
)
82 if timestamp
is not None:
83 live_offset
= timestamp
- start_time
84 # compatibility with replay format
86 'replayChatItemAction': {'actions': [action]}
,
87 'videoOffsetTimeMsec': str(live_offset
),
90 processed_fragment
.extend(
91 json
.dumps(pseudo_action
, ensure_ascii
=False).encode() + b
'\n')
92 continuation_data_getters
= [
93 lambda x
: x
['continuations'][0]['invalidationContinuationData'],
94 lambda x
: x
['continuations'][0]['timedContinuationData'],
96 continuation_data
= try_get(live_chat_continuation
, continuation_data_getters
, dict)
98 continuation_id
= continuation_data
.get('continuation')
99 click_tracking_params
= continuation_data
.get('clickTrackingParams')
100 timeout_ms
= int_or_none(continuation_data
.get('timeoutMs'))
101 if timeout_ms
is not None:
102 time
.sleep(timeout_ms
/ 1000)
103 self
._append
_fragment
(ctx
, processed_fragment
)
104 return continuation_id
, live_offset
, click_tracking_params
106 def download_and_parse_fragment(url
, frag_index
, request_data
=None, headers
=None):
108 while count
<= fragment_retries
:
110 success
= dl_fragment(url
, request_data
, headers
)
112 return False, None, None, None
113 raw_fragment
= self
._read
_fragment
(ctx
)
115 data
= ie
.extract_yt_initial_data(video_id
, raw_fragment
.decode('utf-8', 'replace'))
116 except RegexNotFoundError
:
119 data
= json
.loads(raw_fragment
)
120 live_chat_continuation
= try_get(
122 lambda x
: x
['continuationContents']['liveChatContinuation'], dict) or {}
123 if info_dict
['protocol'] == 'youtube_live_chat_replay':
125 continuation_id
, offset
, click_tracking_params
= try_refresh_replay_beginning(live_chat_continuation
)
127 continuation_id
, offset
, click_tracking_params
= parse_actions_replay(live_chat_continuation
)
128 elif info_dict
['protocol'] == 'youtube_live_chat':
129 continuation_id
, offset
, click_tracking_params
= parse_actions_live(live_chat_continuation
)
130 return True, continuation_id
, offset
, click_tracking_params
131 except compat_urllib_error
.HTTPError
as err
:
133 if count
<= fragment_retries
:
134 self
.report_retry_fragment(err
, frag_index
, count
, fragment_retries
)
135 if count
> fragment_retries
:
136 self
.report_error('giving up after %s fragment retries' % fragment_retries
)
137 return False, None, None, None
139 self
._prepare
_and
_start
_frag
_download
(ctx
, info_dict
)
141 success
= dl_fragment(info_dict
['url'])
144 raw_fragment
= self
._read
_fragment
(ctx
)
146 data
= ie
.extract_yt_initial_data(video_id
, raw_fragment
.decode('utf-8', 'replace'))
147 except RegexNotFoundError
:
149 continuation_id
= try_get(
151 lambda x
: x
['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['continuations'][0]['reloadContinuationData']['continuation'])
152 # no data yet but required to call _append_fragment
153 self
._append
_fragment
(ctx
, b
'')
155 ytcfg
= ie
.extract_ytcfg(video_id
, raw_fragment
.decode('utf-8', 'replace'))
159 api_key
= try_get(ytcfg
, lambda x
: x
['INNERTUBE_API_KEY'])
160 innertube_context
= try_get(ytcfg
, lambda x
: x
['INNERTUBE_CONTEXT'])
161 if not api_key
or not innertube_context
:
163 visitor_data
= try_get(innertube_context
, lambda x
: x
['client']['visitorData'], str)
164 if info_dict
['protocol'] == 'youtube_live_chat_replay':
165 url
= 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat_replay?key=' + api_key
166 chat_page_url
= 'https://www.youtube.com/live_chat_replay?continuation=' + continuation_id
167 elif info_dict
['protocol'] == 'youtube_live_chat':
168 url
= 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat?key=' + api_key
169 chat_page_url
= 'https://www.youtube.com/live_chat?continuation=' + continuation_id
171 frag_index
= offset
= 0
172 click_tracking_params
= None
173 while continuation_id
is not None:
176 'context': innertube_context
,
177 'continuation': continuation_id
,
180 request_data
['currentPlayerState'] = {'playerOffsetMs': str(max(offset - 5000, 0))}
181 if click_tracking_params
:
182 request_data
['context']['clickTracking'] = {'clickTrackingParams': click_tracking_params}
183 headers
= ie
.generate_api_headers(ytcfg
=ytcfg
, visitor_data
=visitor_data
)
184 headers
.update({'content-type': 'application/json'}
)
185 fragment_request_data
= json
.dumps(request_data
, ensure_ascii
=False).encode() + b
'\n'
186 success
, continuation_id
, offset
, click_tracking_params
= download_and_parse_fragment(
187 url
, frag_index
, fragment_request_data
, headers
)
189 success
, continuation_id
, offset
, click_tracking_params
= download_and_parse_fragment(
190 chat_page_url
, frag_index
)
196 self
._finish
_frag
_download
(ctx
, info_dict
)
200 def parse_live_timestamp(action
):
201 action_content
= dict_get(
203 ['addChatItemAction', 'addLiveChatTickerItemAction', 'addBannerToLiveChatCommand'])
204 if not isinstance(action_content
, dict):
206 item
= dict_get(action_content
, ['item', 'bannerRenderer'])
207 if not isinstance(item
, dict):
209 renderer
= dict_get(item
, [
211 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
212 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
214 'liveChatTickerPaidMessageItemRenderer',
215 'liveChatTickerSponsorItemRenderer',
217 'liveChatBannerRenderer',
219 if not isinstance(renderer
, dict):
221 parent_item_getters
= [
222 lambda x
: x
['showItemEndpoint']['showLiveChatItemEndpoint']['renderer'],
223 lambda x
: x
['contents'],
225 parent_item
= try_get(renderer
, parent_item_getters
, dict)
227 renderer
= dict_get(parent_item
, [
228 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
229 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
231 if not isinstance(renderer
, dict):
233 return int_or_none(renderer
.get('timestampUsec'), 1000)