]>
Commit | Line | Data |
---|---|---|
7a340e0d NA |
1 | import json |
2 | import re | |
3 | from hashlib import sha256 | |
4 | ||
5 | from .ffmpeg import FFmpegPostProcessor | |
6 | from ..compat import compat_urllib_parse_urlencode, compat_HTTPError | |
e6f21b3d | 7 | from ..utils import PostProcessingError, network_exceptions, sanitized_Request |
7a340e0d NA |
8 | |
9 | ||
10 | class SponsorBlockPP(FFmpegPostProcessor): | |
11 | ||
12 | EXTRACTORS = { | |
13 | 'Youtube': 'YouTube', | |
14 | } | |
15 | CATEGORIES = { | |
16 | 'sponsor': 'Sponsor', | |
17 | 'intro': 'Intermission/Intro Animation', | |
18 | 'outro': 'Endcards/Credits', | |
19 | 'selfpromo': 'Unpaid/Self Promotion', | |
20 | 'interaction': 'Interaction Reminder', | |
21 | 'preview': 'Preview/Recap', | |
22 | 'music_offtopic': 'Non-Music Section' | |
23 | } | |
24 | ||
25 | def __init__(self, downloader, categories=None, api='https://sponsor.ajay.app'): | |
26 | FFmpegPostProcessor.__init__(self, downloader) | |
27 | self._categories = tuple(categories or self.CATEGORIES.keys()) | |
28 | self._API_URL = api if re.match('^https?://', api) else 'https://' + api | |
29 | ||
30 | def run(self, info): | |
31 | extractor = info['extractor_key'] | |
32 | if extractor not in self.EXTRACTORS: | |
33 | self.to_screen(f'SponsorBlock is not supported for {extractor}') | |
34 | return [], info | |
35 | ||
36 | info['sponsorblock_chapters'] = self._get_sponsor_chapters(info, info['duration']) | |
37 | return [], info | |
38 | ||
39 | def _get_sponsor_chapters(self, info, duration): | |
40 | segments = self._get_sponsor_segments(info['id'], self.EXTRACTORS[info['extractor_key']]) | |
41 | ||
42 | def duration_filter(s): | |
43 | start_end = s['segment'] | |
44 | # Ignore milliseconds difference at the start. | |
45 | if start_end[0] <= 1: | |
46 | start_end[0] = 0 | |
47 | # Ignore milliseconds difference at the end. | |
48 | # Never allow the segment to exceed the video. | |
49 | if duration and duration - start_end[1] <= 1: | |
50 | start_end[1] = duration | |
51 | # SponsorBlock duration may be absent or it may deviate from the real one. | |
52 | return s['videoDuration'] == 0 or not duration or abs(duration - s['videoDuration']) <= 1 | |
53 | ||
54 | duration_match = [s for s in segments if duration_filter(s)] | |
55 | if len(duration_match) != len(segments): | |
56 | self.report_warning('Some SponsorBlock segments are from a video of different duration, maybe from an old version of this video') | |
57 | ||
58 | def to_chapter(s): | |
59 | (start, end), cat = s['segment'], s['category'] | |
60 | return { | |
61 | 'start_time': start, | |
62 | 'end_time': end, | |
63 | 'category': cat, | |
64 | 'title': self.CATEGORIES[cat], | |
65 | '_categories': [(cat, start, end)] | |
66 | } | |
67 | ||
68 | sponsor_chapters = [to_chapter(s) for s in duration_match] | |
69 | if not sponsor_chapters: | |
70 | self.to_screen('No segments were found in the SponsorBlock database') | |
71 | else: | |
72 | self.to_screen(f'Found {len(sponsor_chapters)} segments in the SponsorBlock database') | |
73 | return sponsor_chapters | |
74 | ||
75 | def _get_sponsor_segments(self, video_id, service): | |
76 | hash = sha256(video_id.encode('ascii')).hexdigest() | |
77 | # SponsorBlock API recommends using first 4 hash characters. | |
78 | url = f'{self._API_URL}/api/skipSegments/{hash[:4]}?' + compat_urllib_parse_urlencode({ | |
79 | 'service': service, | |
80 | 'categories': json.dumps(self._categories), | |
81 | }) | |
82 | for d in self._get_json(url): | |
83 | if d['videoID'] == video_id: | |
84 | return d['segments'] | |
85 | return [] | |
86 | ||
87 | def _get_json(self, url): | |
88 | self.write_debug(f'SponsorBlock query: {url}') | |
89 | try: | |
90 | rsp = self._downloader.urlopen(sanitized_Request(url)) | |
e6f21b3d | 91 | except network_exceptions as e: |
92 | if isinstance(e, compat_HTTPError) and e.code == 404: | |
7a340e0d | 93 | return [] |
e6f21b3d | 94 | raise PostProcessingError(f'Unable to communicate with SponsorBlock API - {e}') |
7a340e0d NA |
95 | |
96 | return json.loads(rsp.read().decode(rsp.info().get_param('charset') or 'utf-8')) |