Api

Events & Subscriptions

Real-time streaming updates for market data, orders, and swaps

Hydra provides real-time streaming updates through gRPC subscriptions. Subscribe to market events, order updates, balance changes, and swap progress.

Available Subscriptions


Subscribe Market Events

Stream real-time public market updates for a specific trading pair.

Service: OrderbookServiceMethod: SubscribeMarketEvents

Parameters:

NameTypeRequiredDescription
baseOrderbookCurrencyYESBase currency
quoteOrderbookCurrencyYESQuote currency

Response: Stream of MarketEvent

MarketEvent Types

EventDescriptionFields
is_syncedInitial sync completebool - Always true when synced
orderbook_updateOrderbook changedOrderbookUpdate - Added/removed orders
trade_updateTrade executedTrade - Trade details
daily_stats_update24h stats updatedMarketDailyStats - Volume, price stats
candlestick_updateNew candlestick dataCandlestickUpdate - OHLCV data

OrderbookUpdate

FieldTypeDescription
updated_ordersmap<string, LiquidityPosition>Modified orders
removed_ordersstring[]Removed order IDs

Trade

FieldTypeDescription
taker_order_idstringTaker's order ID
base_amountDecimalStringBase amount traded
quote_amountDecimalStringQuote amount traded
priceDecimalStringExecution price
final_priceDecimalStringPrice after fees
timestampTimestampTrade time
maker_order_sideOrderSideBUY or SELL

MarketDailyStats

FieldTypeDescription
volatilityMarketVolatility24h price and volume data

MarketVolatility:

FieldTypeDescription
first_priceDecimalStringOpening price (24h ago)
last_priceDecimalStringCurrent price
high_priceDecimalStringHighest price (24h)
low_priceDecimalStringLowest price (24h)
base_volumeDecimalString24h volume in base
quote_volumeDecimalString24h volume in quote

CandlestickUpdate

FieldTypeDescription
intervalCandlestickIntervalTime interval
candlestickCandlestickOHLCV data

CandlestickInterval Enum:

ONE_MINUTE, THREE_MINUTES, FIVE_MINUTES, FIFTEEN_MINUTES, THIRTY_MINUTES, ONE_HOUR, TWO_HOURS, FOUR_HOURS, SIX_HOURS, EIGHT_HOURS, TWELVE_HOURS, ONE_DAY, THREE_DAYS, ONE_WEEK, ONE_MONTH

Candlestick:

FieldTypeDescription
timestampTimestampCandle timestamp
openDecimalStringOpening price
closeDecimalStringClosing price
highDecimalStringHighest price
lowDecimalStringLowest price
base_volumeDecimalStringVolume in base
quote_volumeDecimalStringVolume in quote

Example Usage

import { OrderbookServiceClient } from './proto/OrderbookServiceClientPb'
import { SubscribeMarketEventsRequest } from './proto/orderbook_pb'

const client = new OrderbookServiceClient('http://localhost:50051')

const request = new SubscribeMarketEventsRequest()
request.setBase({
  network: { protocol: 0, chainId: '0', name: 'Bitcoin' },
  assetId: 'BTC'
})
request.setQuote({
  network: { protocol: 1, chainId: '1', name: 'Ethereum' },
  assetId: '0x0000000000000000000000000000000000000000'
})

const stream = client.subscribeMarketEvents(request, {})

stream.on('data', (event) => {
  if (event.hasIsSynced()) {
    console.log('✓ Market data synced')
  }

  if (event.hasOrderbookUpdate()) {
    const update = event.getOrderbookUpdate()
    const updatedOrders = update.getUpdatedOrdersMap()
    const removedOrders = update.getRemovedOrdersList()

    console.log('Orderbook updated:')
    console.log('  Modified:', updatedOrders.size)
    console.log('  Removed:', removedOrders.length)
  }

  if (event.hasTradeUpdate()) {
    const trade = event.getTradeUpdate()
    console.log('Trade executed:')
    console.log('  Amount:', trade.getBaseAmount())
    console.log('  Price:', trade.getPrice())
    console.log('  Side:', trade.getMakerOrderSide() === 0 ? 'BUY' : 'SELL')
  }

  if (event.hasDailyStatsUpdate()) {
    const stats = event.getDailyStatsUpdate()
    const vol = stats.getVolatility()
    console.log('24h Stats:')
    console.log('  High:', vol.getHighPrice())
    console.log('  Low:', vol.getLowPrice())
    console.log('  Volume:', vol.getBaseVolume())
  }

  if (event.hasCandlestickUpdate()) {
    const candleUpdate = event.getCandlestickUpdate()
    const candle = candleUpdate.getCandlestick()
    console.log('New candle:', candleUpdate.getInterval())
    console.log('  O:', candle.getOpen())
    console.log('  H:', candle.getHigh())
    console.log('  L:', candle.getLow())
    console.log('  C:', candle.getClose())
  }
})

stream.on('error', (err) => {
  console.error('Stream error:', err)
})

stream.on('end', () => {
  console.log('Stream ended')
})
Go Example
package main

import (
    "context"
    "fmt"
    "io"
    "log"

    pb "path/to/proto"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

func main() {
    conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()

    client := pb.NewOrderbookServiceClient(conn)

    request := &pb.SubscribeMarketEventsRequest{
        Base: &pb.OrderbookCurrency{
            Network: &pb.Network{
                Protocol: pb.Protocol_BITCOIN,
                ChainId:  "0",
                Name:     "Bitcoin",
            },
            AssetId: "BTC",
        },
        Quote: &pb.OrderbookCurrency{
            Network: &pb.Network{
                Protocol: pb.Protocol_EVM,
                ChainId:  "1",
                Name:     "Ethereum",
            },
            AssetId: "0x0000000000000000000000000000000000000000",
        },
    }

    stream, err := client.SubscribeMarketEvents(context.Background(), request)
    if err != nil {
        log.Fatalf("Failed to subscribe: %v", err)
    }

    for {
        event, err := stream.Recv()
        if err == io.EOF {
            fmt.Println("Stream ended")
            break
        }
        if err != nil {
            log.Fatalf("Stream error: %v", err)
        }

        if event.GetIsSynced() {
            fmt.Println("✓ Market data synced")
        }

        if update := event.GetOrderbookUpdate(); update != nil {
            updatedOrders := update.GetUpdatedOrders()
            removedOrders := update.GetRemovedOrders()

            fmt.Println("Orderbook updated:")
            fmt.Printf("  Modified: %d\n", len(updatedOrders))
            fmt.Printf("  Removed: %d\n", len(removedOrders))
        }

        if trade := event.GetTradeUpdate(); trade != nil {
            fmt.Println("Trade executed:")
            fmt.Printf("  Amount: %s\n", trade.GetBaseAmount())
            fmt.Printf("  Price: %s\n", trade.GetPrice())
            side := "SELL"
            if trade.GetMakerOrderSide() == pb.OrderSide_BUY {
                side = "BUY"
            }
            fmt.Printf("  Side: %s\n", side)
        }

        if stats := event.GetDailyStatsUpdate(); stats != nil {
            vol := stats.GetVolatility()
            fmt.Println("24h Stats:")
            fmt.Printf("  High: %s\n", vol.GetHighPrice())
            fmt.Printf("  Low: %s\n", vol.GetLowPrice())
            fmt.Printf("  Volume: %s\n", vol.GetBaseVolume())
        }

        if candleUpdate := event.GetCandlestickUpdate(); candleUpdate != nil {
            candle := candleUpdate.GetCandlestick()
            fmt.Printf("New candle: %v\n", candleUpdate.GetInterval())
            fmt.Printf("  O: %s\n", candle.GetOpen())
            fmt.Printf("  H: %s\n", candle.GetHigh())
            fmt.Printf("  L: %s\n", candle.GetLow())
            fmt.Printf("  C: %s\n", candle.GetClose())
        }
    }
}
Rust Example
use futures::stream::StreamExt;
use proto::orderbook_service_client::OrderbookServiceClient;
use proto::{Network, OrderbookCurrency, Protocol, SubscribeMarketEventsRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = OrderbookServiceClient::connect("http://localhost:50051").await?;

    let request = SubscribeMarketEventsRequest {
        base: Some(OrderbookCurrency {
            network: Some(Network {
                protocol: Protocol::Bitcoin as i32,
                chain_id: "0".to_string(),
                name: "Bitcoin".to_string(),
            }),
            asset_id: "BTC".to_string(),
        }),
        quote: Some(OrderbookCurrency {
            network: Some(Network {
                protocol: Protocol::Evm as i32,
                chain_id: "1".to_string(),
                name: "Ethereum".to_string(),
            }),
            asset_id: "0x0000000000000000000000000000000000000000".to_string(),
        }),
    };

    let mut stream = client.subscribe_market_events(request).await?.into_inner();

    while let Some(event) = stream.next().await {
        match event {
            Ok(event) => {
                if event.is_synced {
                    println!("✓ Market data synced");
                }

                if let Some(update) = &event.orderbook_update {
                    println!("Orderbook updated:");
                    println!("  Modified: {}", update.updated_orders.len());
                    println!("  Removed: {}", update.removed_orders.len());
                }

                if let Some(trade) = &event.trade_update {
                    println!("Trade executed:");
                    println!("  Amount: {}", trade.base_amount);
                    println!("  Price: {}", trade.price);
                    let side = if trade.maker_order_side == 0 { "BUY" } else { "SELL" };
                    println!("  Side: {}", side);
                }

                if let Some(stats) = &event.daily_stats_update {
                    if let Some(vol) = &stats.volatility {
                        println!("24h Stats:");
                        println!("  High: {}", vol.high_price);
                        println!("  Low: {}", vol.low_price);
                        println!("  Volume: {}", vol.base_volume);
                    }
                }

                if let Some(candle_update) = &event.candlestick_update {
                    if let Some(candle) = &candle_update.candlestick {
                        println!("New candle: {:?}", candle_update.interval);
                        println!("  O: {}", candle.open);
                        println!("  H: {}", candle.high);
                        println!("  L: {}", candle.low);
                        println!("  C: {}", candle.close);
                    }
                }
            }
            Err(e) => {
                eprintln!("Stream error: {}", e);
                break;
            }
        }
    }

    println!("Stream ended");
    Ok(())
}

Subscribe DEX Events

Stream your personal trading activity including orders, balances, and swaps.

Service: OrderbookServiceMethod: SubscribeDexEvents

Parameters: None

Response: Stream of DexEvent

DexEvent Types

EventDescription
is_syncedInitial sync complete
balance_updateYour balance changed
order_updateYour order created/updated/completed/cancelled
order_matchedYour order matched with counterparty
swap_updateSwap progress update
market_trade_updateYour market trade completed
swap_trade_updateYour swap trade completed

BalanceUpdate

FieldTypeDescription
currencyOrderbookCurrencyCurrency identifier
balanceCurrencyBalanceUpdated balance

CurrencyBalance:

FieldTypeDescription
sendingDecimalStringAvailable to send
receivingDecimalStringAvailable to receive
pending_sendingDecimalStringPending sends
pending_receivingDecimalStringPending receives
unavailable_sendingDecimalStringLocked/unavailable for sending
unavailable_receivingDecimalStringLocked/unavailable for receiving
in_use_sendingDecimalStringIn active orders (sending)
in_use_receivingDecimalStringIn active orders (receiving)

OrderUpdate

Four variants:

OrderCreated:

  • order_id - New order identifier
  • order - Complete order details

OrderUpdated:

  • order_id - Updated order identifier
  • order - Updated order details

OrderCompleted:

  • order_id - Completed order identifier

OrderCanceled:

  • order_id - Cancelled order identifier

MatchedOrder

FieldTypeDescription
own_order_idstringYour order ID
swap_routeSwapPath[]Multi-hop swap route
is_takerboolWhether you're the taker

SwapPath:

FieldTypeDescription
swap_idstringSwap identifier
first_hopSwapHopFirst hop details
next_hopsSwapHop[]Subsequent hops

SwapHop:

FieldTypeDescription
sending_currencyOrderbookCurrencyCurrency sent
receiving_currencyOrderbookCurrencyCurrency received
sending_amountDecimalStringAmount sent
receiving_amountDecimalStringAmount received
receiving_feeDecimalStringFee paid

SwapUpdate

FieldTypeDescription
order_idstringOrder identifier
swap_idstringSwap identifier
progressSwapProgressCurrent progress

SwapProgress:

FieldTypeDescription
receiving_amountDecimalStringAmount receiving
paying_amountDecimalStringAmount paying
statusSwapStatusCurrent status
errorstringError message (optional)

SwapStatus Enum:

ValueDescription
ORDER_MATCHED (0)Order matched
RECEIVING_INVOICE_CREATED (1)Receiving invoice created
PAYING_INVOICE_RECEIVED (2)Payment invoice received
PAYMENT_SENT (3)Payment sent
PAYMENT_RECEIVED (4)Payment received
PAYMENT_CLAIMED (5)You claimed payment
PAYMENT_CLAIMED_BY_COUNTERPARTY (6)Counterparty claimed
SWAP_COMPLETED (7)Swap successful
SWAP_FAILED (8)Swap failed

MarketTradeUpdate

FieldTypeDescription
baseOrderbookCurrencyBase currency
quoteOrderbookCurrencyQuote currency
tradeClientMarketTradeYour trade details

SwapTradeUpdate

FieldTypeDescription
from_currencyOrderbookCurrencySource currency
to_currencyOrderbookCurrencyDestination currency
tradeClientSwapTradeYour swap trade details

Example Usage

const request = new SubscribeDexEventsRequest()
const stream = client.subscribeDexEvents(request, {})

stream.on('data', (event) => {
  const timestamp = event.getTimestamp()

  if (event.hasBalanceUpdate()) {
    const balance = event.getBalanceUpdate()
    console.log('Balance updated:', {
      currency: balance.getCurrency()?.getAssetId(),
      available: balance.getBalance()?.getSending()
    })
  }

  if (event.hasOrderUpdate()) {
    const update = event.getOrderUpdate()

    if (update.hasOrderCreated()) {
      const created = update.getOrderCreated()
      console.log('Order created:', created.getOrderId())
    }

    if (update.hasOrderCompleted()) {
      const completed = update.getOrderCompleted()
      console.log('Order completed:', completed.getOrderId())
    }

    if (update.hasOrderCanceled()) {
      const canceled = update.getOrderCanceled()
      console.log('Order cancelled:', canceled.getOrderId())
    }
  }

  if (event.hasOrderMatched()) {
    const matched = event.getOrderMatched()
    console.log('Order matched:', {
      orderId: matched.getOwnOrderId(),
      isTaker: matched.getIsTaker(),
      routes: matched.getSwapRouteList().length
    })
  }

  if (event.hasSwapUpdate()) {
    const swap = event.getSwapUpdate()
    const progress = swap.getProgress()

    console.log('Swap update:', {
      swapId: swap.getSwapId(),
      status: progress.getStatus(),
      receiving: progress.getReceivingAmount(),
      paying: progress.getPayingAmount()
    })

    if (progress.hasError()) {
      console.error('Swap error:', progress.getError())
    }
  }

  if (event.hasMarketTradeUpdate()) {
    const trade = event.getMarketTradeUpdate()
    console.log('Trade executed:', trade.getTrade())
  }
})
Go Example
package main

import (
    "context"
    "fmt"
    "io"
    "log"

    pb "path/to/proto"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

func main() {
    conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()

    client := pb.NewOrderbookServiceClient(conn)

    request := &pb.SubscribeDexEventsRequest{}
    stream, err := client.SubscribeDexEvents(context.Background(), request)
    if err != nil {
        log.Fatalf("Failed to subscribe: %v", err)
    }

    for {
        event, err := stream.Recv()
        if err == io.EOF {
            fmt.Println("Stream ended")
            break
        }
        if err != nil {
            log.Fatalf("Stream error: %v", err)
        }

        timestamp := event.GetTimestamp()

        if balance := event.GetBalanceUpdate(); balance != nil {
            fmt.Printf("Balance updated:\n")
            fmt.Printf("  Currency: %s\n", balance.GetCurrency().GetAssetId())
            fmt.Printf("  Available: %s\n", balance.GetBalance().GetSending())
        }

        if update := event.GetOrderUpdate(); update != nil {
            if created := update.GetOrderCreated(); created != nil {
                fmt.Printf("Order created: %s\n", created.GetOrderId())
            }

            if completed := update.GetOrderCompleted(); completed != nil {
                fmt.Printf("Order completed: %s\n", completed.GetOrderId())
            }

            if canceled := update.GetOrderCanceled(); canceled != nil {
                fmt.Printf("Order cancelled: %s\n", canceled.GetOrderId())
            }
        }

        if matched := event.GetOrderMatched(); matched != nil {
            fmt.Printf("Order matched:\n")
            fmt.Printf("  Order ID: %s\n", matched.GetOwnOrderId())
            fmt.Printf("  Is Taker: %v\n", matched.GetIsTaker())
            fmt.Printf("  Routes: %d\n", len(matched.GetSwapRoute()))
        }

        if swap := event.GetSwapUpdate(); swap != nil {
            progress := swap.GetProgress()
            fmt.Printf("Swap update:\n")
            fmt.Printf("  Swap ID: %s\n", swap.GetSwapId())
            fmt.Printf("  Status: %v\n", progress.GetStatus())
            fmt.Printf("  Receiving: %s\n", progress.GetReceivingAmount())
            fmt.Printf("  Paying: %s\n", progress.GetPayingAmount())

            if progress.GetError() != "" {
                fmt.Printf("  Error: %s\n", progress.GetError())
            }
        }

        if trade := event.GetMarketTradeUpdate(); trade != nil {
            fmt.Printf("Trade executed: %v\n", trade.GetTrade())
        }

        _ = timestamp // Use timestamp if needed
    }
}
Rust Example
use futures::stream::StreamExt;
use proto::orderbook_service_client::OrderbookServiceClient;
use proto::SubscribeDexEventsRequest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = OrderbookServiceClient::connect("http://localhost:50051").await?;

    let request = SubscribeDexEventsRequest {};
    let mut stream = client.subscribe_dex_events(request).await?.into_inner();

    while let Some(event) = stream.next().await {
        match event {
            Ok(event) => {
                let _timestamp = &event.timestamp;

                if let Some(balance) = &event.balance_update {
                    println!("Balance updated:");
                    if let Some(currency) = &balance.currency {
                        println!("  Currency: {}", currency.asset_id);
                    }
                    if let Some(bal) = &balance.balance {
                        println!("  Available: {}", bal.sending);
                    }
                }

                if let Some(update) = &event.order_update {
                    if let Some(created) = &update.order_created {
                        println!("Order created: {}", created.order_id);
                    }

                    if let Some(completed) = &update.order_completed {
                        println!("Order completed: {}", completed.order_id);
                    }

                    if let Some(canceled) = &update.order_canceled {
                        println!("Order cancelled: {}", canceled.order_id);
                    }
                }

                if let Some(matched) = &event.order_matched {
                    println!("Order matched:");
                    println!("  Order ID: {}", matched.own_order_id);
                    println!("  Is Taker: {}", matched.is_taker);
                    println!("  Routes: {}", matched.swap_route.len());
                }

                if let Some(swap) = &event.swap_update {
                    if let Some(progress) = &swap.progress {
                        println!("Swap update:");
                        println!("  Swap ID: {}", swap.swap_id);
                        println!("  Status: {:?}", progress.status);
                        println!("  Receiving: {}", progress.receiving_amount);
                        println!("  Paying: {}", progress.paying_amount);

                        if !progress.error.is_empty() {
                            println!("  Error: {}", progress.error);
                        }
                    }
                }

                if let Some(trade) = &event.market_trade_update {
                    println!("Trade executed: {:?}", trade.trade);
                }
            }
            Err(e) => {
                eprintln!("Stream error: {}", e);
                break;
            }
        }
    }

    println!("Stream ended");
    Ok(())
}

Subscribe Simple Swaps

Monitor progress of Simple Swap operations with automatic channel setup.

Service: SwapServiceMethod: SubscribeSimpleSwaps

Parameters: None

Response: Stream of SimpleSwapUpdate

SimpleSwapUpdate Structure

FieldTypeDescription
timestampTimestampUpdate time
simple_swap_idstringSwap identifier
update-One of many update types

Update Types

Channel Setup Updates

FundingSendingChannel:

  • txid - Funding transaction ID
  • channel_id - Channel identifier
  • amount - Funding amount
  • is_opening - Whether opening new channel

RentingReceivingChannel:

  • txid - Rental transaction ID
  • channel_id - Rented channel ID
  • amount - Rental amount

DualFundingChannel:

  • txid - Dual-fund transaction ID
  • channel_id - Channel identifier
  • self_amount - Your contribution
  • counterparty_amount - Peer's contribution
  • is_opening - Whether opening new channel

Channel Ready Updates

SendingChannelReady:

  • channel_id - Ready channel ID

ReceivingChannelReady:

  • channel_id - Ready channel ID

DualFundChannelReady:

  • channel_id - Ready channel ID

Balance Updates

WaitingForBalances:

  • needed_sending - Required sending balance
  • needed_receiving - Required receiving balance

BalancesReady:

  • No fields - balances are sufficient

Order Updates

OrderCreated:

  • order_id - Created order identifier

OrderCompleted:

  • order_id - Completed order identifier
  • sent_amount - Amount sent
  • received_amount - Amount received

Withdrawal Updates

WithdrawingSendingFunds:

  • txids - Withdrawal transaction IDs
  • self_amount - Your withdrawal
  • counterparty_amount - Peer's withdrawal

WithdrawingReceivingFunds:

  • txids - Withdrawal transaction IDs
  • self_amount - Your withdrawal
  • counterparty_amount - Peer's withdrawal

WithdrawingDualFundedFunds:

  • txids - Withdrawal transaction IDs
  • sending_self_amount - Your sending withdrawal
  • sending_counterparty_amount - Peer's sending withdrawal
  • receiving_self_amount - Your receiving withdrawal
  • receiving_counterparty_amount - Peer's receiving withdrawal

SendingFundsWithdrawn:

  • No fields - withdrawal complete

ReceivingFundsWithdrawn:

  • No fields - withdrawal complete

DualFundedFundsWithdrawn:

  • No fields - withdrawal complete

Completion Updates

SimpleSwapCompleted:

  • No fields - swap successful

SimpleSwapError:

  • error - Error message

Example Usage

import { SwapServiceClient } from './proto/SwapServiceClientPb'
import { SubscribeSimpleSwapsRequest } from './proto/swap_pb'

const client = new SwapServiceClient('http://localhost:50051')

const request = new SubscribeSimpleSwapsRequest()
const stream = client.subscribeSimpleSwaps(request, {})

stream.on('data', (update) => {
  const swapId = update.getSimpleSwapId()
  const timestamp = update.getTimestamp()

  // Channel setup
  if (update.hasFundingSendingChannel()) {
    const funding = update.getFundingSendingChannel()
    console.log(`[${swapId}] Funding sending channel:`, {
      txid: funding.getTxid(),
      channelId: funding.getChannelId(),
      amount: funding.getAmount(),
      isOpening: funding.getIsOpening()
    })
  }

  if (update.hasRentingReceivingChannel()) {
    const renting = update.getRentingReceivingChannel()
    console.log(`[${swapId}] Renting receiving channel:`, {
      txid: renting.getTxid(),
      channelId: renting.getChannelId(),
      amount: renting.getAmount()
    })
  }

  if (update.hasDualFundingChannel()) {
    const dualFund = update.getDualFundingChannel()
    console.log(`[${swapId}] Dual-funding channel:`, {
      txid: dualFund.getTxid(),
      selfAmount: dualFund.getSelfAmount(),
      counterpartyAmount: dualFund.getCounterpartyAmount()
    })
  }

  // Channel ready
  if (update.hasSendingChannelReady()) {
    console.log(`[${swapId}] ✓ Sending channel ready`)
  }

  if (update.hasReceivingChannelReady()) {
    console.log(`[${swapId}] ✓ Receiving channel ready`)
  }

  // Balance status
  if (update.hasWaitingForBalances()) {
    const waiting = update.getWaitingForBalances()
    console.log(`[${swapId}] Waiting for balances:`, {
      sending: waiting.getNeededSending(),
      receiving: waiting.getNeededReceiving()
    })
  }

  if (update.hasBalancesReady()) {
    console.log(`[${swapId}] ✓ Balances ready`)
  }

  // Order execution
  if (update.hasOrderCreated()) {
    const order = update.getOrderCreated()
    console.log(`[${swapId}] Order created:`, order.getOrderId())
  }

  if (update.hasOrderCompleted()) {
    const completed = update.getOrderCompleted()
    console.log(`[${swapId}] ✓ Order completed:`, {
      sent: completed.getSentAmount(),
      received: completed.getReceivedAmount()
    })
  }

  // Withdrawals
  if (update.hasWithdrawingReceivingFunds()) {
    const withdrawing = update.getWithdrawingReceivingFunds()
    console.log(`[${swapId}] Withdrawing funds:`, {
      txids: withdrawing.getTxidsList(),
      amount: withdrawing.getSelfAmount()
    })
  }

  if (update.hasReceivingFundsWithdrawn()) {
    console.log(`[${swapId}] ✓ Funds withdrawn`)
  }

  // Final status
  if (update.hasSimpleSwapCompleted()) {
    console.log(`[${swapId}] ✓✓✓ SWAP COMPLETED SUCCESSFULLY ✓✓✓`)
  }

  if (update.hasSimpleSwapError()) {
    const error = update.getSimpleSwapError()
    console.error(`[${swapId}] ✗ ERROR:`, error.getError())
  }
})

stream.on('error', (err) => {
  console.error('Stream error:', err)
})
Go Example
package main

import (
    "context"
    "fmt"
    "io"
    "log"

    pb "path/to/proto"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

func main() {
    conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()

    client := pb.NewSwapServiceClient(conn)

    request := &pb.SubscribeSimpleSwapsRequest{}
    stream, err := client.SubscribeSimpleSwaps(context.Background(), request)
    if err != nil {
        log.Fatalf("Failed to subscribe: %v", err)
    }

    for {
        update, err := stream.Recv()
        if err == io.EOF {
            fmt.Println("Stream ended")
            break
        }
        if err != nil {
            log.Fatalf("Stream error: %v", err)
        }

        swapId := update.GetSimpleSwapId()

        // Channel setup
        if funding := update.GetFundingSendingChannel(); funding != nil {
            fmt.Printf("[%s] Funding sending channel:\n", swapId)
            fmt.Printf("  Txid: %s\n", funding.GetTxid())
            fmt.Printf("  Channel ID: %s\n", funding.GetChannelId())
            fmt.Printf("  Amount: %s\n", funding.GetAmount())
            fmt.Printf("  Is Opening: %v\n", funding.GetIsOpening())
        }

        if renting := update.GetRentingReceivingChannel(); renting != nil {
            fmt.Printf("[%s] Renting receiving channel:\n", swapId)
            fmt.Printf("  Txid: %s\n", renting.GetTxid())
            fmt.Printf("  Channel ID: %s\n", renting.GetChannelId())
            fmt.Printf("  Amount: %s\n", renting.GetAmount())
        }

        if dualFund := update.GetDualFundingChannel(); dualFund != nil {
            fmt.Printf("[%s] Dual-funding channel:\n", swapId)
            fmt.Printf("  Txid: %s\n", dualFund.GetTxid())
            fmt.Printf("  Self Amount: %s\n", dualFund.GetSelfAmount())
            fmt.Printf("  Counterparty Amount: %s\n", dualFund.GetCounterpartyAmount())
        }

        // Channel ready
        if update.GetSendingChannelReady() != nil {
            fmt.Printf("[%s] ✓ Sending channel ready\n", swapId)
        }

        if update.GetReceivingChannelReady() != nil {
            fmt.Printf("[%s] ✓ Receiving channel ready\n", swapId)
        }

        // Balance status
        if waiting := update.GetWaitingForBalances(); waiting != nil {
            fmt.Printf("[%s] Waiting for balances:\n", swapId)
            fmt.Printf("  Sending: %s\n", waiting.GetNeededSending())
            fmt.Printf("  Receiving: %s\n", waiting.GetNeededReceiving())
        }

        if update.GetBalancesReady() != nil {
            fmt.Printf("[%s] ✓ Balances ready\n", swapId)
        }

        // Order execution
        if order := update.GetOrderCreated(); order != nil {
            fmt.Printf("[%s] Order created: %s\n", swapId, order.GetOrderId())
        }

        if completed := update.GetOrderCompleted(); completed != nil {
            fmt.Printf("[%s] ✓ Order completed:\n", swapId)
            fmt.Printf("  Sent: %s\n", completed.GetSentAmount())
            fmt.Printf("  Received: %s\n", completed.GetReceivedAmount())
        }

        // Withdrawals
        if withdrawing := update.GetWithdrawingReceivingFunds(); withdrawing != nil {
            fmt.Printf("[%s] Withdrawing funds:\n", swapId)
            fmt.Printf("  Txids: %v\n", withdrawing.GetTxids())
            fmt.Printf("  Amount: %s\n", withdrawing.GetSelfAmount())
        }

        if update.GetReceivingFundsWithdrawn() != nil {
            fmt.Printf("[%s] ✓ Funds withdrawn\n", swapId)
        }

        // Final status
        if update.GetSimpleSwapCompleted() != nil {
            fmt.Printf("[%s] ✓✓✓ SWAP COMPLETED SUCCESSFULLY ✓✓✓\n", swapId)
        }

        if swapError := update.GetSimpleSwapError(); swapError != nil {
            fmt.Printf("[%s] ✗ ERROR: %s\n", swapId, swapError.GetError())
        }
    }
}
Rust Example
use futures::stream::StreamExt;
use proto::swap_service_client::SwapServiceClient;
use proto::SubscribeSimpleSwapsRequest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = SwapServiceClient::connect("http://localhost:50051").await?;

    let request = SubscribeSimpleSwapsRequest {};
    let mut stream = client.subscribe_simple_swaps(request).await?.into_inner();

    while let Some(update) = stream.next().await {
        match update {
            Ok(update) => {
                let swap_id = &update.simple_swap_id;

                // Channel setup
                if let Some(funding) = &update.funding_sending_channel {
                    println!("[{}] Funding sending channel:", swap_id);
                    println!("  Txid: {}", funding.txid);
                    println!("  Channel ID: {}", funding.channel_id);
                    println!("  Amount: {}", funding.amount);
                    println!("  Is Opening: {}", funding.is_opening);
                }

                if let Some(renting) = &update.renting_receiving_channel {
                    println!("[{}] Renting receiving channel:", swap_id);
                    println!("  Txid: {}", renting.txid);
                    println!("  Channel ID: {}", renting.channel_id);
                    println!("  Amount: {}", renting.amount);
                }

                if let Some(dual_fund) = &update.dual_funding_channel {
                    println!("[{}] Dual-funding channel:", swap_id);
                    println!("  Txid: {}", dual_fund.txid);
                    println!("  Self Amount: {}", dual_fund.self_amount);
                    println!("  Counterparty Amount: {}", dual_fund.counterparty_amount);
                }

                // Channel ready
                if update.sending_channel_ready.is_some() {
                    println!("[{}] ✓ Sending channel ready", swap_id);
                }

                if update.receiving_channel_ready.is_some() {
                    println!("[{}] ✓ Receiving channel ready", swap_id);
                }

                // Balance status
                if let Some(waiting) = &update.waiting_for_balances {
                    println!("[{}] Waiting for balances:", swap_id);
                    println!("  Sending: {}", waiting.needed_sending);
                    println!("  Receiving: {}", waiting.needed_receiving);
                }

                if update.balances_ready.is_some() {
                    println!("[{}] ✓ Balances ready", swap_id);
                }

                // Order execution
                if let Some(order) = &update.order_created {
                    println!("[{}] Order created: {}", swap_id, order.order_id);
                }

                if let Some(completed) = &update.order_completed {
                    println!("[{}] ✓ Order completed:", swap_id);
                    println!("  Sent: {}", completed.sent_amount);
                    println!("  Received: {}", completed.received_amount);
                }

                // Withdrawals
                if let Some(withdrawing) = &update.withdrawing_receiving_funds {
                    println!("[{}] Withdrawing funds:", swap_id);
                    println!("  Txids: {:?}", withdrawing.txids);
                    println!("  Amount: {}", withdrawing.self_amount);
                }

                if update.receiving_funds_withdrawn.is_some() {
                    println!("[{}] ✓ Funds withdrawn", swap_id);
                }

                // Final status
                if update.simple_swap_completed.is_some() {
                    println!("[{}] ✓✓✓ SWAP COMPLETED SUCCESSFULLY ✓✓✓", swap_id);
                }

                if let Some(error) = &update.simple_swap_error {
                    eprintln!("[{}] ✗ ERROR: {}", swap_id, error.error);
                }
            }
            Err(e) => {
                eprintln!("Stream error: {}", e);
                break;
            }
        }
    }

    println!("Stream ended");
    Ok(())
}

Common Patterns

Track specific swap

async function trackSwap(swapId: string): Promise<void> {
  return new Promise((resolve, reject) => {
    const stream = client.subscribeSimpleSwaps(new SubscribeSimpleSwapsRequest(), {})

    stream.on('data', (update) => {
      // Filter for our swap
      if (update.getSimpleSwapId() !== swapId) return

      if (update.hasSimpleSwapCompleted()) {
        stream.cancel()
        resolve()
      }

      if (update.hasSimpleSwapError()) {
        stream.cancel()
        reject(new Error(update.getSimpleSwapError()?.getError()))
      }
    })

    stream.on('error', reject)
  })
}
Go Example
func trackSwap(client pb.SwapServiceClient, swapId string) error {
    request := &pb.SubscribeSimpleSwapsRequest{}
    stream, err := client.SubscribeSimpleSwaps(context.Background(), request)
    if err != nil {
        return fmt.Errorf("failed to subscribe: %w", err)
    }

    for {
        update, err := stream.Recv()
        if err == io.EOF {
            return fmt.Errorf("stream ended unexpectedly")
        }
        if err != nil {
            return fmt.Errorf("stream error: %w", err)
        }

        // Filter for our swap
        if update.GetSimpleSwapId() != swapId {
            continue
        }

        if update.GetSimpleSwapCompleted() != nil {
            return nil // Success
        }

        if swapError := update.GetSimpleSwapError(); swapError != nil {
            return fmt.Errorf("swap error: %s", swapError.GetError())
        }
    }
}
Rust Example
async fn track_swap(
    client: &mut SwapServiceClient<tonic::transport::Channel>,
    swap_id: String,
) -> Result<(), Box<dyn std::error::Error>> {
    let request = SubscribeSimpleSwapsRequest {};
    let mut stream = client.subscribe_simple_swaps(request).await?.into_inner();

    while let Some(update) = stream.next().await {
        let update = update?;

        // Filter for our swap
        if update.simple_swap_id != swap_id {
            continue;
        }

        if update.simple_swap_completed.is_some() {
            return Ok(()); // Success
        }

        if let Some(error) = update.simple_swap_error {
            return Err(format!("swap error: {}", error.error).into());
        }
    }

    Err("stream ended unexpectedly".into())
}

Build price chart from market events

const priceHistory: number[] = []

stream.on('data', (event) => {
  if (event.hasTradeUpdate()) {
    const trade = event.getTradeUpdate()
    const price = parseFloat(trade.getPrice())
    priceHistory.push(price)

    // Update chart
    updatePriceChart(priceHistory)
  }
})
Go Example
var priceHistory []float64

stream, err := client.SubscribeMarketEvents(context.Background(), request)
if err != nil {
    log.Fatalf("Failed to subscribe: %v", err)
}

for {
    event, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("Stream error: %v", err)
    }

    if trade := event.GetTradeUpdate(); trade != nil {
        price, err := strconv.ParseFloat(trade.GetPrice(), 64)
        if err != nil {
            log.Printf("Failed to parse price: %v", err)
            continue
        }
        priceHistory = append(priceHistory, price)

        // Update chart
        updatePriceChart(priceHistory)
    }
}
Rust Example
let mut price_history: Vec<f64> = Vec::new();

let mut stream = client.subscribe_market_events(request).await?.into_inner();

while let Some(event) = stream.next().await {
    match event {
        Ok(event) => {
            if let Some(trade) = &event.trade_update {
                if let Ok(price) = trade.price.parse::<f64>() {
                    price_history.push(price);

                    // Update chart
                    update_price_chart(&price_history);
                }
            }
        }
        Err(e) => {
            eprintln!("Stream error: {}", e);
            break;
        }
    }
}

Monitor orderbook depth

const orderbookDepth = new Map<string, LiquidityPosition>()

stream.on('data', (event) => {
  if (event.hasOrderbookUpdate()) {
    const update = event.getOrderbookUpdate()

    // Add/update orders
    update.getUpdatedOrdersMap().forEach((position, orderId) => {
      orderbookDepth.set(orderId, position)
    })

    // Remove orders
    update.getRemovedOrdersList().forEach(orderId => {
      orderbookDepth.delete(orderId)
    })

    console.log('Current depth:', orderbookDepth.size, 'orders')
  }
})
Go Example
orderbookDepth := make(map[string]*pb.LiquidityPosition)

stream, err := client.SubscribeMarketEvents(context.Background(), request)
if err != nil {
    log.Fatalf("Failed to subscribe: %v", err)
}

for {
    event, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("Stream error: %v", err)
    }

    if update := event.GetOrderbookUpdate(); update != nil {
        // Add/update orders
        for orderId, position := range update.GetUpdatedOrders() {
            orderbookDepth[orderId] = position
        }

        // Remove orders
        for _, orderId := range update.GetRemovedOrders() {
            delete(orderbookDepth, orderId)
        }

        fmt.Printf("Current depth: %d orders\n", len(orderbookDepth))
    }
}
Rust Example
use std::collections::HashMap;

let mut orderbook_depth: HashMap<String, LiquidityPosition> = HashMap::new();

let mut stream = client.subscribe_market_events(request).await?.into_inner();

while let Some(event) = stream.next().await {
    match event {
        Ok(event) => {
            if let Some(update) = &event.orderbook_update {
                // Add/update orders
                for (order_id, position) in &update.updated_orders {
                    orderbook_depth.insert(order_id.clone(), position.clone());
                }

                // Remove orders
                for order_id in &update.removed_orders {
                    orderbook_depth.remove(order_id);
                }

                println!("Current depth: {} orders", orderbook_depth.len());
            }
        }
        Err(e) => {
            eprintln!("Stream error: {}", e);
            break;
        }
    }
}

Best Practices

  1. Handle reconnections - Streams can disconnect, implement retry logic
  2. Filter events - Only process events relevant to your use case
  3. Limit subscriptions - Don't subscribe to too many markets simultaneously
  4. Clean up streams - Call stream.cancel() when done
  5. Buffer updates - Rate-limit UI updates to avoid overwhelming the interface
  6. Error handling - Always listen to error and end events

Stream Lifecycle

// 1. Create subscription
const stream = client.subscribeMarketEvents(request, {})

// 2. Listen to events
stream.on('data', (event) => { /* handle */ })
stream.on('error', (err) => { /* handle */ })
stream.on('end', () => { /* reconnect */ })

// 3. Cancel when done
stream.cancel()
Go Example
// 1. Create subscription
stream, err := client.SubscribeMarketEvents(context.Background(), request)
if err != nil {
    log.Fatalf("Failed to subscribe: %v", err)
}

// 2. Listen to events
for {
    event, err := stream.Recv()
    if err == io.EOF {
        // Stream ended - reconnect if needed
        break
    }
    if err != nil {
        // Handle error
        log.Printf("Stream error: %v", err)
        break
    }

    // Handle event
    // ...
}

// 3. Stream automatically closes when loop exits
// For manual cancellation, use context.WithCancel
Rust Example
// 1. Create subscription
let mut stream = client.subscribe_market_events(request).await?.into_inner();

// 2. Listen to events
while let Some(event) = stream.next().await {
    match event {
        Ok(event) => {
            // Handle event
            // ...
        }
        Err(e) => {
            // Handle error
            eprintln!("Stream error: {}", e);
            break;
        }
    }
}

// 3. Stream automatically closes when dropped
// For manual cancellation, drop the stream or use tokio::select!

← Back to API Reference | Next: Error Codes →


Copyright © 2025