Skip to main content
Full Python implementation for integrating with Emerge. Copy these utilities into your project or use as reference.

Installation

pip install requests
For async support:
pip install httpx
import hmac
import hashlib
import time
import secrets
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urlencode
import requests


@dataclass
class EmergeConfig:
    client_id: str
    signing_secret: str
    api_token: str
    redirect_uri: str


@dataclass
class LinkResult:
    url: str
    state: str


@dataclass
class CallbackParams:
    status: str  # 'success', 'reauthorized', 'failure'
    state: str
    uid: str
    error_code: Optional[str] = None


class EmergeLinkClient:
    def __init__(self, config: EmergeConfig):
        self.config = config
        self._state_store: dict[str, dict] = {}

    def create_link_url(self, user_id: str) -> LinkResult:
        """Create a signed link URL for the consent flow."""
        # Use ISO 8601 timestamp
        timestamp = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
        state = secrets.token_hex(16)

        # Store state for verification
        self._state_store[state] = {
            'user_id': user_id,
            'created_at': time.time()
        }

        params = {
            'client_id': self.config.client_id,
            'redirect_uri': self.config.redirect_uri,
            'state': state,
            'timestamp': timestamp,
            'uid': user_id
        }

        # Sort and create signature (raw values, NOT URL-encoded)
        sorted_items = sorted(params.items())
        signature_base = '&'.join(f'{k}={v}' for k, v in sorted_items)

        signature = hmac.new(
            self.config.signing_secret.encode(),
            signature_base.encode(),
            hashlib.sha256
        ).hexdigest()

        params['signature'] = signature

        return LinkResult(
            url=f'https://link.emergedata.ai/link/start?{urlencode(params)}',
            state=state
        )

    def verify_state(self, state: str) -> tuple[bool, Optional[str]]:
        """Verify callback state parameter."""
        stored = self._state_store.pop(state, None)

        if not stored:
            return False, None

        # Check expiration (1 hour)
        if time.time() - stored['created_at'] > 3600:
            return False, None

        return True, stored['user_id']

    def parse_callback(self, query: dict) -> CallbackParams:
        """Parse callback parameters."""
        return CallbackParams(
            status=query.get('status', ''),
            state=query.get('state', ''),
            uid=query.get('uid', ''),
            error_code=query.get('error_code')
        )

    def get_consent_status(self, uid: str) -> dict:
        """Get consent status for a user.

        Returns:
            dict with 'consents' array containing provider, scopes,
            valid_until, status, issued_at for each consent
        """
        response = requests.get(
            f'https://link.emergedata.ai/consent/status/{uid}',
            headers={'Authorization': f'Bearer {self.config.api_token}'}
        )
        response.raise_for_status()
        return response.json()

    def get_export_status(self, uid: str) -> dict:
        """Get export status for a user.

        Returns:
            dict with uid and sources[] where each source contains:
            provider, data_ready, data_landed_at, export_status, export_completed_at
        """
        response = requests.get(
            f'https://link.emergedata.ai/export/status/{uid}',
            headers={'Authorization': f'Bearer {self.config.api_token}'}
        )
        response.raise_for_status()
        return response.json()

Query API Client

from dataclasses import dataclass
from typing import Optional, Callable, Any
import requests


@dataclass
class QueryParams:
    uid: str
    ingested_begin: Optional[str] = None
    ingested_end: Optional[str] = None
    cursor: Optional[str] = None
    limit: Optional[int] = None


@dataclass
class QueryResult:
    data: list[dict]
    count: int
    has_more: bool
    next_cursor: Optional[str]
    applied_ingested_end: Optional[str]


class EmergeQueryClient:
    def __init__(self, api_token: str):
        self.api_token = api_token
        self.base_url = 'https://query.emergedata.ai/v1'

    def _request(self, endpoint: str, params: QueryParams) -> QueryResult:
        url_params = {'uid': params.uid}

        if params.ingested_begin:
            url_params['ingested_begin'] = params.ingested_begin
        if params.ingested_end:
            url_params['ingested_end'] = params.ingested_end
        if params.cursor:
            url_params['cursor'] = params.cursor
        if params.limit:
            url_params['limit'] = str(params.limit)

        response = requests.get(
            f'{self.base_url}{endpoint}',
            params=url_params,
            headers={'Authorization': f'Bearer {self.api_token}'}
        )
        response.raise_for_status()

        result = response.json()
        return QueryResult(
            data=result['data'],
            count=result.get('count', len(result['data'])),
            has_more=result.get('has_more', False),
            next_cursor=result.get('next_cursor'),
            applied_ingested_end=result.get('applied_ingested_end')
        )

    def get_search(self, params: QueryParams) -> QueryResult:
        """Get search history (sync)."""
        return self._request('/sync/get_search', params)

    def get_browsing(self, params: QueryParams) -> QueryResult:
        """Get browsing history (sync)."""
        return self._request('/sync/get_browsing', params)

    def get_youtube(self, params: QueryParams) -> QueryResult:
        """Get YouTube history (sync)."""
        return self._request('/sync/get_youtube', params)

    def get_ads(self, params: QueryParams) -> QueryResult:
        """Get ad interactions (sync)."""
        return self._request('/sync/get_ads', params)

    def get_receipts(self, params: QueryParams) -> QueryResult:
        """Get receipts (sync)."""
        return self._request('/sync/get_receipts', params)

    def fetch_all(
        self,
        fetcher: Callable[[QueryParams], QueryResult],
        uid: str,
        ingested_begin: Optional[str] = None,
        ingested_end: Optional[str] = None
    ) -> tuple[list[dict], Optional[str]]:
        """Fetch all records with automatic pagination.

        Returns:
            Tuple of (all_data, applied_ingested_end)
        """
        all_data = []
        cursor = None
        applied_end = None

        while True:
            params = QueryParams(
                uid=uid,
                ingested_begin=ingested_begin,
                ingested_end=ingested_end,
                cursor=cursor
            )
            result = fetcher(params)
            all_data.extend(result.data)
            applied_end = result.applied_ingested_end

            if not result.has_more:
                break
            cursor = result.next_cursor

        return all_data, applied_end

Async Query Client

import httpx
from dataclasses import dataclass
from typing import Optional, Callable, Awaitable


class AsyncEmergeQueryClient:
    def __init__(self, api_token: str):
        self.api_token = api_token
        self.base_url = 'https://query.emergedata.ai/v1'

    async def _request(self, endpoint: str, params: QueryParams) -> QueryResult:
        url_params = {'uid': params.uid}

        if params.ingested_begin:
            url_params['ingested_begin'] = params.ingested_begin
        if params.ingested_end:
            url_params['ingested_end'] = params.ingested_end
        if params.cursor:
            url_params['cursor'] = params.cursor
        if params.limit:
            url_params['limit'] = str(params.limit)

        async with httpx.AsyncClient() as client:
            response = await client.get(
                f'{self.base_url}{endpoint}',
                params=url_params,
                headers={'Authorization': f'Bearer {self.api_token}'}
            )
            response.raise_for_status()

            result = response.json()
            return QueryResult(
                data=result['data'],
                count=result.get('count', len(result['data'])),
                has_more=result.get('has_more', False),
                next_cursor=result.get('next_cursor'),
                applied_ingested_end=result.get('applied_ingested_end')
            )

    async def get_search(self, params: QueryParams) -> QueryResult:
        return await self._request('/sync/get_search', params)

    async def get_browsing(self, params: QueryParams) -> QueryResult:
        return await self._request('/sync/get_browsing', params)

    async def get_youtube(self, params: QueryParams) -> QueryResult:
        return await self._request('/sync/get_youtube', params)

    async def get_ads(self, params: QueryParams) -> QueryResult:
        return await self._request('/sync/get_ads', params)

    async def get_receipts(self, params: QueryParams) -> QueryResult:
        return await self._request('/sync/get_receipts', params)

    async def fetch_all(
        self,
        fetcher: Callable[[QueryParams], Awaitable[QueryResult]],
        uid: str,
        ingested_begin: Optional[str] = None
    ) -> tuple[list[dict], Optional[str]]:
        all_data = []
        cursor = None
        applied_end = None

        while True:
            params = QueryParams(
                uid=uid,
                ingested_begin=ingested_begin,
                cursor=cursor
            )
            result = await fetcher(params)
            all_data.extend(result.data)
            applied_end = result.applied_ingested_end

            if not result.has_more:
                break
            cursor = result.next_cursor

        return all_data, applied_end

Webhook Handler

import hmac
import hashlib
import json
from dataclasses import dataclass
from typing import Literal


WebhookEvent = Literal[
    'consent.given',
    'consent.revoked',
    'consent.expiring',
    'consent.reauthorized',
    'data.ready',
    'data.failed'
]


@dataclass
class WebhookPayload:
    event: WebhookEvent
    timestamp: str
    uid: str
    client_id: str
    sources: list[dict]


class EmergeWebhookHandler:
    def __init__(self, webhook_secret: str):
        self.secret = webhook_secret

    def verify_signature(self, payload: bytes, signature: str) -> bool:
        """Verify webhook signature."""
        expected = hmac.new(
            self.secret.encode(),
            payload,
            hashlib.sha256
        ).hexdigest()

        return hmac.compare_digest(signature, expected)

    def parse_webhook(self, raw_body: bytes, signature: str) -> WebhookPayload:
        """Parse and verify webhook."""
        if not self.verify_signature(raw_body, signature):
            raise ValueError('Invalid webhook signature')

        data = json.loads(raw_body)
        return WebhookPayload(
            event=data['event'],
            timestamp=data['timestamp'],
            uid=data['uid'],
            client_id=data['client_id'],
            sources=data.get('sources', [])
        )

Usage Example

import os

# Initialize clients
config = EmergeConfig(
    client_id=os.environ['EMERGE_CLIENT_ID'],
    signing_secret=os.environ['EMERGE_SIGNING_SECRET'],
    api_token=os.environ['EMERGE_API_TOKEN'],
    redirect_uri='https://yourapp.com/emerge/callback'
)

link_client = EmergeLinkClient(config)
query_client = EmergeQueryClient(os.environ['EMERGE_API_TOKEN'])

# Create consent link
result = link_client.create_link_url(user_id='psub_d4e5f6789012345678901234abcdef01')
print(f'Consent URL: {result.url}')

# After consent, query data
search_result = query_client.get_search(QueryParams(uid='psub_d4e5f6789012345678901234abcdef01'))
print(f'Search queries: {search_result.count}')

# Fetch all with pagination
all_browsing, applied_end = query_client.fetch_all(
    query_client.get_browsing,
    uid='psub_d4e5f6789012345678901234abcdef01'
)
print(f'Total browsing entries: {len(all_browsing)}')

# Delta sync - use applied_end for next sync
next_sync_start = applied_end

FastAPI Integration

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import RedirectResponse
import os

app = FastAPI()

# Initialize clients
config = EmergeConfig(
    client_id=os.environ['EMERGE_CLIENT_ID'],
    signing_secret=os.environ['EMERGE_SIGNING_SECRET'],
    api_token=os.environ['EMERGE_API_TOKEN'],
    redirect_uri='https://yourapp.com/emerge/callback'
)

link_client = EmergeLinkClient(config)
webhook_handler = EmergeWebhookHandler(os.environ['EMERGE_WEBHOOK_SECRET'])


@app.get('/connect')
async def connect(request: Request):
    user_id = request.session.get('user_id')
    result = link_client.create_link_url(user_id=user_id)

    # Store state in session
    request.session['emerge_state'] = result.state

    return RedirectResponse(result.url)


@app.get('/emerge/callback')
async def callback(request: Request):
    params = link_client.parse_callback(dict(request.query_params))

    # Verify state
    valid, user_id = link_client.verify_state(params.state)
    if not valid:
        raise HTTPException(status_code=400, detail='Invalid state')

    if params.status in ('success', 'reauthorized'):
        return RedirectResponse('/dashboard?connected=true')

    return RedirectResponse(f'/error?code={params.error_code}')


@app.post('/webhooks/emerge')
async def webhook(request: Request):
    body = await request.body()
    signature = request.headers.get('x-signature', '')

    try:
        payload = webhook_handler.parse_webhook(body, signature)
    except ValueError:
        raise HTTPException(status_code=401, detail='Invalid signature')

    if payload.event == 'consent.given':
        print(f"Consent granted for user {payload.uid}")
    elif payload.event == 'consent.revoked':
        print(f"Consent revoked for user {payload.uid}")
    elif payload.event == 'consent.expiring':
        print(f"Consent expiring for user {payload.uid}")
    elif payload.event == 'data.ready':
        print(f"Data ready for user {payload.uid}: {payload.sources}")
    elif payload.event == 'data.failed':
        print(f"Data export failed for user {payload.uid}: {payload.sources}")

    return {'status': 'ok'}