title: Distributed Transaction Extensions tags: - AVM - distributed transactions - consensus - snapshot isolation
{-# OPTIONS --exact-split --without-K --type-in-type --guardedness #-} open import Foundation.BasicTypes using (ℕ; List; Maybe; String)
Distributed Transaction Extensions
This module extends the AVM instruction set with distributed transaction primitives for multi-controller coordination. See DistributedTransactions.md for the full proposal.
Module Parameters
module AVM.DistributedTx -- Core types (inherited) (Val : Set) (ObjectId : Set) (freshObjectId : ℕ → ObjectId) (MachineId : Set) (ControllerId : Set) (ControllerVersion : Set) (TxId : Set) (freshTxId : ℕ → TxId) (Object : Set) where -- Import base modules open import Foundation.BasicTypes open import AVM.Instruction Val ObjectId freshObjectId MachineId ControllerId ControllerVersion TxId freshTxId Object
Transaction Types
Distinguish between local (single-controller) and distributed (multi-controller) transactions.
data TxType : Set where Local : TxType Distributed : List ControllerId → TxType
Lamport Timestamps for Version Ordering
Distributed controllers use Lamport clocks to establish version ordering without requiring synchronized clocks.
record LamportVersion : Set where constructor mkLamport field counter : ℕ controllerId : ControllerId -- Comparison for version ordering _<ₗ_ : LamportVersion → LamportVersion → Bool mkLamport c1 id1 <ₗ mkLamport c2 id2 with c1 <? c2 ... | true = true ... | false with c1 ==ℕ c2 ... | true = id1 == id2 -- tie-break by controller ID ... | false = false _≤ₗ_ : LamportVersion → LamportVersion → Bool v1 ≤ₗ v2 = (v1 <ₗ v2) || (v1 == v2)
Snapshots
Transactions operate on a consistent snapshot captured at beginTx time.
record Snapshot : Set where constructor mkSnapshot field timestamp : LamportVersion -- Map from ObjectId to version observed versions : ObjectId → Maybe LamportVersion
Distributed Transaction State
Extended transaction state tracking prepare/vote/finalize phases.
data TxStatus : Set where Active : TxStatus -- Executing operations Preparing : TxStatus -- Prepare phase in progress Prepared : TxStatus -- Prepared, awaiting vote Committing : TxStatus -- Applying changes Committed : TxStatus -- Successfully committed Aborted : TxStatus -- Rolled back record TxState : Set where constructor mkTxState field txid : TxId txType : TxType snapshot : Snapshot readSet : List (ObjectId × LamportVersion) writeSet : List ObjectId status : TxStatus
Prepare Phase Results
Participants return prepare results to coordinator.
data PrepareResult : Set where Prepared : (readSet : List (ObjectId × LamportVersion)) → (writeSet : List ObjectId) → PrepareResult PrepareConflict : ObjectId → PrepareResult PrepareTimeout : PrepareResult
Vote Phase
Coordinator decides commit/abort after collecting prepare results.
data Vote : Set where VoteCommit : Vote VoteAbort : String → Vote -- Abort reason data VoteResult : Set where Unanimous : Vote → VoteResult Split : (commits : ℕ) → (aborts : ℕ) → VoteResult
Transfer Protocol
Two-phase transfer with explicit acceptance from target controller.
-- Transfer identifier TransferId : Set TransferId = ℕ record TransferRequest : Set where constructor mkTransferRequest field transferId : TransferId objectId : ObjectId sourceController : ControllerId targetController : ControllerId requestedAt : LamportVersion data TransferStatus : Set where Pending : TransferStatus Accepted : TransferStatus Rejected : String → TransferStatus Completed : TransferStatus
Extended Instruction Set
New instructions for distributed transactions and transfers.
Distributed Transaction Instructions
data DistributedTxInstruction : Safety → ISA where -- Explicit transaction type selection beginLocalTx : DistributedTxInstruction Safe TxId beginDistributedTx : List ControllerId → DistributedTxInstruction Safe TxId -- Three-phase protocol prepareTx : TxId → DistributedTxInstruction Safe PrepareResult voteTx : TxId → Vote → DistributedTxInstruction Safe ⊤ finalizeTx : TxId → DistributedTxInstruction Safe Bool -- Query transaction state getTxStatus : TxId → DistributedTxInstruction Safe (Maybe TxStatus) getTxType : TxId → DistributedTxInstruction Safe (Maybe TxType)
Transfer Instructions
data TransferInstruction : Safety → ISA where -- Phase 1: Request transfer (from current owner) requestTransfer : ObjectId → ControllerId → TransferInstruction Safe TransferId -- Phase 2: Accept/reject transfer (from target controller) acceptTransfer : TransferId → TransferInstruction Safe Bool rejectTransfer : TransferId → String → TransferInstruction Safe ⊤ -- Query transfer status getTransferStatus : TransferId → TransferInstruction Safe (Maybe TransferStatus) -- List pending transfers for this controller listPendingTransfers : TransferInstruction Safe (List TransferRequest)
Conflict Detection
Detect read-write and write-write conflicts across participants.
-- Check if object was read at an old version hasReadWriteConflict : List (ObjectId × LamportVersion) → List ObjectId → (ObjectId → LamportVersion) -- Current versions → Bool hasReadWriteConflict [] writes currentVer = false hasReadWriteConflict ((oid , readVer) ∷ rest) writes currentVer = if elem oid writes then not (readVer == currentVer oid) -- Conflict if versions differ else hasReadWriteConflict rest writes currentVer -- Check if multiple participants wrote the same object hasWriteWriteConflict : List (List ObjectId) → Bool hasWriteWriteConflict writeSets = let allWrites = concat writeSets in hasDuplicates allWrites -- Detect any conflicts in prepare results detectConflicts : List PrepareResult → (ObjectId → LamportVersion) → Bool detectConflicts results currentVersions = let allReads = concatMap extractReads results allWrites = concatMap extractWrites results in hasReadWriteConflict allReads allWrites currentVersions || hasWriteWriteConflict (map extractWrites results) where extractReads : PrepareResult → List (ObjectId × LamportVersion) extractReads (Prepared reads writes) = reads extractReads _ = [] extractWrites : PrepareResult → List ObjectId extractWrites (Prepared reads writes) = writes extractWrites _ = []
Constraint Integration
Extend transaction state with constraint tracking. The ConstraintId and LinearConstraint types are imported from the AVM.Instruction module.
record TxStateWithConstraints : Set where constructor mkTxStateC field txState : TxState constraints : List (ConstraintId × LinearConstraint) satisfied : List ConstraintId -- Check all constraints are satisfied allConstraintsSatisfied : TxStateWithConstraints → Bool allConstraintsSatisfied txStateC = let constraints = TxStateWithConstraints.constraints txStateC satisfied = TxStateWithConstraints.satisfied txStateC in all (λ { (cid , _) → elem cid satisfied }) constraints
Extended State with Distributed Tracking
record DistributedState : Set where constructor mkDistState field -- Base state (from Runtime module) baseState : State -- Active distributed transaction activeDTx : Maybe TxStateWithConstraints -- Pending transfers pendingTransfers : List (TransferId × TransferRequest × TransferStatus) -- Prepared transactions (coordinator view) preparedTxs : List (TxId × List PrepareResult) -- Pending votes (participant view) pendingVotes : List (TxId × Vote) -- Snapshot registry snapshots : TxId → Maybe Snapshot -- ID counters nextTxId : ℕ nextTransferId : ℕ
Helper Functions (Implementation Stubs)
These functions are postulated for the semantic functions below. Full implementations require integration with the AVM.Interpreter module.
-- Authorization checking postulate allOpsAuthorized : List TxWrite → State → Bool findUnauthorizedOp : List TxWrite → State → Maybe ObjectId -- Transaction state queries postulate lookupVoteDecision : TxId → DistributedState → Vote lookupTxStatus : TxId → DistributedState → TxStatus lookupTransfer : TransferId → DistributedState → Maybe (TransferRequest × TransferStatus) -- State transitions postulate applyTransactionChanges : State → State incrementVersion : ControllerVersion → ControllerVersion updateTransferStatus : TransferId → TransferStatus → DistributedState → DistributedState applyTransferToState : ObjectId → ControllerId → State → State -- Error constructors (not already defined in Runtime) postulate err-constraint-violated : AVMError err-internal : AVMError err-transfer-not-found : TransferId → AVMError err-unauthorized-transfer-id : TransferId → AVMError -- Auxiliary types used in properties postulate ver : LamportVersion modifiedBy : TxId → ObjectId → LamportVersion → Set txStatus : TxId → TxStatus -- Convert ControllerVersion to natural number for Lamport timestamps postulate versionToℕ : ControllerVersion → ℕ -- Event constructors for logging (new ones not in Runtime) postulate VoteCast : TxId → Vote → EventType TransferRequested : TransferId → ObjectId → ControllerId → EventType TransferAccepted : TransferId → ObjectId → EventType -- Helper to create log entries with current timestamp postulate currentTimestamp : State → ℕ
Semantic Functions
Begin Distributed Transaction
beginDistributedTxSem : List ControllerId → DistributedState → AVMResult TxId beginDistributedTxSem participants dState = let st = DistributedState.baseState dState txid = freshTxId (DistributedState.nextTxId dState) currentVer = mkLamport (versionToℕ (State.currentVersion st)) (State.controllerId st) -- Capture snapshot snapshot = mkSnapshot currentVer (snapshotVersions st) -- Initialize transaction state txState = mkTxState txid (Distributed participants) snapshot [] -- empty read set [] -- empty write set Active txStateC = mkTxStateC txState [] [] -- no constraints yet dState' = record dState { activeDTx = just txStateC ; nextTxId = suc (DistributedState.nextTxId dState) } entry = mkLogEntry (currentTimestamp st) (TransactionStarted txid) (State.controllerId st) st' = DistributedState.baseState dState' in success (mkSuccess txid st' (entry ∷ [])) where -- Capture current version of all objects in store snapshotVersions : State → (ObjectId → Maybe LamportVersion) snapshotVersions st oid with Store.metadata (State.store st) oid ... | just meta = just (mkLamport (versionToℕ (ObjectMeta.modifiedAtVersion meta)) (ObjectMeta.currentController meta)) ... | nothing = nothing
Prepare Transaction
prepareTxSem : TxId → DistributedState → AVMResult PrepareResult prepareTxSem txid dState with DistributedState.activeDTx dState ... | nothing = failure err-no-active-tx ... | just txStateC = let txState = TxStateWithConstraints.txState txStateC st = DistributedState.baseState dState in if TxState.txid txState == txid then validateAndPrepare txStateC st dState else failure (err-tx-conflict txid) where validateAndPrepare : TxStateWithConstraints → State → DistributedState → AVMResult PrepareResult validateAndPrepare txStateC st dState with allOpsAuthorized (State.txLog st) st ... | false = handleUnauthorized where handleUnauthorized : AVMResult PrepareResult handleUnauthorized with findUnauthorizedOp (State.txLog st) st ... | just oid = success (mkSuccess (PrepareConflict oid) (DistributedState.baseState dState) []) ... | nothing = failure err-internal ... | true with allConstraintsSatisfied txStateC ... | false = failure err-constraint-violated ... | true = let readSet = TxState.readSet (TxStateWithConstraints.txState txStateC) writeSet = TxState.writeSet (TxStateWithConstraints.txState txStateC) txState' = record (TxStateWithConstraints.txState txStateC) { status = Prepared } txStateC' = record txStateC { txState = txState' } dState' = record dState { activeDTx = just txStateC' } st' = DistributedState.baseState dState' in success (mkSuccess (Prepared readSet writeSet) st' [])
Vote Transaction
voteTxSem : TxId → Vote → DistributedState → AVMResult ⊤ voteTxSem txid vote dState = let st = DistributedState.baseState dState dState' = record dState { pendingVotes = (txid , vote) ∷ DistributedState.pendingVotes dState } entry = mkLogEntry (currentTimestamp st) (VoteCast txid vote) (State.controllerId st) st' = DistributedState.baseState dState' in success (mkSuccess tt st' (entry ∷ []))
Finalize Transaction
finalizeTxSem : TxId → DistributedState → AVMResult Bool finalizeTxSem txid dState with DistributedState.activeDTx dState ... | nothing = failure err-no-active-tx ... | just txStateC = let txState = TxStateWithConstraints.txState txStateC st = DistributedState.baseState dState in if TxState.txid txState == txid then applyCommitOrAbort txid txStateC st dState else failure (err-tx-conflict txid) where applyCommitOrAbort : TxId → TxStateWithConstraints → State → DistributedState → AVMResult Bool applyCommitOrAbort txid txStateC st dState with lookupVoteDecision txid dState ... | VoteCommit = -- Apply all changes atomically let stApplied = applyTransactionChanges st st' = record stApplied { tx = nothing ; currentVersion = incrementVersion (State.currentVersion st) } dState' = record dState { baseState = st' ; activeDTx = nothing } entry = mkLogEntry (currentTimestamp st) (TransactionCommitted txid) (State.controllerId st) st'' = DistributedState.baseState dState' in success (mkSuccess true st'' (entry ∷ [])) ... | VoteAbort reason = -- Discard all changes let st' = record st { tx = nothing ; txLog = [] ; creates = [] ; destroys = [] ; observed = [] } dState' = record dState { baseState = st' ; activeDTx = nothing } entry = mkLogEntry (currentTimestamp st) (TransactionAborted txid) (State.controllerId st) st'' = DistributedState.baseState dState' in success (mkSuccess false st'' (entry ∷ []))
Request Transfer
requestTransferSem : ObjectId → ControllerId → DistributedState → AVMResult TransferId requestTransferSem oid targetCid dState = let st = DistributedState.baseState dState tid = DistributedState.nextTransferId dState currentVer = mkLamport (versionToℕ (State.currentVersion st)) (State.controllerId st) request = mkTransferRequest tid oid (State.controllerId st) targetCid currentVer dState' = record dState { nextTransferId = suc tid ; pendingTransfers = (tid , request , Pending) ∷ DistributedState.pendingTransfers dState } entry = mkLogEntry (currentTimestamp st) (TransferRequested tid oid targetCid) (State.controllerId st) st' = DistributedState.baseState dState' in success (mkSuccess tid st' (entry ∷ []))
Accept Transfer
acceptTransferSem : TransferId → DistributedState → AVMResult Bool acceptTransferSem tid dState with lookupTransfer tid dState ... | nothing = failure (err-transfer-not-found tid) ... | just (request , status) = let st = DistributedState.baseState dState oid = TransferRequest.objectId request sourceCid = TransferRequest.sourceController request targetCid = TransferRequest.targetController request in -- Verify we are the target controller if targetCid == State.controllerId st then ( -- Update transfer status and apply transfer let dState' = updateTransferStatus tid Accepted dState st' = applyTransferToState oid targetCid st dState'' = record dState' { baseState = st' } entry = mkLogEntry (currentTimestamp st) (TransferAccepted tid oid) (State.controllerId st) st'' = DistributedState.baseState dState'' in success (mkSuccess true st'' (entry ∷ []))) else failure (err-unauthorized-transfer-id tid)
Pattern Synonyms for Ergonomics
Pattern synonyms would provide ergonomic names for distributed transaction and transfer instructions once they are integrated into the main AVM ISA:
-- Distributed transaction patterns (future work - requires ISA integration)
-- pattern dtx-begin-local = DistInstr (DTx beginLocalTx)
-- pattern dtx-begin-dist cids = DistInstr (DTx (beginDistributedTx cids))
-- pattern dtx-prepare txid = DistInstr (DTx (prepareTx txid))
-- pattern dtx-vote txid vote = DistInstr (DTx (voteTx txid vote))
-- pattern dtx-finalize txid = DistInstr (DTx (finalizeTx txid))
-- Transfer patterns (future work - requires ISA integration)
-- pattern transfer-request oid cid = DistInstr (Transfer (requestTransfer oid cid))
-- pattern transfer-accept tid = DistInstr (Transfer (acceptTransfer tid))
-- pattern transfer-reject tid reason = DistInstr (Transfer (rejectTransfer tid reason))
Usage Example: Bilateral Trade
Example showing atomic bilateral trade between two controllers (pseudocode - requires monadic operations and ISA integration):
-- Example: Atomic bilateral trade between two controllers
-- bilateralTrade : ObjectId → ObjectId → ControllerId → ControllerId → AVMProgram Bool
-- bilateralTrade obj_a obj_b controller_a controller_b = do
-- -- Begin distributed transaction with both controllers
-- txid ← beginDistributedTx (controller_a ∷ controller_b ∷ [])
--
-- -- Request transfers (each controller owns one object)
-- tid_a ← requestTransfer obj_a controller_b -- A → B
-- tid_b ← requestTransfer obj_b controller_a -- B → A
--
-- -- Prepare phase: each controller validates locally
-- result ← prepareTx txid
--
-- case result of λ where
-- (Prepared readSet writeSet) → do
-- -- Vote to commit
-- voteTx txid VoteCommit
--
-- -- Finalize: transfers happen atomically
-- success ← finalizeTx txid
-- return success
--
-- (PrepareConflict oid) → do
-- abortTx txid
-- return false
--
-- PrepareTimeout → do
-- abortTx txid
-- return false
Formal Properties (Specification)
Property 1: Atomicity
All participants reach the same decision (commit or abort).
postulate atomicity : ∀ (txid : TxId) (participants : List ControllerId) (finalStates : List DistributedState) → let decisions = map (λ st → lookupTxStatus txid st) finalStates in (all (λ status → status == Committed) decisions ≡ true) + (all (λ status → status == Aborted) decisions ≡ true)
Property 2: Isolation (Snapshot Isolation)
No transaction observes partial results from concurrent transactions.
postulate snapshot-isolation : ∀ (tx1 tx2 : TxId) (oid : ObjectId) (snapshot1 : Snapshot) → tx1 ≢ tx2 → Snapshot.versions snapshot1 oid ≡ just ver → txStatus tx2 ≢ Committed → ¬ (modifiedBy tx2 oid ver)
Property 3: Consistency
Committed transactions satisfy all constraints.
postulate consistency : ∀ (txid : TxId) (txStateC : TxStateWithConstraints) (dState : DistributedState) → TxState.status (TxStateWithConstraints.txState txStateC) ≡ Committed → allConstraintsSatisfied txStateC ≡ true
Note: This module provides the formal type signatures and core semantics.
Full interpreter implementation requires integration with the existing
AVM.Interpreter module. See implementation roadmap in
DistributedTransactions.md.
References to other modules
This page references the following modules:
- AVM.Instruction
- Symbols: Safety, ObjInstruction, IntrospectInstruction, Instr₀, TxInstruction, Instr₁, PureInstruction, Instr₂, MachineInstruction, ControllerInstruction, Instruction, NondetInstruction, LinearConstraint, ConstraintInstruction, Instr₄, WeightedVal, ConstraintId, Safe, Unsafe, createObj, destroyObj, call, self, input, getCurrentMachine, reflect, scryMeta, scryDeep, Obj, Introspect, beginTx, commitTx, abortTx, Tx, callPure, registerPure, Pure, getMachine, teleport, moveObject, getCurrentController, getController, transferObject, getCurrentVersion, switchVersion, Machine, Controller, obj-create, obj-destroy, obj-call, get-self, get-input, get-current-machine, obj-scry-meta, obj-scry-deep, obj-reflect, tx-begin, tx-commit, tx-abort, call-pure, register-pure, get-machine, do-teleport, move-object, get-current-controller, get-controller, transfer-object, get-current-version, switch-version, choose, require, mkCid, UseOnce, newConstraint, satisfy, Nondet, Constr, ISA, AVMProgram₀, AVMProgram₁, AVMProgram₂, AVMProgram, AVMProgram₄