Sharding & Clustering
Purple8 Graph scales horizontally with a sharding layer built on top of the single-node GraphEngine. You choose a partitioning strategy — hash, label, or range — and the ShardedGraphEngine coordinates across shards transparently.
Architecture overview
┌──────────────────────────────┐
│ ShardedGraphEngine │
│ ┌──────────────────────┐ │
│ │ FederatedQueryEngine │ │
│ └──────────────────────┘ │
└────────────┬─────────────────┘
┌───────────────┬──┴──────────────┐
Shard 0 Shard 1 Shard 2
GraphEngine() GraphEngine() GraphEngine()
./data/s0 ./data/s1 ./data/s2Each shard is a full GraphEngine with its own WAL, HNSW index, and KMS config. The FederatedQueryEngine merges results from all shards (scatter-gather) for MATCH and CALL db.vector.search.
Partitioning strategies
Hash partitioning (default)
Entities are distributed by consistent hashing on node ID. Balances evenly across shards. Best for general-purpose workloads.
from purple8_graph.distributed import ShardedGraphEngine, HashPartitioner
engine = ShardedGraphEngine(
shard_configs=[
{"path": "./data/shard-0"},
{"path": "./data/shard-1"},
{"path": "./data/shard-2"},
],
partitioner=HashPartitioner(),
)
# Usage is identical to a single-node GraphEngine
engine.add_node("Person", {"name": "Alice", "role": "engineer"})
results = engine.query("MATCH (p:Person) RETURN p.name LIMIT 10")Label partitioning
Assigns node labels to specific shards. Best when workloads split clearly along data type lines (e.g., hot financial data on Shard 0, cold archive on Shard 2).
from purple8_graph.distributed import LabelPartitioner
engine = ShardedGraphEngine(
shard_configs=[
{"path": "./data/shard-0"},
{"path": "./data/shard-1"},
{"path": "./data/shard-2"},
],
partitioner=LabelPartitioner({
"Transaction": 0, # all Transaction nodes → shard 0
"Account": 0,
"Customer": 1,
"Document": 2,
"Attachment": 2,
}),
)Cross-shard edges are resolved at query time by the FederatedQueryEngine.
Range partitioning
Partitions by a numeric property value range. Best for time-series or ordered data.
from purple8_graph.distributed import RangePartitioner
engine = ShardedGraphEngine(
shard_configs=[
{"path": "./data/2022"},
{"path": "./data/2023"},
{"path": "./data/2024"},
],
partitioner=RangePartitioner(
property="year",
ranges=[(2022, 2022), (2023, 2023), (2024, 2025)],
),
)Federated queries
MATCH and CALL db.vector.search are automatically federated across all shards. The FederatedQueryEngine issues parallel queries and merges results:
# This Cypher runs on all 3 shards and merges the results
results = engine.query("""
CALL db.vector.search('Document', $vec, 10)
YIELD node, score
MATCH (node)-[:AUTHORED_BY]->(author:Person)
RETURN node.title, author.name, score
ORDER BY score DESC
LIMIT 5
""", vec=embedding)Merge semantics:
RETURNresults are concatenated, then anyORDER BY/LIMITapplied globallyCOUNT(*)is summed across shardsAVG()is weighted by per-shard row count- Vector search: top-k is collected from each shard, merged, and top-k re-selected globally
High-availability with Raft
The RaftShardedEngine wraps each shard in a Raft consensus group for leader election and automatic failover. Each shard can have 1, 3, or 5 replicas.
from purple8_graph.distributed import RaftShardedEngine
engine = RaftShardedEngine(
shard_groups=[
{
# Shard 0 — 3 Raft replicas
"replicas": [
{"host": "node-0a.internal", "port": 9010, "path": "/data/s0"},
{"host": "node-0b.internal", "port": 9010, "path": "/data/s0"},
{"host": "node-0c.internal", "port": 9010, "path": "/data/s0"},
]
},
{
# Shard 1 — 3 Raft replicas
"replicas": [
{"host": "node-1a.internal", "port": 9010, "path": "/data/s1"},
{"host": "node-1b.internal", "port": 9010, "path": "/data/s1"},
{"host": "node-1c.internal", "port": 9010, "path": "/data/s1"},
]
},
],
partitioner=HashPartitioner(),
election_timeout_ms=300,
)Leader election happens within election_timeout_ms of a leader failure. Writes are replicated synchronously (quorum-based). Reads are served from the leader by default; use read_consistency="eventual" to serve from any replica.
Read replicas (async)
For read-heavy workloads, you can add async read replicas without the Raft coordination overhead:
from purple8_graph.distributed import GraphEngine, ReplicaEngine
primary = GraphEngine("./data")
replica = ReplicaEngine(primary, replica_path="./data-replica", replication_lag_ms=50)
# Writes go to primary, reads can go to replica
replica.query("MATCH (n:Document) RETURN n.title LIMIT 100")Replication is WAL-based. The replica replays WAL entries from the primary, with configurable lag tolerance.
Docker Compose: 3-shard cluster
services:
shard-0:
image: purple8/graph:latest
environment:
SHARD_ID: "0"
SHARD_PEERS: "shard-1:9010,shard-2:9010"
P8G_CLUSTER_MODE: "true"
JWT_SECRET: "${JWT_SECRET}"
volumes:
- shard0_data:/data
ports:
- "8010:8010"
shard-1:
image: purple8/graph:latest
environment:
SHARD_ID: "1"
SHARD_PEERS: "shard-0:9010,shard-2:9010"
P8G_CLUSTER_MODE: "true"
JWT_SECRET: "${JWT_SECRET}"
volumes:
- shard1_data:/data
shard-2:
image: purple8/graph:latest
environment:
SHARD_ID: "2"
SHARD_PEERS: "shard-0:9010,shard-1:9010"
P8G_CLUSTER_MODE: "true"
JWT_SECRET: "${JWT_SECRET}"
volumes:
- shard2_data:/data
volumes:
shard0_data:
shard1_data:
shard2_data:Consistency model
| Operation | Guarantee |
|---|---|
| Single-node write | Synchronous WAL commit + HNSW update |
| Raft write | Quorum commit (majority of replicas) |
| Read from leader | Read-your-writes |
| Read from replica | Eventual consistency (configurable lag) |
| Federated MATCH | Eventual consistency across shards |
| Federated vector search | Approximate global top-k |