How to Set Up Asyncio for Bulk Geocoding
To set up asyncio for bulk geocoding, initialize an asyncio.Semaphore to enforce strict concurrency limits, route all HTTP traffic through a single aiohttp.ClientSession for connection pooling, and process your address batch using asyncio.gather() with built-in retry and fallback logic. This pattern transforms sequential, latency-bound API calls into a high-throughput pipeline that respects provider quotas, handles transient network failures, and automatically routes to secondary geocoding endpoints when primary services degrade.
Core Architecture Components
Bulk geocoding is fundamentally I/O-bound. Synchronous requests loops waste CPU cycles waiting for DNS resolution, TLS handshakes, and remote server processing. Asyncio solves this by yielding control back to the event loop during network waits, allowing dozens of concurrent requests to share a single thread. A production-ready implementation requires four tightly coupled components:
- Concurrency Limiter:
asyncio.Semaphore(n)prevents overwhelming your API provider or exhausting local file descriptors. It acts as a gatekeeper, queuing excess tasks until active slots free up. - Session Reuse: A single
aiohttp.ClientSessionreuses underlying TCP connections and manages keep-alive headers. Creating a new session per request adds 50–150ms of handshake overhead per address. - Backpressure & Rate Alignment: Fixed-delay or token-bucket logic ensures your concurrency ceiling matches provider quotas (e.g., 50 req/sec). Without this, you trigger
429 Too Many Requestsresponses that stall the entire batch. - Structured Fallback Routing: When a primary API returns
5xxerrors, timeouts, or empty geometries, the pipeline silently routes to a secondary provider without dropping the batch or halting execution.
For a deeper dive into request structuring and provider-specific parameter mapping, see Building Async Geocoding Requests in Python. The implementation below provides a complete, copy-paste-ready foundation that integrates all four components.
Production-Ready Implementation
The following class handles session lifecycle management, concurrency throttling, exponential backoff retries, and automatic fallback routing. It returns a list of GeocodeResult objects, preserving input order and capturing provider metadata for auditing.
import asyncio
import aiohttp
import logging
from typing import List, Dict, Optional
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
@dataclass
class GeocodeResult:
address: str
lat: Optional[float] = None
lon: Optional[float] = None
provider: Optional[str] = None
success: bool = False
class AsyncGeocoder:
def __init__(self, max_concurrency: int = 15, timeout: float = 10.0, max_retries: int = 3):
self.semaphore = asyncio.Semaphore(max_concurrency)
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(timeout=self.timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def _geocode_single(self, address: str) -> GeocodeResult:
# Primary provider
primary_url = "https://api.primary-geocoder.com/v1/geocode"
primary_params = {"q": address, "format": "json"}
# Secondary provider
fallback_url = "https://api.secondary-geocoder.com/lookup"
fallback_params = {"address": address}
for attempt in range(self.max_retries):
try:
async with self.semaphore:
# Try primary
async with self.session.get(primary_url, params=primary_params) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("results"):
return GeocodeResult(
address=address,
lat=data["results"][0]["lat"],
lon=data["results"][0]["lon"],
provider="primary",
success=True
)
elif resp.status == 429:
wait = min(2 ** attempt + 1, 10)
logging.warning(f"Rate limited. Retrying in {wait}s...")
await asyncio.sleep(wait)
continue
# Fallback to secondary if primary fails/empty
async with self.session.get(fallback_url, params=fallback_params) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("location"):
return GeocodeResult(
address=address,
lat=data["location"]["lat"],
lon=data["location"]["lon"],
provider="fallback",
success=True
)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
logging.debug(f"Network error for '{address}': {e}")
await asyncio.sleep(2 ** attempt)
continue
return GeocodeResult(address=address, success=False)
async def geocode_batch(self, addresses: List[str]) -> List[GeocodeResult]:
tasks = [self._geocode_single(addr) for addr in addresses]
return await asyncio.gather(*tasks)
Usage:
async def main():
addresses = ["1600 Amphitheatre Pkwy, Mountain View, CA", "350 5th Ave, New York, NY"]
async with AsyncGeocoder(max_concurrency=10) as geocoder:
results = await geocoder.geocode_batch(addresses)
for r in results:
print(f"{r.address} -> {r.lat}, {r.lon} [{r.provider}]")
asyncio.run(main())
Production Scaling & Error Handling
While the core pattern handles concurrency and retries, scaling to tens of thousands of addresses requires additional safeguards:
Memory-Safe Chunking
Loading 100,000 addresses into memory and spawning 100,000 coroutines simultaneously will exhaust system resources. Process batches in chunks of 1,000–5,000, writing results to disk or a database after each chunk completes. This keeps memory footprint predictable and allows graceful recovery if the process crashes mid-run.
Rate Limiting vs. Concurrency
A semaphore limits in-flight requests, but it does not enforce requests-per-second quotas. If your provider caps throughput at 50 req/sec, pair asyncio.Semaphore(50) with a fixed await asyncio.sleep(0.02) or implement a token bucket. The official asyncio documentation recommends combining semaphores with explicit delays when provider SLAs are strict.
Connection Pool Tuning
By default, aiohttp limits the connection pool to 100 open sockets. For high-throughput geocoding, increase limit and limit_per_host in aiohttp.TCPConnector to match your concurrency ceiling. Refer to aiohttp connection pooling docs for socket lifecycle tuning.
Fallback Chain Design
Hardcoding two providers works for small scripts, but enterprise pipelines benefit from configurable chains. Implement Multi-API Routing & Fallback Chains to dynamically swap providers based on real-time latency, cost, or geographic coverage. Store provider configs in YAML or environment variables so routing logic remains decoupled from execution code.
Observability
Log provider response codes, retry counts, and fallback triggers. Track success=False results separately for manual review or secondary batch processing. Geocoding APIs frequently return 200 OK with empty result sets for ambiguous addresses; treat these as soft failures rather than hard network errors.
Next Steps
Once your async pipeline is stable, focus on data validation (standardizing input addresses before submission), caching previously resolved coordinates to reduce API spend, and implementing webhook or queue-based processing for continuous geocoding workloads. The pattern outlined here scales cleanly from hundreds to millions of records when paired with chunked execution and proper rate alignment.