您好,欢迎访问一九零五行业门户网

python实现rest请求api示例

该代码参考新浪python api,适用于各个开源api请求
复制代码 代码如下:
# -*- coding: utf-8 -*-
import collections
import gzip
import urllib
import urllib2
from urlparse import urlparsetry:
    from cstringio import stringio
except importerror:
    from stringio import stringio
try:
    import json
except importerror:
    import simplejson as json
__author__ = 'myth'
_http_get = 1
_http_post = 2
# 超时时间(秒)
timeout = 45
return_type = {json: 0, xml: 1, html: 2, text: 3}
_method_map = {'get': _http_get, 'post': _http_post}
class apierror(standarderror):
raise apierror if receiving json message indicating failure.
def __init__(self, error_code, error, request):
        self.error_code = error_code
        self.error = error
        self.request = request
        standarderror.__init__(self, error)
    def __str__(self):
        return 'apierror: %s: %s, request: %s' % (self.error_code, self.error, self.request)
# def callback_type(return_type='json'):
#
#     default_type = json
#     default_value = return_type.get(default_type)
#     if return_type:
#         if isinstance(return_type, (str, unicode)):
#             default_value = return_type.get(return_type.lower(), default_value)
#     return default_value
def _format_params(_pk=none, encode=none, **kw):
:param kw:
    :type kw:
    :param encode:
    :type encode:
    :return:
    :rtype:
__pk = '%s[%%s]' % _pk if _pk else '%s'
    encode = encode if encode else urllib.quote
    for k, v in kw.iteritems():
        _k = __pk % k
        if isinstance(v, basestring):
            qv = v.encode('utf-8') if isinstance(v, unicode) else v
            _v = encode(qv)
            yield _k, _v
        elif isinstance(v, collections.iterable):
            if isinstance(v, dict):
                for _ck, _cv in _format_params(_pk=_k, encode=encode, **v):
                    yield _ck, _cv
            else:
                for i in v:
                    qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)
                    _v = encode(qv)
                    yield _k, _v
        else:
            qv = str(v)
            _v = encode(qv)
            yield _k, _v
def encode_params(**kw):
do url-encode parameters
    >>> encode_params(a=1, b='r&d')
    'a=1&b=r%26d'
    >>> encode_params(a=u'\u4e2d\u6587', b=['a', 'b', 123])
    'a=%e4%b8%ad%e6%96%87&b=a&b=b&b=123'
    >>> encode_params(**{
        'a1': {'aa1': 1, 'aa2': {'aaa1': 11}},
        'b1': [1, 2, 3, 4],
        'c1': {'cc1': 'c', 'cc2': ['q', 1, '@'], 'cc3': {'ccc1': ['s', 2]}}
    })
    'a1[aa1]=1&a1[aa2][aaa1]=11&c1[cc1]=c&c1[cc3][ccc1]=s&c1[cc3][ccc1]=2&c1[cc2]=q&c1[cc2]=1&c1[cc2]=%40&b1=1&b1=2&b1=3&b1=4'
# args = []
    # for k, v in kw.iteritems():
    #     if isinstance(v, basestring):
    #         qv = v.encode('utf-8') if isinstance(v, unicode) else v
    #         args.append('%s=%s' % (k, urllib.quote(qv)))
    #     elif isinstance(v, collections.iterable):
    #         for i in v:
    #             qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)
    #             args.append('%s=%s' % (k, urllib.quote(qv)))
    #     else:
    #         qv = str(v)
    #         args.append('%s=%s' % (k, urllib.quote(qv)))
    # return '&'.join(args)
    args = []
    _urlencode = kw.pop('_urlencode', urllib.quote)
    for k, v in _format_params(_pk=none, encode=_urlencode, **kw):
        args.append('%s=%s' % (k, v))
    args = sorted(args, key=lambda s: s.split(=)[0])
    return '&'.join(args)
def _read_body(obj):
    using_gzip = obj.headers.get('content-encoding', '') == 'gzip'
    body = obj.read()
    if using_gzip:
        gzipper = gzip.gzipfile(fileobj=stringio(body))
        fcontent = gzipper.read()
        gzipper.close()
        return fcontent
    return body
class jsondict(dict):
general json object that allows attributes to be bound to and also behaves like a dict
def __getattr__(self, attr):
        try:
            return self[attr]
        except keyerror:
            raise attributeerror(r'jsondict' object has no attribute '%s' % attr)
    def __setattr__(self, attr, value):
        self[attr] = value
def _parse_json(s):
parse str into jsondict
def _obj_hook(pairs):
convert json object to python object
o = jsondict()
        for k, v in pairs.iteritems():
            o[str(k)] = v
        return o
    return json.loads(s, object_hook=_obj_hook)
def _parse_xml(s):
parse str into xml
raise notimplementederror()
def _parse_html(s):
parse str into html
raise notimplementederror()
def _parse_text(s):
parse str into text
raise notimplementederror()
def _http_call(the_url, method, return_type=json, request_type=none, request_suffix=none, headers={}, _timeout=30, **kwargs):
the_url: 请求地址
    method 请求方法(get,post)
    return_type: 返回格式解析
    request_suffix: 请求地址的后缀,如jsp,net
    _timeout: 超时时间
    kwargs: 请求参数
http_url = %s.%s (the_url, request_suffix) if request_suffix else the_url
    if request_type == 'json':
        headers['content-type'] = 'application/json'
        # method = _http_post
        # json_data = json.dumps(kwargs)
        # # convert str to bytes (ensure encoding is ok)
        # params = json_data.encode('utf-8')
        params = json.dumps(kwargs)
    else:
        params = encode_params(**kwargs)
        http_url = '%s?%s' % (http_url, params) if method == _http_get else http_url
    print http_url
    # u = urlparse(http_url)
    # headers.setdefault(host, u.hostname)
    http_body = none if method == _http_get else params
    req = urllib2.request(http_url, data=http_body, headers=headers)
    callback = globals().get('_parse_{0}'.format(return_type))
    if not hasattr(callback, '__call__'):
        print return '%s' unable to resolve % return_type
        callback = _parse_json
    try:
        resp = urllib2.urlopen(req, timeout=_timeout if _timeout else timeout)
        body = _read_body(resp)
        r = callback(body)
        # if hasattr(r, 'error_code'):
        #     raise apierror(r.error_code, r.get('error', ''), r.get('request', ''))
        return r
    except urllib2.httperror, e:
        try:
            body = _read_body(e)
            r = callback(body)
            return r
        except:
            r = none
        # if hasattr(r, 'error_code'):
        #     raise apierror(r.error_code, r.get('error', ''), r.get('request', ''))
            raise e
class httpobject(object):
    def __init__(self, client, method):
        self.client = client
        self.method = method
    def __getattr__(self, attr):
        def wrap(**kw):
            if attr:
                the_url = '%s/%s' % (self.client.api_url, attr.replace('__', '/'))
            else:
                the_url = self.client.api_url
            return _http_call(the_url, self.method, **kw)
        return wrap
    def __call__(self, **kw):
        return _http_call(self.client.api_url, self.method, **kw)
class apiclient(object):
使用方法:
        比如:api 请求地址为:http://api.open.zbjdev.com/kuaiyinserv/kuaiyin/billaddress
             请求方式为: get
             需要的参数为:user_id 用户的uid
                         is_all 是否查询所有数据,0为默认邮寄地址 1为全部邮寄地址
                         access_token 平台认证
             返回数据为:json
        那么此时使用如下:
        domain = api.open.zbjdev.com
        #如果是https请求,需要将is_https设置为true
        client = apiclient(domain)
        data = {user_id: 14035462, is_all: 1, access_token: xxxxxxxxxx}
        # 如果是post请求,请将get方法改为post方法
        result = client.kuaiyinserv.kuaiyin.billaddress.get(return_type=json, **data)
        #等同于
        # result = client.kuaiyinserv__kuaiyin__billaddress__get(return_type=json, **data)
        # result = client.kuaiyinserv__kuaiyin__billaddress(return_type=json, **data)
def __init__(self, domain, is_https=false):
        http = http
        if domain.startswith(http://) or domain.startswith(https://):
            http, domain = domain.split(://)
        # else:
        if is_https:
            http = https
        self.api_url = ('%s://%s' % (http, domain)).rstrip(/)
        self.get = httpobject(self, _http_get)
        self.post = httpobject(self, _http_post)
    def __getattr__(self, attr):
        if '__' in attr:
            method = self.get
            if attr[-6:] == __post:
                attr = attr[:-6]
                method = self.post
            elif attr[-5:] == __get:
                attr = attr[:-5]
            if attr[:2] == '__':
                attr = attr[2:]
            return getattr(method, attr)
        return _callable(self, attr)
class _executable(object):
    def __init__(self, client, method, path):
        self._client = client
        self._method = method
        self._path = path
    def __call__(self, **kw):
        method = _method_map[self._method]
        return _http_call('%s/%s' % (self._client.api_url, self._path), method, **kw)
    def __str__(self):
        return '_executable (%s %s)' % (self._method, self._path)
    __repr__ = __str__
class _callable(object):
    def __init__(self, client, name):
        self._client = client
        self._name = name
    def __getattr__(self, attr):
        if attr == 'get':
            return _executable(self._client, 'get', self._name)
        if attr == 'post':
            return _executable(self._client, 'post', self._name)
        name = '%s/%s' % (self._name, attr)
        return _callable(self._client, name)
    def __str__(self):
        return '_callable (%s)' % self._name
    __repr__ = __str__
def test_logistics():
    domain = https://api.kuaidi100.com/api
    #如果是https请求,需要将is_https设置为true
    client = apiclient(domain)
    data = {id: 45f2d1f2sds, com: yunda, nu: 1500066330925}
    result = client.__get(_timeout=2, **data)
    print result
    print result[message]
    print result.get(message)
    print result.message
if __name__ == '__main__':
# test_logistics()
    data = {
        data:{
            id: 1,
            pid: 3,
            name: u'中的阿斯顿'
        },
        sign: 'asdfdsdsfsdf',
    }
    domain = kuaiyin.zhubajie.com
    client = apiclient(domain)
    # headers = {'content-type': 'application/json'}
    headers = {host: domain}
    result = client.api.zhubajie.fz.info.post(return_type=json, request_type=json, headers=headers, **data)
    print result
    print result['data']['msg']
    c = apiclient('task.6.zbj.cn')
    r = getattr(c.api, 'kuaiyin-action-delcache').post(request_type=json,
                                                       headers={},
                                                       **{sign: ,
                                                          data: {
                                                              product: pvc,
                                                              category: card,
                                                              req_type: pack_list
                                                          }})
    print r
    print r['data']['msg']
其它类似信息

推荐信息