]>
Commit | Line | Data |
---|---|---|
d2646ec9 JR |
1 | #!/usr/bin/env python |
2 | """client library for iterating over http Server Sent Event (SSE) streams""" | |
3 | # | |
4 | # Distributed under the terms of the MIT license. | |
5 | # | |
6 | from __future__ import unicode_literals | |
7 | ||
8 | import codecs | |
9 | import re | |
10 | import time | |
11 | import warnings | |
12 | ||
13 | import six | |
14 | ||
15 | import requests | |
16 | ||
17 | __version__ = '0.0.27' | |
18 | ||
19 | # Technically, we should support streams that mix line endings. This regex, | |
20 | # however, assumes that a system will provide consistent line endings. | |
21 | end_of_field = re.compile(r'\r\n\r\n|\r\r|\n\n') | |
22 | ||
23 | ||
24 | class SSEClient(object): | |
25 | def __init__(self, url, last_id=None, retry=3000, session=None, chunk_size=1024, **kwargs): | |
26 | self.url = url | |
27 | self.last_id = last_id | |
28 | self.retry = retry | |
29 | self.chunk_size = chunk_size | |
30 | ||
31 | # Optional support for passing in a requests.Session() | |
32 | self.session = session | |
33 | ||
34 | # Any extra kwargs will be fed into the requests.get call later. | |
35 | self.requests_kwargs = kwargs | |
36 | ||
37 | # The SSE spec requires making requests with Cache-Control: nocache | |
38 | if 'headers' not in self.requests_kwargs: | |
39 | self.requests_kwargs['headers'] = {} | |
40 | self.requests_kwargs['headers']['Cache-Control'] = 'no-cache' | |
41 | ||
42 | # The 'Accept' header is not required, but explicit > implicit | |
43 | self.requests_kwargs['headers']['Accept'] = 'text/event-stream' | |
44 | ||
45 | # Keep data here as it streams in | |
46 | self.buf = '' | |
47 | ||
48 | self._connect() | |
49 | ||
50 | def _connect(self): | |
51 | if self.last_id: | |
52 | self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id | |
53 | ||
54 | # Use session if set. Otherwise fall back to requests module. | |
55 | requester = self.session or requests | |
56 | self.resp = requester.get(self.url, stream=True, **self.requests_kwargs) | |
dd2ed0b9 | 57 | self.resp_iterator = self.resp.iter_content(chunk_size=None) |
d2646ec9 JR |
58 | encoding = self.resp.encoding or self.resp.apparent_encoding |
59 | self.decoder = codecs.getincrementaldecoder(encoding)(errors='replace') | |
60 | ||
61 | # TODO: Ensure we're handling redirects. Might also stick the 'origin' | |
62 | # attribute on Events like the Javascript spec requires. | |
63 | self.resp.raise_for_status() | |
64 | ||
65 | def iter_content(self): | |
66 | def generate(): | |
67 | while True: | |
68 | if hasattr(self.resp.raw, '_fp') and \ | |
69 | hasattr(self.resp.raw._fp, 'fp') and \ | |
70 | hasattr(self.resp.raw._fp.fp, 'read1'): | |
71 | chunk = self.resp.raw._fp.fp.read1(self.chunk_size) | |
72 | else: | |
73 | # _fp is not available, this means that we cannot use short | |
74 | # reads and this will block until the full chunk size is | |
75 | # actually read | |
76 | chunk = self.resp.raw.read(self.chunk_size) | |
77 | if not chunk: | |
78 | break | |
79 | yield chunk | |
80 | ||
81 | return generate() | |
82 | ||
83 | def _event_complete(self): | |
84 | return re.search(end_of_field, self.buf) is not None | |
85 | ||
86 | def __iter__(self): | |
87 | return self | |
88 | ||
89 | def __next__(self): | |
90 | while not self._event_complete(): | |
91 | try: | |
92 | next_chunk = next(self.resp_iterator) | |
93 | if not next_chunk: | |
94 | raise EOFError() | |
95 | self.buf += self.decoder.decode(next_chunk) | |
96 | ||
97 | except (StopIteration, requests.RequestException, EOFError, six.moves.http_client.IncompleteRead) as e: | |
98 | print(e) | |
99 | time.sleep(self.retry / 1000.0) | |
100 | self._connect() | |
101 | ||
102 | # The SSE spec only supports resuming from a whole message, so | |
103 | # if we have half a message we should throw it out. | |
104 | head, sep, tail = self.buf.rpartition('\n') | |
105 | self.buf = head + sep | |
106 | continue | |
107 | ||
108 | # Split the complete event (up to the end_of_field) into event_string, | |
109 | # and retain anything after the current complete event in self.buf | |
110 | # for next time. | |
111 | (event_string, self.buf) = re.split(end_of_field, self.buf, maxsplit=1) | |
112 | msg = Event.parse(event_string) | |
113 | ||
114 | # If the server requests a specific retry delay, we need to honor it. | |
115 | if msg.retry: | |
116 | self.retry = msg.retry | |
117 | ||
118 | # last_id should only be set if included in the message. It's not | |
119 | # forgotten if a message omits it. | |
120 | if msg.id: | |
121 | self.last_id = msg.id | |
122 | ||
123 | return msg | |
124 | ||
125 | if six.PY2: | |
126 | next = __next__ | |
127 | ||
128 | ||
129 | class Event(object): | |
130 | ||
131 | sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?') | |
132 | ||
133 | def __init__(self, data='', event='message', id=None, retry=None): | |
134 | assert isinstance(data, six.string_types), "Data must be text" | |
135 | self.data = data | |
136 | self.event = event | |
137 | self.id = id | |
138 | self.retry = retry | |
139 | ||
140 | def dump(self): | |
141 | lines = [] | |
142 | if self.id: | |
143 | lines.append('id: %s' % self.id) | |
144 | ||
145 | # Only include an event line if it's not the default already. | |
146 | if self.event != 'message': | |
147 | lines.append('event: %s' % self.event) | |
148 | ||
149 | if self.retry: | |
150 | lines.append('retry: %s' % self.retry) | |
151 | ||
152 | lines.extend('data: %s' % d for d in self.data.split('\n')) | |
153 | return '\n'.join(lines) + '\n\n' | |
154 | ||
155 | @classmethod | |
156 | def parse(cls, raw): | |
157 | """ | |
158 | Given a possibly-multiline string representing an SSE message, parse it | |
159 | and return a Event object. | |
160 | """ | |
161 | msg = cls() | |
162 | for line in raw.splitlines(): | |
163 | m = cls.sse_line_pattern.match(line) | |
164 | if m is None: | |
165 | # Malformed line. Discard but warn. | |
166 | warnings.warn('Invalid SSE line: "%s"' % line, SyntaxWarning) | |
167 | continue | |
168 | ||
169 | name = m.group('name') | |
170 | if name == '': | |
171 | # line began with a ":", so is a comment. Ignore | |
172 | continue | |
173 | value = m.group('value') | |
174 | ||
175 | if name == 'data': | |
176 | # If we already have some data, then join to it with a newline. | |
177 | # Else this is it. | |
178 | if msg.data: | |
179 | msg.data = '%s\n%s' % (msg.data, value) | |
180 | else: | |
181 | msg.data = value | |
182 | elif name == 'event': | |
183 | msg.event = value | |
184 | elif name == 'id': | |
185 | msg.id = value | |
186 | elif name == 'retry': | |
187 | msg.retry = int(value) | |
188 | ||
189 | return msg | |
190 | ||
191 | def __str__(self): | |
192 | return self.data |