]> jfr.im git - erebus.git/blob - modules/contrib/sseclient.py
urls - allow port in url
[erebus.git] / modules / contrib / sseclient.py
1 #!/usr/bin/env python
2 """client library for iterating over http Server Sent Event (SSE) streams"""
3 #
4 # Distributed under the terms of the MIT license.
5 #
6 from __future__ import unicode_literals
7
8 import codecs
9 import re
10 import time
11 import warnings
12
13 import six
14
15 import requests
16
17 __version__ = '0.0.27'
18
19 # Technically, we should support streams that mix line endings. This regex,
20 # however, assumes that a system will provide consistent line endings.
21 end_of_field = re.compile(r'\r\n\r\n|\r\r|\n\n')
22
23
24 class SSEClient(object):
25 def __init__(self, url, last_id=None, retry=3000, session=None, chunk_size=1024, **kwargs):
26 self.url = url
27 self.last_id = last_id
28 self.retry = retry
29 self.chunk_size = chunk_size
30
31 # Optional support for passing in a requests.Session()
32 self.session = session
33
34 # Any extra kwargs will be fed into the requests.get call later.
35 self.requests_kwargs = kwargs
36
37 # The SSE spec requires making requests with Cache-Control: nocache
38 if 'headers' not in self.requests_kwargs:
39 self.requests_kwargs['headers'] = {}
40 self.requests_kwargs['headers']['Cache-Control'] = 'no-cache'
41
42 # The 'Accept' header is not required, but explicit > implicit
43 self.requests_kwargs['headers']['Accept'] = 'text/event-stream'
44
45 # Keep data here as it streams in
46 self.buf = ''
47
48 self._connect()
49
50 def _connect(self):
51 if self.last_id:
52 self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
53
54 # Use session if set. Otherwise fall back to requests module.
55 requester = self.session or requests
56 self.resp = requester.get(self.url, stream=True, **self.requests_kwargs)
57 self.resp_iterator = self.resp.iter_content(chunk_size=None)
58 encoding = self.resp.encoding or self.resp.apparent_encoding
59 self.decoder = codecs.getincrementaldecoder(encoding)(errors='replace')
60
61 # TODO: Ensure we're handling redirects. Might also stick the 'origin'
62 # attribute on Events like the Javascript spec requires.
63 self.resp.raise_for_status()
64
65 def iter_content(self):
66 def generate():
67 while True:
68 if hasattr(self.resp.raw, '_fp') and \
69 hasattr(self.resp.raw._fp, 'fp') and \
70 hasattr(self.resp.raw._fp.fp, 'read1'):
71 chunk = self.resp.raw._fp.fp.read1(self.chunk_size)
72 else:
73 # _fp is not available, this means that we cannot use short
74 # reads and this will block until the full chunk size is
75 # actually read
76 chunk = self.resp.raw.read(self.chunk_size)
77 if not chunk:
78 break
79 yield chunk
80
81 return generate()
82
83 def _event_complete(self):
84 return re.search(end_of_field, self.buf) is not None
85
86 def __iter__(self):
87 return self
88
89 def __next__(self):
90 while not self._event_complete():
91 try:
92 next_chunk = next(self.resp_iterator)
93 if not next_chunk:
94 raise EOFError()
95 self.buf += self.decoder.decode(next_chunk)
96
97 except (StopIteration, requests.RequestException, EOFError, six.moves.http_client.IncompleteRead) as e:
98 print(e)
99 time.sleep(self.retry / 1000.0)
100 self._connect()
101
102 # The SSE spec only supports resuming from a whole message, so
103 # if we have half a message we should throw it out.
104 head, sep, tail = self.buf.rpartition('\n')
105 self.buf = head + sep
106 continue
107
108 # Split the complete event (up to the end_of_field) into event_string,
109 # and retain anything after the current complete event in self.buf
110 # for next time.
111 (event_string, self.buf) = re.split(end_of_field, self.buf, maxsplit=1)
112 msg = Event.parse(event_string)
113
114 # If the server requests a specific retry delay, we need to honor it.
115 if msg.retry:
116 self.retry = msg.retry
117
118 # last_id should only be set if included in the message. It's not
119 # forgotten if a message omits it.
120 if msg.id:
121 self.last_id = msg.id
122
123 return msg
124
125 if six.PY2:
126 next = __next__
127
128
129 class Event(object):
130
131 sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')
132
133 def __init__(self, data='', event='message', id=None, retry=None):
134 assert isinstance(data, six.string_types), "Data must be text"
135 self.data = data
136 self.event = event
137 self.id = id
138 self.retry = retry
139
140 def dump(self):
141 lines = []
142 if self.id:
143 lines.append('id: %s' % self.id)
144
145 # Only include an event line if it's not the default already.
146 if self.event != 'message':
147 lines.append('event: %s' % self.event)
148
149 if self.retry:
150 lines.append('retry: %s' % self.retry)
151
152 lines.extend('data: %s' % d for d in self.data.split('\n'))
153 return '\n'.join(lines) + '\n\n'
154
155 @classmethod
156 def parse(cls, raw):
157 """
158 Given a possibly-multiline string representing an SSE message, parse it
159 and return a Event object.
160 """
161 msg = cls()
162 for line in raw.splitlines():
163 m = cls.sse_line_pattern.match(line)
164 if m is None:
165 # Malformed line. Discard but warn.
166 warnings.warn('Invalid SSE line: "%s"' % line, SyntaxWarning)
167 continue
168
169 name = m.group('name')
170 if name == '':
171 # line began with a ":", so is a comment. Ignore
172 continue
173 value = m.group('value')
174
175 if name == 'data':
176 # If we already have some data, then join to it with a newline.
177 # Else this is it.
178 if msg.data:
179 msg.data = '%s\n%s' % (msg.data, value)
180 else:
181 msg.data = value
182 elif name == 'event':
183 msg.event = value
184 elif name == 'id':
185 msg.id = value
186 elif name == 'retry':
187 msg.retry = int(value)
188
189 return msg
190
191 def __str__(self):
192 return self.data