Py学习  »  Python

Python使用高德API批量下载POI数据

小猿猴GISer • 2 月前 • 80 次点击  

· 地理、遥感数据代下载/处理 ·

联系我们微信号






xiaoguotmd


POI (Point of Interest) 是地图和地理信息系统(GIS)中的一个重要概念,指的是在地图上标注的具有特定功能或意义的地点。POI可以是商店、餐厅、学校、医院、景点等各类地理实体。它们通常包含位置信息(经纬度)以及相关属性(如名称、地址、类型等)。


我们一般下载POI最容易的方法就是从直接下载OSM开源数据,但是这个数据由于是开源就有两个问题,一是数据精度无法保证,二是相对于高德、百度等地图厂商数据缺失的实在太多。


不过从高德、百度这些地图厂商下载一般会限制每天一定的额度,如果是大批量的下载就需要收费购买他们的额度了。今天我们介绍下如果使用高德API下载POI数据。

1

获取高德的key


首先要在高德开放平台注册(amap.com)注册账号,进行个人开发者的认证,如果能完成企业认证更好(请求额度更高)。


然后在控制台管理Key,创建一个应用并对应着创建一个Key(服务平台选择Web端即可)。


2

代码


批量下载POI代码如下:


import requestsimport jsonimport osimport mathfrom concurrent.futures import ThreadPoolExecutor
# 全局变量KEYS = ["your_api_key1", "your_api_key2"] # 这里放置多个API KeySAVE_DIR = r"F:\data\gaode\data" # 数据保存目录GRID_SIZE = 0.005 # 网格大小为0.005度KEYWORD = "餐厅" # 查询关键字polygon_coords = [[113.0, 23.0], [113.5, 23.0], [113.5, 22.5], [113.0, 22.5]] # 查询的多边形区域NUM_THREADS = 4 # 线程数量,可以根据需求调整
def file_exists(polygon, keyword, page): """检查文件是否已经存在""" lng_min, lat_min = polygon[0][0], polygon[2][1] filename = f"{SAVE_DIR}/poi_{lng_min}_{lat_min}_{keyword}_page{page}.geojson" return os.path.exists(filename)
def save_poi_data(polygon, keyword, page, data): """保存POI数据到GeoJSON文件""" if not os.path.exists(SAVE_DIR): os.makedirs(SAVE_DIR) lng_min, lat_min = polygon[0][0], polygon[2][1] filename = f"{SAVE_DIR}/poi_{lng_min}_{lat_min}_{keyword}_page{page}.geojson" with open(filename, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4) return True
def fetch_poi_data(polygon, key, keyword): """获取POI数据,支持分页""" lng_min, lat_min = polygon[0][0], polygon[2][1] page = 1 while True: if file_exists(polygon, keyword, page): print(f"文件已存在,跳过:网格: {polygon},页数: {page}") page += 1 continue
polygon_str = f"{lng_min},{lat_min}|{polygon[1][0]},{polygon[1][1]}" api_url = f"https://restapi.amap.com/v3/place/polygon?polygon={polygon_str}&keywords={keyword}&key={key}&page={page}"
try: response = requests.get(api_url, timeout=10) response.raise_for_status() data = response.json()
infocode = data.get("infocode") if infocode == "10000" and data.get("pois"): save_poi_data(polygon, keyword, page, data) print( f"下载成功!网格: {polygon_str},页数: {page}") page += 1 if len(data.get("pois")) < 20: # 当POI数据少于20时,说明已经是最后一页 break elif infocode in ["10001", "10003", "10004"]: print(f"Key 出现问题,infocode: {infocode},切换到下一个Key进行重试...") return False # 返回False以便切换Key进行重试 else: print(f"请求失败或无数据,infocode: {infocode},信息: {data.get('info')}") break except Exception as e: print(f"请求异常,跳过此Key:{str(e)}") break return True # 下载成功或完成所有页数时返回True
def generate_grids(polygon_coords): """生成网格""" min_lng = min([coord[0] for coord in polygon_coords]) max_lng = max([coord[0] for coord in polygon_coords]) min_lat = min([coord[1] for coord in polygon_coords]) max_lat = max([coord[1] for coord in polygon_coords])
grids = [] lng_steps = math.ceil((max_lng - min_lng) / GRID_SIZE) lat_steps = math.ceil((max_lat - min_lat) / GRID_SIZE)
for i in range(lng_steps): for j in range(lat_steps): grid_min_lng = min_lng + i * GRID_SIZE grid_max_lng = min(grid_min_lng + GRID_SIZE, max_lng) grid_min_lat = min_lat + j * GRID_SIZE grid_max_lat = min(grid_min_lat + GRID_SIZE, max_lat) grid_polygon = [[grid_min_lng, grid_max_lat], [grid_max_lng, grid_max_lat], [grid_max_lng, grid_min_lat], [grid_min_lng, grid_min_lat]] grids.append(grid_polygon) return grids
def download_poi_for_grid(polygon, key, keyword): """下载单个网格的POI数据""" success = fetch_poi_data(polygon, key, keyword) if not success: return False # 返回False以便线程外层处理Key切换 return True
def main(): grids = generate_grids(polygon_coords) with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: for idx, grid in enumerate(grids): key_idx = 0 while key_idx < len(KEYS): key = KEYS[key_idx] future = executor.submit(download_poi_for_grid, grid, key, KEYWORD) if future.result(): break # 如果下载成功或完成,则跳出循环 key_idx += 1 if key_idx == len(KEYS): print(f"所有Key均不可用,跳过此网格: {grid}")
if __name__ == "__main__": main()

3

代码解释


这里我们通过矩形经纬度设置下载范围,同时为了避免下载遗漏将这个这个矩形划分为多个小的网格分别进行下载保存。


一是可以把区域内的POI下载完,如果是区域较大,我们只进行一次请求很多POI下载不了。二是避免下载中断需要重新下载,中断后可以接着下载。


同时代码使用多进程下载,高德API对并发量有限制,我们按照其文档设置进程即可。


代码中下载区域、划分的网格大小还有需要的POI类型都可以设置。示例代码中划分的网格大小为0.005度(大约为500m)。


将这些划分好的网格数据下载好后,我们还需要将其进行合并。合并后的数据如下:



这里是0.5*0.5度的范围,总共有5万多个点(餐厅),把我一天的额度就给用完了。如果用企业认证的应该可以下载更多。我看淘宝可以租用企业认证的账号,如果感兴趣可以自己尝试一下。


这里下载的数据链接我放到原文链接中了。


合并的代码如下:

import osimport json
SAVE_DIR = r"F:\data\gaode\data" # 数据保存目录OUTPUT_FILE = "merged_poi_data3.geojson" # 合并后的输出文件
def merge_geojson_files(): merged_data = { "type": "FeatureCollection", "features": [] }
for filename in os.listdir(SAVE_DIR): if filename.endswith(".geojson"): file_path = os.path.join(SAVE_DIR, filename) with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) if "pois" in data: for poi in data["pois"]: # 将POI数据转换为GeoJSON格式的Feature feature = { "type": "Feature", "geometry": { "type": "Point", "coordinates": [float(coord) for coord in poi["location"].split(",")] }, "properties": { "name": poi.get("name"), "address": poi.get("address"), "cityname": poi.get("cityname"), "adname": poi.get("adname"), "type": poi.get("type"), "typecode": poi.get("typecode"), "tel": poi.get("tel") } } merged_data["features"].append(feature)
with open(OUTPUT_FILE, "w", encoding="utf-8") as f: json.dump(merged_data, f, ensure_ascii=False, indent=4)
print(f"所有数据已合并并保存为 {OUTPUT_FILE}")
if __name__ == "__main__": merge_geojson_files()




如果大家对某些区域有POI需求,从高德下载数据是一个选择。不过这里下载的数据注意不是WGS84坐标,这里我们在之前的推文中有说过,并且转换的方法也有。


大家有处理数据的需要可以随时找我。

微信号xiaoguotmd

微信公众号|智行探幽

ChatGPT

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/173626
 
80 次点击