社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

python实现拼多多订单接口API

马哥Linux运维 • 5 年前 • 802 次点击  

最近工作需要用到拼多多的一些接口,官方竟然没有提供,python的sdk,于是就自己简单的写了一个订单接口的api。

1、代码

#!/usr/bin/python3# -*- coding: utf-8 -*-# @Time    : 2020/3/29 0021 下午 19:40# @Author  : xiaozhi!# @FileName: pdd_api# @Software: PyCharmimport timeimport hashlibimport requestsfrom urllib.parse import urlencode
class PddApi: host = "https://gw-api.pinduoduo.com/api/router?" headers = { "accept": "application/json" }
def __init__(self, id=None, token=None, secret=None): """ 初始化 """ self.client_id = id self.secret = secret self.access_token = token self.data_type = "JSON"
def sign_md5(self, params): """ 对拼接好的字符串进行md5签名 """ hl = hashlib.md5() hl.update(params.encode(encoding='utf-8')) return hl.hexdigest().upper()
def splice_str(self): """ 升序排序请求参数,连接字符串,并在首尾加上client_secret """ self.timestamp = f"{time.time()}".split(".")[0] pdd_dict = self.__dict__.copy() secret = self.secret del pdd_dict["secret"] reverse_list = sorted([(k, str(v)) for k, v in pdd_dict.items()], key=lambda x:x[0]) reverse_list.insert(0, ("", secret)) reverse_list.append(("", secret)) reverse_list_str = list(map(lambda x: "".join(x), reverse_list)) params = "".join(reverse_list_str) return params, pdd_dict
def urlencode_data(self, params, pdd_dict): pdd_dict["sign"] = self.sign_md5(params) result = urlencode(pdd_dict) url = f"{self.host}{result}" return url
def pdd_order_list_get(self, **kwargs): """ 获取订单列表 kwargs: order_status=None, refund_status=None, start_confirm_at=None, end_confirm_at=None, page=None, page_size=None """ res_type = "pdd.order.list.get" self.type = res_type for k, v in kwargs.items(): setattr(self, k, v) params, pdd_dict = self.splice_str()
url = self.urlencode_data(params, pdd_dict) return self.response_json(url=url)
def pdd_order_information_get(self, order_sn=None): """ 获取订单详情 """ res_type = "pdd.order.information.get" self.order_sn = order_sn self.type = res_type params, pdd_dict = self.splice_str() url = self.urlencode_data(params, pdd_dict) return self.response_json(url=url)
def pdd_order_status_get(self, order_sns=None): """ 订单状态 """ res_type = "pdd.order.status.get" self.order_sns = order_sns self.type = res_type params, pdd_dict = self.splice_str() url = self.urlencode_data(params, pdd_dict) return self.response_json(url=url)
def pdd_erp_order_sync(self, **kwargs): """ erp打单信息同步, kwargs: order_sn=None, order_state=None, waybill_no=None, logistics_id=None """ res_type = "pdd.erp.order.sync" self.type = res_type for k, v in kwargs.items(): setattr(self, k, v) params, pdd_dict = self.splice_str() url = self.urlencode_data(params, pdd_dict) return self.response_json(url=url)
def pdd_order_number_list_increment_get(self, **kwargs): """ 订单增量接口 kwargs: is_lucky_flag=None, order_state=None, start_updated_at=None, end_updated_at=None,page_size=None, page=None, refund_status=None """ res_type = "pdd.order.number.list.increment.get" self.type = res_type for k, v in kwargs.items(): setattr(self, k, v) params, pdd_dict = self.splice_str() url = self.urlencode_data(params, pdd_dict) return self.response_json(url=url)
def response_json(self, url): res = requests.post(url=url, headers=self.headers) return res.json()

2、调用:

3、结果:



☆ END ☆


Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/58627
 
802 次点击