Skip to content

Customer Onboarding with the Journey Engine

The problem with onboarding across systems

A customer walks in — or clicks "Sign Up." What follows is rarely a single system's job.

A bank might touch a CRM, a KYC/AML screening service, a core banking system on a mainframe, a product provisioning platform, a notifications engine, and a relationship manager's task queue. A telco might span a CPQ tool, a network provisioning system, a billing platform, a device activation service, and a field operations team. Each system knows its piece. No system knows the whole story.

When something stalls — the customer calls, the manager escalates, the auditor asks — nobody can answer: where exactly is this customer, who last touched their record, and why did it take three weeks?

Purple8's Journey Engine solves this by making the graph the single source of truth for the entire onboarding lifecycle. Every system — legacy mainframe, modern SaaS, internal microservice — writes one event when it completes its step. The graph records it as an immutable edge. The full story is always queryable.


What gets stored

For every customer, the Journey Engine writes:

(JourneyInstance: customer_id="C-8812")

    ├─[:ADVANCED_TO]──────────────────────────────────────────────────────────────────┐
    │   from_stage:   "application_submitted"                                         │
    │   to_stage:     "kyc_screening"                                                 │
    │   actor:        "CRMService"           ← which system advanced it               │
    │   timestamp:    "2026-01-14T09:01:22Z"                                          │
    │   notes:        "Application validated, forwarded to KYC"                       │
    │                                                                                 ▼
    ├─[:ADVANCED_TO]──────────────────────────────────────────────────────────────────┐
    │   from_stage:   "kyc_screening"                                                 │
    │   to_stage:     "account_provisioning"                                          │
    │   actor:        "KYCVendorAPI"                                                  │
    │   timestamp:    "2026-01-14T11:44:07Z"                                          │
    │   notes:        "Clear. Risk score: 0.12"                                       │
    │                                                                                 ▼
    ├─[:AI_ADVISED]───────────────────────────────────────────────────────────────────┐
    │   action_type:  "suggest_advance"                                               │
    │   reasoning:    "KYC clear, no sanctions match, similar profiles approved"      │
    │   confidence:   0.94                                                            │
    │   timestamp:    "2026-01-14T11:44:09Z"                                          │
    │                                                                                 ▼
    └─[:ADVANCED_TO]──────────────────────────────────────────────────────────────────┐
        from_stage:   "account_provisioning"                                         │
        to_stage:     "products_assigned"                                            │
        actor:        "CoreBankingMainframe"  ← yes, the mainframe writes here too   │
        timestamp:    "2026-01-14T14:03:55Z"                                         │
        notes:        "Account 004-882771 opened. Sort code 20-44-18"               ▼
                                                                             (and so on)

Every product taken, every department handoff, every AI decision, every human override — immutable edges in the graph, forever queryable.


Defining the journey

python
from purple8_graph import GraphEngine
from purple8_graph.journey import JourneyEngine, StageSpec, SLAPolicy, AIStepConfig, AIStepTrigger, AIActionType
from purple8_graph.genai import OpenAIProvider

engine = GraphEngine("./data")
je = JourneyEngine(engine)

je.define_journey(
    journey_type="customer_onboarding",
    stages=[
        StageSpec("application_submitted",
                  owner_system="CRMService"),

        StageSpec("kyc_screening",
                  owner_system="KYCVendorAPI",
                  sla=SLAPolicy(warn_after_seconds=3600,    # 1 hour
                                breach_after_seconds=86400), # 24 hours
                  ai_step=AIStepConfig(
                      trigger=AIStepTrigger.ON_ENTER,
                      auto_execute=True,
                      allowed_actions=[AIActionType.SUGGEST_ADVANCE, AIActionType.FLAG],
                      extra_context={"policy": "Flag for manual review if risk score > 0.7"},
                  )),

        StageSpec("manual_review",            # human-in-the-loop stage
                  owner_system="ComplianceTeam",
                  requires_human=True,         # creates a HITLTask node
                  sla=SLAPolicy(warn_after_seconds=7200,
                                breach_after_seconds=172800)),

        StageSpec("account_provisioning",
                  owner_system="CoreBankingMainframe",
                  sla=SLAPolicy(warn_after_seconds=1800,
                                breach_after_seconds=14400)),

        StageSpec("products_assigned",
                  owner_system="ProductCatalogue"),

        StageSpec("welcome_sent",
                  owner_system="NotificationsEngine"),

        StageSpec("relationship_manager_assigned",
                  owner_system="RMAllocationService"),

        StageSpec("onboarding_complete"),
        StageSpec("rejected"),
        StageSpec("abandoned"),
    ],
    description="End-to-end customer onboarding across CRM, KYC, Core Banking, and product systems",
)

# Register the AI advisor
provider = OpenAIProvider(api_key="...")
je.register_ai_advisor(advisor)

Wiring your systems

Each system — new or legacy — makes one call when it completes its step. That is the entire integration contract.

Modern systems: REST webhook

Any system that can POST HTTP calls the /v1/journeys/instances/{id}/advance endpoint:

bash
# KYC vendor fires this when screening completes
curl -X POST https://your-purple8-server/v1/journeys/instances/inst_C8812/advance \
  -H "X-API-Key: $KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "to_stage": "account_provisioning",
    "actor":    "KYCVendorAPI",
    "notes":    "Clear. Risk score: 0.12. Reference: KYC-99241"
  }'

Done. The graph records the transition, the AI advisor fires, SLAs are checked, and a CDC event notifies downstream subscribers — all triggered by that one call.

Legacy systems: REST poller (Purple8 polls you)

For systems that cannot make outbound HTTP calls — mainframes, legacy batch systems, older core banking platforms — Purple8 polls them on a schedule instead:

python
from purple8_graph.connectors import RESTPoller, PollerConfig

# Purple8 polls the mainframe's REST adapter every 30 seconds
poller = RESTPoller(
    je,
    PollerConfig(
        name="CoreBankingMainframe",
        url="http://mainframe-adapter.internal/onboarding/status",
        poll_interval_seconds=30,
        headers={"Authorization": "Basic ..."},
        # Map the mainframe's response fields to a journey advance
        field_map={
            "instance_field": "customer_ref",   # maps to journey instance_id
            "stage_field":    "status_code",    # maps to to_stage
            "actor_field":    "system_id",
            "notes_field":    "status_message",
        },
    )
)
poller.start()

The poller runs as a background thread. The mainframe never needs to know Purple8 exists.

Event-driven systems: CDC subscription

Systems that emit events (Kafka, SQS, webhooks) can subscribe to Purple8's event bus to react to journey transitions:

python
from purple8_graph.cdc import EventBus

bus = EventBus()

async with bus.subscribe(tenant_id="acme-bank") as queue:
    async for event in queue:
        if event.event_type == "journey.advanced":
            data = event.payload
            if data["to_stage"] == "welcome_sent":
                # Trigger the notifications engine
                await notify_customer(data["entity_id"])

Querying the full customer story

Because every interaction is a graph edge, any question about any customer at any point in time is a Cypher query — no joins across systems, no API calls to reconstruct history.

Where is this customer right now?

cypher
MATCH (i:JourneyInstance {entity_id: "C-8812"})
RETURN i.current_stage, i.sla_status, i.started_at, i.updated_at

Full interaction timeline

cypher
MATCH (i:JourneyInstance {entity_id: "C-8812"})-[t:ADVANCED_TO]->()
RETURN t.from_stage, t.to_stage, t.actor, t.timestamp, t.notes
ORDER BY t.timestamp

Every product this customer ever took

cypher
MATCH (i:JourneyInstance {entity_id: "C-8812"})-[t:ADVANCED_TO]->()
WHERE t.to_stage = "products_assigned"
RETURN t.notes, t.actor, t.timestamp

Every person who ever touched this customer's record

cypher
MATCH (i:JourneyInstance {entity_id: "C-8812"})-[t:ADVANCED_TO|AI_ADVISED|HITL_RESOLVED]->()
RETURN DISTINCT t.actor, collect(t.to_stage) AS stages_touched
ORDER BY t.actor

All customers stalled in KYC screening for more than 24 hours

cypher
MATCH (i:JourneyInstance {journey_type: "customer_onboarding", current_stage: "kyc_screening"})
WHERE i.sla_status IN ["BREACHED", "AT_RISK"]
RETURN i.entity_id, i.entered_at, i.sla_status
ORDER BY i.entered_at

AI flagged for manual review — why?

cypher
MATCH (i:JourneyInstance)-[a:AI_ADVISED]->()
WHERE a.action_type = "flag" AND i.journey_type = "customer_onboarding"
RETURN i.entity_id, a.reasoning, a.confidence, a.timestamp
ORDER BY a.timestamp DESC

Average time per stage across all customers this month

cypher
MATCH (i:JourneyInstance {journey_type: "customer_onboarding"})
      -[t:ADVANCED_TO]->()
WHERE t.timestamp >= "2026-03-01T00:00:00Z"
RETURN t.from_stage,
       avg(duration.inSeconds(t.entered_at, t.timestamp)) AS avg_seconds,
       count(*) AS volume
ORDER BY avg_seconds DESC

Human-in-the-loop: compliance review

When the AI flags a customer for manual review, Purple8 creates a HITLTask node automatically. The compliance team works through a queue of these tasks via the REST API — no separate task system needed:

bash
# Compliance officer claims the task
curl -X POST http://purple8-server/v1/hitl/tasks/task_88821/claim \
  -H "Authorization: Bearer $JWT"

# After review — approve and advance
curl -X POST http://purple8-server/v1/hitl/tasks/task_88821/resolve \
  -H "Authorization: Bearer $JWT" \
  -d '{"decision": "approve", "notes": "Manual check complete. Customer verified."}'

The decision is written to the graph as a HITL_RESOLVED edge with the officer's identity, timestamp, and notes. The customer's journey automatically advances to account_provisioning.


Real-time dashboard

Subscribe to the WebSocket stream to power a live onboarding dashboard — no polling:

python
import websockets, json, asyncio

async def onboarding_feed():
    uri = "wss://purple8-server/ws/changes?event_type=journey.advanced&tenant_id=acme-bank"
    async with websockets.connect(uri, extra_headers={"X-API-Key": KEY}) as ws:
        async for message in ws:
            event = json.loads(message)
            print(f"[{event['payload']['entity_id']}] "
                  f"{event['payload']['from_stage']}{event['payload']['to_stage']} "
                  f"by {event['payload']['actor']}")

What you do NOT need

What you might expect to needWhy you don't need it with Purple8
A separate workflow database (Postgres, Redis)Journey state is the graph
An ETL pipeline to reconstruct historyHistory is always in the graph, always queryable
A separate audit log serviceEvery transition is an immutable graph edge
Custom compliance reportingCypher queries against the graph
An orchestration layer (Temporal, Airflow)Journey Engine handles state, SLAs, and retries
A middleware bus to coordinate systemsEach system calls one endpoint. That's it.

What it looks like after 90 days

After 90 days of customer onboarding flowing through Purple8, the graph is your institutional memory:

  • Every customer journey, start to finish, traversable in milliseconds
  • Every AI decision with reasoning and confidence score, immutable
  • Every human override, by whom, when, and why
  • Every SLA breach — which stage, which system, how long
  • Full product history per customer, linked to the journey that created it
  • Cross-department analytics without any ETL
cypher
-- Which department causes the most SLA breaches?
MATCH ()-[b:SLA_BREACHED]->(stage)
WHERE b.journey_type = "customer_onboarding"
RETURN stage.owner_system, count(*) AS breach_count
ORDER BY breach_count DESC

-- Which relationship managers have the fastest onboarding times?
MATCH (i:JourneyInstance {journey_type: "customer_onboarding",
                           current_stage: "onboarding_complete"})
      -[rm:ADVANCED_TO {to_stage: "relationship_manager_assigned"}]->()
RETURN rm.actor AS rm_name,
       avg(duration.inSeconds(i.started_at, i.completed_at)) AS avg_seconds
ORDER BY avg_seconds

See Also

Purple8 Graph is proprietary software. All rights reserved.