- Introduced `ibc_example.rs` demonstrating Inter-Blockchain Communication operations including cross-chain transfers, channel management, packet handling, and relayer operations. - Introduced `zk_example.rs` showcasing Zero-Knowledge proof operations such as circuit compilation, proof generation and verification, and on-chain verification with multiple proving systems.
521 lines
15 KiB
Python
521 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Synor ZK SDK Examples for Python
|
|
|
|
Demonstrates Zero-Knowledge proof operations:
|
|
- Circuit compilation
|
|
- Proof generation and verification
|
|
- Groth16, PLONK, and STARK proving systems
|
|
- Recursive proofs
|
|
- On-chain verification
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import time
|
|
from synor_zk import (
|
|
SynorZk,
|
|
ZkConfig,
|
|
ProvingSystem,
|
|
CircuitFormat,
|
|
)
|
|
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
# Initialize client
|
|
config = ZkConfig(
|
|
api_key=os.environ.get("SYNOR_API_KEY", "your-api-key"),
|
|
endpoint="https://zk.synor.io/v1",
|
|
timeout=120000, # ZK ops can be slow
|
|
retries=3,
|
|
debug=False,
|
|
default_proving_system=ProvingSystem.GROTH16,
|
|
)
|
|
|
|
zk = SynorZk(config)
|
|
|
|
try:
|
|
# Check service health
|
|
healthy = await zk.health_check()
|
|
print(f"Service healthy: {healthy}\n")
|
|
|
|
# Example 1: Circuit compilation
|
|
await circuit_example(zk)
|
|
|
|
# Example 2: Proof generation
|
|
await proof_example(zk)
|
|
|
|
# Example 3: Different proving systems
|
|
await proving_systems_example(zk)
|
|
|
|
# Example 4: Recursive proofs
|
|
await recursive_proof_example(zk)
|
|
|
|
# Example 5: On-chain verification
|
|
await on_chain_verification_example(zk)
|
|
|
|
# Example 6: Trusted setup
|
|
await setup_example(zk)
|
|
finally:
|
|
await zk.close()
|
|
|
|
|
|
async def circuit_example(zk: SynorZk):
|
|
"""Circuit compilation."""
|
|
print("=== Circuit Compilation ===")
|
|
|
|
# Circom circuit example: prove knowledge of preimage
|
|
circom_circuit = """
|
|
pragma circom 2.1.0;
|
|
|
|
template HashPreimage() {
|
|
signal input preimage;
|
|
signal input hash;
|
|
|
|
// Simplified hash computation (in reality, use proper hash)
|
|
signal preimageSquared;
|
|
preimageSquared <== preimage * preimage;
|
|
|
|
// Constrain that hash matches
|
|
hash === preimageSquared;
|
|
}
|
|
|
|
component main {public [hash]} = HashPreimage();
|
|
"""
|
|
|
|
# Compile circuit
|
|
print("Compiling Circom circuit...")
|
|
compiled = await zk.circuits.compile(
|
|
code=circom_circuit,
|
|
format=CircuitFormat.CIRCOM,
|
|
name="hash_preimage",
|
|
proving_system=ProvingSystem.GROTH16,
|
|
)
|
|
|
|
print(f"Circuit compiled:")
|
|
print(f" Circuit ID: {compiled.circuit_id}")
|
|
print(f" Constraints: {compiled.constraint_count}")
|
|
print(f" Public inputs: {compiled.public_input_count}")
|
|
print(f" Private inputs: {compiled.private_input_count}")
|
|
print(f" Proving key size: {compiled.proving_key_size} bytes")
|
|
print(f" Verification key size: {compiled.verification_key_size} bytes")
|
|
|
|
# List circuits
|
|
circuits = await zk.circuits.list()
|
|
print(f"\nYour circuits: {len(circuits)}")
|
|
for circuit in circuits:
|
|
print(f" {circuit.name} ({circuit.circuit_id})")
|
|
|
|
# Get circuit details
|
|
details = await zk.circuits.get(compiled.circuit_id)
|
|
print(f"\nCircuit details:")
|
|
print(f" Format: {details.format}")
|
|
print(f" Proving system: {details.proving_system}")
|
|
print(f" Created: {details.created_at}")
|
|
|
|
# Download proving key
|
|
proving_key = await zk.circuits.get_proving_key(compiled.circuit_id)
|
|
print(f"\nProving key downloaded: {len(proving_key)} bytes")
|
|
|
|
# Download verification key
|
|
verification_key = await zk.circuits.get_verification_key(compiled.circuit_id)
|
|
print(f"Verification key downloaded: {len(verification_key)} bytes")
|
|
|
|
print()
|
|
|
|
|
|
async def proof_example(zk: SynorZk):
|
|
"""Proof generation and verification."""
|
|
print("=== Proof Generation ===")
|
|
|
|
# First, compile a simple circuit
|
|
circuit = """
|
|
pragma circom 2.1.0;
|
|
|
|
template Multiplier() {
|
|
signal input a;
|
|
signal input b;
|
|
signal output c;
|
|
|
|
c <== a * b;
|
|
}
|
|
|
|
component main {public [c]} = Multiplier();
|
|
"""
|
|
|
|
compiled = await zk.circuits.compile(
|
|
code=circuit,
|
|
format=CircuitFormat.CIRCOM,
|
|
name="multiplier",
|
|
proving_system=ProvingSystem.GROTH16,
|
|
)
|
|
|
|
# Generate proof
|
|
print("Generating proof...")
|
|
start_time = time.time()
|
|
|
|
proof = await zk.proofs.generate(
|
|
circuit_id=compiled.circuit_id,
|
|
inputs={"a": "3", "b": "7"},
|
|
)
|
|
|
|
proof_time = (time.time() - start_time) * 1000
|
|
print(f"Proof generated in {proof_time:.0f}ms")
|
|
print(f" Proof ID: {proof.proof_id}")
|
|
print(f" Proof size: {len(proof.proof)} bytes")
|
|
print(f" Public signals: {proof.public_signals}")
|
|
|
|
# Verify proof
|
|
print("\nVerifying proof...")
|
|
verify_start = time.time()
|
|
|
|
is_valid = await zk.proofs.verify(
|
|
circuit_id=compiled.circuit_id,
|
|
proof=proof.proof,
|
|
public_signals=proof.public_signals,
|
|
)
|
|
|
|
verify_time = (time.time() - verify_start) * 1000
|
|
print(f"Verification completed in {verify_time:.0f}ms")
|
|
print(f" Valid: {is_valid}")
|
|
|
|
# Verify with wrong public signals (should fail)
|
|
print("\nVerifying with wrong signals...")
|
|
invalid_result = await zk.proofs.verify(
|
|
circuit_id=compiled.circuit_id,
|
|
proof=proof.proof,
|
|
public_signals=["42"], # Wrong answer
|
|
)
|
|
print(f" Valid: {invalid_result} (expected False)")
|
|
|
|
# Get proof status
|
|
status = await zk.proofs.get_status(proof.proof_id)
|
|
print(f"\nProof status:")
|
|
print(f" State: {status.state}")
|
|
print(f" Verified: {status.verified}")
|
|
print(f" Created: {status.created_at}")
|
|
|
|
# List proofs
|
|
proofs = await zk.proofs.list(circuit_id=compiled.circuit_id)
|
|
print(f"\nProofs for circuit: {len(proofs)}")
|
|
|
|
print()
|
|
|
|
|
|
async def proving_systems_example(zk: SynorZk):
|
|
"""Different proving systems comparison."""
|
|
print("=== Proving Systems Comparison ===")
|
|
|
|
# Simple circuit for comparison
|
|
circuit = """
|
|
pragma circom 2.1.0;
|
|
|
|
template Comparison() {
|
|
signal input x;
|
|
signal input y;
|
|
signal output sum;
|
|
|
|
sum <== x + y;
|
|
}
|
|
|
|
component main {public [sum]} = Comparison();
|
|
"""
|
|
|
|
systems = [
|
|
ProvingSystem.GROTH16,
|
|
ProvingSystem.PLONK,
|
|
ProvingSystem.STARK,
|
|
]
|
|
|
|
print("Comparing proving systems:\n")
|
|
|
|
for system in systems:
|
|
print(f"{system.value}:")
|
|
|
|
# Compile for this system
|
|
compiled = await zk.circuits.compile(
|
|
code=circuit,
|
|
format=CircuitFormat.CIRCOM,
|
|
name=f"comparison_{system.value.lower()}",
|
|
proving_system=system,
|
|
)
|
|
|
|
# Generate proof
|
|
proof_start = time.time()
|
|
proof = await zk.proofs.generate(
|
|
circuit_id=compiled.circuit_id,
|
|
inputs={"x": "10", "y": "20"},
|
|
)
|
|
proof_time = (time.time() - proof_start) * 1000
|
|
|
|
# Verify proof
|
|
verify_start = time.time()
|
|
await zk.proofs.verify(
|
|
circuit_id=compiled.circuit_id,
|
|
proof=proof.proof,
|
|
public_signals=proof.public_signals,
|
|
)
|
|
verify_time = (time.time() - verify_start) * 1000
|
|
|
|
print(f" Setup: {compiled.setup_time}ms")
|
|
print(f" Proof time: {proof_time:.0f}ms")
|
|
print(f" Verify time: {verify_time:.0f}ms")
|
|
print(f" Proof size: {len(proof.proof)} bytes")
|
|
print(f" Verification key: {compiled.verification_key_size} bytes")
|
|
print()
|
|
|
|
print("Summary:")
|
|
print(" Groth16: Smallest proofs, fast verification, trusted setup required")
|
|
print(" PLONK: Universal setup, flexible, moderate proof size")
|
|
print(" STARK: No trusted setup, largest proofs, quantum resistant")
|
|
|
|
print()
|
|
|
|
|
|
async def recursive_proof_example(zk: SynorZk):
|
|
"""Recursive proof aggregation."""
|
|
print("=== Recursive Proofs ===")
|
|
|
|
# Inner circuit
|
|
inner_circuit = """
|
|
pragma circom 2.1.0;
|
|
|
|
template Inner() {
|
|
signal input x;
|
|
signal output y;
|
|
y <== x * x;
|
|
}
|
|
|
|
component main {public [y]} = Inner();
|
|
"""
|
|
|
|
# Compile inner circuit
|
|
inner = await zk.circuits.compile(
|
|
code=inner_circuit,
|
|
format=CircuitFormat.CIRCOM,
|
|
name="inner_circuit",
|
|
proving_system=ProvingSystem.GROTH16,
|
|
)
|
|
|
|
# Generate multiple proofs to aggregate
|
|
print("Generating proofs to aggregate...")
|
|
proofs_to_aggregate = []
|
|
for i in range(1, 5):
|
|
proof = await zk.proofs.generate(
|
|
circuit_id=inner.circuit_id,
|
|
inputs={"x": str(i)},
|
|
)
|
|
proofs_to_aggregate.append({
|
|
"proof": proof.proof,
|
|
"public_signals": proof.public_signals,
|
|
})
|
|
print(f" Proof {i}: y = {proof.public_signals[0]}")
|
|
|
|
# Aggregate proofs recursively
|
|
print("\nAggregating proofs...")
|
|
aggregated = await zk.proofs.aggregate(
|
|
circuit_id=inner.circuit_id,
|
|
proofs=proofs_to_aggregate,
|
|
aggregation_type="recursive",
|
|
)
|
|
|
|
original_size = sum(len(p["proof"]) for p in proofs_to_aggregate)
|
|
print(f"Aggregated proof:")
|
|
print(f" Proof ID: {aggregated.proof_id}")
|
|
print(f" Aggregated count: {aggregated.aggregated_count}")
|
|
print(f" Proof size: {len(aggregated.proof)} bytes")
|
|
print(f" Size reduction: {(1 - len(aggregated.proof) / original_size) * 100:.1f}%")
|
|
|
|
# Verify aggregated proof
|
|
is_valid = await zk.proofs.verify_aggregated(
|
|
circuit_id=inner.circuit_id,
|
|
proof=aggregated.proof,
|
|
public_signals_list=[p["public_signals"] for p in proofs_to_aggregate],
|
|
)
|
|
print(f"\nAggregated proof valid: {is_valid}")
|
|
|
|
# Batch verification (verify multiple proofs in one operation)
|
|
print("\nBatch verification...")
|
|
batch_result = await zk.proofs.batch_verify(
|
|
circuit_id=inner.circuit_id,
|
|
proofs=proofs_to_aggregate,
|
|
)
|
|
print(f" All valid: {batch_result.all_valid}")
|
|
print(f" Results: {', '.join(str(r) for r in batch_result.results)}")
|
|
|
|
print()
|
|
|
|
|
|
async def on_chain_verification_example(zk: SynorZk):
|
|
"""On-chain verification."""
|
|
print("=== On-Chain Verification ===")
|
|
|
|
# Compile circuit
|
|
circuit = """
|
|
pragma circom 2.1.0;
|
|
|
|
template VoteCommitment() {
|
|
signal input vote; // Private: actual vote
|
|
signal input nullifier; // Private: unique identifier
|
|
signal input commitment; // Public: commitment to verify
|
|
|
|
// Simplified commitment (in practice, use Poseidon hash)
|
|
signal computed;
|
|
computed <== vote * nullifier;
|
|
commitment === computed;
|
|
}
|
|
|
|
component main {public [commitment]} = VoteCommitment();
|
|
"""
|
|
|
|
compiled = await zk.circuits.compile(
|
|
code=circuit,
|
|
format=CircuitFormat.CIRCOM,
|
|
name="vote_commitment",
|
|
proving_system=ProvingSystem.GROTH16,
|
|
)
|
|
|
|
# Generate Solidity verifier
|
|
print("Generating Solidity verifier...")
|
|
solidity_verifier = await zk.contracts.generate_verifier(
|
|
circuit_id=compiled.circuit_id,
|
|
language="solidity",
|
|
optimized=True,
|
|
)
|
|
print(f"Solidity verifier generated: {len(solidity_verifier.code)} bytes")
|
|
print(f" Contract name: {solidity_verifier.contract_name}")
|
|
print(f" Gas estimate: {solidity_verifier.gas_estimate}")
|
|
|
|
# Generate proof
|
|
proof = await zk.proofs.generate(
|
|
circuit_id=compiled.circuit_id,
|
|
inputs={
|
|
"vote": "1", # Vote YES (1) or NO (0)
|
|
"nullifier": "12345",
|
|
"commitment": "12345", # 1 * 12345
|
|
},
|
|
)
|
|
|
|
# Format proof for on-chain verification
|
|
print("\nFormatting proof for on-chain...")
|
|
on_chain_proof = await zk.contracts.format_proof(
|
|
circuit_id=compiled.circuit_id,
|
|
proof=proof.proof,
|
|
public_signals=proof.public_signals,
|
|
format="calldata",
|
|
)
|
|
print(f" Calldata: {on_chain_proof.calldata[:100]}...")
|
|
print(f" Estimated gas: {on_chain_proof.gas_estimate}")
|
|
|
|
# Deploy verifier contract (simulation)
|
|
print("\nDeploying verifier contract...")
|
|
deployment = await zk.contracts.deploy_verifier(
|
|
circuit_id=compiled.circuit_id,
|
|
network="synor-testnet",
|
|
)
|
|
print(f" Contract address: {deployment.address}")
|
|
print(f" TX hash: {deployment.tx_hash}")
|
|
print(f" Gas used: {deployment.gas_used}")
|
|
|
|
# Verify on-chain
|
|
print("\nVerifying on-chain...")
|
|
on_chain_result = await zk.contracts.verify_on_chain(
|
|
contract_address=deployment.address,
|
|
proof=proof.proof,
|
|
public_signals=proof.public_signals,
|
|
network="synor-testnet",
|
|
)
|
|
print(f" TX hash: {on_chain_result.tx_hash}")
|
|
print(f" Verified: {on_chain_result.verified}")
|
|
print(f" Gas used: {on_chain_result.gas_used}")
|
|
|
|
# Generate verifier for other targets
|
|
print("\nGenerating verifiers for other targets:")
|
|
targets = ["cairo", "noir", "ink"]
|
|
for target in targets:
|
|
verifier = await zk.contracts.generate_verifier(
|
|
circuit_id=compiled.circuit_id,
|
|
language=target,
|
|
)
|
|
print(f" {target}: {len(verifier.code)} bytes")
|
|
|
|
print()
|
|
|
|
|
|
async def setup_example(zk: SynorZk):
|
|
"""Trusted setup ceremonies."""
|
|
print("=== Trusted Setup ===")
|
|
|
|
# Get available ceremonies
|
|
ceremonies = await zk.setup.list_ceremonies()
|
|
print(f"Active ceremonies: {len(ceremonies)}")
|
|
for ceremony in ceremonies:
|
|
print(f" {ceremony.name}:")
|
|
print(f" Status: {ceremony.status}")
|
|
print(f" Participants: {ceremony.participant_count}")
|
|
print(f" Current round: {ceremony.current_round}")
|
|
|
|
# Create a new circuit setup
|
|
circuit = """
|
|
pragma circom 2.1.0;
|
|
|
|
template NewCircuit() {
|
|
signal input a;
|
|
signal output b;
|
|
b <== a + 1;
|
|
}
|
|
|
|
component main {public [b]} = NewCircuit();
|
|
"""
|
|
|
|
print("\nInitializing setup for new circuit...")
|
|
setup = await zk.setup.initialize(
|
|
circuit=circuit,
|
|
format=CircuitFormat.CIRCOM,
|
|
name="new_circuit_setup",
|
|
proving_system=ProvingSystem.GROTH16,
|
|
ceremony_type="powers_of_tau", # or 'phase2'
|
|
)
|
|
print(f"Setup initialized:")
|
|
print(f" Ceremony ID: {setup.ceremony_id}")
|
|
print(f" Powers of Tau required: {setup.powers_required}")
|
|
print(f" Current phase: {setup.phase}")
|
|
|
|
# Contribute to ceremony (in practice, generates random entropy)
|
|
print("\nContributing to ceremony...")
|
|
contribution = await zk.setup.contribute(
|
|
ceremony_id=setup.ceremony_id,
|
|
entropy=b"random-entropy-from-user".hex(),
|
|
)
|
|
print(f"Contribution submitted:")
|
|
print(f" Participant: {contribution.participant_id}")
|
|
print(f" Contribution hash: {contribution.hash}")
|
|
print(f" Verification status: {contribution.verified}")
|
|
|
|
# Get ceremony status
|
|
status = await zk.setup.get_status(setup.ceremony_id)
|
|
print(f"\nCeremony status:")
|
|
print(f" Phase: {status.phase}")
|
|
print(f" Total contributions: {status.total_contributions}")
|
|
print(f" Verified contributions: {status.verified_contributions}")
|
|
print(f" Ready for finalization: {status.ready_for_finalization}")
|
|
|
|
# Finalize setup (when enough contributions)
|
|
if status.ready_for_finalization:
|
|
print("\nFinalizing setup...")
|
|
finalized = await zk.setup.finalize(setup.ceremony_id)
|
|
print(f"Setup finalized:")
|
|
print(f" Proving key hash: {finalized.proving_key_hash}")
|
|
print(f" Verification key hash: {finalized.verification_key_hash}")
|
|
print(f" Contribution transcript: {finalized.transcript_cid}")
|
|
|
|
# Download ceremony transcript
|
|
transcript = await zk.setup.get_transcript(setup.ceremony_id)
|
|
print(f"\nCeremony transcript: {len(transcript)} bytes")
|
|
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|