#!/usr/bin/env python3 """ Synor IBC SDK Examples for Python Demonstrates Inter-Blockchain Communication operations: - Cross-chain token transfers - Channel management - Packet handling - Relayer operations """ import asyncio import os from datetime import datetime from synor_ibc import ( SynorIbc, IbcConfig, Network, ChannelState, ) async def main(): """Main entry point.""" # Initialize client config = IbcConfig( api_key=os.environ.get("SYNOR_API_KEY", "your-api-key"), endpoint="https://ibc.synor.io/v1", timeout=30000, retries=3, debug=False, default_network=Network.MAINNET, ) ibc = SynorIbc(config) try: # Check service health healthy = await ibc.health_check() print(f"Service healthy: {healthy}\n") # Example 1: Connected chains await chains_example(ibc) # Example 2: Channel management await channels_example(ibc) # Example 3: Token transfers await transfer_example(ibc) # Example 4: Packet tracking await packet_example(ibc) # Example 5: Relayer operations await relayer_example(ibc) # Example 6: Connection info await connection_example(ibc) finally: await ibc.close() async def chains_example(ibc: SynorIbc): """Connected chains information.""" print("=== Connected Chains ===") # Get all connected chains chains = await ibc.chains.list() print(f"Connected chains: {len(chains)}") for chain in chains: print(f" {chain.chain_id}:") print(f" Name: {chain.name}") print(f" Status: {chain.status}") print(f" Block height: {chain.latest_height}") print(f" Channels: {chain.channel_count}") # Get specific chain info cosmos = await ibc.chains.get("cosmoshub-4") print(f"\nCosmos Hub details:") print(f" RPC: {cosmos.rpc_endpoint}") print(f" Rest: {cosmos.rest_endpoint}") print(f" Native denom: {cosmos.native_denom}") print(f" Prefix: {cosmos.bech32_prefix}") # Get supported assets on a chain assets = await ibc.chains.get_assets("cosmoshub-4") print(f"\nSupported assets on Cosmos Hub:") for asset in assets[:5]: print(f" {asset.symbol}: {asset.denom}") print(f" Origin: {asset.origin_chain}") print(f" Decimals: {asset.decimals}") # Get chain paths (routes) paths = await ibc.chains.get_paths("synor-1", "cosmoshub-4") print(f"\nPaths from Synor to Cosmos Hub:") for path in paths: print(f" {path.source_channel} -> {path.dest_channel}") print(f" Hops: {path.hops}") print(f" Avg time: {path.avg_transfer_time}s") print() async def channels_example(ibc: SynorIbc): """Channel management.""" print("=== Channel Management ===") # List all channels channels = await ibc.channels.list() print(f"Total channels: {len(channels)}") # Filter by state open_channels = [c for c in channels if c.state == ChannelState.OPEN] print(f"Open channels: {len(open_channels)}") for channel in open_channels[:3]: print(f"\n Channel {channel.channel_id}:") print(f" Port: {channel.port_id}") print(f" Counterparty: {channel.counterparty_channel_id} on {channel.counterparty_chain_id}") print(f" Ordering: {channel.ordering}") print(f" Version: {channel.version}") print(f" State: {channel.state}") # Get specific channel channel = await ibc.channels.get("channel-0") print(f"\nChannel-0 details:") print(f" Connection: {channel.connection_id}") print(f" Counterparty port: {channel.counterparty_port_id}") # Get channel statistics stats = await ibc.channels.get_stats("channel-0") print(f"\nChannel-0 statistics:") print(f" Total packets sent: {stats.packets_sent}") print(f" Total packets received: {stats.packets_received}") print(f" Pending packets: {stats.pending_packets}") print(f" Success rate: {stats.success_rate}%") print(f" Avg relay time: {stats.avg_relay_time}s") # Get channel capacity capacity = await ibc.channels.get_capacity("channel-0") print(f"\nChannel-0 capacity:") print(f" Max throughput: {capacity.max_packets_per_block} packets/block") print(f" Current utilization: {capacity.utilization}%") print() async def transfer_example(ibc: SynorIbc): """Cross-chain token transfers.""" print("=== Cross-Chain Transfers ===") # Estimate transfer fee estimate = await ibc.transfers.estimate_fee( source_chain="synor-1", dest_chain="cosmoshub-4", denom="usyn", amount="1000000", # 1 SYN (6 decimals) ) print("Transfer fee estimate:") print(f" Gas: {estimate.gas}") print(f" Fee: {estimate.fee} {estimate.fee_denom}") print(f" Timeout: {estimate.timeout}s") # Initiate a transfer print("\nInitiating transfer...") transfer = await ibc.transfers.send( source_chain="synor-1", dest_chain="cosmoshub-4", channel="channel-0", sender="synor1abc...", # Your address receiver="cosmos1xyz...", # Recipient address denom="usyn", amount="1000000", # 1 SYN memo="Cross-chain transfer example", timeout_height=0, # Use timestamp instead timeout_timestamp=int(datetime.now().timestamp() * 1000) + 600000, # 10 minutes ) print(f"Transfer initiated:") print(f" TX Hash: {transfer.tx_hash}") print(f" Sequence: {transfer.sequence}") print(f" Status: {transfer.status}") # Track transfer status print("\nTracking transfer...") status = await ibc.transfers.get_status(transfer.tx_hash) print(f"Current status: {status.state}") print(f" Source confirmed: {status.source_confirmed}") print(f" Relayed: {status.relayed}") print(f" Dest confirmed: {status.dest_confirmed}") # Get transfer history history = await ibc.transfers.get_history(address="synor1abc...", limit=5) print(f"\nTransfer history (last 5):") for tx in history: direction = "OUT" if tx.sender == "synor1abc..." else "IN" print(f" {direction} {tx.amount} {tx.denom} ({tx.status})") # Get pending transfers pending = await ibc.transfers.get_pending("synor1abc...") print(f"\nPending transfers: {len(pending)}") print() async def packet_example(ibc: SynorIbc): """Packet handling.""" print("=== Packet Handling ===") # Get pending packets packets = await ibc.packets.get_pending("channel-0") print(f"Pending packets on channel-0: {len(packets)}") for packet in packets[:3]: print(f"\n Packet {packet.sequence}:") print(f" Source: {packet.source_port}/{packet.source_channel}") print(f" Dest: {packet.dest_port}/{packet.dest_channel}") print(f" State: {packet.state}") print(f" Data size: {len(packet.data)} bytes") print(f" Timeout height: {packet.timeout_height}") print(f" Timeout timestamp: {packet.timeout_timestamp}") # Get packet by sequence packet = await ibc.packets.get("channel-0", 1) print(f"\nPacket details:") print(f" Commitment: {packet.commitment}") print(f" Receipt: {packet.receipt}") print(f" Acknowledgement: {packet.acknowledgement}") # Get packet receipts receipts = await ibc.packets.get_receipts("channel-0", [1, 2, 3]) print(f"\nPacket receipts:") for seq, receipt in receipts.items(): print(f" Sequence {seq}: {'received' if receipt else 'not received'}") # Get timed out packets timed_out = await ibc.packets.get_timed_out("channel-0") print(f"\nTimed out packets: {len(timed_out)}") for p in timed_out: print(f" Sequence {p.sequence}: timeout at {p.timeout_timestamp}") # Get unreceived packets unreceived = await ibc.packets.get_unreceived("channel-0") print(f"\nUnreceived packet sequences: {', '.join(map(str, unreceived)) or 'none'}") # Get unacknowledged packets unacked = await ibc.packets.get_unacknowledged("channel-0") print(f"Unacknowledged packet sequences: {', '.join(map(str, unacked)) or 'none'}") print() async def relayer_example(ibc: SynorIbc): """Relayer operations.""" print("=== Relayer Operations ===") # Get active relayers relayers = await ibc.relayers.list() print(f"Active relayers: {len(relayers)}") for relayer in relayers[:3]: print(f"\n {relayer.address}:") print(f" Chains: {', '.join(relayer.chains)}") print(f" Packets relayed: {relayer.packets_relayed}") print(f" Success rate: {relayer.success_rate}%") print(f" Avg latency: {relayer.avg_latency}ms") print(f" Fee rate: {relayer.fee_rate}%") # Get relayer statistics stats = await ibc.relayers.get_stats() print("\nGlobal relayer statistics:") print(f" Total relayers: {stats.total_relayers}") print(f" Active relayers: {stats.active_relayers}") print(f" Packets relayed (24h): {stats.packets_relayed_24h}") print(f" Total fees earned: {stats.total_fees_earned}") # Register as a relayer print("\nRegistering as relayer...") registration = await ibc.relayers.register( chains=["synor-1", "cosmoshub-4"], fee_rate=0.1, # 0.1% fee min_packet_size=0, max_packet_size=1000000, ) print(f"Registered with address: {registration.relayer_address}") # Start relaying (in background) print("\nStarting relay service...") relay_session = await ibc.relayers.start_relay( channels=["channel-0"], auto_ack=True, batch_size=10, poll_interval=5000, ) print(f"Relay session started: {relay_session.session_id}") # Get relay queue queue = await ibc.relayers.get_queue("channel-0") print(f"\nRelay queue for channel-0: {len(queue)} packets") # Manually relay a packet if queue: print("\nRelaying packet...") relay_result = await ibc.relayers.relay_packet(queue[0].sequence, "channel-0") print(f"Relay result: {relay_result.status}") print(f" TX Hash: {relay_result.tx_hash}") print(f" Fee earned: {relay_result.fee_earned}") # Stop relay session await ibc.relayers.stop_relay(relay_session.session_id) print("\nRelay session stopped") print() async def connection_example(ibc: SynorIbc): """Connection information.""" print("=== Connection Information ===") # List connections connections = await ibc.connections.list() print(f"Total connections: {len(connections)}") for conn in connections[:3]: print(f"\n {conn.connection_id}:") print(f" Client: {conn.client_id}") print(f" Counterparty: {conn.counterparty_connection_id}") print(f" State: {conn.state}") print(f" Versions: {', '.join(conn.versions)}") # Get connection details connection = await ibc.connections.get("connection-0") print(f"\nConnection-0 details:") print(f" Delay period: {connection.delay_period}ns") print(f" Counterparty client: {connection.counterparty_client_id}") print(f" Counterparty prefix: {connection.counterparty_prefix}") # Get client state client = await ibc.clients.get(connection.client_id) print(f"\nClient state:") print(f" Chain ID: {client.chain_id}") print(f" Trust level: {client.trust_level}") print(f" Trusting period: {client.trusting_period}") print(f" Unbonding period: {client.unbonding_period}") print(f" Latest height: {client.latest_height}") print(f" Frozen: {client.frozen}") # Get consensus state consensus = await ibc.clients.get_consensus_state(connection.client_id, client.latest_height) print(f"\nConsensus state at height {client.latest_height}:") print(f" Timestamp: {consensus.timestamp}") print(f" Root: {consensus.root[:20]}...") print(f" Next validators hash: {consensus.next_validators_hash[:20]}...") print() if __name__ == "__main__": asyncio.run(main())