Skip to main content
This guide covers best practices for building AdCP orchestrators that handle asynchronous operations, pending states, and human-in-the-loop workflows.

Core Design Principles

1. Asynchronous First

The AdCP protocol is inherently asynchronous. Operations may take seconds, hours, or even days to complete. DO:
  • Design all operations as async/await
  • Store operation state persistently
  • Handle orchestrator restarts gracefully
  • Implement proper timeout handling
DON’T:
  • Assume immediate completion
  • Use synchronous blocking calls
  • Store state only in memory
  • Retry indefinitely without backoff

2. Status-Driven Logic

Operations progress through standardized status values:
TASK_STATUSES = {
    "submitted",      # Long-running (hours to days) - provide webhook or poll
    "working",        # Processing (< 120 seconds) - poll frequently
    "input-required", # Need user input/approval - continue conversation
    "completed",      # Success - process results
    "failed",         # Error - handle appropriately
    "canceled",       # User canceled
    "auth-required"   # Need authentication
}

3. State Machine Design

Implement proper state machines aligned with AdCP task statuses:
class OperationState(Enum):
    # Local orchestrator states
    REQUESTED = "requested"
    CALLING_ADCP = "calling_adcp"

    # AdCP task states (match server responses)
    SUBMITTED = "submitted"
    WORKING = "working"
    INPUT_REQUIRED = "input_required"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELED = "canceled"

# Valid state transitions
VALID_TRANSITIONS = {
    "requested": ["calling_adcp"],
    "calling_adcp": ["submitted", "working", "input_required", "completed", "failed"],
    "submitted": ["working", "completed", "failed", "canceled"],
    "working": ["completed", "failed", "input_required"],
    "input_required": ["submitted", "working", "completed", "failed"]
}

Operation Tracking

Persistent Storage

Store all operations with comprehensive tracking:
class OperationTracker:
    def __init__(self, db):
        self.db = db

    async def create_operation(self, operation_type, request_data, webhook_config=None):
        operation = {
            "id": str(uuid.uuid4()),
            "type": operation_type,
            "status": "requested",
            "request": request_data,
            "webhook_config": webhook_config,
            "created_at": datetime.now(),
            "updated_at": datetime.now(),
            "task_id": None,
            "context_id": None,
            "result": None,
            "error": None
        }
        await self.db.operations.insert_one(operation)
        return operation["id"]

    async def update_status(self, operation_id, status, **kwargs):
        update = {
            "status": status,
            "updated_at": datetime.now()
        }
        update.update(kwargs)

        await self.db.operations.update_one(
            {"id": operation_id},
            {"$set": update}
        )

    async def get_pending_operations(self):
        """Get all operations that need monitoring"""
        return await self.db.operations.find({
            "status": {"$in": ["submitted", "working", "input_required"]}
        }).to_list(length=None)

State Reconciliation

Sync local state with server on startup:
async def reconcile_with_server(self, adcp_client):
    """Sync local state with server using tasks/list"""
    server_tasks = await adcp_client.call('tasks/list', {
        'filters': {'statuses': ['submitted', 'working', 'input_required']}
    })

    server_task_ids = {task['task_id'] for task in server_tasks['tasks']}
    local_operations = await self.get_pending_operations()
    local_task_ids = {op['task_id'] for op in local_operations if op['task_id']}

    return {
        'orphaned_on_server': server_task_ids - local_task_ids,
        'missing_from_server': local_task_ids - server_task_ids,
        'total_pending_server': len(server_tasks['tasks']),
        'total_pending_local': len(local_operations)
    }

Async Operation Handler

Response Routing

Handle responses based on status:
class AsyncOperationHandler:
    def __init__(self, adcp_client, tracker, notifier):
        self.adcp = adcp_client
        self.tracker = tracker
        self.notifier = notifier
        self.polling_tasks = {}

    async def handle_operation_response(self, operation_id, response):
        """Handle any AdCP response with proper status routing"""
        status = response.get("status")

        # Update operation with response details
        await self.tracker.update_status(
            operation_id,
            status,
            task_id=response.get("task_id"),
            context_id=response.get("context_id"),
            result=response.get("result") if status == "completed" else None,
            error=response.get("error") if status == "failed" else None
        )

        # Route based on status
        if status == "completed":
            await self._handle_completed(operation_id, response)
        elif status == "failed":
            await self._handle_failed(operation_id, response)
        elif status == "submitted":
            await self._handle_submitted(operation_id, response)
        elif status == "working":
            await self._handle_working(operation_id, response)
        elif status == "input_required":
            await self._handle_input_required(operation_id, response)

Submitted Operations

Handle long-running operations:
async def _handle_submitted(self, operation_id, response):
    """Handle long-running operations"""
    task_id = response["task_id"]

    # Check if webhook is configured
    operation = await self.tracker.get_operation(operation_id)
    webhook_config = operation.get("webhook_config")

    if webhook_config:
        # Webhook will handle completion notification
        await self.notifier.notify_submitted_with_webhook(operation_id, task_id)
    else:
        # Start polling for completion
        polling_task = asyncio.create_task(
            self._poll_for_completion(operation_id, task_id, interval=60)
        )
        self.polling_tasks[task_id] = polling_task

Polling with Backoff

Implement efficient polling:
async def _poll_for_completion(self, operation_id, task_id, interval=60):
    """Poll task status until completion"""
    max_polls = 1440 if interval == 60 else 24  # 24 hours or 2 minutes
    poll_count = 0

    while poll_count < max_polls:
        try:
            await asyncio.sleep(interval)
            poll_count += 1

            task_response = await self.adcp.call('tasks/get', {
                'task_id': task_id,
                'include_result': True
            })

            await self.handle_operation_response(operation_id, task_response)

            if task_response["status"] in ["completed", "failed", "canceled"]:
                break

        except Exception as e:
            await self.tracker.update_status(
                operation_id,
                "failed",
                error=f"Polling error: {str(e)}"
            )
            break

    self.polling_tasks.pop(task_id, None)

    if poll_count >= max_polls:
        await self.tracker.update_status(
            operation_id,
            "failed",
            error="Task polling timeout"
        )

Webhook Support

Reliable Webhook Handler

Implement webhooks with reliability patterns:
class WebhookHandler:
    def __init__(self, tracker, notifier, secret_key):
        self.tracker = tracker
        self.notifier = notifier
        self.secret_key = secret_key
        self.processed_events = {}

    def verify_webhook_signature(self, payload: bytes, signature: str) -> bool:
        """Verify webhook authenticity"""
        expected_signature = hmac.new(
            self.secret_key.encode(),
            payload,
            hashlib.sha256
        ).hexdigest()
        return signature == f"sha256={expected_signature}"

    async def is_replay_attack(self, timestamp: str, event_id: str) -> bool:
        """Prevent replay attacks using timestamp and event ID"""
        event_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
        now = datetime.now()

        if now - event_time > timedelta(minutes=5):
            return True

        return event_id in self.processed_events

Webhook + Polling Backup

Never rely solely on webhooks:
class ReliableWebhookOrchestrator:
    def __init__(self):
        self.webhook_timeout = timedelta(minutes=10)
        self.backup_polling_delay = timedelta(minutes=2)

    async def _handle_submitted_with_webhook(self, operation_id, task_id):
        """Handle submitted task with webhook + backup polling"""

        async def backup_polling():
            await asyncio.sleep(self.backup_polling_delay.total_seconds())

            operation = await tracker.get_operation(operation_id)
            if operation["status"] not in ["completed", "failed", "canceled"]:
                logger.info(f"Starting backup polling for task {task_id}")
                await self._poll_for_completion(operation_id, task_id, interval=60)

        asyncio.create_task(backup_polling())

Example Orchestrator

Complete orchestrator implementation:
class AdCPOrchestrator:
    def __init__(self):
        self.adcp = AdCPClient()
        self.tracker = OperationTracker(db)
        self.handler = AsyncOperationHandler(self.adcp, self.tracker, UserNotifier())
        self.webhook_base_url = "https://orchestrator.com/webhooks"

    async def create_campaign(self, user_id, request, enable_webhook=True):
        """Create a campaign with governance validation and full async handling.

        Plans must already be synced via sync_plans before calling this method.
        Plan creation happens during the planning phase, not at campaign creation time.
        """

        # 1. Run intent check (plan must already exist)
        if request.get("governance_context"):
            gov_check = await self.adcp.call("check_governance", {
                "plan_id": request["governance_context"]["plan_id"],
                "caller": request["governance_context"]["caller"],
                "tool": "create_media_buy",
                "payload": request
            })
            if gov_check["status"] == "denied":
                raise GovernanceDeniedError(gov_check["explanation"])
            if gov_check["status"] == "conditions":
                raise GovernanceConditionsError(gov_check["conditions"])
            # If check_governance needs human review internally, it returns
            # async task status (submitted/working) and resolves to
            # approved or denied — standard task lifecycle.

        # 2. Create the media buy
        await self._create_media_buy(user_id, request, enable_webhook)

    async def _create_media_buy(self, user_id, request, enable_webhook=True):
        """Create a media buy with full async handling."""

        # 1. Prepare webhook configuration
        webhook_config = None
        if enable_webhook:
            webhook_config = {
                "webhook_url": f"{self.webhook_base_url}/adcp/{user_id}",
                "webhook_auth": {
                    "type": "bearer",
                    "credentials": await self.get_webhook_token(user_id)
                }
            }

        # 2. Create operation record
        operation_id = await self.tracker.create_operation(
            "create_media_buy",
            request,
            webhook_config=webhook_config
        )

        try:
            # 3. Call AdCP
            response = await self.adcp.call("create_media_buy", request, webhook_config)

            # 4. Handle response
            await self.handler.handle_operation_response(operation_id, response)

            # 5. Return appropriate response to user
            return self._format_user_response(operation_id, response)

        except Exception as e:
            await self.tracker.update_status(operation_id, "failed", error=str(e))
            raise

    async def reconcile_state_on_startup(self):
        """Recover from orchestrator restart"""
        reconciliation = await self.tracker.reconcile_with_server(self.adcp)
        logger.info(f"State reconciliation: {reconciliation}")

        for task_id in reconciliation["orphaned_on_server"]:
            # Resume monitoring orphaned tasks
            operation_id = await self.tracker.create_operation(
                "unknown",
                {},
                status="submitted"
            )
            await self.tracker.update_status(operation_id, "submitted", task_id=task_id)
            asyncio.create_task(
                self.handler._poll_for_completion(operation_id, task_id)
            )

Governance in the Campaign Lifecycle

Plan creation (sync_plans) happens during the planning phase — before any campaigns exist. Governance checks happen during campaign execution. These are separate concerns. Planning phase (once per media plan):
sync_plans — orchestrator pushes the plan to the governance agent
Campaign execution (per media buy):
check_governance(tool + payload) → create_media_buy → check_governance(media_buy_id + planned_delivery) → delivery → report_plan_outcome
PhaseWho callsTaskWhat happens on failure
Intent checkOrchestratorcheck_governance (tool + payload)Campaign violates buyer’s plan — denied or conditioned before any spend. If the governance agent needs human review, the task goes async and resolves to approved or denied.
Execution checkSellercheck_governance (media_buy_id + planned_delivery)Seller’s delivery plan doesn’t match buyer’s expectations — purchase blocked
Delivery checkSellercheck_governance (phase: delivery + delivery_metrics)Drift detected — pacing, geo, or channel distribution deviates from plan
Plan outcomeOrchestratorreport_plan_outcomeNo feedback loop — governance agent cannot improve future recommendations
See the media buy governance workflow for the complete sequence with code examples, and the seller integration guide for the seller’s execution check obligations.

Best Practices

1. Persistent Storage

Always use persistent storage for operation state:
  • Database (PostgreSQL, MongoDB)
  • Message queue (Redis, RabbitMQ)
  • Distributed cache (Redis Cluster)

2. Idempotency

Make all operations idempotent:
async def create_media_buy_idempotent(self, request):
    existing = await self.db.operations.find_one({
        "type": "create_media_buy",
        "request.po_number": request["po_number"],
        "status": {"$in": ["created", "active"]}
    })

    if existing:
        return existing["result"]

    return await self.create_media_buy(request)

3. Timeout Handling

Implement reasonable timeouts:
OPERATION_TIMEOUTS = {
    "create_media_buy": timedelta(hours=24),
    "update_media_buy": timedelta(hours=12),
    "creative_approval": timedelta(hours=48)
}

4. Error Recovery

Implement retry logic with circuit breakers:
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(min=1, max=60),
    retry=retry_if_exception_type(TransientError)
)
async def call_adcp_api(self, tool, params):
    try:
        return await self.adcp.call(tool, params)
    except RateLimitError:
        raise TransientError("Rate limited")
    except NetworkError:
        raise TransientError("Network error")

5. Monitoring and Alerting

Track key metrics:
  • Pending operation count by type
  • Average approval time
  • Rejection rate
  • Task timeout rate
  • API error rate

User Communication

Keep users informed about pending operations:
class UserNotifier:
    async def notify_pending_approval(self, user_id, operation):
        message = {
            "type": "pending_approval",
            "operation_id": operation["id"],
            "message": "Your media buy requires publisher approval",
            "estimated_time": "2-4 hours"
        }
        await self.send_notification(user_id, message)

    async def notify_approval(self, user_id, operation):
        message = {
            "type": "operation_approved",
            "operation_id": operation["id"],
            "message": "Your media buy has been approved",
            "media_buy_id": operation["result"]["media_buy_id"]
        }
        await self.send_notification(user_id, message)

Summary

Building a robust AdCP orchestrator requires:
  1. Asynchronous design throughout
  2. Proper state management with persistence
  3. Graceful handling of pending states
  4. User communication for long-running operations
  5. Monitoring and observability
Remember: Pending states are not errors - they’re a normal part of the advertising workflow.

Next Steps

  • Task Lifecycle: See Task Lifecycle for status handling
  • Webhooks: See Webhooks for push notifications
  • Security: See Security for multi-tenant security