Customer Onboarding with the Journey Engine
The problem with onboarding across systems
A customer walks in — or clicks "Sign Up." What follows is rarely a single system's job.
A bank might touch a CRM, a KYC/AML screening service, a core banking system on a mainframe, a product provisioning platform, a notifications engine, and a relationship manager's task queue. A telco might span a CPQ tool, a network provisioning system, a billing platform, a device activation service, and a field operations team. Each system knows its piece. No system knows the whole story.
When something stalls — the customer calls, the manager escalates, the auditor asks — nobody can answer: where exactly is this customer, who last touched their record, and why did it take three weeks?
Purple8's Journey Engine solves this by making the graph the single source of truth for the entire onboarding lifecycle. Every system — legacy mainframe, modern SaaS, internal microservice — writes one event when it completes its step. The graph records it as an immutable edge. The full story is always queryable.
What gets stored
For every customer, the Journey Engine writes:
(JourneyInstance: customer_id="C-8812")
│
├─[:ADVANCED_TO]──────────────────────────────────────────────────────────────────┐
│ from_stage: "application_submitted" │
│ to_stage: "kyc_screening" │
│ actor: "CRMService" ← which system advanced it │
│ timestamp: "2026-01-14T09:01:22Z" │
│ notes: "Application validated, forwarded to KYC" │
│ ▼
├─[:ADVANCED_TO]──────────────────────────────────────────────────────────────────┐
│ from_stage: "kyc_screening" │
│ to_stage: "account_provisioning" │
│ actor: "KYCVendorAPI" │
│ timestamp: "2026-01-14T11:44:07Z" │
│ notes: "Clear. Risk score: 0.12" │
│ ▼
├─[:AI_ADVISED]───────────────────────────────────────────────────────────────────┐
│ action_type: "suggest_advance" │
│ reasoning: "KYC clear, no sanctions match, similar profiles approved" │
│ confidence: 0.94 │
│ timestamp: "2026-01-14T11:44:09Z" │
│ ▼
└─[:ADVANCED_TO]──────────────────────────────────────────────────────────────────┐
from_stage: "account_provisioning" │
to_stage: "products_assigned" │
actor: "CoreBankingMainframe" ← yes, the mainframe writes here too │
timestamp: "2026-01-14T14:03:55Z" │
notes: "Account 004-882771 opened. Sort code 20-44-18" ▼
(and so on)Every product taken, every department handoff, every AI decision, every human override — immutable edges in the graph, forever queryable.
Defining the journey
from purple8_graph import GraphEngine
from purple8_graph.journey import JourneyEngine, StageSpec, SLAPolicy, AIStepConfig, AIStepTrigger, AIActionType
from purple8_graph.genai import OpenAIProvider
engine = GraphEngine("./data")
je = JourneyEngine(engine)
je.define_journey(
journey_type="customer_onboarding",
stages=[
StageSpec("application_submitted",
owner_system="CRMService"),
StageSpec("kyc_screening",
owner_system="KYCVendorAPI",
sla=SLAPolicy(warn_after_seconds=3600, # 1 hour
breach_after_seconds=86400), # 24 hours
ai_step=AIStepConfig(
trigger=AIStepTrigger.ON_ENTER,
auto_execute=True,
allowed_actions=[AIActionType.SUGGEST_ADVANCE, AIActionType.FLAG],
extra_context={"policy": "Flag for manual review if risk score > 0.7"},
)),
StageSpec("manual_review", # human-in-the-loop stage
owner_system="ComplianceTeam",
requires_human=True, # creates a HITLTask node
sla=SLAPolicy(warn_after_seconds=7200,
breach_after_seconds=172800)),
StageSpec("account_provisioning",
owner_system="CoreBankingMainframe",
sla=SLAPolicy(warn_after_seconds=1800,
breach_after_seconds=14400)),
StageSpec("products_assigned",
owner_system="ProductCatalogue"),
StageSpec("welcome_sent",
owner_system="NotificationsEngine"),
StageSpec("relationship_manager_assigned",
owner_system="RMAllocationService"),
StageSpec("onboarding_complete"),
StageSpec("rejected"),
StageSpec("abandoned"),
],
description="End-to-end customer onboarding across CRM, KYC, Core Banking, and product systems",
)
# Register the AI advisor
provider = OpenAIProvider(api_key="...")
je.register_ai_advisor(advisor)Wiring your systems
Each system — new or legacy — makes one call when it completes its step. That is the entire integration contract.
Modern systems: REST webhook
Any system that can POST HTTP calls the /v1/journeys/instances/{id}/advance endpoint:
# KYC vendor fires this when screening completes
curl -X POST https://your-purple8-server/v1/journeys/instances/inst_C8812/advance \
-H "X-API-Key: $KEY" \
-H "Content-Type: application/json" \
-d '{
"to_stage": "account_provisioning",
"actor": "KYCVendorAPI",
"notes": "Clear. Risk score: 0.12. Reference: KYC-99241"
}'Done. The graph records the transition, the AI advisor fires, SLAs are checked, and a CDC event notifies downstream subscribers — all triggered by that one call.
Legacy systems: REST poller (Purple8 polls you)
For systems that cannot make outbound HTTP calls — mainframes, legacy batch systems, older core banking platforms — Purple8 polls them on a schedule instead:
from purple8_graph.connectors import RESTPoller, PollerConfig
# Purple8 polls the mainframe's REST adapter every 30 seconds
poller = RESTPoller(
je,
PollerConfig(
name="CoreBankingMainframe",
url="http://mainframe-adapter.internal/onboarding/status",
poll_interval_seconds=30,
headers={"Authorization": "Basic ..."},
# Map the mainframe's response fields to a journey advance
field_map={
"instance_field": "customer_ref", # maps to journey instance_id
"stage_field": "status_code", # maps to to_stage
"actor_field": "system_id",
"notes_field": "status_message",
},
)
)
poller.start()The poller runs as a background thread. The mainframe never needs to know Purple8 exists.
Event-driven systems: CDC subscription
Systems that emit events (Kafka, SQS, webhooks) can subscribe to Purple8's event bus to react to journey transitions:
from purple8_graph.cdc import EventBus
bus = EventBus()
async with bus.subscribe(tenant_id="acme-bank") as queue:
async for event in queue:
if event.event_type == "journey.advanced":
data = event.payload
if data["to_stage"] == "welcome_sent":
# Trigger the notifications engine
await notify_customer(data["entity_id"])Querying the full customer story
Because every interaction is a graph edge, any question about any customer at any point in time is a Cypher query — no joins across systems, no API calls to reconstruct history.
Where is this customer right now?
MATCH (i:JourneyInstance {entity_id: "C-8812"})
RETURN i.current_stage, i.sla_status, i.started_at, i.updated_atFull interaction timeline
MATCH (i:JourneyInstance {entity_id: "C-8812"})-[t:ADVANCED_TO]->()
RETURN t.from_stage, t.to_stage, t.actor, t.timestamp, t.notes
ORDER BY t.timestampEvery product this customer ever took
MATCH (i:JourneyInstance {entity_id: "C-8812"})-[t:ADVANCED_TO]->()
WHERE t.to_stage = "products_assigned"
RETURN t.notes, t.actor, t.timestampEvery person who ever touched this customer's record
MATCH (i:JourneyInstance {entity_id: "C-8812"})-[t:ADVANCED_TO|AI_ADVISED|HITL_RESOLVED]->()
RETURN DISTINCT t.actor, collect(t.to_stage) AS stages_touched
ORDER BY t.actorAll customers stalled in KYC screening for more than 24 hours
MATCH (i:JourneyInstance {journey_type: "customer_onboarding", current_stage: "kyc_screening"})
WHERE i.sla_status IN ["BREACHED", "AT_RISK"]
RETURN i.entity_id, i.entered_at, i.sla_status
ORDER BY i.entered_atAI flagged for manual review — why?
MATCH (i:JourneyInstance)-[a:AI_ADVISED]->()
WHERE a.action_type = "flag" AND i.journey_type = "customer_onboarding"
RETURN i.entity_id, a.reasoning, a.confidence, a.timestamp
ORDER BY a.timestamp DESCAverage time per stage across all customers this month
MATCH (i:JourneyInstance {journey_type: "customer_onboarding"})
-[t:ADVANCED_TO]->()
WHERE t.timestamp >= "2026-03-01T00:00:00Z"
RETURN t.from_stage,
avg(duration.inSeconds(t.entered_at, t.timestamp)) AS avg_seconds,
count(*) AS volume
ORDER BY avg_seconds DESCHuman-in-the-loop: compliance review
When the AI flags a customer for manual review, Purple8 creates a HITLTask node automatically. The compliance team works through a queue of these tasks via the REST API — no separate task system needed:
# Compliance officer claims the task
curl -X POST http://purple8-server/v1/hitl/tasks/task_88821/claim \
-H "Authorization: Bearer $JWT"
# After review — approve and advance
curl -X POST http://purple8-server/v1/hitl/tasks/task_88821/resolve \
-H "Authorization: Bearer $JWT" \
-d '{"decision": "approve", "notes": "Manual check complete. Customer verified."}'The decision is written to the graph as a HITL_RESOLVED edge with the officer's identity, timestamp, and notes. The customer's journey automatically advances to account_provisioning.
Real-time dashboard
Subscribe to the WebSocket stream to power a live onboarding dashboard — no polling:
import websockets, json, asyncio
async def onboarding_feed():
uri = "wss://purple8-server/ws/changes?event_type=journey.advanced&tenant_id=acme-bank"
async with websockets.connect(uri, extra_headers={"X-API-Key": KEY}) as ws:
async for message in ws:
event = json.loads(message)
print(f"[{event['payload']['entity_id']}] "
f"{event['payload']['from_stage']} → {event['payload']['to_stage']} "
f"by {event['payload']['actor']}")What you do NOT need
| What you might expect to need | Why you don't need it with Purple8 |
|---|---|
| A separate workflow database (Postgres, Redis) | Journey state is the graph |
| An ETL pipeline to reconstruct history | History is always in the graph, always queryable |
| A separate audit log service | Every transition is an immutable graph edge |
| Custom compliance reporting | Cypher queries against the graph |
| An orchestration layer (Temporal, Airflow) | Journey Engine handles state, SLAs, and retries |
| A middleware bus to coordinate systems | Each system calls one endpoint. That's it. |
What it looks like after 90 days
After 90 days of customer onboarding flowing through Purple8, the graph is your institutional memory:
- Every customer journey, start to finish, traversable in milliseconds
- Every AI decision with reasoning and confidence score, immutable
- Every human override, by whom, when, and why
- Every SLA breach — which stage, which system, how long
- Full product history per customer, linked to the journey that created it
- Cross-department analytics without any ETL
-- Which department causes the most SLA breaches?
MATCH ()-[b:SLA_BREACHED]->(stage)
WHERE b.journey_type = "customer_onboarding"
RETURN stage.owner_system, count(*) AS breach_count
ORDER BY breach_count DESC
-- Which relationship managers have the fastest onboarding times?
MATCH (i:JourneyInstance {journey_type: "customer_onboarding",
current_stage: "onboarding_complete"})
-[rm:ADVANCED_TO {to_stage: "relationship_manager_assigned"}]->()
RETURN rm.actor AS rm_name,
avg(duration.inSeconds(i.started_at, i.completed_at)) AS avg_seconds
ORDER BY avg_secondsSee Also
- Journey Engine guide — full API reference, SLA configuration, AI advisor setup
- Graph as Memory — how patterns from past journeys feed future AI decisions
- MCP Integration — let Claude or Cursor query any customer's journey in natural language
- Real-time Augmented AI — combining live CDC events with graph context