import requests
import json
import os
import math
from concurrent.futures import ThreadPoolExecutor
KEYS = ["your_api_key1", "your_api_key2"]
SAVE_DIR = r"F:\data\gaode\data"
GRID_SIZE = 0.005
KEYWORD = "餐厅"
polygon_coords = [[113.0, 23.0], [113.5, 23.0], [113.5, 22.5], [113.0, 22.5]]
NUM_THREADS = 4
def file_exists(polygon, keyword, page):
"""检查文件是否已经存在"""
lng_min, lat_min = polygon[0][0], polygon[2][1]
filename = f"{SAVE_DIR}/poi_{lng_min}_{lat_min}_{keyword}_page{page}.geojson"
return os.path.exists(filename)
def save_poi_data(polygon, keyword, page, data):
"""保存POI数据到GeoJSON文件"""
if not os.path.exists(SAVE_DIR):
os.makedirs(SAVE_DIR)
lng_min, lat_min = polygon[0][0], polygon[2][1]
filename = f"{SAVE_DIR}/poi_{lng_min}_{lat_min}_{keyword}_page{page}.geojson"
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return True
def fetch_poi_data(polygon, key, keyword):
"""获取POI数据,支持分页"""
lng_min, lat_min = polygon[0][0], polygon[2][1]
page = 1
while True:
if file_exists(polygon, keyword, page):
print(f"文件已存在,跳过:网格: {polygon},页数: {page}")
page += 1
continue
polygon_str = f"{lng_min},{lat_min}|{polygon[1][0]},{polygon[1][1]}"
api_url = f"https://restapi.amap.com/v3/place/polygon?polygon={polygon_str}&keywords={keyword}&key={key}&page={page}"
try:
response = requests.get(api_url, timeout=10)
response.raise_for_status()
data = response.json()
infocode = data.get("infocode")
if infocode == "10000" and data.get("pois"):
save_poi_data(polygon, keyword, page, data)
print(
f"下载成功!网格: {polygon_str},页数: {page}")
page += 1
if len(data.get("pois")) < 20:
break
elif infocode in ["10001", "10003", "10004"]:
print(f"Key 出现问题,infocode: {infocode},切换到下一个Key进行重试...")
return False
else:
print(f"请求失败或无数据,infocode: {infocode},信息: {data.get('info')}")
break
except Exception as e:
print(f"请求异常,跳过此Key:{str(e)}")
break
return True
def generate_grids(polygon_coords):
"""生成网格"""
min_lng = min([coord[0] for coord in polygon_coords])
max_lng = max([coord[0] for coord in polygon_coords])
min_lat = min([coord[1] for coord in polygon_coords])
max_lat = max([coord[1] for coord in polygon_coords])
grids = []
lng_steps = math.ceil((max_lng - min_lng) / GRID_SIZE)
lat_steps = math.ceil((max_lat - min_lat) / GRID_SIZE)
for i in range(lng_steps):
for j in range(lat_steps):
grid_min_lng = min_lng + i * GRID_SIZE
grid_max_lng = min(grid_min_lng + GRID_SIZE, max_lng)
grid_min_lat = min_lat + j * GRID_SIZE
grid_max_lat = min(grid_min_lat + GRID_SIZE, max_lat)
grid_polygon = [[grid_min_lng, grid_max_lat], [grid_max_lng, grid_max_lat], [grid_max_lng, grid_min_lat], [grid_min_lng, grid_min_lat]]
grids.append(grid_polygon)
return grids
def download_poi_for_grid(polygon, key, keyword):
"""下载单个网格的POI数据"""
success = fetch_poi_data(polygon, key, keyword)
if not success:
return False
return True
def main():
grids = generate_grids(polygon_coords)
with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
for idx, grid in enumerate(grids):
key_idx = 0
while key_idx < len(KEYS):
key = KEYS[key_idx]
future = executor.submit(download_poi_for_grid, grid, key, KEYWORD)
if future.result():
break
key_idx += 1
if key_idx == len(KEYS):
print(f"所有Key均不可用,跳过此网格: {grid}")
if __name__ == "__main__":
main()