+import functools
import json
import re
from .common import InfoExtractor
from ..utils import (
+ OnDemandPagedList,
clean_podcast_url,
float_or_none,
int_or_none,
strip_or_none,
+ traverse_obj,
try_get,
unified_strdate,
)
class SpotifyBaseIE(InfoExtractor):
+ _WORKING = False
_ACCESS_TOKEN = None
_OPERATION_HASHES = {
'Episode': '8276d4423d709ae9b68ec1b74cc047ba0f7479059a37820be730f125189ac2bf',
'ShowEpisodes': 'e0e5ce27bd7748d2c59b4d44ba245a8992a05be75d6fabc3b20753fc8857444d',
}
_VALID_URL_TEMPL = r'https?://open\.spotify\.com/(?:embed-podcast/|embed/|)%s/(?P<id>[^/?&#]+)'
+ _EMBED_REGEX = [r'<iframe[^>]+src="(?P<url>https?://open\.spotify.com/embed/[^"]+)"']
def _real_initialize(self):
self._ACCESS_TOKEN = self._download_json(
'https://open.spotify.com/get_access_token', None)['accessToken']
- def _call_api(self, operation, video_id, variables):
+ def _call_api(self, operation, video_id, variables, **kwargs):
return self._download_json(
'https://api-partner.spotify.com/pathfinder/v1/query', video_id, query={
'operationName': 'query' + operation,
'sha256Hash': self._OPERATION_HASHES[operation],
},
})
- }, headers={'authorization': 'Bearer ' + self._ACCESS_TOKEN})['data']
+ }, headers={'authorization': 'Bearer ' + self._ACCESS_TOKEN},
+ **kwargs)['data']
def _extract_episode(self, episode, series):
episode_id = episode['id']
'series': series,
}
- @classmethod
- def _extract_embed_urls(cls, webpage):
- return re.findall(
- r'<iframe[^>]+src="(https?://open\.spotify.com/embed/[^"]+)"',
- webpage)
-
class SpotifyIE(SpotifyBaseIE):
IE_NAME = 'spotify'
+ IE_DESC = 'Spotify episodes'
_VALID_URL = SpotifyBaseIE._VALID_URL_TEMPL % 'episode'
_TESTS = [{
'url': 'https://open.spotify.com/episode/4Z7GAJ50bgctf6uclHlWKo',
class SpotifyShowIE(SpotifyBaseIE):
IE_NAME = 'spotify:show'
+ IE_DESC = 'Spotify shows'
_VALID_URL = SpotifyBaseIE._VALID_URL_TEMPL % 'show'
_TEST = {
'url': 'https://open.spotify.com/show/4PM9Ke6l66IRNpottHKV9M',
},
'playlist_mincount': 36,
}
+ _PER_PAGE = 100
+
+ def _fetch_page(self, show_id, page=0):
+ return self._call_api('ShowEpisodes', show_id, {
+ 'limit': 100,
+ 'offset': page * self._PER_PAGE,
+ 'uri': f'spotify:show:{show_id}',
+ }, note=f'Downloading page {page + 1} JSON metadata')['podcast']
def _real_extract(self, url):
show_id = self._match_id(url)
- podcast = self._call_api('ShowEpisodes', show_id, {
- 'limit': 1000000000,
- 'offset': 0,
- 'uri': 'spotify:show:' + show_id,
- })['podcast']
- podcast_name = podcast.get('name')
-
- entries = []
- for item in (try_get(podcast, lambda x: x['episodes']['items']) or []):
- episode = item.get('episode')
- if not episode:
- continue
- entries.append(self._extract_episode(episode, podcast_name))
+ first_page = self._fetch_page(show_id)
+
+ def _entries(page):
+ podcast = self._fetch_page(show_id, page) if page else first_page
+ yield from map(
+ functools.partial(self._extract_episode, series=podcast.get('name')),
+ traverse_obj(podcast, ('episodes', 'items', ..., 'episode')))
return self.playlist_result(
- entries, show_id, podcast_name, podcast.get('description'))
+ OnDemandPagedList(_entries, self._PER_PAGE),
+ show_id, first_page.get('name'), first_page.get('description'))