Example of Message Listener Service (Rust)
After integrating the message contract, both sending messages and chain confirmation messages will generate events. You need to listen to these events to complete the subsequent business logic.
Structure of the message payload:
struct Message {
MessageHeader msg_header; // Message Header
bytes msg_body; // Message Content
}
struct MessageHeader {
uint8 msg_type; // 0 means bridge message
uint64 nonce; // Message nonce, used to prevent replay attacks.
Chain from_chain; // Chain from which the message is sent
bytes32 sender; // Sender contract address
Chain to_chain; // Chain to which the message is sent
bytes32 receiver; // Receiver contract address
uint128 upload_gas_fee; // Gas fee for uploading the message
}
Structure of the execution message payload:
struct ConfirmMsg(
address executor;// Executor address
Chain from_chain;// Chain from which the message was sent
uint64 nonce;// Message nonce (to prevent replay)
bytes mbody; // Message body (content)
)
Rust Implementation of Block Scanning Logic and On-Chain Event Handling — EVM Chain Listening:
use ethers::{
contract::abigen,
core::k256,
middleware::SignerMiddleware,
providers::{Middleware, Provider},
signers::{LocalWallet, Signer},
};
abigen!(IMessager, "./abi/Messager.json");
async fn fetch(
messager_contract: &IMessager<Provider<Http>>,
block_start: String,
block_end: Option<String>,
) -> anyhow::Result<Vec<DbOrder>> {
let start = u64::from_str(&block_start).unwrap();
let end = if block_end.is_some() {
Some(u64::from_str(&block_end.unwrap()).unwrap())
} else {
None
};
let mut builder = messager_contract
.event::<MsgFilter>()
.from_block(start);
match end {
Some(e) => builder = builder.to_block(e),
None => {}
}
let events = builder.query_with_meta().await?;
let mut msgs = vec![];
for (event, meta) in events.iter() {
let head = event.0.msg_header.clone();
let from_chain = Chain {
vm: ChainVM::try_from_u8(head.from_chain.chain_type)?,
id: head.from_chain.chain_id,
};
if from_chain != self.chain {
log::error!("Fetch on {} != {}", self.chain, from_chain);
continue;
}
let to_chain = Chain {
vm: ChainVM::try_from_u8(head.to_chain.chain_type)?,
id: head.to_chain.chain_id,
};
let msg_head = MessageHead {
r#type: head.msg_type,
nonce: head.nonce,
from_addr: from_chain
.address_from_byte32(head.sender)
.unwrap_or_default(),
from_chain,
to_addr: to_chain
.address_from_byte32(head.receiver)
.unwrap_or_default(),
to_chain,
fee_relay: head.upload_gas_fee,
};
let msg = Message {
head: msg_head,
body: event.0.msg_body.to_vec(),
};
// todo! You can determine whether to include a message in msgs based on whether msg_head.from_addr or msg_head.to_addr corresponds to your own business contract as the sender or receiver.
msgs.push(msg);
}
Ok(msgs)
}
pub async fn start_fetch_block(first_block_start: u64) -> anyhow::Result<()> {
task::spawn(async move {
let client = Arc::new(Provider::<Http>::try_from("YOUR_RPC_URL").unwrap());
let messager_contract: IMessager<Provider<Http>> = IMessager::new(
H160::from_str("MESSAGER_CONTRACT_ADDRESS").unwrap(),
client.clone(),
);
let fetch_msec = 3000; // Query blocks every 3 seconds
let fetch_limit = 400; // Query up to 400 blocks each time
let confmation_block = 12; // Number of confirmation blocks
let mut block_start = get_next_fetch_block().unwrap_or(first_block_start); //todo! Get the last queried block
if block_start < first_block_start {
block_start = first_block_start;
}
loop {
match get_max_fetch_block(&client, block_start, fetch_limit, confmation_block).await { // todo! Get the current latest block
Ok(block_number) => {
let block_cur = block_number;
if block_start <= block_cur {
match fetch(
&messager_contract,
block_start.to_string(),
Some(block_cur.to_string()),
)
.await
{
Ok(msgs) => {
// todo! Process the message
log::info!(
"Fetch {} tx in block {} - {} successed",
msgs.len(),
block_start,
block_cur,
);
block_start = block_cur + 1; // Next block to query +1
let _ = set_next_fetch_block(block_start); // todo! Set the next block to query
}
Err(err) => {
log::info!(
"Fetch block {} - {} error {}, {} second retry",
block_start,
block_cur,
err.to_string(),
fetch_msec
);
}
}
}
}
Err(err) => {
log::error!("bsc get_max_fetch_block {err}");
}
}
// Wait for one query interval before retrying
sleep(Duration::from_millis(fetch_msec)).await;
}
});
Ok(())
}
Rust Implementation of Block Scanning Logic and On-Chain Event Handling — Endless Chain Listening:
use endless_sdk::crypto::ed25519::PrivateKey as Ed25519PrivateKey;
use endless_sdk::helper_client::Overrides;
use endless_sdk::move_types::identifier::Identifier;
use endless_sdk::move_types::language_storage::{ModuleId, TypeTag};
use endless_sdk::rest_client::endless_api_types::{
Address as EndlessAddress, TransactionOnChainData, ViewFunction,
};
use endless_sdk::rest_client::{
self as endless_rest_client, Client as EndlessClient, PendingTransaction, QueryRange, Response,
};
use endless_sdk::transaction_builder::TransactionBuilder;
use endless_sdk::types::account_address::AccountAddress;
use endless_sdk::types::LocalAccount;
use endless_sdk::types::chain_id::ChainId;
use endless_sdk::types::transaction::{EntryFunction, SignedTransaction, TransactionPayload};
async fn fetch(provider: &Arc<EndlessClient>, start_block: String, end_block: Option<String>) -> anyhow::Result<Vec<Message>> {
#[derive(Clone, Debug, Deserialize)]
struct ChainResponse {
r#type: u8,
id: u64,
}
#[derive(Clone, Debug, Deserialize)]
struct MessageHeadResponse {
r#type: u8,
nonce: u64,
from_chain: ChainResponse,
from_addr: Vec<u8>,
to_chain: ChainResponse,
to_addr: Vec<u8>,
upload_gas_fee: u128,
}
#[derive(Clone, Debug, Deserialize)]
struct MessageSendResponse {
head: MessageHeadResponse,
body: Vec<u8>,
fee: u128,
}
let mut msgs = vec![];
let mut versions = vec![];
// Fetch transaction events
let messager_contract: "MESSAGER_CONTRACT_ADDRESS";
let module_addr = AccountAddress::from_bytes(messager_contract.as_bytes()).unwrap();
let type_tag = format!("0x{}::message::SendMessage", address_hex);
let start_block = u64::from_str(&start_block).unwrap();
let end_block = if end_block.is_some() {
Some(u64::from_str(&end_block.unwrap()).unwrap())
} else {
None
};
let range = QueryRange::by_block(start_block as usize..=end_block.unwrap_or(0) as usize);
let response = provider.get_events_by_type_bcs(&type_tag, range)
.await?;
for item in response.inner() {
let msg_resp: MessageSendResponse = bcs::from_bytes(item.event.event_data()).unwrap();
let from_chain = Chain {
vm: ChainVM::try_from_u8(msg_resp.head.from_chain.r#type)?,
id: msg_resp.head.from_chain.id,
};
if from_chain != self.chain {
log::error!("Fetch on {} != {}", self.chain, from_chain);
continue;
}
let to_chain = Chain {
vm: ChainVM::try_from_u8(msg_resp.head.to_chain.r#type)?,
id: msg_resp.head.to_chain.id,
};
let msg = Message {
head: MessageHead {
r#type: msg_resp.head.r#type,
nonce: msg_resp.head.nonce,
from_addr: from_chain
.address_from_byte32(msg_resp.head.from_addr.try_into().unwrap())?,
from_chain: from_chain,
to_addr: to_chain
.address_from_byte32(msg_resp.head.to_addr.try_into().unwrap())?,
to_chain: to_chain,
fee_relay: msg_resp.head.upload_gas_fee,
},
body: msg_resp.body
};
// todo! You can determine whether to include a message in msgs based on whether msg_head.from_addr or msg_head.to_addr is your own business contract’s address as the sender or receiver.
msgs.push(msg);
versions.push(item.transaction_version);
}
// Fetch transaction hash
let len = versions.len();
if len > 0 {
let mut ver_maps = HashMap::new();
let response = provider
.get_transactions_by_version_bcs(versions)
.await?;
for tx in response.inner() {
match tx {
endless_rest_client::endless_api_types::TransactionData::OnChain(tx) => {
let hash =
TxHash::from_str(self.chain.clone(), &tx.info.transaction_hash())
.unwrap();
ver_maps.insert(tx.version, hash);
}
_ => {}
}
}
for msg in &mut msgs {
if let Some(tx) = ver_maps.get(&msg.from_block) {
msg.from_hash = tx.clone();
}
}
}
Ok(msgs)
}
pub async fn start_fetch_block(first_block_start: u64) -> anyhow::Result<()> {
task::spawn(async move {
let client = Arc::new(EndlessClient::new(Url::parse("YOUR_RPC_URL").unwrap()));
let fetch_msec = 3000; // Query blocks every 3 seconds
let fetch_limit = 400; // Query up to 400 blocks each time
let confmation_block = 12; // Number of confirmation blocks
let mut block_start = get_next_fetch_block().unwrap_or(first_block_start); //todo! Get the last queried block.
if block_start < first_block_start {
block_start = first_block_start;
}
loop {
match get_max_fetch_block(&client, block_start, fetch_limit, confmation_block).await { // todo! Get the current latest block.
Ok(block_number) => {
let block_cur = block_number;
if block_start <= block_cur {
match fetch(
&client,
block_start.to_string(),
Some(block_cur.to_string()),
)
.await
{
Ok(msgs) => {
// todo! Process the message
log::info!(
"Fetch {} tx in block {} - {} successed",
msgs.len(),
block_start,
block_cur,
);
block_start = block_cur + 1; // Next block to query +1
let _ = set_next_fetch_block(block_start); // todo! Set the next block to query
}
Err(err) => {
log::info!(
"Fetch block {} - {} error {}, {} second retry",
block_start,
block_cur,
err.to_string(),
fetch_msec
);
}
}
}
}
Err(err) => {
log::error!("bsc get_max_fetch_block {err}");
}
}
// Wait for one query interval before retrying.
sleep(Duration::from_millis(fetch_msec)).await;
}
});
Ok(())
}
Last updated