Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.emergedata.ai/llms.txt

Use this file to discover all available pages before exploring further.

Full Python implementation for integrating with Emerge. Copy these utilities into your project or use as reference.

Installation

pip install requests
For async support:
pip install httpx
import hmac
import hashlib
import time
import secrets
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urlencode
import requests


@dataclass
class EmergeConfig:
    client_id: str
    signing_secret: str
    api_token: str
    redirect_uri: str


@dataclass
class LinkResult:
    url: str
    state: str


@dataclass
class CallbackParams:
    status: str  # 'success', 'reauthorized', 'failure'
    state: str
    uid: str
    error_code: Optional[str] = None


class EmergeLinkClient:
    def __init__(self, config: EmergeConfig):
        self.config = config
        self._state_store: dict[str, dict] = {}

    def create_link_url(self, user_id: str) -> LinkResult:
        """Create a signed link URL for the consent flow."""
        # Use ISO 8601 timestamp
        timestamp = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
        state = secrets.token_hex(16)

        # Store state for verification
        self._state_store[state] = {
            'user_id': user_id,
            'created_at': time.time()
        }

        params = {
            'client_id': self.config.client_id,
            'redirect_uri': self.config.redirect_uri,
            'state': state,
            'timestamp': timestamp,
            'uid': user_id
        }

        # Sort and create signature (raw values, NOT URL-encoded)
        sorted_items = sorted(params.items())
        signature_base = '&'.join(f'{k}={v}' for k, v in sorted_items)

        signature = hmac.new(
            self.config.signing_secret.encode(),
            signature_base.encode(),
            hashlib.sha256
        ).hexdigest()

        params['signature'] = signature

        return LinkResult(
            url=f'https://link.emergedata.ai/link/start?{urlencode(params)}',
            state=state
        )

    def verify_state(self, state: str) -> tuple[bool, Optional[str]]:
        """Verify callback state parameter."""
        stored = self._state_store.pop(state, None)

        if not stored:
            return False, None

        # Check expiration (1 hour)
        if time.time() - stored['created_at'] > 3600:
            return False, None

        return True, stored['user_id']

    def parse_callback(self, query: dict) -> CallbackParams:
        """Parse callback parameters."""
        return CallbackParams(
            status=query.get('status', ''),
            state=query.get('state', ''),
            uid=query.get('uid', ''),
            error_code=query.get('error_code')
        )

    def get_consent_status(self, uid: str) -> dict:
        """Get consent status for a user.

        Returns:
            dict with 'consents' array containing provider, scopes,
            valid_until, status, issued_at for each consent
        """
        response = requests.get(
            f'https://link.emergedata.ai/consent/status/{uid}',
            headers={'Authorization': f'Bearer {self.config.api_token}'}
        )
        response.raise_for_status()
        return response.json()

    def get_export_status(self, uid: str) -> dict:
        """Get export status for a user.

        Returns:
            dict with uid and sources[] where each source contains:
            provider, data_ready, data_landed_at, export_status, export_completed_at
        """
        response = requests.get(
            f'https://link.emergedata.ai/export/status/{uid}',
            headers={'Authorization': f'Bearer {self.config.api_token}'}
        )
        response.raise_for_status()
        return response.json()

Query API Client

from dataclasses import dataclass
from typing import Optional, Callable, Any
import requests


@dataclass
class QueryParams:
    uid: str
    ingested_begin: Optional[str] = None
    ingested_end: Optional[str] = None
    cursor: Optional[str] = None
    limit: Optional[int] = None


@dataclass
class QueryResult:
    data: list[dict]
    count: int
    has_more: bool
    next_cursor: Optional[str]
    applied_ingested_end: Optional[str]


class EmergeQueryClient:
    def __init__(self, api_token: str):
        self.api_token = api_token
        self.base_url = 'https://query.emergedata.ai/v1'

    def _request(self, endpoint: str, params: QueryParams) -> QueryResult:
        url_params = {'uid': params.uid}

        if params.ingested_begin:
            url_params['ingested_begin'] = params.ingested_begin
        if params.ingested_end:
            url_params['ingested_end'] = params.ingested_end
        if params.cursor:
            url_params['cursor'] = params.cursor
        if params.limit:
            url_params['limit'] = str(params.limit)

        response = requests.get(
            f'{self.base_url}{endpoint}',
            params=url_params,
            headers={'Authorization': f'Bearer {self.api_token}'}
        )
        response.raise_for_status()

        result = response.json()
        return QueryResult(
            data=result['data'],
            count=result.get('count', len(result['data'])),
            has_more=result.get('has_more', False),
            next_cursor=result.get('next_cursor'),
            applied_ingested_end=result.get('applied_ingested_end')
        )

    def get_search(self, params: QueryParams) -> QueryResult:
        """Get search history (sync)."""
        return self._request('/sync/get_search', params)

    def get_browsing(self, params: QueryParams) -> QueryResult:
        """Get browsing history (sync)."""
        return self._request('/sync/get_browsing', params)

    def get_youtube(self, params: QueryParams) -> QueryResult:
        """Get YouTube history (sync)."""
        return self._request('/sync/get_youtube', params)

    def get_ads(self, params: QueryParams) -> QueryResult:
        """Get ad interactions (sync)."""
        return self._request('/sync/get_ads', params)

    def get_receipts(self, params: QueryParams) -> QueryResult:
        """Get receipts (sync)."""
        return self._request('/sync/get_receipts', params)

    def fetch_all(
        self,
        fetcher: Callable[[QueryParams], QueryResult],
        uid: str,
        ingested_begin: Optional[str] = None,
        ingested_end: Optional[str] = None
    ) -> tuple[list[dict], Optional[str]]:
        """Fetch all records with automatic pagination.

        Returns:
            Tuple of (all_data, applied_ingested_end)
        """
        all_data = []
        cursor = None
        applied_end = None

        while True:
            params = QueryParams(
                uid=uid,
                ingested_begin=ingested_begin,
                ingested_end=ingested_end,
                cursor=cursor
            )
            result = fetcher(params)
            all_data.extend(result.data)
            applied_end = result.applied_ingested_end

            if not result.has_more:
                break
            cursor = result.next_cursor

        return all_data, applied_end

Async Query Client

import httpx
from dataclasses import dataclass
from typing import Optional, Callable, Awaitable


class AsyncEmergeQueryClient:
    def __init__(self, api_token: str):
        self.api_token = api_token
        self.base_url = 'https://query.emergedata.ai/v1'

    async def _request(self, endpoint: str, params: QueryParams) -> QueryResult:
        url_params = {'uid': params.uid}

        if params.ingested_begin:
            url_params['ingested_begin'] = params.ingested_begin
        if params.ingested_end:
            url_params['ingested_end'] = params.ingested_end
        if params.cursor:
            url_params['cursor'] = params.cursor
        if params.limit:
            url_params['limit'] = str(params.limit)

        async with httpx.AsyncClient() as client:
            response = await client.get(
                f'{self.base_url}{endpoint}',
                params=url_params,
                headers={'Authorization': f'Bearer {self.api_token}'}
            )
            response.raise_for_status()

            result = response.json()
            return QueryResult(
                data=result['data'],
                count=result.get('count', len(result['data'])),
                has_more=result.get('has_more', False),
                next_cursor=result.get('next_cursor'),
                applied_ingested_end=result.get('applied_ingested_end')
            )

    async def get_search(self, params: QueryParams) -> QueryResult:
        return await self._request('/sync/get_search', params)

    async def get_browsing(self, params: QueryParams) -> QueryResult:
        return await self._request('/sync/get_browsing', params)

    async def get_youtube(self, params: QueryParams) -> QueryResult:
        return await self._request('/sync/get_youtube', params)

    async def get_ads(self, params: QueryParams) -> QueryResult:
        return await self._request('/sync/get_ads', params)

    async def get_receipts(self, params: QueryParams) -> QueryResult:
        return await self._request('/sync/get_receipts', params)

    async def fetch_all(
        self,
        fetcher: Callable[[QueryParams], Awaitable[QueryResult]],
        uid: str,
        ingested_begin: Optional[str] = None
    ) -> tuple[list[dict], Optional[str]]:
        all_data = []
        cursor = None
        applied_end = None

        while True:
            params = QueryParams(
                uid=uid,
                ingested_begin=ingested_begin,
                cursor=cursor
            )
            result = await fetcher(params)
            all_data.extend(result.data)
            applied_end = result.applied_ingested_end

            if not result.has_more:
                break
            cursor = result.next_cursor

        return all_data, applied_end

Webhook Handler

import hmac
import hashlib
import json
from dataclasses import dataclass
from typing import Literal


WebhookEvent = Literal[
    'consent.given',
    'consent.revoked',
    'consent.expiring',
    'consent.reauthorized',
    'data.ready',
    'data.failed'
]


@dataclass
class WebhookPayload:
    event: WebhookEvent
    timestamp: str
    uid: str
    client_id: str
    sources: list[dict]


class EmergeWebhookHandler:
    def __init__(self, webhook_secret: str):
        self.secret = webhook_secret

    def verify_signature(self, payload: bytes, signature: str) -> bool:
        """Verify webhook signature."""
        expected = hmac.new(
            self.secret.encode(),
            payload,
            hashlib.sha256
        ).hexdigest()

        return hmac.compare_digest(signature, expected)

    def parse_webhook(self, raw_body: bytes, signature: str) -> WebhookPayload:
        """Parse and verify webhook."""
        if not self.verify_signature(raw_body, signature):
            raise ValueError('Invalid webhook signature')

        data = json.loads(raw_body)
        return WebhookPayload(
            event=data['event'],
            timestamp=data['timestamp'],
            uid=data['uid'],
            client_id=data['client_id'],
            sources=data.get('sources', [])
        )

Usage Example

import os

# Initialize clients
config = EmergeConfig(
    client_id=os.environ['EMERGE_CLIENT_ID'],
    signing_secret=os.environ['EMERGE_SIGNING_SECRET'],
    api_token=os.environ['EMERGE_API_TOKEN'],
    redirect_uri='https://yourapp.com/emerge/callback'
)

link_client = EmergeLinkClient(config)
query_client = EmergeQueryClient(os.environ['EMERGE_API_TOKEN'])

# Create consent link
result = link_client.create_link_url(user_id='psub_d4e5f6789012345678901234abcdef01')
print(f'Consent URL: {result.url}')

# After consent, query data
search_result = query_client.get_search(QueryParams(uid='psub_d4e5f6789012345678901234abcdef01'))
print(f'Search queries: {search_result.count}')

# Fetch all with pagination
all_browsing, applied_end = query_client.fetch_all(
    query_client.get_browsing,
    uid='psub_d4e5f6789012345678901234abcdef01'
)
print(f'Total browsing entries: {len(all_browsing)}')

# Delta sync - use applied_end for next sync
next_sync_start = applied_end

FastAPI Integration

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import RedirectResponse
import os

app = FastAPI()

# Initialize clients
config = EmergeConfig(
    client_id=os.environ['EMERGE_CLIENT_ID'],
    signing_secret=os.environ['EMERGE_SIGNING_SECRET'],
    api_token=os.environ['EMERGE_API_TOKEN'],
    redirect_uri='https://yourapp.com/emerge/callback'
)

link_client = EmergeLinkClient(config)
webhook_handler = EmergeWebhookHandler(os.environ['EMERGE_WEBHOOK_SECRET'])


@app.get('/connect')
async def connect(request: Request):
    user_id = request.session.get('user_id')
    result = link_client.create_link_url(user_id=user_id)

    # Store state in session
    request.session['emerge_state'] = result.state

    return RedirectResponse(result.url)


@app.get('/emerge/callback')
async def callback(request: Request):
    params = link_client.parse_callback(dict(request.query_params))

    # Verify state
    valid, user_id = link_client.verify_state(params.state)
    if not valid:
        raise HTTPException(status_code=400, detail='Invalid state')

    if params.status in ('success', 'reauthorized'):
        return RedirectResponse('/dashboard?connected=true')

    return RedirectResponse(f'/error?code={params.error_code}')


@app.post('/webhooks/emerge')
async def webhook(request: Request):
    body = await request.body()
    signature = request.headers.get('x-signature', '')

    try:
        payload = webhook_handler.parse_webhook(body, signature)
    except ValueError:
        raise HTTPException(status_code=401, detail='Invalid signature')

    if payload.event == 'consent.given':
        print(f"Consent granted for user {payload.uid}")
    elif payload.event == 'consent.revoked':
        print(f"Consent revoked for user {payload.uid}")
    elif payload.event == 'consent.expiring':
        print(f"Consent expiring for user {payload.uid}")
    elif payload.event == 'data.ready':
        print(f"Data ready for user {payload.uid}: {payload.sources}")
    elif payload.event == 'data.failed':
        print(f"Data export failed for user {payload.uid}: {payload.sources}")

    return {'status': 'ok'}