]> jfr.im git - yt-dlp.git/blob - yt_dlp/postprocessor/sponsorblock.py
[patreon] Fix vimeo player regex (#1332)
[yt-dlp.git] / yt_dlp / postprocessor / sponsorblock.py
1 from hashlib import sha256
2 import itertools
3 import json
4 import re
5 import time
6
7 from .ffmpeg import FFmpegPostProcessor
8 from ..compat import compat_urllib_parse_urlencode, compat_HTTPError
9 from ..utils import PostProcessingError, network_exceptions, sanitized_Request
10
11
12 class SponsorBlockPP(FFmpegPostProcessor):
13
14 EXTRACTORS = {
15 'Youtube': 'YouTube',
16 }
17 CATEGORIES = {
18 'sponsor': 'Sponsor',
19 'intro': 'Intermission/Intro Animation',
20 'outro': 'Endcards/Credits',
21 'selfpromo': 'Unpaid/Self Promotion',
22 'interaction': 'Interaction Reminder',
23 'preview': 'Preview/Recap',
24 'music_offtopic': 'Non-Music Section'
25 }
26
27 def __init__(self, downloader, categories=None, api='https://sponsor.ajay.app'):
28 FFmpegPostProcessor.__init__(self, downloader)
29 self._categories = tuple(categories or self.CATEGORIES.keys())
30 self._API_URL = api if re.match('^https?://', api) else 'https://' + api
31
32 def run(self, info):
33 extractor = info['extractor_key']
34 if extractor not in self.EXTRACTORS:
35 self.to_screen(f'SponsorBlock is not supported for {extractor}')
36 return [], info
37
38 self.to_screen('Fetching SponsorBlock segments')
39 info['sponsorblock_chapters'] = self._get_sponsor_chapters(info, info['duration'])
40 return [], info
41
42 def _get_sponsor_chapters(self, info, duration):
43 segments = self._get_sponsor_segments(info['id'], self.EXTRACTORS[info['extractor_key']])
44
45 def duration_filter(s):
46 start_end = s['segment']
47 # Ignore milliseconds difference at the start.
48 if start_end[0] <= 1:
49 start_end[0] = 0
50 # Ignore milliseconds difference at the end.
51 # Never allow the segment to exceed the video.
52 if duration and duration - start_end[1] <= 1:
53 start_end[1] = duration
54 # SponsorBlock duration may be absent or it may deviate from the real one.
55 return s['videoDuration'] == 0 or not duration or abs(duration - s['videoDuration']) <= 1
56
57 duration_match = [s for s in segments if duration_filter(s)]
58 if len(duration_match) != len(segments):
59 self.report_warning('Some SponsorBlock segments are from a video of different duration, maybe from an old version of this video')
60
61 def to_chapter(s):
62 (start, end), cat = s['segment'], s['category']
63 return {
64 'start_time': start,
65 'end_time': end,
66 'category': cat,
67 'title': self.CATEGORIES[cat],
68 '_categories': [(cat, start, end)]
69 }
70
71 sponsor_chapters = [to_chapter(s) for s in duration_match]
72 if not sponsor_chapters:
73 self.to_screen('No segments were found in the SponsorBlock database')
74 else:
75 self.to_screen(f'Found {len(sponsor_chapters)} segments in the SponsorBlock database')
76 return sponsor_chapters
77
78 def _get_sponsor_segments(self, video_id, service):
79 hash = sha256(video_id.encode('ascii')).hexdigest()
80 # SponsorBlock API recommends using first 4 hash characters.
81 url = f'{self._API_URL}/api/skipSegments/{hash[:4]}?' + compat_urllib_parse_urlencode({
82 'service': service,
83 'categories': json.dumps(self._categories),
84 })
85 self.write_debug(f'SponsorBlock query: {url}')
86 for d in self._get_json(url):
87 if d['videoID'] == video_id:
88 return d['segments']
89 return []
90
91 def _get_json(self, url):
92 # While this is not an extractor, it behaves similar to one and
93 # so obey extractor_retries and sleep_interval_requests
94 max_retries = self.get_param('extractor_retries', 3)
95 sleep_interval = self.get_param('sleep_interval_requests') or 0
96 for retries in itertools.count():
97 try:
98 rsp = self._downloader.urlopen(sanitized_Request(url))
99 return json.loads(rsp.read().decode(rsp.info().get_param('charset') or 'utf-8'))
100 except network_exceptions as e:
101 if isinstance(e, compat_HTTPError) and e.code == 404:
102 return []
103 if retries < max_retries:
104 self.report_warning(f'{e}. Retrying...')
105 if sleep_interval > 0:
106 self.to_screen(f'Sleeping {sleep_interval} seconds ...')
107 time.sleep(sleep_interval)
108 continue
109 raise PostProcessingError(f'Unable to communicate with SponsorBlock API: {e}')