Copy use endless_sdk::crypto::ed25519::PrivateKey as Ed25519PrivateKey;
use endless_sdk::helper_client::Overrides;
use endless_sdk::move_types::identifier::Identifier;
use endless_sdk::move_types::language_storage::{ModuleId, TypeTag};
use endless_sdk::rest_client::endless_api_types::{
Address as EndlessAddress, TransactionOnChainData, ViewFunction,
};
use endless_sdk::rest_client::{
self as endless_rest_client, Client as EndlessClient, PendingTransaction, QueryRange, Response,
};
use endless_sdk::transaction_builder::TransactionBuilder;
use endless_sdk::types::account_address::AccountAddress;
use endless_sdk::types::LocalAccount;
use endless_sdk::types::chain_id::ChainId;
use endless_sdk::types::transaction::{EntryFunction, SignedTransaction, TransactionPayload};
async fn fetch(provider: &Arc<EndlessClient>, start_block: String, end_block: Option<String>) -> anyhow::Result<Vec<Message>> {
#[derive(Clone, Debug, Deserialize)]
struct ChainResponse {
r#type: u8,
id: u64,
}
#[derive(Clone, Debug, Deserialize)]
struct MessageHeadResponse {
r#type: u8,
nonce: u64,
from_chain: ChainResponse,
from_addr: Vec<u8>,
to_chain: ChainResponse,
to_addr: Vec<u8>,
upload_gas_fee: u128,
}
#[derive(Clone, Debug, Deserialize)]
struct MessageSendResponse {
head: MessageHeadResponse,
body: Vec<u8>,
fee: u128,
}
let mut msgs = vec![];
let mut versions = vec![];
// Fetch transaction events
let messager_contract: "MESSAGER_CONTRACT_ADDRESS";
let module_addr = AccountAddress::from_bytes(messager_contract.as_bytes()).unwrap();
let type_tag = format!("0x{}::message::SendMessage", address_hex);
let start_block = u64::from_str(&start_block).unwrap();
let end_block = if end_block.is_some() {
Some(u64::from_str(&end_block.unwrap()).unwrap())
} else {
None
};
let range = QueryRange::by_block(start_block as usize..=end_block.unwrap_or(0) as usize);
let response = provider.get_events_by_type_bcs(&type_tag, range)
.await?;
for item in response.inner() {
let msg_resp: MessageSendResponse = bcs::from_bytes(item.event.event_data()).unwrap();
let from_chain = Chain {
vm: ChainVM::try_from_u8(msg_resp.head.from_chain.r#type)?,
id: msg_resp.head.from_chain.id,
};
if from_chain != self.chain {
log::error!("Fetch on {} != {}", self.chain, from_chain);
continue;
}
let to_chain = Chain {
vm: ChainVM::try_from_u8(msg_resp.head.to_chain.r#type)?,
id: msg_resp.head.to_chain.id,
};
let msg = Message {
head: MessageHead {
r#type: msg_resp.head.r#type,
nonce: msg_resp.head.nonce,
from_addr: from_chain
.address_from_byte32(msg_resp.head.from_addr.try_into().unwrap())?,
from_chain: from_chain,
to_addr: to_chain
.address_from_byte32(msg_resp.head.to_addr.try_into().unwrap())?,
to_chain: to_chain,
fee_relay: msg_resp.head.upload_gas_fee,
},
body: msg_resp.body
};
// todo! You can determine whether to include a message in msgs based on whether msg_head.from_addr or msg_head.to_addr is your own business contract’s address as the sender or receiver.
msgs.push(msg);
versions.push(item.transaction_version);
}
// Fetch transaction hash
let len = versions.len();
if len > 0 {
let mut ver_maps = HashMap::new();
let response = provider
.get_transactions_by_version_bcs(versions)
.await?;
for tx in response.inner() {
match tx {
endless_rest_client::endless_api_types::TransactionData::OnChain(tx) => {
let hash =
TxHash::from_str(self.chain.clone(), &tx.info.transaction_hash())
.unwrap();
ver_maps.insert(tx.version, hash);
}
_ => {}
}
}
for msg in &mut msgs {
if let Some(tx) = ver_maps.get(&msg.from_block) {
msg.from_hash = tx.clone();
}
}
}
Ok(msgs)
}
pub async fn start_fetch_block(first_block_start: u64) -> anyhow::Result<()> {
task::spawn(async move {
let client = Arc::new(EndlessClient::new(Url::parse("YOUR_RPC_URL").unwrap()));
let fetch_msec = 3000; // Query blocks every 3 seconds
let fetch_limit = 400; // Query up to 400 blocks each time
let confmation_block = 12; // Number of confirmation blocks
let mut block_start = get_next_fetch_block().unwrap_or(first_block_start); //todo! Get the last queried block.
if block_start < first_block_start {
block_start = first_block_start;
}
loop {
match get_max_fetch_block(&client, block_start, fetch_limit, confmation_block).await { // todo! Get the current latest block.
Ok(block_number) => {
let block_cur = block_number;
if block_start <= block_cur {
match fetch(
&client,
block_start.to_string(),
Some(block_cur.to_string()),
)
.await
{
Ok(msgs) => {
// todo! Process the message
log::info!(
"Fetch {} tx in block {} - {} successed",
msgs.len(),
block_start,
block_cur,
);
block_start = block_cur + 1; // Next block to query +1
let _ = set_next_fetch_block(block_start); // todo! Set the next block to query
}
Err(err) => {
log::info!(
"Fetch block {} - {} error {}, {} second retry",
block_start,
block_cur,
err.to_string(),
fetch_msec
);
}
}
}
}
Err(err) => {
log::error!("bsc get_max_fetch_block {err}");
}
}
// Wait for one query interval before retrying.
sleep(Duration::from_millis(fetch_msec)).await;
}
});
Ok(())
}