from __future__ import unicode_literals
from twitter.api import method_for_uri, build_uri
+from twitter.util import PY_3_OR_HIGHER, actually_bytes
def test_method_for_uri__lookup():
assert "POST" == method_for_uri("/1.1/users/lookup")
# But only for strings beginning with _.
uri = build_uri(["1.1", "foo", "bar"], {"foo": "asdf"})
assert uri == "1.1/foo/bar"
+
+def test_actually_bytes():
+ out_type = str
+ if PY_3_OR_HIGHER:
+ out_type = bytes
+ for inp in [b"asdf", "asdf", u"asdfüü", 1234]:
+ assert type(actually_bytes(inp)) == out_type
AZaz = "abcdefghijklmnopqrstuvwxyz1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+b64_image_data = b"iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAIAAACQkWg2AAAAAXNSR0IArs4c6QAAAAlwSFlzAAALEwAACxMBAJqcGAAAAAd0SU1FB94JFhMBAJv5kaUAAAAZdEVYdENvbW1lbnQAQ3JlYXRlZCB3aXRoIEdJTVBXgQ4XAAAA4UlEQVQoz7WSIZLGIAxG6c5OFZjianBcIOfgPkju1DsEBWfAUEcNGGpY8Xe7dDoVFRvHfO8NJGRorZE39UVe1nd/WNfVObcsi3OOEAIASikAmOf5D2q/FWPUWgshKKWfiFIqhNBaxxhPjPQ05/z+Bs557xw9hBC89ymlu5BS8t6HEC5NW2sR8alRRLTWXoRSSinlSejT12M9BAAAgCeoTw9BSimlfBIu6WdYtVZEVErdaaUUItZaL/9wOsaY83YAMMb0dGtt6Jdv3/ec87ZtOWdCCGNsmibG2DiOJzP8+7b+AAOmsiPxyHWCAAAAAElFTkSuQmCC"
def get_random_str():
return ''.join(choice(AZaz) for _ in range(10))
def _img_data():
return open(os.path.join(__location__, "test.png"), "rb").read()
-def test_API_set_unicode_twitpic(base64=False):
- random_tweet = "A random twitpic from %s with unicode üøπ" % \
- ("base64" if base64 else "file") + get_random_str()
- if base64:
- img = b"iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAIAAACQkWg2AAAAAXNSR0IArs4c6QAAAAlwSFlzAAALEwAACxMBAJqcGAAAAAd0SU1FB94JFhMBAJv5kaUAAAAZdEVYdENvbW1lbnQAQ3JlYXRlZCB3aXRoIEdJTVBXgQ4XAAAA4UlEQVQoz7WSIZLGIAxG6c5OFZjianBcIOfgPkju1DsEBWfAUEcNGGpY8Xe7dDoVFRvHfO8NJGRorZE39UVe1nd/WNfVObcsi3OOEAIASikAmOf5D2q/FWPUWgshKKWfiFIqhNBaxxhPjPQ05/z+Bs557xw9hBC89ymlu5BS8t6HEC5NW2sR8alRRLTWXoRSSinlSejT12M9BAAAgCeoTw9BSimlfBIu6WdYtVZEVErdaaUUItZaL/9wOsaY83YAMMb0dGtt6Jdv3/ec87ZtOWdCCGNsmibG2DiOJzP8+7b+AAOmsiPxyHWCAAAAAElFTkSuQmCC"
- else:
- img = _img_data()
- params = {"status": random_tweet, "media[]": img}
- if base64:
- params["_base64"] = True
+def _test_API_old_media(img, _base64):
+ random_tweet = (
+ "A random twitpic with unicode üøπ"
+ + get_random_str())
+ params = {"status": random_tweet, "media[]": img, "_base64": _base64}
twitter11.statuses.update_with_media(**params)
time.sleep(5)
recent = twitter11.statuses.user_timeline()
assert random_tweet in texts
def test_API_set_unicode_twitpic_base64():
- test_API_set_unicode_twitpic(base64=True)
+ _test_API_old_media(b64_image_data, True)
+
+def test_API_set_unicode_twitpic_base64_string():
+ _test_API_old_media(b64_image_data.decode('utf-8'), True)
+
+def test_API_set_unicode_twitpic_auto_base64_convert():
+ _test_API_old_media(_img_data(), False)
def test_upload_media():
res = twitter_upl.media.upload(media=_img_data())
# encoding: utf-8
from __future__ import unicode_literals, print_function
+from .util import PY_3_OR_HIGHER, actually_bytes
+
try:
import urllib.request as urllib_request
import urllib.error as urllib_error
kwargs = dict(kwargs)
uri = build_uri(self.uriparts, kwargs)
method = kwargs.pop('_method', None) or method_for_uri(uri)
+ domain = self.domain
# If an _id kwarg is present, this is treated as id as a CGI
# param.
dot = ""
if self.format:
dot = "."
- uriBase = "http%s://%s/%s%s%s" % (
- secure_str, self.domain, uri, dot, self.format)
+ url_base = "http%s://%s/%s%s%s" % (
+ secure_str, domain, uri, dot, self.format)
# Check if argument tells whether img is already base64 encoded
b64_convert = not kwargs.pop("_base64", False)
if 'media' in kwargs:
mediafield = 'media'
media = kwargs.pop('media')
+ media_raw = True
elif 'media[]' in kwargs:
mediafield = 'media[]'
media = kwargs.pop('media[]')
if b64_convert:
media = base64.b64encode(media)
- if sys.version_info >= (3, 0):
- media = str(media, 'utf8')
+ media_raw = False
# Catch media arguments that are not accepted through multipart
# and are not yet base64 encoded
# Use urlencoded oauth args with no params when sending media
# via multipart and send it directly via uri even for post
arg_data = self.auth.encode_params(
- uriBase, method, {} if media else kwargs)
+ url_base, method, {} if media else kwargs)
if method == 'GET' or media:
- uriBase += '?' + arg_data
+ url_base += '?' + arg_data
else:
- body = arg_data.encode('utf8')
+ body = arg_data.encode('utf-8')
# Handle query as multipart when sending media
if media:
bod = []
bod.append(b'--' + BOUNDARY)
bod.append(
- b'Content-Disposition: form-data; name="%s"' % mediafield.encode('utf-8'))
- bod.append(b'Content-Transfer-Encoding: base64')
+ b'Content-Disposition: form-data; name="'
+ + actually_bytes(mediafield)
+ + b'"')
+ bod.append(b'Content-Type: application/octet-stream')
+ if not media_raw:
+ bod.append(b'Content-Transfer-Encoding: base64')
bod.append(b'')
- bod.append(media)
+ bod.append(actually_bytes(media))
for k, v in kwargs.items():
- if sys.version_info < (3, 0):
- k = k.encode("utf-8")
- v = v.encode("utf-8")
+ k = actually_bytes(k)
+ v = actually_bytes(v)
bod.append(b'--' + BOUNDARY)
- bod.append(b'Content-Disposition: form-data; name="%s"' % k)
+ bod.append(b'Content-Disposition: form-data; name="' + k + b'"')
+ bod.append(b'Content-Type: text/plain;charset=utf-8')
bod.append(b'')
bod.append(v)
bod.append(b'--' + BOUNDARY + b'--')
+ bod.append(b'')
+ bod.append(b'')
body = b'\r\n'.join(bod)
- headers['Content-Type'] = \
- 'multipart/form-data; boundary=%s' % BOUNDARY
+ # print(body.decode('utf-8', errors='ignore'))
+ headers[b'Content-Type'] = \
+ b'multipart/form-data; boundary=' + BOUNDARY
+
+ for k in headers:
+ headers[actually_bytes(k)] = actually_bytes(headers.pop(k))
+ # print(headers)
- if sys.version_info < (3, 0):
- uriBase = uriBase.encode("utf-8")
- for k in headers:
- headers[k.encode('utf-8')] = headers.pop(k)
+ if not PY_3_OR_HIGHER:
+ url_base = url_base.encode("utf-8")
- req = urllib_request.Request(uriBase, body, headers)
+ req = urllib_request.Request(url_base, data=body, headers=headers)
if self.retry:
return self._handle_response_with_retry(req, uri, arg_data, _timeout)
else:
import time
import socket
+PY_3_OR_HIGHER = sys.version_info >= (3, 0)
+
try:
from html.entities import name2codepoint
unichr = chr
else:
print(string.encode('utf8'))
-__all__ = ["htmlentitydecode", "smrt_input"]
+def actually_bytes(stringy):
+ if PY_3_OR_HIGHER:
+ if type(stringy) == bytes:
+ pass
+ elif type(stringy) != str:
+ stringy = str(stringy)
+ if type(stringy) == str:
+ stringy = stringy.encode("utf-8")
+ else:
+ if type(stringy) == str:
+ pass
+ elif type(stringy) != unicode:
+ stringy = str(stringy)
+ if type(stringy) == unicode:
+ stringy = stringy.encode("utf-8")
+ return stringy
def err(msg=""):
print(msg, file=sys.stderr)
+
class Fail(object):
"""A class to count fails during a repetitive task.
lines.append('\n'.join(temp_lines))
ret = '\n'.join(lines)
return ret.lstrip()
+
+
+__all__ = ["htmlentitydecode", "smrt_input"]