4 from .fragment
import FragmentFD
5 from ..compat
import compat_urllib_error
6 from ..extractor
.youtube
import YoutubeBaseInfoExtractor
as YT_BaseIE
7 from ..utils
import RegexNotFoundError
, dict_get
, int_or_none
, try_get
10 class YoutubeLiveChatFD(FragmentFD
):
11 """ Downloads YouTube live chats fragment by fragment """
13 FD_NAME
= 'youtube_live_chat'
15 def real_download(self
, filename
, info_dict
):
16 video_id
= info_dict
['video_id']
17 self
.to_screen('[%s] Downloading live chat' % self
.FD_NAME
)
18 if not self
.params
.get('skip_download') and info_dict
['protocol'] == 'youtube_live_chat':
19 self
.report_warning('Live chat download runs until the livestream ends. '
20 'If you wish to download the video simultaneously, run a separate yt-dlp instance')
22 fragment_retries
= self
.params
.get('fragment_retries', 0)
23 test
= self
.params
.get('test', False)
31 ie
= YT_BaseIE(self
.ydl
)
33 start_time
= int(time
.time() * 1000)
35 def dl_fragment(url
, data
=None, headers
=None):
36 http_headers
= info_dict
.get('http_headers', {})
38 http_headers
= http_headers
.copy()
39 http_headers
.update(headers
)
40 return self
._download
_fragment
(ctx
, url
, info_dict
, http_headers
, data
)
42 def parse_actions_replay(live_chat_continuation
):
43 offset
= continuation_id
= click_tracking_params
= None
44 processed_fragment
= bytearray()
45 for action
in live_chat_continuation
.get('actions', []):
46 if 'replayChatItemAction' in action
:
47 replay_chat_item_action
= action
['replayChatItemAction']
48 offset
= int(replay_chat_item_action
['videoOffsetTimeMsec'])
49 processed_fragment
.extend(
50 json
.dumps(action
, ensure_ascii
=False).encode('utf-8') + b
'\n')
51 if offset
is not None:
52 continuation
= try_get(
53 live_chat_continuation
,
54 lambda x
: x
['continuations'][0]['liveChatReplayContinuationData'], dict)
56 continuation_id
= continuation
.get('continuation')
57 click_tracking_params
= continuation
.get('clickTrackingParams')
58 self
._append
_fragment
(ctx
, processed_fragment
)
59 return continuation_id
, offset
, click_tracking_params
61 def try_refresh_replay_beginning(live_chat_continuation
):
62 # choose the second option that contains the unfiltered live chat replay
63 refresh_continuation
= try_get(
64 live_chat_continuation
,
65 lambda x
: x
['header']['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'][1]['continuation']['reloadContinuationData'], dict)
66 if refresh_continuation
:
67 # no data yet but required to call _append_fragment
68 self
._append
_fragment
(ctx
, b
'')
69 refresh_continuation_id
= refresh_continuation
.get('continuation')
71 click_tracking_params
= refresh_continuation
.get('trackingParams')
72 return refresh_continuation_id
, offset
, click_tracking_params
73 return parse_actions_replay(live_chat_continuation
)
77 def parse_actions_live(live_chat_continuation
):
79 continuation_id
= click_tracking_params
= None
80 processed_fragment
= bytearray()
81 for action
in live_chat_continuation
.get('actions', []):
82 timestamp
= self
.parse_live_timestamp(action
)
83 if timestamp
is not None:
84 live_offset
= timestamp
- start_time
85 # compatibility with replay format
87 'replayChatItemAction': {'actions': [action]}
,
88 'videoOffsetTimeMsec': str(live_offset
),
91 processed_fragment
.extend(
92 json
.dumps(pseudo_action
, ensure_ascii
=False).encode('utf-8') + b
'\n')
93 continuation_data_getters
= [
94 lambda x
: x
['continuations'][0]['invalidationContinuationData'],
95 lambda x
: x
['continuations'][0]['timedContinuationData'],
97 continuation_data
= try_get(live_chat_continuation
, continuation_data_getters
, dict)
99 continuation_id
= continuation_data
.get('continuation')
100 click_tracking_params
= continuation_data
.get('clickTrackingParams')
101 timeout_ms
= int_or_none(continuation_data
.get('timeoutMs'))
102 if timeout_ms
is not None:
103 time
.sleep(timeout_ms
/ 1000)
104 self
._append
_fragment
(ctx
, processed_fragment
)
105 return continuation_id
, live_offset
, click_tracking_params
107 def download_and_parse_fragment(url
, frag_index
, request_data
=None, headers
=None):
109 while count
<= fragment_retries
:
111 success
= dl_fragment(url
, request_data
, headers
)
113 return False, None, None, None
114 raw_fragment
= self
._read
_fragment
(ctx
)
116 data
= ie
.extract_yt_initial_data(video_id
, raw_fragment
.decode('utf-8', 'replace'))
117 except RegexNotFoundError
:
120 data
= json
.loads(raw_fragment
)
121 live_chat_continuation
= try_get(
123 lambda x
: x
['continuationContents']['liveChatContinuation'], dict) or {}
124 if info_dict
['protocol'] == 'youtube_live_chat_replay':
126 continuation_id
, offset
, click_tracking_params
= try_refresh_replay_beginning(live_chat_continuation
)
128 continuation_id
, offset
, click_tracking_params
= parse_actions_replay(live_chat_continuation
)
129 elif info_dict
['protocol'] == 'youtube_live_chat':
130 continuation_id
, offset
, click_tracking_params
= parse_actions_live(live_chat_continuation
)
131 return True, continuation_id
, offset
, click_tracking_params
132 except compat_urllib_error
.HTTPError
as err
:
134 if count
<= fragment_retries
:
135 self
.report_retry_fragment(err
, frag_index
, count
, fragment_retries
)
136 if count
> fragment_retries
:
137 self
.report_error('giving up after %s fragment retries' % fragment_retries
)
138 return False, None, None, None
140 self
._prepare
_and
_start
_frag
_download
(ctx
, info_dict
)
142 success
= dl_fragment(info_dict
['url'])
145 raw_fragment
= self
._read
_fragment
(ctx
)
147 data
= ie
.extract_yt_initial_data(video_id
, raw_fragment
.decode('utf-8', 'replace'))
148 except RegexNotFoundError
:
150 continuation_id
= try_get(
152 lambda x
: x
['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['continuations'][0]['reloadContinuationData']['continuation'])
153 # no data yet but required to call _append_fragment
154 self
._append
_fragment
(ctx
, b
'')
156 ytcfg
= ie
.extract_ytcfg(video_id
, raw_fragment
.decode('utf-8', 'replace'))
160 api_key
= try_get(ytcfg
, lambda x
: x
['INNERTUBE_API_KEY'])
161 innertube_context
= try_get(ytcfg
, lambda x
: x
['INNERTUBE_CONTEXT'])
162 if not api_key
or not innertube_context
:
164 visitor_data
= try_get(innertube_context
, lambda x
: x
['client']['visitorData'], str)
165 if info_dict
['protocol'] == 'youtube_live_chat_replay':
166 url
= 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat_replay?key=' + api_key
167 chat_page_url
= 'https://www.youtube.com/live_chat_replay?continuation=' + continuation_id
168 elif info_dict
['protocol'] == 'youtube_live_chat':
169 url
= 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat?key=' + api_key
170 chat_page_url
= 'https://www.youtube.com/live_chat?continuation=' + continuation_id
172 frag_index
= offset
= 0
173 click_tracking_params
= None
174 while continuation_id
is not None:
177 'context': innertube_context
,
178 'continuation': continuation_id
,
181 request_data
['currentPlayerState'] = {'playerOffsetMs': str(max(offset - 5000, 0))}
182 if click_tracking_params
:
183 request_data
['context']['clickTracking'] = {'clickTrackingParams': click_tracking_params}
184 headers
= ie
.generate_api_headers(ytcfg
=ytcfg
, visitor_data
=visitor_data
)
185 headers
.update({'content-type': 'application/json'}
)
186 fragment_request_data
= json
.dumps(request_data
, ensure_ascii
=False).encode('utf-8') + b
'\n'
187 success
, continuation_id
, offset
, click_tracking_params
= download_and_parse_fragment(
188 url
, frag_index
, fragment_request_data
, headers
)
190 success
, continuation_id
, offset
, click_tracking_params
= download_and_parse_fragment(
191 chat_page_url
, frag_index
)
197 self
._finish
_frag
_download
(ctx
, info_dict
)
201 def parse_live_timestamp(action
):
202 action_content
= dict_get(
204 ['addChatItemAction', 'addLiveChatTickerItemAction', 'addBannerToLiveChatCommand'])
205 if not isinstance(action_content
, dict):
207 item
= dict_get(action_content
, ['item', 'bannerRenderer'])
208 if not isinstance(item
, dict):
210 renderer
= dict_get(item
, [
212 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
213 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
215 'liveChatTickerPaidMessageItemRenderer',
216 'liveChatTickerSponsorItemRenderer',
218 'liveChatBannerRenderer',
220 if not isinstance(renderer
, dict):
222 parent_item_getters
= [
223 lambda x
: x
['showItemEndpoint']['showLiveChatItemEndpoint']['renderer'],
224 lambda x
: x
['contents'],
226 parent_item
= try_get(renderer
, parent_item_getters
, dict)
228 renderer
= dict_get(parent_item
, [
229 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
230 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
232 if not isinstance(renderer
, dict):
234 return int_or_none(renderer
.get('timestampUsec'), 1000)