Skip to content
Open
24 changes: 24 additions & 0 deletions docs/AGENT-PIPE-TEST-LOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Agent Pipe Test Log (Warm Idris)

Purpose: capture real-world StackFlow pipe test outcomes so onboarding for other agents is based on observed behavior, not assumptions.

## Per-test template

- Timestamp (UTC):
- Counterparty:
- Pipe identifier / contract:
- Scenario:
- Preconditions:
- Action executed:
- Expected result:
- Observed result:
- Artifacts (txid, signatures, nonce, logs):
- Pass/Fail:
- Root cause (if fail):
- Fix / mitigation:
- Process improvement for future agents:

---

## Run log

60 changes: 59 additions & 1 deletion packages/stackflow-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,69 @@ watcher.start();
8. `disputeClosure(...)`
9. `watcher.runOnce()` or `watcher.start()` for hourly checks

## Quick workflow (setup pipe + send + receive)

1. Track pipe locally:

```js
const tracked = agent.trackPipe({
contractId: "ST...stackflow-0-6-0",
pipeKey: { "principal-1": "SP...ME", "principal-2": "SP...THEM", token: null },
localPrincipal: "SP...ME",
counterpartyPrincipal: "SP...THEM",
});
```

2. Open/fund the pipe on-chain:

```js
await agent.openPipe({
contractId: tracked.contractId,
token: null,
amount: "1000",
counterpartyPrincipal: tracked.counterpartyPrincipal,
nonce: "0",
});
```

3. Build an outgoing state update to send to counterparty:

```js
const outgoing = agent.buildOutgoingTransfer({
pipeId: tracked.pipeId,
amount: "25",
// actor defaults to tracked.localPrincipal
});
```

4. Validate + accept incoming counterparty update:

```js
const result = await agent.acceptIncomingTransfer({
pipeId: tracked.pipeId,
payload: {
...outgoing,
actor: tracked.counterpartyPrincipal,
theirSignature: "0x...",
},
});
```

5. Persisted local latest state is now available via `getPipeLatestState(...)`.

## Notes

1. This scaffold intentionally avoids observer endpoints and local chain node.
2. The watcher interval defaults to one hour; dispute window is still 144 BTC blocks.
3. `HourlyClosureWatcher` supports two sources:
- `getPipeState` (recommended): per-pipe read-only polling (`get-pipe`)
- `listClosureEvents`: event scan mode
4. For production hardening, add alerting, signer balance checks, and idempotency audit logs.
4. Watcher retries are idempotent for already-disputed closures (same closure txid is skipped on later polls).
5. Read-only polling isolates per-pipe failures (`getPipeState` errors on one pipe do not stop others).
6. Event scan mode intentionally holds the cursor when any dispute submission errors occur, so failed disputes are retried on next run.
7. Event scan mode now reports `listErrors` (event source/indexer failures) and keeps the watcher cursor unchanged on those failures.
8. Invalid closure event payloads are skipped and counted in `invalidEvents` so one malformed record does not abort a full scan.
9. `buildOutgoingTransfer(...)` defaults `actor` to the tracked local principal and rejects mismatched actor values.
10. Incoming transfer validation enforces tracked contract/pipe/principals/token consistency; mismatched `pipeId`, `pipeKey`, `actor`, or token payloads are rejected.
11. Incoming transfer validation also enforces sequential nonces and balance invariants against the latest stored local state (same total balance, and counterparty-actor updates must not reduce local balance).
12. For production hardening, add alerting, signer balance checks, and idempotency audit logs.
107 changes: 104 additions & 3 deletions packages/stackflow-agent/src/agent-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export class StackflowAgentService {
buildOutgoingTransfer({
pipeId,
amount,
actor,
actor = null,
action = "1",
secret = null,
validAfter = null,
Expand Down Expand Up @@ -134,6 +134,14 @@ export class StackflowAgentService {
const nextTheir = currentTheir + transferAmount;
const nextNonce = currentNonce + 1n;

const normalizedActor =
actor == null || String(actor).trim() === ""
? tracked.localPrincipal
: assertNonEmptyString(actor, "actor");
if (normalizedActor !== tracked.localPrincipal) {
throw new Error("actor must match tracked local principal");
}

return {
contractId: tracked.contractId,
pipeKey: tracked.pipeKey,
Expand All @@ -144,7 +152,7 @@ export class StackflowAgentService {
theirBalance: nextTheir.toString(10),
nonce: nextNonce.toString(10),
action: toUnsignedString(action, "action"),
actor: assertNonEmptyString(actor, "actor"),
actor: normalizedActor,
secret,
validAfter,
beneficialOnly: beneficialOnly === true,
Expand Down Expand Up @@ -173,6 +181,41 @@ export class StackflowAgentService {
reason: "contract-mismatch",
};
}

if (data.pipeId != null && String(data.pipeId).trim() !== tracked.pipeId) {
return {
valid: false,
reason: "pipe-id-mismatch",
};
}

if (data.pipeKey != null) {
if (!data.pipeKey || typeof data.pipeKey !== "object" || Array.isArray(data.pipeKey)) {
return {
valid: false,
reason: "pipe-key-invalid",
};
}
let incomingPipeId;
try {
incomingPipeId = buildPipeId({
contractId,
pipeKey: data.pipeKey,
});
} catch {
return {
valid: false,
reason: "pipe-key-invalid",
};
}
if (incomingPipeId !== tracked.pipeId) {
return {
valid: false,
reason: "pipe-key-mismatch",
};
}
}

const forPrincipal = String(data.forPrincipal ?? "").trim();
if (forPrincipal !== tracked.localPrincipal) {
return {
Expand All @@ -187,6 +230,17 @@ export class StackflowAgentService {
reason: "with-principal-mismatch",
};
}

const trackedToken =
tracked.token == null ? null : String(tracked.token).trim();
const payloadToken =
data.token == null ? trackedToken : String(data.token).trim();
if (payloadToken !== trackedToken) {
return {
valid: false,
reason: "token-mismatch",
};
}
const theirSignature = (() => {
try {
return normalizeHex(data.theirSignature, "theirSignature");
Expand Down Expand Up @@ -222,6 +276,12 @@ export class StackflowAgentService {
reason: "actor-missing",
};
}
if (actor !== tracked.counterpartyPrincipal) {
return {
valid: false,
reason: "actor-mismatch",
};
}
const latest = this.stateStore.getLatestSignatureState(
tracked.pipeId,
tracked.localPrincipal,
Expand All @@ -236,6 +296,47 @@ export class StackflowAgentService {
existingNonce: latest.nonce,
};
}
if (incomingNonce !== existingNonce + 1n) {
return {
valid: false,
reason: "nonce-not-sequential",
existingNonce: latest.nonce,
};
}

const existingMyBalance = parseUnsignedBigInt(
latest.myBalance,
"existing myBalance",
);
const existingTheirBalance = parseUnsignedBigInt(
latest.theirBalance,
"existing theirBalance",
);
const incomingMyBalance = parseUnsignedBigInt(myBalance, "incoming myBalance");
const incomingTheirBalance = parseUnsignedBigInt(
theirBalance,
"incoming theirBalance",
);

if (
incomingMyBalance + incomingTheirBalance !==
existingMyBalance + existingTheirBalance
) {
return {
valid: false,
reason: "balance-sum-mismatch",
};
}

if (
incomingMyBalance < existingMyBalance ||
incomingTheirBalance > existingTheirBalance
) {
return {
valid: false,
reason: "balance-direction-invalid",
};
}
}

let secret = null;
Expand All @@ -256,7 +357,7 @@ export class StackflowAgentService {
pipeKey: tracked.pipeKey,
forPrincipal,
withPrincipal,
token: data.token == null ? tracked.token : String(data.token).trim(),
token: trackedToken,
myBalance,
theirBalance,
nonce,
Expand Down
26 changes: 26 additions & 0 deletions packages/stackflow-agent/src/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ export class AgentStateStore {
dispute_txid = ?
WHERE txid = ?
`);
this.getClosureStmt = this.db.prepare(`
SELECT * FROM closures WHERE txid = ?
`);

this.getCursorStmt = this.db.prepare(`
SELECT last_block_height FROM watcher_cursor WHERE id = 1
Expand Down Expand Up @@ -386,6 +389,29 @@ export class AgentStateStore {
);
}

getClosure(txid) {
this.assertOpen();
const row = this.getClosureStmt.get(assertNonEmptyString(txid, "txid"));
if (!row) {
return null;
}
return {
txid: row.txid,
contractId: row.contract_id,
pipeId: row.pipe_id,
pipeKey: JSON.parse(row.pipe_key_json),
eventName: row.event_name,
nonce: row.nonce,
closer: row.closer,
blockHeight: row.block_height,
expiresAt: row.expires_at,
closureMyBalance: row.closure_my_balance,
disputed: row.disputed === 1,
disputeTxid: row.dispute_txid,
createdAt: row.created_at,
};
}

getWatcherCursor() {
this.assertOpen();
const row = this.getCursorStmt.get();
Expand Down
Loading