class AdCPOrchestrator:
def __init__(self):
self.adcp = AdCPClient()
self.tracker = OperationTracker(db)
self.handler = AsyncOperationHandler(self.adcp, self.tracker, UserNotifier())
self.webhook_base_url = "https://orchestrator.com/webhooks"
async def create_campaign(self, user_id, request, enable_webhook=True):
"""Create a campaign with governance validation and full async handling.
Plans must already be synced via sync_plans before calling this method.
Plan creation happens during the planning phase, not at campaign creation time.
"""
# 1. Run intent check (plan must already exist)
if request.get("governance_context"):
gov_check = await self.adcp.call("check_governance", {
"plan_id": request["governance_context"]["plan_id"],
"caller": request["governance_context"]["caller"],
"tool": "create_media_buy",
"payload": request
})
if gov_check["status"] == "denied":
raise GovernanceDeniedError(gov_check["explanation"])
if gov_check["status"] == "conditions":
raise GovernanceConditionsError(gov_check["conditions"])
# If check_governance needs human review internally, it returns
# async task status (submitted/working) and resolves to
# approved or denied — standard task lifecycle.
# 2. Create the media buy
await self._create_media_buy(user_id, request, enable_webhook)
async def _create_media_buy(self, user_id, request, enable_webhook=True):
"""Create a media buy with full async handling."""
# 1. Prepare webhook configuration
webhook_config = None
if enable_webhook:
webhook_config = {
"webhook_url": f"{self.webhook_base_url}/adcp/{user_id}",
"webhook_auth": {
"type": "bearer",
"credentials": await self.get_webhook_token(user_id)
}
}
# 2. Create operation record
operation_id = await self.tracker.create_operation(
"create_media_buy",
request,
webhook_config=webhook_config
)
try:
# 3. Call AdCP
response = await self.adcp.call("create_media_buy", request, webhook_config)
# 4. Handle response
await self.handler.handle_operation_response(operation_id, response)
# 5. Return appropriate response to user
return self._format_user_response(operation_id, response)
except Exception as e:
await self.tracker.update_status(operation_id, "failed", error=str(e))
raise
async def reconcile_state_on_startup(self):
"""Recover from orchestrator restart"""
reconciliation = await self.tracker.reconcile_with_server(self.adcp)
logger.info(f"State reconciliation: {reconciliation}")
for task_id in reconciliation["orphaned_on_server"]:
# Resume monitoring orphaned tasks
operation_id = await self.tracker.create_operation(
"unknown",
{},
status="submitted"
)
await self.tracker.update_status(operation_id, "submitted", task_id=task_id)
asyncio.create_task(
self.handler._poll_for_completion(operation_id, task_id)
)