import requestsimport jsonimport osimport mathfrom concurrent.futures import ThreadPoolExecutor
KEYS = ["your_api_key1", "your_api_key2"] SAVE_DIR = r"F:\data\gaode\data" GRID_SIZE = 0.005 KEYWORD = "餐厅" polygon_coords = [[113.0, 23.0], [113.5, 23.0], [113.5, 22.5], [113.0, 22.5]] NUM_THREADS = 4
def file_exists(polygon, keyword, page): """检查文件是否已经存在""" lng_min, lat_min = polygon[0][0], polygon[2][1] filename = f"{SAVE_DIR}/poi_{lng_min}_{lat_min}_{keyword}_page{page}.geojson" return os.path.exists(filename)
def save_poi_data(polygon, keyword, page, data): """保存POI数据到GeoJSON文件""" if not os.path.exists(SAVE_DIR): os.makedirs(SAVE_DIR) lng_min, lat_min = polygon[0][0], polygon[2][1] filename = f"{SAVE_DIR}/poi_{lng_min}_{lat_min}_{keyword}_page{page}.geojson" with open(filename, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4) return True
def fetch_poi_data(polygon, key, keyword): """获取POI数据,支持分页""" lng_min, lat_min = polygon[0][0], polygon[2][1] page = 1 while True: if file_exists(polygon, keyword, page): print(f"文件已存在,跳过:网格: {polygon},页数: {page}") page += 1 continue
polygon_str = f"{lng_min},{lat_min}|{polygon[1][0]},{polygon[1][1]}" api_url = f"https://restapi.amap.com/v3/place/polygon?polygon={polygon_str}&keywords={keyword}&key={key}&page={page}"
try: response = requests.get(api_url, timeout=10) response.raise_for_status() data = response.json()
infocode = data.get("infocode") if infocode == "10000" and data.get("pois"): save_poi_data(polygon, keyword, page, data) print(
f"下载成功!网格: {polygon_str},页数: {page}") page += 1 if len(data.get("pois")) < 20: break elif infocode in ["10001", "10003", "10004"]: print(f"Key 出现问题,infocode: {infocode},切换到下一个Key进行重试...") return False else: print(f"请求失败或无数据,infocode: {infocode},信息: {data.get('info')}") break except Exception as e: print(f"请求异常,跳过此Key:{str(e)}") break return True
def generate_grids(polygon_coords): """生成网格""" min_lng = min([coord[0] for coord in polygon_coords]) max_lng = max([coord[0] for coord in polygon_coords]) min_lat = min([coord[1] for coord in polygon_coords]) max_lat = max([coord[1] for coord in polygon_coords])
grids = [] lng_steps = math.ceil((max_lng - min_lng) / GRID_SIZE) lat_steps = math.ceil((max_lat - min_lat) / GRID_SIZE)
for i in range(lng_steps): for j in range(lat_steps): grid_min_lng = min_lng + i * GRID_SIZE grid_max_lng = min(grid_min_lng + GRID_SIZE, max_lng) grid_min_lat = min_lat + j * GRID_SIZE grid_max_lat = min(grid_min_lat + GRID_SIZE, max_lat) grid_polygon = [[grid_min_lng, grid_max_lat], [grid_max_lng, grid_max_lat], [grid_max_lng, grid_min_lat], [grid_min_lng, grid_min_lat]] grids.append(grid_polygon) return grids
def download_poi_for_grid(polygon, key, keyword): """下载单个网格的POI数据""" success = fetch_poi_data(polygon, key, keyword) if not success: return False return True
def main(): grids = generate_grids(polygon_coords) with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: for idx, grid in enumerate(grids): key_idx = 0 while key_idx < len(KEYS): key = KEYS[key_idx] future = executor.submit(download_poi_for_grid, grid, key, KEYWORD) if future.result(): break key_idx += 1 if key_idx == len(KEYS): print(f"所有Key均不可用,跳过此网格: {grid}")
if __name__ == "__main__": main()