]> jfr.im git - z_archive/twitter.git/commitdiff
Implementation of multipart/form-data that handles more imports and is more conformant
authorMike Verdone <redacted>
Tue, 21 Oct 2014 12:26:57 +0000 (14:26 +0200)
committerMike Verdone <redacted>
Tue, 21 Oct 2014 12:26:57 +0000 (14:26 +0200)
tests/test_internals.py
tests/test_sanity.py
twitter/api.py
twitter/stream.py
twitter/util.py

index edea203bafac2a6a5aeec3909fdbeb4ef3d74b5f..1942d00c7c7ea58bffca04e0cb1e9583f6c32110 100644 (file)
@@ -2,6 +2,7 @@
 from __future__ import unicode_literals
 
 from twitter.api import method_for_uri, build_uri
+from twitter.util import PY_3_OR_HIGHER, actually_bytes
 
 def test_method_for_uri__lookup():
     assert "POST" == method_for_uri("/1.1/users/lookup")
@@ -20,3 +21,10 @@ def test_build_uri():
     # But only for strings beginning with _.
     uri = build_uri(["1.1", "foo", "bar"], {"foo": "asdf"})
     assert uri == "1.1/foo/bar"
+
+def test_actually_bytes():
+    out_type = str
+    if PY_3_OR_HIGHER:
+        out_type = bytes
+    for inp in [b"asdf", "asdf", u"asdfüü", 1234]:
+        assert type(actually_bytes(inp)) == out_type
index 679098beb908f73bdf2181061b885ad3304c8c06..08158c0b62d23deb5fb96360ebe9a1e2d3326b78 100644 (file)
@@ -28,6 +28,7 @@ twitter11_na = Twitter(domain='api.twitter.com',
 
 AZaz = "abcdefghijklmnopqrstuvwxyz1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ"
 
+b64_image_data = b"iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAIAAACQkWg2AAAAAXNSR0IArs4c6QAAAAlwSFlzAAALEwAACxMBAJqcGAAAAAd0SU1FB94JFhMBAJv5kaUAAAAZdEVYdENvbW1lbnQAQ3JlYXRlZCB3aXRoIEdJTVBXgQ4XAAAA4UlEQVQoz7WSIZLGIAxG6c5OFZjianBcIOfgPkju1DsEBWfAUEcNGGpY8Xe7dDoVFRvHfO8NJGRorZE39UVe1nd/WNfVObcsi3OOEAIASikAmOf5D2q/FWPUWgshKKWfiFIqhNBaxxhPjPQ05/z+Bs557xw9hBC89ymlu5BS8t6HEC5NW2sR8alRRLTWXoRSSinlSejT12M9BAAAgCeoTw9BSimlfBIu6WdYtVZEVErdaaUUItZaL/9wOsaY83YAMMb0dGtt6Jdv3/ec87ZtOWdCCGNsmibG2DiOJzP8+7b+AAOmsiPxyHWCAAAAAElFTkSuQmCC"
 
 def get_random_str():
     return ''.join(choice(AZaz) for _ in range(10))
@@ -61,16 +62,11 @@ __location__ = os.path.realpath(
 def _img_data():
     return open(os.path.join(__location__, "test.png"), "rb").read()
 
-def test_API_set_unicode_twitpic(base64=False):
-    random_tweet = "A random twitpic from %s with unicode üøπ" % \
-                    ("base64" if base64 else "file") + get_random_str()
-    if base64:
-        img = b"iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAIAAACQkWg2AAAAAXNSR0IArs4c6QAAAAlwSFlzAAALEwAACxMBAJqcGAAAAAd0SU1FB94JFhMBAJv5kaUAAAAZdEVYdENvbW1lbnQAQ3JlYXRlZCB3aXRoIEdJTVBXgQ4XAAAA4UlEQVQoz7WSIZLGIAxG6c5OFZjianBcIOfgPkju1DsEBWfAUEcNGGpY8Xe7dDoVFRvHfO8NJGRorZE39UVe1nd/WNfVObcsi3OOEAIASikAmOf5D2q/FWPUWgshKKWfiFIqhNBaxxhPjPQ05/z+Bs557xw9hBC89ymlu5BS8t6HEC5NW2sR8alRRLTWXoRSSinlSejT12M9BAAAgCeoTw9BSimlfBIu6WdYtVZEVErdaaUUItZaL/9wOsaY83YAMMb0dGtt6Jdv3/ec87ZtOWdCCGNsmibG2DiOJzP8+7b+AAOmsiPxyHWCAAAAAElFTkSuQmCC"
-    else:
-        img = _img_data()
-    params = {"status": random_tweet, "media[]": img}
-    if base64:
-        params["_base64"] = True
+def _test_API_old_media(img, _base64):
+    random_tweet = (
+        "A random twitpic with unicode üøπ"
+        + get_random_str())
+    params = {"status": random_tweet, "media[]": img, "_base64": _base64}
     twitter11.statuses.update_with_media(**params)
     time.sleep(5)
     recent = twitter11.statuses.user_timeline()
@@ -79,7 +75,13 @@ def test_API_set_unicode_twitpic(base64=False):
     assert random_tweet in texts
 
 def test_API_set_unicode_twitpic_base64():
-    test_API_set_unicode_twitpic(base64=True)
+    _test_API_old_media(b64_image_data, True)
+
+def test_API_set_unicode_twitpic_base64_string():
+    _test_API_old_media(b64_image_data.decode('utf-8'), True)
+
+def test_API_set_unicode_twitpic_auto_base64_convert():
+    _test_API_old_media(_img_data(), False)
 
 def test_upload_media():
     res = twitter_upl.media.upload(media=_img_data())
index db20e0e94cc334fdb7c19942ae20f5cbe60244c7..8f5f5f9da6e03e79a2c596d9ceb1b67856cb0426 100644 (file)
@@ -1,6 +1,8 @@
 # encoding: utf-8
 from __future__ import unicode_literals, print_function
 
+from .util import PY_3_OR_HIGHER, actually_bytes
+
 try:
     import urllib.request as urllib_request
     import urllib.error as urllib_error
@@ -200,6 +202,7 @@ class TwitterCall(object):
         kwargs = dict(kwargs)
         uri = build_uri(self.uriparts, kwargs)
         method = kwargs.pop('_method', None) or method_for_uri(uri)
+        domain = self.domain
 
         # If an _id kwarg is present, this is treated as id as a CGI
         # param.
@@ -216,8 +219,8 @@ class TwitterCall(object):
         dot = ""
         if self.format:
             dot = "."
-        uriBase = "http%s://%s/%s%s%s" % (
-            secure_str, self.domain, uri, dot, self.format)
+        url_base = "http%s://%s/%s%s%s" % (
+            secure_str, domain, uri, dot, self.format)
 
         # Check if argument tells whether img is already base64 encoded
         b64_convert = not kwargs.pop("_base64", False)
@@ -229,13 +232,13 @@ class TwitterCall(object):
         if 'media' in kwargs:
             mediafield = 'media'
             media = kwargs.pop('media')
+            media_raw = True
         elif 'media[]' in kwargs:
             mediafield = 'media[]'
             media = kwargs.pop('media[]')
             if b64_convert:
                 media = base64.b64encode(media)
-            if sys.version_info >= (3, 0):
-                media = str(media, 'utf8')
+            media_raw = False
 
         # Catch media arguments that are not accepted through multipart
         # and are not yet base64 encoded
@@ -252,11 +255,11 @@ class TwitterCall(object):
             # Use urlencoded oauth args with no params when sending media
             # via multipart and send it directly via uri even for post
             arg_data = self.auth.encode_params(
-                uriBase, method, {} if media else kwargs)
+                url_base, method, {} if media else kwargs)
             if method == 'GET' or media:
-                uriBase += '?' + arg_data
+                url_base += '?' + arg_data
             else:
-                body = arg_data.encode('utf8')
+                body = arg_data.encode('utf-8')
 
         # Handle query as multipart when sending media
         if media:
@@ -264,29 +267,38 @@ class TwitterCall(object):
             bod = []
             bod.append(b'--' + BOUNDARY)
             bod.append(
-                b'Content-Disposition: form-data; name="%s"' % mediafield.encode('utf-8'))
-            bod.append(b'Content-Transfer-Encoding: base64')
+                b'Content-Disposition: form-data; name="'
+                + actually_bytes(mediafield)
+                + b'"')
+            bod.append(b'Content-Type: application/octet-stream')
+            if not media_raw:
+                bod.append(b'Content-Transfer-Encoding: base64')
             bod.append(b'')
-            bod.append(media)
+            bod.append(actually_bytes(media))
             for k, v in kwargs.items():
-                if sys.version_info < (3, 0):
-                    k = k.encode("utf-8")
-                    v = v.encode("utf-8")
+                k = actually_bytes(k)
+                v = actually_bytes(v)
                 bod.append(b'--' + BOUNDARY)
-                bod.append(b'Content-Disposition: form-data; name="%s"' % k)
+                bod.append(b'Content-Disposition: form-data; name="' + k + b'"')
+                bod.append(b'Content-Type: text/plain;charset=utf-8')
                 bod.append(b'')
                 bod.append(v)
             bod.append(b'--' + BOUNDARY + b'--')
+            bod.append(b'')
+            bod.append(b'')
             body = b'\r\n'.join(bod)
-            headers['Content-Type'] = \
-                'multipart/form-data; boundary=%s' % BOUNDARY
+            # print(body.decode('utf-8', errors='ignore'))
+            headers[b'Content-Type'] = \
+                b'multipart/form-data; boundary=' + BOUNDARY
+
+            for k in headers:
+                headers[actually_bytes(k)] = actually_bytes(headers.pop(k))
+            # print(headers)
 
-            if sys.version_info < (3, 0):
-                uriBase = uriBase.encode("utf-8")
-                for k in headers:
-                    headers[k.encode('utf-8')] = headers.pop(k)
+            if not PY_3_OR_HIGHER:
+                url_base = url_base.encode("utf-8")
 
-        req = urllib_request.Request(uriBase, body, headers)
+        req = urllib_request.Request(url_base, data=body, headers=headers)
         if self.retry:
             return self._handle_response_with_retry(req, uri, arg_data, _timeout)
         else:
index 5384d5b865df3f70947a232261aac46815ee6b1c..f7c9419bd406269c0414b210721b20d3c1744a68 100644 (file)
@@ -1,8 +1,7 @@
 # encoding: utf-8
 from __future__ import unicode_literals
 
-import sys
-PY_3_OR_HIGHER = sys.version_info >= (3, 0)
+from .util import PY_3_OR_HIGHER
 
 if PY_3_OR_HIGHER:
     import urllib.request as urllib_request
index 8d3bd8acac7c4b165ad515e840cb49a90d367b83..5a5f50710f4d8ed8d7d5a0c65f4bc97671f5f072 100644 (file)
@@ -14,6 +14,8 @@ import textwrap
 import time
 import socket
 
+PY_3_OR_HIGHER = sys.version_info >= (3, 0)
+
 try:
     from html.entities import name2codepoint
     unichr = chr
@@ -54,11 +56,27 @@ def printNicely(string):
     else:
         print(string.encode('utf8'))
 
-__all__ = ["htmlentitydecode", "smrt_input"]
+def actually_bytes(stringy):
+    if PY_3_OR_HIGHER:
+        if type(stringy) == bytes:
+            pass
+        elif type(stringy) != str:
+            stringy = str(stringy)
+        if type(stringy) == str:
+            stringy = stringy.encode("utf-8")
+    else:
+        if type(stringy) == str:
+            pass
+        elif type(stringy) != unicode:
+            stringy = str(stringy)
+        if type(stringy) == unicode:
+            stringy = stringy.encode("utf-8")
+    return stringy
 
 def err(msg=""):
     print(msg, file=sys.stderr)
 
+
 class Fail(object):
     """A class to count fails during a repetitive task.
 
@@ -154,3 +172,6 @@ def align_text(text, left_margin=17, max_width=160):
         lines.append('\n'.join(temp_lines))
     ret = '\n'.join(lines)
     return ret.lstrip()
+
+
+__all__ = ["htmlentitydecode", "smrt_input"]