Journey Engine
The Journey engine is Purple8's built-in AI workflow orchestration layer. It is not a database feature — it is a complete framework for stateful, auditable, AI-driven workflows backed by the graph.
The key insight
Journey is not a plugin for LangChain or CrewAI. It is the alternative. Every stage transition is a graph edge. Every AI decision is an immutable audit entry. Every workflow has SLA enforcement built in.
"Journey IS your agent framework — with graph memory, hybrid search, audit trails, human-in-the-loop oversight, and SLA enforcement built in."
Core concepts
| Concept | Description |
|---|---|
| JourneyDefinition | A template for a repeatable process — a list of stages, each with an owner, optional SLA, and optional AI step config |
| JourneyInstance | A running instance of a definition tied to a specific entity (a customer, a document, a case) |
| StageSpec | One stage in a journey — with SLAPolicy and optional AIStepConfig |
| SLAPolicy | Warn after N seconds, breach after M seconds — enforced by a background thread |
| AIStepConfig | When to invoke the LLM, which actions it can take, whether to auto-execute |
| JourneyAIAdvisor | The LLM decision-maker — fires on every configured trigger with full audit context |
Define a journey
from purple8_graph import GraphEngine
from purple8_graph.journey import (
JourneyEngine, JourneyAIAdvisor,
StageSpec, SLAPolicy, AIStepConfig, AIStepTrigger, AIActionType,
)
from purple8_graph.genai import OpenAIProvider
engine = GraphEngine("./data")
je = JourneyEngine(engine)
je.define_journey(
journey_type="document_review",
stages=[
StageSpec("submitted"),
StageSpec(
"classified",
owner_system="ClassifierService",
sla=SLAPolicy(warn_after_seconds=1800, breach_after_seconds=3600),
ai_step=AIStepConfig(
trigger=AIStepTrigger.ON_ENTER, # fires immediately on stage entry
auto_execute=True, # LLM decision runs automatically
allowed_actions=[
AIActionType.SUGGEST_ADVANCE,
AIActionType.FLAG,
AIActionType.ANNOTATE,
],
extra_context={"policy": "Flag documents containing PII for legal review"},
),
),
StageSpec(
"reviewed",
owner_system="ReviewTeam",
sla=SLAPolicy(warn_after_seconds=7200, breach_after_seconds=14400),
ai_step=AIStepConfig(
trigger=AIStepTrigger.ON_SLA_WARN, # fires when SLA is at risk
auto_execute=False, # human must confirm
),
),
StageSpec("approved"),
StageSpec("rejected"),
],
description="End-to-end document review with classification and legal flag",
)Register the AI advisor
provider = OpenAIProvider(api_key="...")
advisor = JourneyAIAdvisor(je, provider)
je.register_ai_advisor(advisor)Start and advance an instance
# Start a journey for a specific document
instance = je.start(
journey_type="document_review",
entity_id="doc_abc123",
metadata={"filename": "contract_2026_q1.pdf", "submitter": "legal@acme.com"},
)
# Advance to the next stage (called by whatever system completes this step)
je.advance(
instance_id=instance.instance_id,
to_stage="classified",
actor="ClassifierService",
notes="Classified as: Contract. PII detected: Yes.",
)
# → AIStepConfig.trigger=ON_ENTER fires here automatically
# → JourneyAIAdvisor receives full context, decides to FLAG
# → AI_ADVISED edge written to graph with reasoningWhat JourneyAIAdvisor receives
On every advise() call, the advisor receives:
- Full audit history — every prior stage transition, every prior AI decision, every annotation
- Current stage — where the instance is now, who owns it
- SLA health — on-time, at-risk, or in breach
- Business metadata — anything passed at
start()oradvance() - Workflow definition — the complete stage graph with names, SLAs, owners
The LLM returns one of:
| Action | Effect |
|---|---|
suggest_advance | Recommends moving to a named stage (human confirms unless auto_execute=True) |
force_advance | Advances immediately (only when auto_execute=True) |
cancel | Cancels the instance |
flag | Annotates with an alert and holds for human review |
hold | Pauses the instance (status → STALLED) without a breach |
annotate | Writes a note to the audit trail — no state change |
no_action | LLM determined no action is warranted |
Every action, including no_action, is written to the audit trail as an AI_ADVISED edge with the LLM's reasoning.
Read the audit trail
# Full immutable audit history — graph edges, append-only
for event in je.audit(instance.instance_id):
print(f"[{event.timestamp}] {event.from_stage} → {event.to_stage}")
print(f" actor: {event.actor} notes: {event.notes}")
# Current status
status = je.status(instance.instance_id)
print(status.current_stage, status.sla_status) # e.g. "classified", "AT_RISK"SLA enforcement
The SLAMonitor background thread wakes every poll_interval seconds and checks all active instances. When an SLA fires:
- A
SLA_BREACHEDedge is written to the graph (append-only — existing nodes are not modified) - A CDC
ChangeEventis published on theEventBus(if configured) - If the stage has
AIStepConfig(trigger=ON_SLA_WARN), the advisor is invoked
# SLA configuration
StageSpec(
"reviewed",
sla=SLAPolicy(
warn_after_seconds=7200, # 2 hours — advisor fires, human notified
breach_after_seconds=14400, # 4 hours — instance marked STALLED
),
)Query journey data in Cypher
Because everything is stored as graph nodes and edges, journey data is fully queryable:
-- Find all instances currently in the "classified" stage with SLA at risk
MATCH (i:JourneyInstance {current_stage: "classified", sla_status: "AT_RISK"})
RETURN i.instance_id, i.entity_id, i.entered_at
ORDER BY i.entered_at
-- Full audit trail for a specific instance
MATCH (i:JourneyInstance {instance_id: $id})-[t:ADVANCED_TO]->(stage)
RETURN t.from_stage, t.to_stage, t.actor, t.timestamp, t.notes
ORDER BY t.timestamp
-- AI decisions across all instances
MATCH ()-[a:AI_ADVISED]->()
WHERE a.action_type = "flag"
RETURN a.instance_id, a.reasoning, a.confidence, a.timestamp
ORDER BY a.timestamp DESC
LIMIT 20Journey vs LangChain / CrewAI / AutoGen
| Capability | LangChain / CrewAI / AutoGen | Purple8 Journey |
|---|---|---|
| Workflow state storage | External (Redis, Postgres, in-memory) | Graph-native — every transition = graph edge |
| AI decision context | Whatever you wire up | Full audit history + SLA health + metadata |
| Audit trail | DIY | ✅ Immutable AI_ADVISED edges, append-only |
| Human-in-the-loop | Requires custom implementation | ✅ auto_execute=False → human confirms |
| SLA enforcement | DIY | ✅ SLAPolicy(warn_after_seconds, breach_after_seconds) |
| Real-time events | DIY | ✅ EventBus → WebSocket CDC on every transition |
| Graph retrieval at decision time | Separate tool call | ✅ Same engine — zero latency |
| Multi-tenant isolation | DIY | ✅ Per-tenant JourneyEngine with JWT RBAC |
See Also
- Customer Onboarding guide — end-to-end walkthrough: KYC, mainframe wiring, HITL compliance review, 360° customer queries
- Memory & Learning guide — using closed journey outcomes as LLM few-shot context
- MCP Integration guide — query any customer's journey in natural language via Claude or Cursor
- Real-time Augmented AI guide — combining CDC events with graph context