Skip to content

Journey Engine

The Journey engine is Purple8's built-in AI workflow orchestration layer. It is not a database feature — it is a complete framework for stateful, auditable, AI-driven workflows backed by the graph.

The key insight

Journey is not a plugin for LangChain or CrewAI. It is the alternative. Every stage transition is a graph edge. Every AI decision is an immutable audit entry. Every workflow has SLA enforcement built in.

"Journey IS your agent framework — with graph memory, hybrid search, audit trails, human-in-the-loop oversight, and SLA enforcement built in."

Core concepts

ConceptDescription
JourneyDefinitionA template for a repeatable process — a list of stages, each with an owner, optional SLA, and optional AI step config
JourneyInstanceA running instance of a definition tied to a specific entity (a customer, a document, a case)
StageSpecOne stage in a journey — with SLAPolicy and optional AIStepConfig
SLAPolicyWarn after N seconds, breach after M seconds — enforced by a background thread
AIStepConfigWhen to invoke the LLM, which actions it can take, whether to auto-execute
JourneyAIAdvisorThe LLM decision-maker — fires on every configured trigger with full audit context

Define a journey

python
from purple8_graph import GraphEngine
from purple8_graph.journey import (
    JourneyEngine, JourneyAIAdvisor,
    StageSpec, SLAPolicy, AIStepConfig, AIStepTrigger, AIActionType,
)
from purple8_graph.genai import OpenAIProvider

engine = GraphEngine("./data")
je = JourneyEngine(engine)

je.define_journey(
    journey_type="document_review",
    stages=[
        StageSpec("submitted"),
        StageSpec(
            "classified",
            owner_system="ClassifierService",
            sla=SLAPolicy(warn_after_seconds=1800, breach_after_seconds=3600),
            ai_step=AIStepConfig(
                trigger=AIStepTrigger.ON_ENTER,   # fires immediately on stage entry
                auto_execute=True,                 # LLM decision runs automatically
                allowed_actions=[
                    AIActionType.SUGGEST_ADVANCE,
                    AIActionType.FLAG,
                    AIActionType.ANNOTATE,
                ],
                extra_context={"policy": "Flag documents containing PII for legal review"},
            ),
        ),
        StageSpec(
            "reviewed",
            owner_system="ReviewTeam",
            sla=SLAPolicy(warn_after_seconds=7200, breach_after_seconds=14400),
            ai_step=AIStepConfig(
                trigger=AIStepTrigger.ON_SLA_WARN,  # fires when SLA is at risk
                auto_execute=False,                  # human must confirm
            ),
        ),
        StageSpec("approved"),
        StageSpec("rejected"),
    ],
    description="End-to-end document review with classification and legal flag",
)

Register the AI advisor

python
provider = OpenAIProvider(api_key="...")
advisor = JourneyAIAdvisor(je, provider)
je.register_ai_advisor(advisor)

Start and advance an instance

python
# Start a journey for a specific document
instance = je.start(
    journey_type="document_review",
    entity_id="doc_abc123",
    metadata={"filename": "contract_2026_q1.pdf", "submitter": "legal@acme.com"},
)

# Advance to the next stage (called by whatever system completes this step)
je.advance(
    instance_id=instance.instance_id,
    to_stage="classified",
    actor="ClassifierService",
    notes="Classified as: Contract. PII detected: Yes.",
)
# → AIStepConfig.trigger=ON_ENTER fires here automatically
# → JourneyAIAdvisor receives full context, decides to FLAG
# → AI_ADVISED edge written to graph with reasoning

What JourneyAIAdvisor receives

On every advise() call, the advisor receives:

  • Full audit history — every prior stage transition, every prior AI decision, every annotation
  • Current stage — where the instance is now, who owns it
  • SLA health — on-time, at-risk, or in breach
  • Business metadata — anything passed at start() or advance()
  • Workflow definition — the complete stage graph with names, SLAs, owners

The LLM returns one of:

ActionEffect
suggest_advanceRecommends moving to a named stage (human confirms unless auto_execute=True)
force_advanceAdvances immediately (only when auto_execute=True)
cancelCancels the instance
flagAnnotates with an alert and holds for human review
holdPauses the instance (status → STALLED) without a breach
annotateWrites a note to the audit trail — no state change
no_actionLLM determined no action is warranted

Every action, including no_action, is written to the audit trail as an AI_ADVISED edge with the LLM's reasoning.

Read the audit trail

python
# Full immutable audit history — graph edges, append-only
for event in je.audit(instance.instance_id):
    print(f"[{event.timestamp}] {event.from_stage}{event.to_stage}")
    print(f"  actor: {event.actor}  notes: {event.notes}")

# Current status
status = je.status(instance.instance_id)
print(status.current_stage, status.sla_status)  # e.g. "classified", "AT_RISK"

SLA enforcement

The SLAMonitor background thread wakes every poll_interval seconds and checks all active instances. When an SLA fires:

  • A SLA_BREACHED edge is written to the graph (append-only — existing nodes are not modified)
  • A CDC ChangeEvent is published on the EventBus (if configured)
  • If the stage has AIStepConfig(trigger=ON_SLA_WARN), the advisor is invoked
python
# SLA configuration
StageSpec(
    "reviewed",
    sla=SLAPolicy(
        warn_after_seconds=7200,    # 2 hours — advisor fires, human notified
        breach_after_seconds=14400, # 4 hours — instance marked STALLED
    ),
)

Query journey data in Cypher

Because everything is stored as graph nodes and edges, journey data is fully queryable:

cypher
-- Find all instances currently in the "classified" stage with SLA at risk
MATCH (i:JourneyInstance {current_stage: "classified", sla_status: "AT_RISK"})
RETURN i.instance_id, i.entity_id, i.entered_at
ORDER BY i.entered_at

-- Full audit trail for a specific instance
MATCH (i:JourneyInstance {instance_id: $id})-[t:ADVANCED_TO]->(stage)
RETURN t.from_stage, t.to_stage, t.actor, t.timestamp, t.notes
ORDER BY t.timestamp

-- AI decisions across all instances
MATCH ()-[a:AI_ADVISED]->()
WHERE a.action_type = "flag"
RETURN a.instance_id, a.reasoning, a.confidence, a.timestamp
ORDER BY a.timestamp DESC
LIMIT 20

Journey vs LangChain / CrewAI / AutoGen

CapabilityLangChain / CrewAI / AutoGenPurple8 Journey
Workflow state storageExternal (Redis, Postgres, in-memory)Graph-native — every transition = graph edge
AI decision contextWhatever you wire upFull audit history + SLA health + metadata
Audit trailDIY✅ Immutable AI_ADVISED edges, append-only
Human-in-the-loopRequires custom implementationauto_execute=False → human confirms
SLA enforcementDIYSLAPolicy(warn_after_seconds, breach_after_seconds)
Real-time eventsDIYEventBus → WebSocket CDC on every transition
Graph retrieval at decision timeSeparate tool call✅ Same engine — zero latency
Multi-tenant isolationDIY✅ Per-tenant JourneyEngine with JWT RBAC

See Also

Purple8 Graph is proprietary software. All rights reserved.