]>
jfr.im git - erebus.git/blob - modules/contrib/sseclient.py
2 """client library for iterating over http Server Sent Event (SSE) streams"""
4 # Distributed under the terms of the MIT license.
6 from __future__
import unicode_literals
17 __version__
= '0.0.27'
19 # Technically, we should support streams that mix line endings. This regex,
20 # however, assumes that a system will provide consistent line endings.
21 end_of_field
= re
.compile(r
'\r\n\r\n|\r\r|\n\n')
24 class SSEClient(object):
25 def __init__(self
, url
, last_id
=None, retry
=3000, session
=None, chunk_size
=1024, **kwargs
):
27 self
.last_id
= last_id
29 self
.chunk_size
= chunk_size
31 # Optional support for passing in a requests.Session()
32 self
.session
= session
34 # Any extra kwargs will be fed into the requests.get call later.
35 self
.requests_kwargs
= kwargs
37 # The SSE spec requires making requests with Cache-Control: nocache
38 if 'headers' not in self
.requests_kwargs
:
39 self
.requests_kwargs
['headers'] = {}
40 self
.requests_kwargs
['headers']['Cache-Control'] = 'no-cache'
42 # The 'Accept' header is not required, but explicit > implicit
43 self
.requests_kwargs
['headers']['Accept'] = 'text/event-stream'
45 # Keep data here as it streams in
52 self
.requests_kwargs
['headers']['Last-Event-ID'] = self
.last_id
54 # Use session if set. Otherwise fall back to requests module.
55 requester
= self
.session
or requests
56 self
.resp
= requester
.get(self
.url
, stream
=True, **self
.requests_kwargs
)
57 self
.resp_iterator
= self
.resp
.iter_content(chunk_size
=None)
58 encoding
= self
.resp
.encoding
or self
.resp
.apparent_encoding
59 self
.decoder
= codecs
.getincrementaldecoder(encoding
)(errors
='replace')
61 # TODO: Ensure we're handling redirects. Might also stick the 'origin'
62 # attribute on Events like the Javascript spec requires.
63 self
.resp
.raise_for_status()
65 def iter_content(self
):
68 if hasattr(self
.resp
.raw
, '_fp') and \
69 hasattr(self
.resp
.raw
._fp
, 'fp') and \
70 hasattr(self
.resp
.raw
._fp
.fp
, 'read1'):
71 chunk
= self
.resp
.raw
._fp
.fp
.read1(self
.chunk_size
)
73 # _fp is not available, this means that we cannot use short
74 # reads and this will block until the full chunk size is
76 chunk
= self
.resp
.raw
.read(self
.chunk_size
)
83 def _event_complete(self
):
84 return re
.search(end_of_field
, self
.buf
) is not None
90 while not self
._event
_complete
():
92 next_chunk
= next(self
.resp_iterator
)
95 self
.buf
+= self
.decoder
.decode(next_chunk
)
97 except (StopIteration, requests
.RequestException
, EOFError, six
.moves
.http_client
.IncompleteRead
) as e
:
99 time
.sleep(self
.retry
/ 1000.0)
102 # The SSE spec only supports resuming from a whole message, so
103 # if we have half a message we should throw it out.
104 head
, sep
, tail
= self
.buf
.rpartition('\n')
105 self
.buf
= head
+ sep
108 # Split the complete event (up to the end_of_field) into event_string,
109 # and retain anything after the current complete event in self.buf
111 (event_string
, self
.buf
) = re
.split(end_of_field
, self
.buf
, maxsplit
=1)
112 msg
= Event
.parse(event_string
)
114 # If the server requests a specific retry delay, we need to honor it.
116 self
.retry
= msg
.retry
118 # last_id should only be set if included in the message. It's not
119 # forgotten if a message omits it.
121 self
.last_id
= msg
.id
131 sse_line_pattern
= re
.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')
133 def __init__(self
, data
='', event
='message', id=None, retry
=None):
134 assert isinstance(data
, six
.string_types
), "Data must be text"
143 lines
.append('id: %s' % self
.id)
145 # Only include an event line if it's not the default already.
146 if self
.event
!= 'message':
147 lines
.append('event: %s' % self
.event
)
150 lines
.append('retry: %s' % self
.retry
)
152 lines
.extend('data: %s' % d
for d
in self
.data
.split('\n'))
153 return '\n'.join(lines
) + '\n\n'
158 Given a possibly-multiline string representing an SSE message, parse it
159 and return a Event object.
162 for line
in raw
.splitlines():
163 m
= cls
.sse_line_pattern
.match(line
)
165 # Malformed line. Discard but warn.
166 warnings
.warn('Invalid SSE line: "%s"' % line
, SyntaxWarning)
169 name
= m
.group('name')
171 # line began with a ":", so is a comment. Ignore
173 value
= m
.group('value')
176 # If we already have some data, then join to it with a newline.
179 msg
.data
= '%s\n%s' % (msg
.data
, value
)
182 elif name
== 'event':
186 elif name
== 'retry':
187 msg
.retry
= int(value
)