Events & Subscriptions
Hydra provides real-time streaming updates through gRPC subscriptions. Subscribe to market events, order updates, balance changes, and swap progress.
Available Subscriptions
- Subscribe Market Events - Public market data and orderbook updates
- Subscribe DEX Events - Your personal trading activity
- Subscribe Simple Swaps - Simple swap progress tracking
Subscribe Market Events
Stream real-time public market updates for a specific trading pair.
Service: OrderbookServiceMethod: SubscribeMarketEvents
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
base | OrderbookCurrency | YES | Base currency |
quote | OrderbookCurrency | YES | Quote currency |
Response: Stream of MarketEvent
MarketEvent Types
| Event | Description | Fields |
|---|---|---|
is_synced | Initial sync complete | bool - Always true when synced |
orderbook_update | Orderbook changed | OrderbookUpdate - Added/removed orders |
trade_update | Trade executed | Trade - Trade details |
daily_stats_update | 24h stats updated | MarketDailyStats - Volume, price stats |
candlestick_update | New candlestick data | CandlestickUpdate - OHLCV data |
OrderbookUpdate
| Field | Type | Description |
|---|---|---|
updated_orders | map<string, LiquidityPosition> | Modified orders |
removed_orders | string[] | Removed order IDs |
Trade
| Field | Type | Description |
|---|---|---|
taker_order_id | string | Taker's order ID |
base_amount | DecimalString | Base amount traded |
quote_amount | DecimalString | Quote amount traded |
price | DecimalString | Execution price |
final_price | DecimalString | Price after fees |
timestamp | Timestamp | Trade time |
maker_order_side | OrderSide | BUY or SELL |
MarketDailyStats
| Field | Type | Description |
|---|---|---|
volatility | MarketVolatility | 24h price and volume data |
MarketVolatility:
| Field | Type | Description |
|---|---|---|
first_price | DecimalString | Opening price (24h ago) |
last_price | DecimalString | Current price |
high_price | DecimalString | Highest price (24h) |
low_price | DecimalString | Lowest price (24h) |
base_volume | DecimalString | 24h volume in base |
quote_volume | DecimalString | 24h volume in quote |
CandlestickUpdate
| Field | Type | Description |
|---|---|---|
interval | CandlestickInterval | Time interval |
candlestick | Candlestick | OHLCV data |
CandlestickInterval Enum:
ONE_MINUTE, THREE_MINUTES, FIVE_MINUTES, FIFTEEN_MINUTES, THIRTY_MINUTES, ONE_HOUR, TWO_HOURS, FOUR_HOURS, SIX_HOURS, EIGHT_HOURS, TWELVE_HOURS, ONE_DAY, THREE_DAYS, ONE_WEEK, ONE_MONTH
Candlestick:
| Field | Type | Description |
|---|---|---|
timestamp | Timestamp | Candle timestamp |
open | DecimalString | Opening price |
close | DecimalString | Closing price |
high | DecimalString | Highest price |
low | DecimalString | Lowest price |
base_volume | DecimalString | Volume in base |
quote_volume | DecimalString | Volume in quote |
Example Usage
import { OrderbookServiceClient } from './proto/OrderbookServiceClientPb'
import { SubscribeMarketEventsRequest } from './proto/orderbook_pb'
const client = new OrderbookServiceClient('http://localhost:50051')
const request = new SubscribeMarketEventsRequest()
request.setBase({
network: { protocol: 0, chainId: '0', name: 'Bitcoin' },
assetId: 'BTC'
})
request.setQuote({
network: { protocol: 1, chainId: '1', name: 'Ethereum' },
assetId: '0x0000000000000000000000000000000000000000'
})
const stream = client.subscribeMarketEvents(request, {})
stream.on('data', (event) => {
if (event.hasIsSynced()) {
console.log('✓ Market data synced')
}
if (event.hasOrderbookUpdate()) {
const update = event.getOrderbookUpdate()
const updatedOrders = update.getUpdatedOrdersMap()
const removedOrders = update.getRemovedOrdersList()
console.log('Orderbook updated:')
console.log(' Modified:', updatedOrders.size)
console.log(' Removed:', removedOrders.length)
}
if (event.hasTradeUpdate()) {
const trade = event.getTradeUpdate()
console.log('Trade executed:')
console.log(' Amount:', trade.getBaseAmount())
console.log(' Price:', trade.getPrice())
console.log(' Side:', trade.getMakerOrderSide() === 0 ? 'BUY' : 'SELL')
}
if (event.hasDailyStatsUpdate()) {
const stats = event.getDailyStatsUpdate()
const vol = stats.getVolatility()
console.log('24h Stats:')
console.log(' High:', vol.getHighPrice())
console.log(' Low:', vol.getLowPrice())
console.log(' Volume:', vol.getBaseVolume())
}
if (event.hasCandlestickUpdate()) {
const candleUpdate = event.getCandlestickUpdate()
const candle = candleUpdate.getCandlestick()
console.log('New candle:', candleUpdate.getInterval())
console.log(' O:', candle.getOpen())
console.log(' H:', candle.getHigh())
console.log(' L:', candle.getLow())
console.log(' C:', candle.getClose())
}
})
stream.on('error', (err) => {
console.error('Stream error:', err)
})
stream.on('end', () => {
console.log('Stream ended')
})
Go Example
package main
import (
"context"
"fmt"
"io"
"log"
pb "path/to/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewOrderbookServiceClient(conn)
request := &pb.SubscribeMarketEventsRequest{
Base: &pb.OrderbookCurrency{
Network: &pb.Network{
Protocol: pb.Protocol_BITCOIN,
ChainId: "0",
Name: "Bitcoin",
},
AssetId: "BTC",
},
Quote: &pb.OrderbookCurrency{
Network: &pb.Network{
Protocol: pb.Protocol_EVM,
ChainId: "1",
Name: "Ethereum",
},
AssetId: "0x0000000000000000000000000000000000000000",
},
}
stream, err := client.SubscribeMarketEvents(context.Background(), request)
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
for {
event, err := stream.Recv()
if err == io.EOF {
fmt.Println("Stream ended")
break
}
if err != nil {
log.Fatalf("Stream error: %v", err)
}
if event.GetIsSynced() {
fmt.Println("✓ Market data synced")
}
if update := event.GetOrderbookUpdate(); update != nil {
updatedOrders := update.GetUpdatedOrders()
removedOrders := update.GetRemovedOrders()
fmt.Println("Orderbook updated:")
fmt.Printf(" Modified: %d\n", len(updatedOrders))
fmt.Printf(" Removed: %d\n", len(removedOrders))
}
if trade := event.GetTradeUpdate(); trade != nil {
fmt.Println("Trade executed:")
fmt.Printf(" Amount: %s\n", trade.GetBaseAmount())
fmt.Printf(" Price: %s\n", trade.GetPrice())
side := "SELL"
if trade.GetMakerOrderSide() == pb.OrderSide_BUY {
side = "BUY"
}
fmt.Printf(" Side: %s\n", side)
}
if stats := event.GetDailyStatsUpdate(); stats != nil {
vol := stats.GetVolatility()
fmt.Println("24h Stats:")
fmt.Printf(" High: %s\n", vol.GetHighPrice())
fmt.Printf(" Low: %s\n", vol.GetLowPrice())
fmt.Printf(" Volume: %s\n", vol.GetBaseVolume())
}
if candleUpdate := event.GetCandlestickUpdate(); candleUpdate != nil {
candle := candleUpdate.GetCandlestick()
fmt.Printf("New candle: %v\n", candleUpdate.GetInterval())
fmt.Printf(" O: %s\n", candle.GetOpen())
fmt.Printf(" H: %s\n", candle.GetHigh())
fmt.Printf(" L: %s\n", candle.GetLow())
fmt.Printf(" C: %s\n", candle.GetClose())
}
}
}
Rust Example
use futures::stream::StreamExt;
use proto::orderbook_service_client::OrderbookServiceClient;
use proto::{Network, OrderbookCurrency, Protocol, SubscribeMarketEventsRequest};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = OrderbookServiceClient::connect("http://localhost:50051").await?;
let request = SubscribeMarketEventsRequest {
base: Some(OrderbookCurrency {
network: Some(Network {
protocol: Protocol::Bitcoin as i32,
chain_id: "0".to_string(),
name: "Bitcoin".to_string(),
}),
asset_id: "BTC".to_string(),
}),
quote: Some(OrderbookCurrency {
network: Some(Network {
protocol: Protocol::Evm as i32,
chain_id: "1".to_string(),
name: "Ethereum".to_string(),
}),
asset_id: "0x0000000000000000000000000000000000000000".to_string(),
}),
};
let mut stream = client.subscribe_market_events(request).await?.into_inner();
while let Some(event) = stream.next().await {
match event {
Ok(event) => {
if event.is_synced {
println!("✓ Market data synced");
}
if let Some(update) = &event.orderbook_update {
println!("Orderbook updated:");
println!(" Modified: {}", update.updated_orders.len());
println!(" Removed: {}", update.removed_orders.len());
}
if let Some(trade) = &event.trade_update {
println!("Trade executed:");
println!(" Amount: {}", trade.base_amount);
println!(" Price: {}", trade.price);
let side = if trade.maker_order_side == 0 { "BUY" } else { "SELL" };
println!(" Side: {}", side);
}
if let Some(stats) = &event.daily_stats_update {
if let Some(vol) = &stats.volatility {
println!("24h Stats:");
println!(" High: {}", vol.high_price);
println!(" Low: {}", vol.low_price);
println!(" Volume: {}", vol.base_volume);
}
}
if let Some(candle_update) = &event.candlestick_update {
if let Some(candle) = &candle_update.candlestick {
println!("New candle: {:?}", candle_update.interval);
println!(" O: {}", candle.open);
println!(" H: {}", candle.high);
println!(" L: {}", candle.low);
println!(" C: {}", candle.close);
}
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
break;
}
}
}
println!("Stream ended");
Ok(())
}
Subscribe DEX Events
Stream your personal trading activity including orders, balances, and swaps.
Service: OrderbookServiceMethod: SubscribeDexEvents
Parameters: None
Response: Stream of DexEvent
DexEvent Types
| Event | Description |
|---|---|
is_synced | Initial sync complete |
balance_update | Your balance changed |
order_update | Your order created/updated/completed/cancelled |
order_matched | Your order matched with counterparty |
swap_update | Swap progress update |
market_trade_update | Your market trade completed |
swap_trade_update | Your swap trade completed |
BalanceUpdate
| Field | Type | Description |
|---|---|---|
currency | OrderbookCurrency | Currency identifier |
balance | CurrencyBalance | Updated balance |
CurrencyBalance:
| Field | Type | Description |
|---|---|---|
sending | DecimalString | Available to send |
receiving | DecimalString | Available to receive |
pending_sending | DecimalString | Pending sends |
pending_receiving | DecimalString | Pending receives |
unavailable_sending | DecimalString | Locked/unavailable for sending |
unavailable_receiving | DecimalString | Locked/unavailable for receiving |
in_use_sending | DecimalString | In active orders (sending) |
in_use_receiving | DecimalString | In active orders (receiving) |
OrderUpdate
Four variants:
OrderCreated:
order_id- New order identifierorder- Complete order details
OrderUpdated:
order_id- Updated order identifierorder- Updated order details
OrderCompleted:
order_id- Completed order identifier
OrderCanceled:
order_id- Cancelled order identifier
MatchedOrder
| Field | Type | Description |
|---|---|---|
own_order_id | string | Your order ID |
swap_route | SwapPath[] | Multi-hop swap route |
is_taker | bool | Whether you're the taker |
SwapPath:
| Field | Type | Description |
|---|---|---|
swap_id | string | Swap identifier |
first_hop | SwapHop | First hop details |
next_hops | SwapHop[] | Subsequent hops |
SwapHop:
| Field | Type | Description |
|---|---|---|
sending_currency | OrderbookCurrency | Currency sent |
receiving_currency | OrderbookCurrency | Currency received |
sending_amount | DecimalString | Amount sent |
receiving_amount | DecimalString | Amount received |
receiving_fee | DecimalString | Fee paid |
SwapUpdate
| Field | Type | Description |
|---|---|---|
order_id | string | Order identifier |
swap_id | string | Swap identifier |
progress | SwapProgress | Current progress |
SwapProgress:
| Field | Type | Description |
|---|---|---|
receiving_amount | DecimalString | Amount receiving |
paying_amount | DecimalString | Amount paying |
status | SwapStatus | Current status |
error | string | Error message (optional) |
SwapStatus Enum:
| Value | Description |
|---|---|
ORDER_MATCHED (0) | Order matched |
RECEIVING_INVOICE_CREATED (1) | Receiving invoice created |
PAYING_INVOICE_RECEIVED (2) | Payment invoice received |
PAYMENT_SENT (3) | Payment sent |
PAYMENT_RECEIVED (4) | Payment received |
PAYMENT_CLAIMED (5) | You claimed payment |
PAYMENT_CLAIMED_BY_COUNTERPARTY (6) | Counterparty claimed |
SWAP_COMPLETED (7) | Swap successful |
SWAP_FAILED (8) | Swap failed |
MarketTradeUpdate
| Field | Type | Description |
|---|---|---|
base | OrderbookCurrency | Base currency |
quote | OrderbookCurrency | Quote currency |
trade | ClientMarketTrade | Your trade details |
SwapTradeUpdate
| Field | Type | Description |
|---|---|---|
from_currency | OrderbookCurrency | Source currency |
to_currency | OrderbookCurrency | Destination currency |
trade | ClientSwapTrade | Your swap trade details |
Example Usage
const request = new SubscribeDexEventsRequest()
const stream = client.subscribeDexEvents(request, {})
stream.on('data', (event) => {
const timestamp = event.getTimestamp()
if (event.hasBalanceUpdate()) {
const balance = event.getBalanceUpdate()
console.log('Balance updated:', {
currency: balance.getCurrency()?.getAssetId(),
available: balance.getBalance()?.getSending()
})
}
if (event.hasOrderUpdate()) {
const update = event.getOrderUpdate()
if (update.hasOrderCreated()) {
const created = update.getOrderCreated()
console.log('Order created:', created.getOrderId())
}
if (update.hasOrderCompleted()) {
const completed = update.getOrderCompleted()
console.log('Order completed:', completed.getOrderId())
}
if (update.hasOrderCanceled()) {
const canceled = update.getOrderCanceled()
console.log('Order cancelled:', canceled.getOrderId())
}
}
if (event.hasOrderMatched()) {
const matched = event.getOrderMatched()
console.log('Order matched:', {
orderId: matched.getOwnOrderId(),
isTaker: matched.getIsTaker(),
routes: matched.getSwapRouteList().length
})
}
if (event.hasSwapUpdate()) {
const swap = event.getSwapUpdate()
const progress = swap.getProgress()
console.log('Swap update:', {
swapId: swap.getSwapId(),
status: progress.getStatus(),
receiving: progress.getReceivingAmount(),
paying: progress.getPayingAmount()
})
if (progress.hasError()) {
console.error('Swap error:', progress.getError())
}
}
if (event.hasMarketTradeUpdate()) {
const trade = event.getMarketTradeUpdate()
console.log('Trade executed:', trade.getTrade())
}
})
Go Example
package main
import (
"context"
"fmt"
"io"
"log"
pb "path/to/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewOrderbookServiceClient(conn)
request := &pb.SubscribeDexEventsRequest{}
stream, err := client.SubscribeDexEvents(context.Background(), request)
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
for {
event, err := stream.Recv()
if err == io.EOF {
fmt.Println("Stream ended")
break
}
if err != nil {
log.Fatalf("Stream error: %v", err)
}
timestamp := event.GetTimestamp()
if balance := event.GetBalanceUpdate(); balance != nil {
fmt.Printf("Balance updated:\n")
fmt.Printf(" Currency: %s\n", balance.GetCurrency().GetAssetId())
fmt.Printf(" Available: %s\n", balance.GetBalance().GetSending())
}
if update := event.GetOrderUpdate(); update != nil {
if created := update.GetOrderCreated(); created != nil {
fmt.Printf("Order created: %s\n", created.GetOrderId())
}
if completed := update.GetOrderCompleted(); completed != nil {
fmt.Printf("Order completed: %s\n", completed.GetOrderId())
}
if canceled := update.GetOrderCanceled(); canceled != nil {
fmt.Printf("Order cancelled: %s\n", canceled.GetOrderId())
}
}
if matched := event.GetOrderMatched(); matched != nil {
fmt.Printf("Order matched:\n")
fmt.Printf(" Order ID: %s\n", matched.GetOwnOrderId())
fmt.Printf(" Is Taker: %v\n", matched.GetIsTaker())
fmt.Printf(" Routes: %d\n", len(matched.GetSwapRoute()))
}
if swap := event.GetSwapUpdate(); swap != nil {
progress := swap.GetProgress()
fmt.Printf("Swap update:\n")
fmt.Printf(" Swap ID: %s\n", swap.GetSwapId())
fmt.Printf(" Status: %v\n", progress.GetStatus())
fmt.Printf(" Receiving: %s\n", progress.GetReceivingAmount())
fmt.Printf(" Paying: %s\n", progress.GetPayingAmount())
if progress.GetError() != "" {
fmt.Printf(" Error: %s\n", progress.GetError())
}
}
if trade := event.GetMarketTradeUpdate(); trade != nil {
fmt.Printf("Trade executed: %v\n", trade.GetTrade())
}
_ = timestamp // Use timestamp if needed
}
}
Rust Example
use futures::stream::StreamExt;
use proto::orderbook_service_client::OrderbookServiceClient;
use proto::SubscribeDexEventsRequest;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = OrderbookServiceClient::connect("http://localhost:50051").await?;
let request = SubscribeDexEventsRequest {};
let mut stream = client.subscribe_dex_events(request).await?.into_inner();
while let Some(event) = stream.next().await {
match event {
Ok(event) => {
let _timestamp = &event.timestamp;
if let Some(balance) = &event.balance_update {
println!("Balance updated:");
if let Some(currency) = &balance.currency {
println!(" Currency: {}", currency.asset_id);
}
if let Some(bal) = &balance.balance {
println!(" Available: {}", bal.sending);
}
}
if let Some(update) = &event.order_update {
if let Some(created) = &update.order_created {
println!("Order created: {}", created.order_id);
}
if let Some(completed) = &update.order_completed {
println!("Order completed: {}", completed.order_id);
}
if let Some(canceled) = &update.order_canceled {
println!("Order cancelled: {}", canceled.order_id);
}
}
if let Some(matched) = &event.order_matched {
println!("Order matched:");
println!(" Order ID: {}", matched.own_order_id);
println!(" Is Taker: {}", matched.is_taker);
println!(" Routes: {}", matched.swap_route.len());
}
if let Some(swap) = &event.swap_update {
if let Some(progress) = &swap.progress {
println!("Swap update:");
println!(" Swap ID: {}", swap.swap_id);
println!(" Status: {:?}", progress.status);
println!(" Receiving: {}", progress.receiving_amount);
println!(" Paying: {}", progress.paying_amount);
if !progress.error.is_empty() {
println!(" Error: {}", progress.error);
}
}
}
if let Some(trade) = &event.market_trade_update {
println!("Trade executed: {:?}", trade.trade);
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
break;
}
}
}
println!("Stream ended");
Ok(())
}
Subscribe Simple Swaps
Monitor progress of Simple Swap operations with automatic channel setup.
Service: SwapServiceMethod: SubscribeSimpleSwaps
Parameters: None
Response: Stream of SimpleSwapUpdate
SimpleSwapUpdate Structure
| Field | Type | Description |
|---|---|---|
timestamp | Timestamp | Update time |
simple_swap_id | string | Swap identifier |
update | - | One of many update types |
Update Types
Channel Setup Updates
FundingSendingChannel:
txid- Funding transaction IDchannel_id- Channel identifieramount- Funding amountis_opening- Whether opening new channel
RentingReceivingChannel:
txid- Rental transaction IDchannel_id- Rented channel IDamount- Rental amount
DualFundingChannel:
txid- Dual-fund transaction IDchannel_id- Channel identifierself_amount- Your contributioncounterparty_amount- Peer's contributionis_opening- Whether opening new channel
Channel Ready Updates
SendingChannelReady:
channel_id- Ready channel ID
ReceivingChannelReady:
channel_id- Ready channel ID
DualFundChannelReady:
channel_id- Ready channel ID
Balance Updates
WaitingForBalances:
needed_sending- Required sending balanceneeded_receiving- Required receiving balance
BalancesReady:
- No fields - balances are sufficient
Order Updates
OrderCreated:
order_id- Created order identifier
OrderCompleted:
order_id- Completed order identifiersent_amount- Amount sentreceived_amount- Amount received
Withdrawal Updates
WithdrawingSendingFunds:
txids- Withdrawal transaction IDsself_amount- Your withdrawalcounterparty_amount- Peer's withdrawal
WithdrawingReceivingFunds:
txids- Withdrawal transaction IDsself_amount- Your withdrawalcounterparty_amount- Peer's withdrawal
WithdrawingDualFundedFunds:
txids- Withdrawal transaction IDssending_self_amount- Your sending withdrawalsending_counterparty_amount- Peer's sending withdrawalreceiving_self_amount- Your receiving withdrawalreceiving_counterparty_amount- Peer's receiving withdrawal
SendingFundsWithdrawn:
- No fields - withdrawal complete
ReceivingFundsWithdrawn:
- No fields - withdrawal complete
DualFundedFundsWithdrawn:
- No fields - withdrawal complete
Completion Updates
SimpleSwapCompleted:
- No fields - swap successful
SimpleSwapError:
error- Error message
Example Usage
import { SwapServiceClient } from './proto/SwapServiceClientPb'
import { SubscribeSimpleSwapsRequest } from './proto/swap_pb'
const client = new SwapServiceClient('http://localhost:50051')
const request = new SubscribeSimpleSwapsRequest()
const stream = client.subscribeSimpleSwaps(request, {})
stream.on('data', (update) => {
const swapId = update.getSimpleSwapId()
const timestamp = update.getTimestamp()
// Channel setup
if (update.hasFundingSendingChannel()) {
const funding = update.getFundingSendingChannel()
console.log(`[${swapId}] Funding sending channel:`, {
txid: funding.getTxid(),
channelId: funding.getChannelId(),
amount: funding.getAmount(),
isOpening: funding.getIsOpening()
})
}
if (update.hasRentingReceivingChannel()) {
const renting = update.getRentingReceivingChannel()
console.log(`[${swapId}] Renting receiving channel:`, {
txid: renting.getTxid(),
channelId: renting.getChannelId(),
amount: renting.getAmount()
})
}
if (update.hasDualFundingChannel()) {
const dualFund = update.getDualFundingChannel()
console.log(`[${swapId}] Dual-funding channel:`, {
txid: dualFund.getTxid(),
selfAmount: dualFund.getSelfAmount(),
counterpartyAmount: dualFund.getCounterpartyAmount()
})
}
// Channel ready
if (update.hasSendingChannelReady()) {
console.log(`[${swapId}] ✓ Sending channel ready`)
}
if (update.hasReceivingChannelReady()) {
console.log(`[${swapId}] ✓ Receiving channel ready`)
}
// Balance status
if (update.hasWaitingForBalances()) {
const waiting = update.getWaitingForBalances()
console.log(`[${swapId}] Waiting for balances:`, {
sending: waiting.getNeededSending(),
receiving: waiting.getNeededReceiving()
})
}
if (update.hasBalancesReady()) {
console.log(`[${swapId}] ✓ Balances ready`)
}
// Order execution
if (update.hasOrderCreated()) {
const order = update.getOrderCreated()
console.log(`[${swapId}] Order created:`, order.getOrderId())
}
if (update.hasOrderCompleted()) {
const completed = update.getOrderCompleted()
console.log(`[${swapId}] ✓ Order completed:`, {
sent: completed.getSentAmount(),
received: completed.getReceivedAmount()
})
}
// Withdrawals
if (update.hasWithdrawingReceivingFunds()) {
const withdrawing = update.getWithdrawingReceivingFunds()
console.log(`[${swapId}] Withdrawing funds:`, {
txids: withdrawing.getTxidsList(),
amount: withdrawing.getSelfAmount()
})
}
if (update.hasReceivingFundsWithdrawn()) {
console.log(`[${swapId}] ✓ Funds withdrawn`)
}
// Final status
if (update.hasSimpleSwapCompleted()) {
console.log(`[${swapId}] ✓✓✓ SWAP COMPLETED SUCCESSFULLY ✓✓✓`)
}
if (update.hasSimpleSwapError()) {
const error = update.getSimpleSwapError()
console.error(`[${swapId}] ✗ ERROR:`, error.getError())
}
})
stream.on('error', (err) => {
console.error('Stream error:', err)
})
Go Example
package main
import (
"context"
"fmt"
"io"
"log"
pb "path/to/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewSwapServiceClient(conn)
request := &pb.SubscribeSimpleSwapsRequest{}
stream, err := client.SubscribeSimpleSwaps(context.Background(), request)
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
for {
update, err := stream.Recv()
if err == io.EOF {
fmt.Println("Stream ended")
break
}
if err != nil {
log.Fatalf("Stream error: %v", err)
}
swapId := update.GetSimpleSwapId()
// Channel setup
if funding := update.GetFundingSendingChannel(); funding != nil {
fmt.Printf("[%s] Funding sending channel:\n", swapId)
fmt.Printf(" Txid: %s\n", funding.GetTxid())
fmt.Printf(" Channel ID: %s\n", funding.GetChannelId())
fmt.Printf(" Amount: %s\n", funding.GetAmount())
fmt.Printf(" Is Opening: %v\n", funding.GetIsOpening())
}
if renting := update.GetRentingReceivingChannel(); renting != nil {
fmt.Printf("[%s] Renting receiving channel:\n", swapId)
fmt.Printf(" Txid: %s\n", renting.GetTxid())
fmt.Printf(" Channel ID: %s\n", renting.GetChannelId())
fmt.Printf(" Amount: %s\n", renting.GetAmount())
}
if dualFund := update.GetDualFundingChannel(); dualFund != nil {
fmt.Printf("[%s] Dual-funding channel:\n", swapId)
fmt.Printf(" Txid: %s\n", dualFund.GetTxid())
fmt.Printf(" Self Amount: %s\n", dualFund.GetSelfAmount())
fmt.Printf(" Counterparty Amount: %s\n", dualFund.GetCounterpartyAmount())
}
// Channel ready
if update.GetSendingChannelReady() != nil {
fmt.Printf("[%s] ✓ Sending channel ready\n", swapId)
}
if update.GetReceivingChannelReady() != nil {
fmt.Printf("[%s] ✓ Receiving channel ready\n", swapId)
}
// Balance status
if waiting := update.GetWaitingForBalances(); waiting != nil {
fmt.Printf("[%s] Waiting for balances:\n", swapId)
fmt.Printf(" Sending: %s\n", waiting.GetNeededSending())
fmt.Printf(" Receiving: %s\n", waiting.GetNeededReceiving())
}
if update.GetBalancesReady() != nil {
fmt.Printf("[%s] ✓ Balances ready\n", swapId)
}
// Order execution
if order := update.GetOrderCreated(); order != nil {
fmt.Printf("[%s] Order created: %s\n", swapId, order.GetOrderId())
}
if completed := update.GetOrderCompleted(); completed != nil {
fmt.Printf("[%s] ✓ Order completed:\n", swapId)
fmt.Printf(" Sent: %s\n", completed.GetSentAmount())
fmt.Printf(" Received: %s\n", completed.GetReceivedAmount())
}
// Withdrawals
if withdrawing := update.GetWithdrawingReceivingFunds(); withdrawing != nil {
fmt.Printf("[%s] Withdrawing funds:\n", swapId)
fmt.Printf(" Txids: %v\n", withdrawing.GetTxids())
fmt.Printf(" Amount: %s\n", withdrawing.GetSelfAmount())
}
if update.GetReceivingFundsWithdrawn() != nil {
fmt.Printf("[%s] ✓ Funds withdrawn\n", swapId)
}
// Final status
if update.GetSimpleSwapCompleted() != nil {
fmt.Printf("[%s] ✓✓✓ SWAP COMPLETED SUCCESSFULLY ✓✓✓\n", swapId)
}
if swapError := update.GetSimpleSwapError(); swapError != nil {
fmt.Printf("[%s] ✗ ERROR: %s\n", swapId, swapError.GetError())
}
}
}
Rust Example
use futures::stream::StreamExt;
use proto::swap_service_client::SwapServiceClient;
use proto::SubscribeSimpleSwapsRequest;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = SwapServiceClient::connect("http://localhost:50051").await?;
let request = SubscribeSimpleSwapsRequest {};
let mut stream = client.subscribe_simple_swaps(request).await?.into_inner();
while let Some(update) = stream.next().await {
match update {
Ok(update) => {
let swap_id = &update.simple_swap_id;
// Channel setup
if let Some(funding) = &update.funding_sending_channel {
println!("[{}] Funding sending channel:", swap_id);
println!(" Txid: {}", funding.txid);
println!(" Channel ID: {}", funding.channel_id);
println!(" Amount: {}", funding.amount);
println!(" Is Opening: {}", funding.is_opening);
}
if let Some(renting) = &update.renting_receiving_channel {
println!("[{}] Renting receiving channel:", swap_id);
println!(" Txid: {}", renting.txid);
println!(" Channel ID: {}", renting.channel_id);
println!(" Amount: {}", renting.amount);
}
if let Some(dual_fund) = &update.dual_funding_channel {
println!("[{}] Dual-funding channel:", swap_id);
println!(" Txid: {}", dual_fund.txid);
println!(" Self Amount: {}", dual_fund.self_amount);
println!(" Counterparty Amount: {}", dual_fund.counterparty_amount);
}
// Channel ready
if update.sending_channel_ready.is_some() {
println!("[{}] ✓ Sending channel ready", swap_id);
}
if update.receiving_channel_ready.is_some() {
println!("[{}] ✓ Receiving channel ready", swap_id);
}
// Balance status
if let Some(waiting) = &update.waiting_for_balances {
println!("[{}] Waiting for balances:", swap_id);
println!(" Sending: {}", waiting.needed_sending);
println!(" Receiving: {}", waiting.needed_receiving);
}
if update.balances_ready.is_some() {
println!("[{}] ✓ Balances ready", swap_id);
}
// Order execution
if let Some(order) = &update.order_created {
println!("[{}] Order created: {}", swap_id, order.order_id);
}
if let Some(completed) = &update.order_completed {
println!("[{}] ✓ Order completed:", swap_id);
println!(" Sent: {}", completed.sent_amount);
println!(" Received: {}", completed.received_amount);
}
// Withdrawals
if let Some(withdrawing) = &update.withdrawing_receiving_funds {
println!("[{}] Withdrawing funds:", swap_id);
println!(" Txids: {:?}", withdrawing.txids);
println!(" Amount: {}", withdrawing.self_amount);
}
if update.receiving_funds_withdrawn.is_some() {
println!("[{}] ✓ Funds withdrawn", swap_id);
}
// Final status
if update.simple_swap_completed.is_some() {
println!("[{}] ✓✓✓ SWAP COMPLETED SUCCESSFULLY ✓✓✓", swap_id);
}
if let Some(error) = &update.simple_swap_error {
eprintln!("[{}] ✗ ERROR: {}", swap_id, error.error);
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
break;
}
}
}
println!("Stream ended");
Ok(())
}
Common Patterns
Track specific swap
async function trackSwap(swapId: string): Promise<void> {
return new Promise((resolve, reject) => {
const stream = client.subscribeSimpleSwaps(new SubscribeSimpleSwapsRequest(), {})
stream.on('data', (update) => {
// Filter for our swap
if (update.getSimpleSwapId() !== swapId) return
if (update.hasSimpleSwapCompleted()) {
stream.cancel()
resolve()
}
if (update.hasSimpleSwapError()) {
stream.cancel()
reject(new Error(update.getSimpleSwapError()?.getError()))
}
})
stream.on('error', reject)
})
}
Go Example
func trackSwap(client pb.SwapServiceClient, swapId string) error {
request := &pb.SubscribeSimpleSwapsRequest{}
stream, err := client.SubscribeSimpleSwaps(context.Background(), request)
if err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
}
for {
update, err := stream.Recv()
if err == io.EOF {
return fmt.Errorf("stream ended unexpectedly")
}
if err != nil {
return fmt.Errorf("stream error: %w", err)
}
// Filter for our swap
if update.GetSimpleSwapId() != swapId {
continue
}
if update.GetSimpleSwapCompleted() != nil {
return nil // Success
}
if swapError := update.GetSimpleSwapError(); swapError != nil {
return fmt.Errorf("swap error: %s", swapError.GetError())
}
}
}
Rust Example
async fn track_swap(
client: &mut SwapServiceClient<tonic::transport::Channel>,
swap_id: String,
) -> Result<(), Box<dyn std::error::Error>> {
let request = SubscribeSimpleSwapsRequest {};
let mut stream = client.subscribe_simple_swaps(request).await?.into_inner();
while let Some(update) = stream.next().await {
let update = update?;
// Filter for our swap
if update.simple_swap_id != swap_id {
continue;
}
if update.simple_swap_completed.is_some() {
return Ok(()); // Success
}
if let Some(error) = update.simple_swap_error {
return Err(format!("swap error: {}", error.error).into());
}
}
Err("stream ended unexpectedly".into())
}
Build price chart from market events
const priceHistory: number[] = []
stream.on('data', (event) => {
if (event.hasTradeUpdate()) {
const trade = event.getTradeUpdate()
const price = parseFloat(trade.getPrice())
priceHistory.push(price)
// Update chart
updatePriceChart(priceHistory)
}
})
Go Example
var priceHistory []float64
stream, err := client.SubscribeMarketEvents(context.Background(), request)
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
for {
event, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Stream error: %v", err)
}
if trade := event.GetTradeUpdate(); trade != nil {
price, err := strconv.ParseFloat(trade.GetPrice(), 64)
if err != nil {
log.Printf("Failed to parse price: %v", err)
continue
}
priceHistory = append(priceHistory, price)
// Update chart
updatePriceChart(priceHistory)
}
}
Rust Example
let mut price_history: Vec<f64> = Vec::new();
let mut stream = client.subscribe_market_events(request).await?.into_inner();
while let Some(event) = stream.next().await {
match event {
Ok(event) => {
if let Some(trade) = &event.trade_update {
if let Ok(price) = trade.price.parse::<f64>() {
price_history.push(price);
// Update chart
update_price_chart(&price_history);
}
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
break;
}
}
}
Monitor orderbook depth
const orderbookDepth = new Map<string, LiquidityPosition>()
stream.on('data', (event) => {
if (event.hasOrderbookUpdate()) {
const update = event.getOrderbookUpdate()
// Add/update orders
update.getUpdatedOrdersMap().forEach((position, orderId) => {
orderbookDepth.set(orderId, position)
})
// Remove orders
update.getRemovedOrdersList().forEach(orderId => {
orderbookDepth.delete(orderId)
})
console.log('Current depth:', orderbookDepth.size, 'orders')
}
})
Go Example
orderbookDepth := make(map[string]*pb.LiquidityPosition)
stream, err := client.SubscribeMarketEvents(context.Background(), request)
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
for {
event, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Stream error: %v", err)
}
if update := event.GetOrderbookUpdate(); update != nil {
// Add/update orders
for orderId, position := range update.GetUpdatedOrders() {
orderbookDepth[orderId] = position
}
// Remove orders
for _, orderId := range update.GetRemovedOrders() {
delete(orderbookDepth, orderId)
}
fmt.Printf("Current depth: %d orders\n", len(orderbookDepth))
}
}
Rust Example
use std::collections::HashMap;
let mut orderbook_depth: HashMap<String, LiquidityPosition> = HashMap::new();
let mut stream = client.subscribe_market_events(request).await?.into_inner();
while let Some(event) = stream.next().await {
match event {
Ok(event) => {
if let Some(update) = &event.orderbook_update {
// Add/update orders
for (order_id, position) in &update.updated_orders {
orderbook_depth.insert(order_id.clone(), position.clone());
}
// Remove orders
for order_id in &update.removed_orders {
orderbook_depth.remove(order_id);
}
println!("Current depth: {} orders", orderbook_depth.len());
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
break;
}
}
}
Best Practices
- Handle reconnections - Streams can disconnect, implement retry logic
- Filter events - Only process events relevant to your use case
- Limit subscriptions - Don't subscribe to too many markets simultaneously
- Clean up streams - Call
stream.cancel()when done - Buffer updates - Rate-limit UI updates to avoid overwhelming the interface
- Error handling - Always listen to
errorandendevents
Stream Lifecycle
// 1. Create subscription
const stream = client.subscribeMarketEvents(request, {})
// 2. Listen to events
stream.on('data', (event) => { /* handle */ })
stream.on('error', (err) => { /* handle */ })
stream.on('end', () => { /* reconnect */ })
// 3. Cancel when done
stream.cancel()
Go Example
// 1. Create subscription
stream, err := client.SubscribeMarketEvents(context.Background(), request)
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
// 2. Listen to events
for {
event, err := stream.Recv()
if err == io.EOF {
// Stream ended - reconnect if needed
break
}
if err != nil {
// Handle error
log.Printf("Stream error: %v", err)
break
}
// Handle event
// ...
}
// 3. Stream automatically closes when loop exits
// For manual cancellation, use context.WithCancel
Rust Example
// 1. Create subscription
let mut stream = client.subscribe_market_events(request).await?.into_inner();
// 2. Listen to events
while let Some(event) = stream.next().await {
match event {
Ok(event) => {
// Handle event
// ...
}
Err(e) => {
// Handle error
eprintln!("Stream error: {}", e);
break;
}
}
}
// 3. Stream automatically closes when dropped
// For manual cancellation, drop the stream or use tokio::select!