Installation
pip install requests
pip install httpx
Link API Client
import hmac
import hashlib
import time
import secrets
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urlencode
import requests
@dataclass
class EmergeConfig:
client_id: str
signing_secret: str
api_token: str
redirect_uri: str
@dataclass
class LinkResult:
url: str
state: str
@dataclass
class CallbackParams:
status: str # 'success', 'reauthorized', 'failure'
state: str
uid: str
error_code: Optional[str] = None
class EmergeLinkClient:
def __init__(self, config: EmergeConfig):
self.config = config
self._state_store: dict[str, dict] = {}
def create_link_url(self, user_id: str) -> LinkResult:
"""Create a signed link URL for the consent flow."""
# Use ISO 8601 timestamp
timestamp = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
state = secrets.token_hex(16)
# Store state for verification
self._state_store[state] = {
'user_id': user_id,
'created_at': time.time()
}
params = {
'client_id': self.config.client_id,
'redirect_uri': self.config.redirect_uri,
'state': state,
'timestamp': timestamp,
'uid': user_id
}
# Sort and create signature (raw values, NOT URL-encoded)
sorted_items = sorted(params.items())
signature_base = '&'.join(f'{k}={v}' for k, v in sorted_items)
signature = hmac.new(
self.config.signing_secret.encode(),
signature_base.encode(),
hashlib.sha256
).hexdigest()
params['signature'] = signature
return LinkResult(
url=f'https://link.emergedata.ai/link/start?{urlencode(params)}',
state=state
)
def verify_state(self, state: str) -> tuple[bool, Optional[str]]:
"""Verify callback state parameter."""
stored = self._state_store.pop(state, None)
if not stored:
return False, None
# Check expiration (1 hour)
if time.time() - stored['created_at'] > 3600:
return False, None
return True, stored['user_id']
def parse_callback(self, query: dict) -> CallbackParams:
"""Parse callback parameters."""
return CallbackParams(
status=query.get('status', ''),
state=query.get('state', ''),
uid=query.get('uid', ''),
error_code=query.get('error_code')
)
def get_consent_status(self, uid: str) -> dict:
"""Get consent status for a user.
Returns:
dict with 'consents' array containing provider, scopes,
valid_until, status, issued_at for each consent
"""
response = requests.get(
f'https://link.emergedata.ai/consent/status/{uid}',
headers={'Authorization': f'Bearer {self.config.api_token}'}
)
response.raise_for_status()
return response.json()
def get_export_status(self, uid: str) -> dict:
"""Get export status for a user.
Returns:
dict with uid and sources[] where each source contains:
provider, data_ready, data_landed_at, export_status, export_completed_at
"""
response = requests.get(
f'https://link.emergedata.ai/export/status/{uid}',
headers={'Authorization': f'Bearer {self.config.api_token}'}
)
response.raise_for_status()
return response.json()
Query API Client
from dataclasses import dataclass
from typing import Optional, Callable, Any
import requests
@dataclass
class QueryParams:
uid: str
ingested_begin: Optional[str] = None
ingested_end: Optional[str] = None
cursor: Optional[str] = None
limit: Optional[int] = None
@dataclass
class QueryResult:
data: list[dict]
count: int
has_more: bool
next_cursor: Optional[str]
applied_ingested_end: Optional[str]
class EmergeQueryClient:
def __init__(self, api_token: str):
self.api_token = api_token
self.base_url = 'https://query.emergedata.ai/v1'
def _request(self, endpoint: str, params: QueryParams) -> QueryResult:
url_params = {'uid': params.uid}
if params.ingested_begin:
url_params['ingested_begin'] = params.ingested_begin
if params.ingested_end:
url_params['ingested_end'] = params.ingested_end
if params.cursor:
url_params['cursor'] = params.cursor
if params.limit:
url_params['limit'] = str(params.limit)
response = requests.get(
f'{self.base_url}{endpoint}',
params=url_params,
headers={'Authorization': f'Bearer {self.api_token}'}
)
response.raise_for_status()
result = response.json()
return QueryResult(
data=result['data'],
count=result.get('count', len(result['data'])),
has_more=result.get('has_more', False),
next_cursor=result.get('next_cursor'),
applied_ingested_end=result.get('applied_ingested_end')
)
def get_search(self, params: QueryParams) -> QueryResult:
"""Get search history (sync)."""
return self._request('/sync/get_search', params)
def get_browsing(self, params: QueryParams) -> QueryResult:
"""Get browsing history (sync)."""
return self._request('/sync/get_browsing', params)
def get_youtube(self, params: QueryParams) -> QueryResult:
"""Get YouTube history (sync)."""
return self._request('/sync/get_youtube', params)
def get_ads(self, params: QueryParams) -> QueryResult:
"""Get ad interactions (sync)."""
return self._request('/sync/get_ads', params)
def get_receipts(self, params: QueryParams) -> QueryResult:
"""Get receipts (sync)."""
return self._request('/sync/get_receipts', params)
def fetch_all(
self,
fetcher: Callable[[QueryParams], QueryResult],
uid: str,
ingested_begin: Optional[str] = None,
ingested_end: Optional[str] = None
) -> tuple[list[dict], Optional[str]]:
"""Fetch all records with automatic pagination.
Returns:
Tuple of (all_data, applied_ingested_end)
"""
all_data = []
cursor = None
applied_end = None
while True:
params = QueryParams(
uid=uid,
ingested_begin=ingested_begin,
ingested_end=ingested_end,
cursor=cursor
)
result = fetcher(params)
all_data.extend(result.data)
applied_end = result.applied_ingested_end
if not result.has_more:
break
cursor = result.next_cursor
return all_data, applied_end
Async Query Client
import httpx
from dataclasses import dataclass
from typing import Optional, Callable, Awaitable
class AsyncEmergeQueryClient:
def __init__(self, api_token: str):
self.api_token = api_token
self.base_url = 'https://query.emergedata.ai/v1'
async def _request(self, endpoint: str, params: QueryParams) -> QueryResult:
url_params = {'uid': params.uid}
if params.ingested_begin:
url_params['ingested_begin'] = params.ingested_begin
if params.ingested_end:
url_params['ingested_end'] = params.ingested_end
if params.cursor:
url_params['cursor'] = params.cursor
if params.limit:
url_params['limit'] = str(params.limit)
async with httpx.AsyncClient() as client:
response = await client.get(
f'{self.base_url}{endpoint}',
params=url_params,
headers={'Authorization': f'Bearer {self.api_token}'}
)
response.raise_for_status()
result = response.json()
return QueryResult(
data=result['data'],
count=result.get('count', len(result['data'])),
has_more=result.get('has_more', False),
next_cursor=result.get('next_cursor'),
applied_ingested_end=result.get('applied_ingested_end')
)
async def get_search(self, params: QueryParams) -> QueryResult:
return await self._request('/sync/get_search', params)
async def get_browsing(self, params: QueryParams) -> QueryResult:
return await self._request('/sync/get_browsing', params)
async def get_youtube(self, params: QueryParams) -> QueryResult:
return await self._request('/sync/get_youtube', params)
async def get_ads(self, params: QueryParams) -> QueryResult:
return await self._request('/sync/get_ads', params)
async def get_receipts(self, params: QueryParams) -> QueryResult:
return await self._request('/sync/get_receipts', params)
async def fetch_all(
self,
fetcher: Callable[[QueryParams], Awaitable[QueryResult]],
uid: str,
ingested_begin: Optional[str] = None
) -> tuple[list[dict], Optional[str]]:
all_data = []
cursor = None
applied_end = None
while True:
params = QueryParams(
uid=uid,
ingested_begin=ingested_begin,
cursor=cursor
)
result = await fetcher(params)
all_data.extend(result.data)
applied_end = result.applied_ingested_end
if not result.has_more:
break
cursor = result.next_cursor
return all_data, applied_end
Webhook Handler
import hmac
import hashlib
import json
from dataclasses import dataclass
from typing import Literal
WebhookEvent = Literal[
'consent.given',
'consent.revoked',
'consent.expiring',
'consent.reauthorized',
'data.ready',
'data.failed'
]
@dataclass
class WebhookPayload:
event: WebhookEvent
timestamp: str
uid: str
client_id: str
sources: list[dict]
class EmergeWebhookHandler:
def __init__(self, webhook_secret: str):
self.secret = webhook_secret
def verify_signature(self, payload: bytes, signature: str) -> bool:
"""Verify webhook signature."""
expected = hmac.new(
self.secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
def parse_webhook(self, raw_body: bytes, signature: str) -> WebhookPayload:
"""Parse and verify webhook."""
if not self.verify_signature(raw_body, signature):
raise ValueError('Invalid webhook signature')
data = json.loads(raw_body)
return WebhookPayload(
event=data['event'],
timestamp=data['timestamp'],
uid=data['uid'],
client_id=data['client_id'],
sources=data.get('sources', [])
)
Usage Example
import os
# Initialize clients
config = EmergeConfig(
client_id=os.environ['EMERGE_CLIENT_ID'],
signing_secret=os.environ['EMERGE_SIGNING_SECRET'],
api_token=os.environ['EMERGE_API_TOKEN'],
redirect_uri='https://yourapp.com/emerge/callback'
)
link_client = EmergeLinkClient(config)
query_client = EmergeQueryClient(os.environ['EMERGE_API_TOKEN'])
# Create consent link
result = link_client.create_link_url(user_id='psub_d4e5f6789012345678901234abcdef01')
print(f'Consent URL: {result.url}')
# After consent, query data
search_result = query_client.get_search(QueryParams(uid='psub_d4e5f6789012345678901234abcdef01'))
print(f'Search queries: {search_result.count}')
# Fetch all with pagination
all_browsing, applied_end = query_client.fetch_all(
query_client.get_browsing,
uid='psub_d4e5f6789012345678901234abcdef01'
)
print(f'Total browsing entries: {len(all_browsing)}')
# Delta sync - use applied_end for next sync
next_sync_start = applied_end
FastAPI Integration
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import RedirectResponse
import os
app = FastAPI()
# Initialize clients
config = EmergeConfig(
client_id=os.environ['EMERGE_CLIENT_ID'],
signing_secret=os.environ['EMERGE_SIGNING_SECRET'],
api_token=os.environ['EMERGE_API_TOKEN'],
redirect_uri='https://yourapp.com/emerge/callback'
)
link_client = EmergeLinkClient(config)
webhook_handler = EmergeWebhookHandler(os.environ['EMERGE_WEBHOOK_SECRET'])
@app.get('/connect')
async def connect(request: Request):
user_id = request.session.get('user_id')
result = link_client.create_link_url(user_id=user_id)
# Store state in session
request.session['emerge_state'] = result.state
return RedirectResponse(result.url)
@app.get('/emerge/callback')
async def callback(request: Request):
params = link_client.parse_callback(dict(request.query_params))
# Verify state
valid, user_id = link_client.verify_state(params.state)
if not valid:
raise HTTPException(status_code=400, detail='Invalid state')
if params.status in ('success', 'reauthorized'):
return RedirectResponse('/dashboard?connected=true')
return RedirectResponse(f'/error?code={params.error_code}')
@app.post('/webhooks/emerge')
async def webhook(request: Request):
body = await request.body()
signature = request.headers.get('x-signature', '')
try:
payload = webhook_handler.parse_webhook(body, signature)
except ValueError:
raise HTTPException(status_code=401, detail='Invalid signature')
if payload.event == 'consent.given':
print(f"Consent granted for user {payload.uid}")
elif payload.event == 'consent.revoked':
print(f"Consent revoked for user {payload.uid}")
elif payload.event == 'consent.expiring':
print(f"Consent expiring for user {payload.uid}")
elif payload.event == 'data.ready':
print(f"Data ready for user {payload.uid}: {payload.sources}")
elif payload.event == 'data.failed':
print(f"Data export failed for user {payload.uid}: {payload.sources}")
return {'status': 'ok'}