Skip to content

title: Distributed Transaction Extensions tags: - AVM - distributed transactions - consensus - snapshot isolation


{-# OPTIONS --exact-split --without-K --type-in-type --guardedness #-}

open import Foundation.BasicTypes using (; List; Maybe; String)

Distributed Transaction Extensions

This module extends the AVM instruction set with distributed transaction primitives for multi-controller coordination. See DistributedTransactions.md for the full proposal.

Module Parameters

module AVM.DistributedTx
  -- Core types (inherited)
  (Val : Set)
  (ObjectId : Set)
  (freshObjectId :   ObjectId)
  (MachineId : Set)
  (ControllerId : Set)
  (ControllerVersion : Set)
  (TxId : Set)
  (freshTxId :   TxId)
  (Object : Set)
  where

-- Import base modules
open import Foundation.BasicTypes
open import AVM.Instruction Val ObjectId freshObjectId MachineId ControllerId
                        ControllerVersion TxId freshTxId Object

Transaction Types

Distinguish between local (single-controller) and distributed (multi-controller) transactions.

data TxType : Set where
  Local : TxType
  Distributed : List ControllerId  TxType

Lamport Timestamps for Version Ordering

Distributed controllers use Lamport clocks to establish version ordering without requiring synchronized clocks.

record LamportVersion : Set where
  constructor mkLamport
  field
    counter : 
    controllerId : ControllerId

-- Comparison for version ordering
_<ₗ_ : LamportVersion  LamportVersion  Bool
mkLamport c1 id1 <ₗ mkLamport c2 id2 with c1 <? c2
... | true = true
... | false with c1 ==ℕ c2
...   | true = id1 == id2  -- tie-break by controller ID
...   | false = false

_≤ₗ_ : LamportVersion  LamportVersion  Bool
v1 ≤ₗ v2 = (v1 <ₗ v2) || (v1 == v2)

Snapshots

Transactions operate on a consistent snapshot captured at beginTx time.

record Snapshot : Set where
  constructor mkSnapshot
  field
    timestamp : LamportVersion
    -- Map from ObjectId to version observed
    versions : ObjectId  Maybe LamportVersion

Distributed Transaction State

Extended transaction state tracking prepare/vote/finalize phases.

data TxStatus : Set where
  Active : TxStatus       -- Executing operations
  Preparing : TxStatus    -- Prepare phase in progress
  Prepared : TxStatus     -- Prepared, awaiting vote
  Committing : TxStatus   -- Applying changes
  Committed : TxStatus    -- Successfully committed
  Aborted : TxStatus      -- Rolled back

record TxState : Set where
  constructor mkTxState
  field
    txid : TxId
    txType : TxType
    snapshot : Snapshot
    readSet : List (ObjectId × LamportVersion)
    writeSet : List ObjectId
    status : TxStatus

Prepare Phase Results

Participants return prepare results to coordinator.

data PrepareResult : Set where
  Prepared : (readSet : List (ObjectId × LamportVersion))
            (writeSet : List ObjectId)
            PrepareResult
  PrepareConflict : ObjectId  PrepareResult
  PrepareTimeout : PrepareResult

Vote Phase

Coordinator decides commit/abort after collecting prepare results.

data Vote : Set where
  VoteCommit : Vote
  VoteAbort : String  Vote  -- Abort reason

data VoteResult : Set where
  Unanimous : Vote  VoteResult
  Split : (commits : )  (aborts : )  VoteResult

Transfer Protocol

Two-phase transfer with explicit acceptance from target controller.

-- Transfer identifier
TransferId : Set
TransferId = 

record TransferRequest : Set where
  constructor mkTransferRequest
  field
    transferId : TransferId
    objectId : ObjectId
    sourceController : ControllerId
    targetController : ControllerId
    requestedAt : LamportVersion

data TransferStatus : Set where
  Pending : TransferStatus
  Accepted : TransferStatus
  Rejected : String  TransferStatus
  Completed : TransferStatus

Extended Instruction Set

New instructions for distributed transactions and transfers.

Distributed Transaction Instructions

data DistributedTxInstruction : Safety  ISA where
  -- Explicit transaction type selection
  beginLocalTx : DistributedTxInstruction Safe TxId
  beginDistributedTx : List ControllerId  DistributedTxInstruction Safe TxId

  -- Three-phase protocol
  prepareTx : TxId  DistributedTxInstruction Safe PrepareResult
  voteTx : TxId  Vote  DistributedTxInstruction Safe 
  finalizeTx : TxId  DistributedTxInstruction Safe Bool

  -- Query transaction state
  getTxStatus : TxId  DistributedTxInstruction Safe (Maybe TxStatus)
  getTxType : TxId  DistributedTxInstruction Safe (Maybe TxType)

Transfer Instructions

data TransferInstruction : Safety  ISA where
  -- Phase 1: Request transfer (from current owner)
  requestTransfer : ObjectId  ControllerId  TransferInstruction Safe TransferId

  -- Phase 2: Accept/reject transfer (from target controller)
  acceptTransfer : TransferId  TransferInstruction Safe Bool
  rejectTransfer : TransferId  String  TransferInstruction Safe 

  -- Query transfer status
  getTransferStatus : TransferId  TransferInstruction Safe (Maybe TransferStatus)

  -- List pending transfers for this controller
  listPendingTransfers : TransferInstruction Safe (List TransferRequest)

Conflict Detection

Detect read-write and write-write conflicts across participants.

-- Check if object was read at an old version
hasReadWriteConflict : List (ObjectId × LamportVersion)
                      List ObjectId
                      (ObjectId  LamportVersion)  -- Current versions
                      Bool
hasReadWriteConflict [] writes currentVer = false
hasReadWriteConflict ((oid , readVer)  rest) writes currentVer =
  if elem oid writes
  then not (readVer == currentVer oid)  -- Conflict if versions differ
  else hasReadWriteConflict rest writes currentVer

-- Check if multiple participants wrote the same object
hasWriteWriteConflict : List (List ObjectId)  Bool
hasWriteWriteConflict writeSets =
  let allWrites = concat writeSets
  in hasDuplicates allWrites

-- Detect any conflicts in prepare results
detectConflicts : List PrepareResult
                 (ObjectId  LamportVersion)
                 Bool
detectConflicts results currentVersions =
  let
    allReads = concatMap extractReads results
    allWrites = concatMap extractWrites results
  in
    hasReadWriteConflict allReads allWrites currentVersions
    || hasWriteWriteConflict (map extractWrites results)
  where
    extractReads : PrepareResult  List (ObjectId × LamportVersion)
    extractReads (Prepared reads writes) = reads
    extractReads _ = []

    extractWrites : PrepareResult  List ObjectId
    extractWrites (Prepared reads writes) = writes
    extractWrites _ = []

Constraint Integration

Extend transaction state with constraint tracking. The ConstraintId and LinearConstraint types are imported from the AVM.Instruction module.

record TxStateWithConstraints : Set where
  constructor mkTxStateC
  field
    txState : TxState
    constraints : List (ConstraintId × LinearConstraint)
    satisfied : List ConstraintId

-- Check all constraints are satisfied
allConstraintsSatisfied : TxStateWithConstraints  Bool
allConstraintsSatisfied txStateC =
  let
    constraints = TxStateWithConstraints.constraints txStateC
    satisfied = TxStateWithConstraints.satisfied txStateC
  in
    all  { (cid , _)  elem cid satisfied }) constraints

Extended State with Distributed Tracking

record DistributedState : Set where
  constructor mkDistState
  field
    -- Base state (from Runtime module)
    baseState : State

    -- Active distributed transaction
    activeDTx : Maybe TxStateWithConstraints

    -- Pending transfers
    pendingTransfers : List (TransferId × TransferRequest × TransferStatus)

    -- Prepared transactions (coordinator view)
    preparedTxs : List (TxId × List PrepareResult)

    -- Pending votes (participant view)
    pendingVotes : List (TxId × Vote)

    -- Snapshot registry
    snapshots : TxId  Maybe Snapshot

    -- ID counters
    nextTxId : 
    nextTransferId : 

Helper Functions (Implementation Stubs)

These functions are postulated for the semantic functions below. Full implementations require integration with the AVM.Interpreter module.

-- Authorization checking
postulate
  allOpsAuthorized : List TxWrite  State  Bool
  findUnauthorizedOp : List TxWrite  State  Maybe ObjectId

-- Transaction state queries
postulate
  lookupVoteDecision : TxId  DistributedState  Vote
  lookupTxStatus : TxId  DistributedState  TxStatus
  lookupTransfer : TransferId  DistributedState
                  Maybe (TransferRequest × TransferStatus)

-- State transitions
postulate
  applyTransactionChanges : State  State
  incrementVersion : ControllerVersion  ControllerVersion
  updateTransferStatus : TransferId  TransferStatus  DistributedState
                        DistributedState
  applyTransferToState : ObjectId  ControllerId  State  State

-- Error constructors (not already defined in Runtime)
postulate
  err-constraint-violated : AVMError
  err-internal : AVMError
  err-transfer-not-found : TransferId  AVMError
  err-unauthorized-transfer-id : TransferId  AVMError

-- Auxiliary types used in properties
postulate
  ver : LamportVersion
  modifiedBy : TxId  ObjectId  LamportVersion  Set
  txStatus : TxId  TxStatus

-- Convert ControllerVersion to natural number for Lamport timestamps
postulate
  versionToℕ : ControllerVersion  

-- Event constructors for logging (new ones not in Runtime)
postulate
  VoteCast : TxId  Vote  EventType
  TransferRequested : TransferId  ObjectId  ControllerId  EventType
  TransferAccepted : TransferId  ObjectId  EventType

-- Helper to create log entries with current timestamp
postulate
  currentTimestamp : State  

Semantic Functions

Begin Distributed Transaction

beginDistributedTxSem : List ControllerId  DistributedState  AVMResult TxId
beginDistributedTxSem participants dState =
  let
    st = DistributedState.baseState dState
    txid = freshTxId (DistributedState.nextTxId dState)
    currentVer = mkLamport (versionToℕ (State.currentVersion st)) (State.controllerId st)

    -- Capture snapshot
    snapshot = mkSnapshot currentVer (snapshotVersions st)

    -- Initialize transaction state
    txState = mkTxState txid
                        (Distributed participants)
                        snapshot
                        []  -- empty read set
                        []  -- empty write set
                        Active

    txStateC = mkTxStateC txState [] []  -- no constraints yet

    dState' = record dState {
                activeDTx = just txStateC
              ; nextTxId = suc (DistributedState.nextTxId dState)
              }
    entry = mkLogEntry (currentTimestamp st) (TransactionStarted txid) (State.controllerId st)
    st' = DistributedState.baseState dState'
  in
    success (mkSuccess txid st' (entry  []))
  where
    -- Capture current version of all objects in store
    snapshotVersions : State  (ObjectId  Maybe LamportVersion)
    snapshotVersions st oid
      with Store.metadata (State.store st) oid
    ... | just meta = just (mkLamport (versionToℕ (ObjectMeta.modifiedAtVersion meta))
                                      (ObjectMeta.currentController meta))
    ... | nothing = nothing

Prepare Transaction

prepareTxSem : TxId  DistributedState  AVMResult PrepareResult
prepareTxSem txid dState
  with DistributedState.activeDTx dState
... | nothing = failure err-no-active-tx
... | just txStateC =
  let
    txState = TxStateWithConstraints.txState txStateC
    st = DistributedState.baseState dState
  in
    if TxState.txid txState == txid
    then validateAndPrepare txStateC st dState
    else failure (err-tx-conflict txid)
  where
    validateAndPrepare : TxStateWithConstraints  State  DistributedState
                        AVMResult PrepareResult
    validateAndPrepare txStateC st dState
      with allOpsAuthorized (State.txLog st) st
    ... | false = handleUnauthorized
      where
        handleUnauthorized : AVMResult PrepareResult
        handleUnauthorized
          with findUnauthorizedOp (State.txLog st) st
        ... | just oid = success (mkSuccess (PrepareConflict oid) (DistributedState.baseState dState) [])
        ... | nothing = failure err-internal
    ... | true
      with allConstraintsSatisfied txStateC
    ... | false = failure err-constraint-violated
    ... | true =
      let
        readSet = TxState.readSet (TxStateWithConstraints.txState txStateC)
        writeSet = TxState.writeSet (TxStateWithConstraints.txState txStateC)
        txState' = record (TxStateWithConstraints.txState txStateC)
                         { status = Prepared }
        txStateC' = record txStateC { txState = txState' }
        dState' = record dState { activeDTx = just txStateC' }
        st' = DistributedState.baseState dState'
      in
        success (mkSuccess (Prepared readSet writeSet) st' [])

Vote Transaction

voteTxSem : TxId  Vote  DistributedState  AVMResult 
voteTxSem txid vote dState =
  let
    st = DistributedState.baseState dState
    dState' = record dState { pendingVotes = (txid , vote)  DistributedState.pendingVotes dState }
    entry = mkLogEntry (currentTimestamp st) (VoteCast txid vote) (State.controllerId st)
    st' = DistributedState.baseState dState'
  in
    success (mkSuccess tt st' (entry  []))

Finalize Transaction

finalizeTxSem : TxId  DistributedState  AVMResult Bool
finalizeTxSem txid dState
  with DistributedState.activeDTx dState
... | nothing = failure err-no-active-tx
... | just txStateC =
  let
    txState = TxStateWithConstraints.txState txStateC
    st = DistributedState.baseState dState
  in
    if TxState.txid txState == txid
    then applyCommitOrAbort txid txStateC st dState
    else failure (err-tx-conflict txid)
  where
    applyCommitOrAbort : TxId  TxStateWithConstraints  State  DistributedState
                        AVMResult Bool
    applyCommitOrAbort txid txStateC st dState
      with lookupVoteDecision txid dState
    ... | VoteCommit =
      -- Apply all changes atomically
      let
        stApplied = applyTransactionChanges st
        st' = record stApplied {
                tx = nothing
              ; currentVersion = incrementVersion (State.currentVersion st)
              }
        dState' = record dState {
                    baseState = st'
                  ; activeDTx = nothing
                  }
        entry = mkLogEntry (currentTimestamp st) (TransactionCommitted txid) (State.controllerId st)
        st'' = DistributedState.baseState dState'
      in
        success (mkSuccess true st'' (entry  []))

    ... | VoteAbort reason =
      -- Discard all changes
      let
        st' = record st {
                tx = nothing
              ; txLog = []
              ; creates = []
              ; destroys = []
              ; observed = []
              }
        dState' = record dState {
                    baseState = st'
                  ; activeDTx = nothing
                  }
        entry = mkLogEntry (currentTimestamp st) (TransactionAborted txid) (State.controllerId st)
        st'' = DistributedState.baseState dState'
      in
        success (mkSuccess false st'' (entry  []))

Request Transfer

requestTransferSem : ObjectId  ControllerId  DistributedState  AVMResult TransferId
requestTransferSem oid targetCid dState =
  let
    st = DistributedState.baseState dState
    tid = DistributedState.nextTransferId dState
    currentVer = mkLamport (versionToℕ (State.currentVersion st)) (State.controllerId st)

    request = mkTransferRequest tid oid (State.controllerId st) targetCid currentVer

    dState' = record dState {
                nextTransferId = suc tid
              ; pendingTransfers = (tid , request , Pending)  DistributedState.pendingTransfers dState
              }

    entry = mkLogEntry (currentTimestamp st) (TransferRequested tid oid targetCid) (State.controllerId st)
    st' = DistributedState.baseState dState'
  in
    success (mkSuccess tid st' (entry  []))

Accept Transfer

acceptTransferSem : TransferId  DistributedState  AVMResult Bool
acceptTransferSem tid dState
  with lookupTransfer tid dState
... | nothing = failure (err-transfer-not-found tid)
... | just (request , status) =
  let
    st = DistributedState.baseState dState
    oid = TransferRequest.objectId request
    sourceCid = TransferRequest.sourceController request
    targetCid = TransferRequest.targetController request
  in
        -- Verify we are the target controller
        if targetCid == State.controllerId st
        then (
          -- Update transfer status and apply transfer
          let
            dState' = updateTransferStatus tid Accepted dState
            st' = applyTransferToState oid targetCid st
            dState'' = record dState' { baseState = st' }
            entry = mkLogEntry (currentTimestamp st) (TransferAccepted tid oid) (State.controllerId st)
            st'' = DistributedState.baseState dState''
          in
            success (mkSuccess true st'' (entry  [])))
        else
          failure (err-unauthorized-transfer-id tid)

Pattern Synonyms for Ergonomics

Pattern synonyms would provide ergonomic names for distributed transaction and transfer instructions once they are integrated into the main AVM ISA:

-- Distributed transaction patterns (future work - requires ISA integration)
-- pattern dtx-begin-local = DistInstr (DTx beginLocalTx)
-- pattern dtx-begin-dist cids = DistInstr (DTx (beginDistributedTx cids))
-- pattern dtx-prepare txid = DistInstr (DTx (prepareTx txid))
-- pattern dtx-vote txid vote = DistInstr (DTx (voteTx txid vote))
-- pattern dtx-finalize txid = DistInstr (DTx (finalizeTx txid))

-- Transfer patterns (future work - requires ISA integration)
-- pattern transfer-request oid cid = DistInstr (Transfer (requestTransfer oid cid))
-- pattern transfer-accept tid = DistInstr (Transfer (acceptTransfer tid))
-- pattern transfer-reject tid reason = DistInstr (Transfer (rejectTransfer tid reason))

Usage Example: Bilateral Trade

Example showing atomic bilateral trade between two controllers (pseudocode - requires monadic operations and ISA integration):

-- Example: Atomic bilateral trade between two controllers
-- bilateralTrade : ObjectId → ObjectId → ControllerId → ControllerId → AVMProgram Bool
-- bilateralTrade obj_a obj_b controller_a controller_b = do
--   -- Begin distributed transaction with both controllers
--   txid ← beginDistributedTx (controller_a ∷ controller_b ∷ [])
--
--   -- Request transfers (each controller owns one object)
--   tid_a ← requestTransfer obj_a controller_b  -- A → B
--   tid_b ← requestTransfer obj_b controller_a  -- B → A
--
--   -- Prepare phase: each controller validates locally
--   result ← prepareTx txid
--
--   case result of λ where
--     (Prepared readSet writeSet) → do
--       -- Vote to commit
--       voteTx txid VoteCommit
--
--       -- Finalize: transfers happen atomically
--       success ← finalizeTx txid
--       return success
--
--     (PrepareConflict oid) → do
--       abortTx txid
--       return false
--
--     PrepareTimeout → do
--       abortTx txid
--       return false

Formal Properties (Specification)

Property 1: Atomicity

All participants reach the same decision (commit or abort).

postulate
  atomicity :  (txid : TxId)
                (participants : List ControllerId)
                (finalStates : List DistributedState) 
    let decisions = map  st  lookupTxStatus txid st) finalStates
    in
      (all  status  status == Committed) decisions  true)
      +
      (all  status  status == Aborted) decisions  true)

Property 2: Isolation (Snapshot Isolation)

No transaction observes partial results from concurrent transactions.

postulate
  snapshot-isolation :  (tx1 tx2 : TxId)
                         (oid : ObjectId)
                         (snapshot1 : Snapshot) 
    tx1  tx2 
    Snapshot.versions snapshot1 oid  just ver 
    txStatus tx2  Committed 
    ¬ (modifiedBy tx2 oid ver)

Property 3: Consistency

Committed transactions satisfy all constraints.

postulate
  consistency :  (txid : TxId)
                  (txStateC : TxStateWithConstraints)
                  (dState : DistributedState) 
    TxState.status (TxStateWithConstraints.txState txStateC)  Committed 
    allConstraintsSatisfied txStateC  true

Note: This module provides the formal type signatures and core semantics. Full interpreter implementation requires integration with the existing AVM.Interpreter module. See implementation roadmap in DistributedTransactions.md.


References to other modules

This page references the following modules: