#![cfg_attr(not(feature = "std"), no_std)]
#![allow(clippy::manual_inspect)]
#![doc = simple_mermaid::mermaid!("../docs/tasks_extrinsics.mmd")]
#![doc = simple_mermaid::mermaid!("../docs/tasks_lr.mmd")]
#![doc = simple_mermaid::mermaid!("../docs/unregister_gateways.mmd")]
#![doc = simple_mermaid::mermaid!("../docs/reset_tasks.mmd")]
#[cfg(feature = "runtime-benchmarks")]
mod benchmarking;
pub use pallet::*;
#[cfg(test)]
mod mock;
pub mod queue;
#[cfg(test)]
mod tests;
#[polkadot_sdk::frame_support::pallet]
pub mod pallet {
use crate::queue::*;
use polkadot_sdk::{
frame_support::{self, Blake2_128Concat},
frame_system, pallet_balances, sp_runtime, sp_std,
};
use frame_support::pallet_prelude::*;
use frame_system::pallet_prelude::*;
use sp_runtime::Saturating;
use sp_std::boxed::Box;
use sp_std::vec;
use sp_std::vec::Vec;
use time_primitives::{
AccountId, Balance, BatchBuilder, BatchId, ErrorMsg, GatewayMessage, GatewayOp, GmpEvent,
GmpEvents, Hash as TxHash, MessageId, NetworkId, NetworksInterface, ShardId,
ShardsInterface, Task, TaskId, TaskResult, TasksInterface, TssPublicKey, TssSignature,
MAX_GMP_EVENTS,
};
pub trait WeightInfo {
fn submit_task_result() -> Weight;
fn prepare_batches(n: u32) -> Weight;
fn schedule_tasks(n: u32) -> Weight;
fn submit_gmp_events() -> Weight;
fn sync_network() -> Weight;
fn stop_network() -> Weight;
fn remove_task() -> Weight;
fn restart_batch() -> Weight;
}
impl WeightInfo for () {
fn submit_task_result() -> Weight {
Weight::default()
}
fn prepare_batches(_: u32) -> Weight {
Weight::default()
}
fn schedule_tasks(_: u32) -> Weight {
Weight::default()
}
fn submit_gmp_events() -> Weight {
Weight::default()
}
fn sync_network() -> Weight {
Weight::default()
}
fn stop_network() -> Weight {
Weight::default()
}
fn remove_task() -> Weight {
Weight::default()
}
fn restart_batch() -> Weight {
Weight::default()
}
}
#[pallet::pallet]
#[pallet::without_storage_info]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config:
polkadot_sdk::frame_system::Config<AccountId = AccountId>
+ pallet_balances::Config<Balance = Balance>
{
type RuntimeEvent: From<Event<Self>>
+ IsType<<Self as polkadot_sdk::frame_system::Config>::RuntimeEvent>;
type AdminOrigin: EnsureOrigin<Self::RuntimeOrigin>;
type WeightInfo: WeightInfo;
type Shards: ShardsInterface;
type Networks: NetworksInterface;
type MaxTasksPerBlock: Get<u32>;
type MaxBatchesPerBlock: Get<u32>;
}
#[pallet::storage]
pub type UATasks<T: Config> = StorageDoubleMap<
_,
Blake2_128Concat,
NetworkId,
Blake2_128Concat,
Index,
TaskId,
OptionQuery,
>;
#[pallet::storage]
pub type UATasksInsertIndex<T: Config> =
StorageMap<_, Blake2_128Concat, NetworkId, Index, OptionQuery>;
#[pallet::storage]
pub type UATasksRemoveIndex<T: Config> =
StorageMap<_, Blake2_128Concat, NetworkId, Index, OptionQuery>;
#[pallet::storage]
pub type Ops<T: Config> = StorageDoubleMap<
_,
Blake2_128Concat,
NetworkId,
Blake2_128Concat,
Index,
GatewayOp,
OptionQuery,
>;
#[pallet::storage]
pub type OpsInsertIndex<T: Config> =
StorageMap<_, Blake2_128Concat, NetworkId, Index, OptionQuery>;
#[pallet::storage]
pub type OpsRemoveIndex<T: Config> =
StorageMap<_, Blake2_128Concat, NetworkId, Index, OptionQuery>;
#[pallet::storage]
pub type ShardTasks<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, ShardId, Blake2_128Concat, TaskId, (), OptionQuery>;
#[pallet::storage]
pub type TaskShard<T: Config> = StorageMap<_, Blake2_128Concat, TaskId, ShardId, OptionQuery>;
#[pallet::storage]
pub type NetworkShards<T: Config> = StorageDoubleMap<
_,
Blake2_128Concat,
NetworkId,
Blake2_128Concat,
ShardId,
(),
OptionQuery,
>;
#[pallet::storage]
pub type TaskIdCounter<T: Config> = StorageValue<_, u64, ValueQuery>;
#[pallet::storage]
pub type TaskCount<T: Config> = StorageMap<_, Blake2_128Concat, NetworkId, u64, ValueQuery>;
#[pallet::storage]
pub type ExecutedTaskCount<T: Config> =
StorageMap<_, Blake2_128Concat, NetworkId, u64, ValueQuery>;
#[pallet::storage]
pub type ShardTaskCount<T: Config> = StorageMap<_, Blake2_128Concat, ShardId, u32, ValueQuery>;
#[pallet::storage]
#[pallet::getter(fn tasks)]
pub type Tasks<T: Config> = StorageMap<_, Blake2_128Concat, TaskId, Task, OptionQuery>;
#[pallet::storage]
pub type TaskOutput<T: Config> =
StorageMap<_, Blake2_128Concat, TaskId, Result<(), ErrorMsg>, OptionQuery>;
#[pallet::storage]
pub type TaskNetwork<T: Config> =
StorageMap<_, Blake2_128Concat, TaskId, NetworkId, OptionQuery>;
#[pallet::storage]
pub type ShardRegisterBatchId<T: Config> =
StorageMap<_, Blake2_128Concat, TssPublicKey, BatchId, OptionQuery>;
#[pallet::storage]
pub type ShardUnregisterBatchId<T: Config> =
StorageMap<_, Blake2_128Concat, TssPublicKey, BatchId, OptionQuery>;
#[pallet::storage]
pub type ShardRegistered<T: Config> =
StorageMap<_, Blake2_128Concat, TssPublicKey, (), OptionQuery>;
#[pallet::storage]
pub type ReadEventsTask<T: Config> =
StorageMap<_, Blake2_128Concat, NetworkId, TaskId, OptionQuery>;
#[pallet::storage]
pub type SyncHeight<T: Config> = StorageMap<_, Blake2_128Concat, NetworkId, u64, ValueQuery>;
#[pallet::storage]
pub type MessageReceivedTaskId<T: Config> =
StorageMap<_, Blake2_128Concat, MessageId, TaskId, OptionQuery>;
#[pallet::storage]
pub type MessageExecutedTaskId<T: Config> =
StorageMap<_, Blake2_128Concat, MessageId, TaskId, OptionQuery>;
#[pallet::storage]
pub type MessageBatchId<T: Config> =
StorageMap<_, Blake2_128Concat, MessageId, BatchId, OptionQuery>;
#[pallet::storage]
pub type BatchIdCounter<T: Config> = StorageValue<_, u64, ValueQuery>;
#[pallet::storage]
pub type Batch<T: Config> =
StorageMap<_, Blake2_128Concat, BatchId, GatewayMessage, OptionQuery>;
#[pallet::storage]
pub type BatchTaskId<T: Config> = StorageMap<_, Blake2_128Concat, BatchId, TaskId, OptionQuery>;
#[pallet::storage]
pub type PendingBatches<T: Config> = StorageMap<_, Blake2_128Concat, BatchId, (), OptionQuery>;
#[pallet::storage]
pub type FailedBatches<T: Config> = StorageMap<_, Blake2_128Concat, BatchId, (), OptionQuery>;
#[pallet::storage]
pub type BatchTxHash<T: Config> = StorageMap<_, Blake2_128Concat, BatchId, TxHash, OptionQuery>;
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
pub enum Event<T: Config> {
TaskCreated(TaskId),
TaskResult(TaskId, Result<(), ErrorMsg>),
ShardTaskLimitSet(NetworkId, u32),
BatchSizeSet(NetworkId, u64, u64),
BatchRestarted(TaskId, TaskId),
MessageReceived(MessageId),
MessageExecuted(MessageId),
}
#[pallet::error]
pub enum Error<T> {
UnknownTask,
UnknownShard,
InvalidSignature,
InvalidTaskResult,
InvalidSigner,
UnassignedTask,
TaskSigned,
GatewayNotRegistered,
InvalidBatchId,
CannotRemoveTask,
}
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_initialize(_: BlockNumberFor<T>) -> Weight {
log::info!("on_initialize begin");
let weight = Self::prepare_batches().saturating_add(Self::schedule_tasks());
log::info!("on_initialize end");
weight
}
}
#[pallet::call]
impl<T: Config> Pallet<T> {
#[pallet::call_index(1)]
#[pallet::weight((
<T as Config>::WeightInfo::submit_task_result(),
DispatchClass::Normal,
Pays::No
))]
pub fn submit_task_result(
origin: OriginFor<T>,
task_id: TaskId,
result: TaskResult,
) -> DispatchResult {
let signer = ensure_signed(origin)?;
let task = Tasks::<T>::get(task_id).ok_or(Error::<T>::UnknownTask)?;
if TaskOutput::<T>::contains_key(task_id) {
return Ok(());
}
let shard = TaskShard::<T>::get(task_id).ok_or(Error::<T>::UnassignedTask)?;
let network = T::Shards::shard_network(shard).ok_or(Error::<T>::UnknownShard)?;
let result = match (task, result) {
(
Task::ReadGatewayEvents { blocks },
TaskResult::ReadGatewayEvents { events, signature },
) => {
let bytes = time_primitives::encode_gmp_events(task_id, &events.0);
Self::verify_signature(shard, &bytes, signature)?;
let curr = SyncHeight::<T>::get(network);
if curr == blocks.start {
SyncHeight::<T>::insert(network, blocks.end);
}
if ReadEventsTask::<T>::contains_key(network) {
Self::read_gateway_events(network);
}
let remaining = events.0.len() == MAX_GMP_EVENTS as usize;
Self::process_events(network, task_id, events);
if remaining {
return Ok(());
}
Ok(())
},
(
Task::SubmitGatewayMessage { batch_id },
TaskResult::SubmitGatewayMessage { error },
) => {
let members = T::Shards::shard_members(shard);
ensure!(members.contains(&signer), Error::<T>::InvalidSigner);
PendingBatches::<T>::remove(batch_id);
FailedBatches::<T>::insert(batch_id, ());
Err(error)
},
(_, _) => return Err(Error::<T>::InvalidTaskResult.into()),
};
Self::finish_task(network, task_id, result);
Ok(())
}
#[pallet::call_index(10)]
#[pallet::weight(<T as Config>::WeightInfo::submit_gmp_events())]
pub fn submit_gmp_events(
origin: OriginFor<T>,
network: NetworkId,
events: GmpEvents,
) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
for event in events.0.iter() {
if let GmpEvent::BatchExecuted { batch_id, .. } = event {
FailedBatches::<T>::remove(batch_id);
if let Some(task_id) = BatchTaskId::<T>::get(batch_id) {
TaskOutput::<T>::remove(task_id);
}
}
}
Self::process_events(network, 0, events);
Ok(())
}
#[pallet::call_index(11)]
#[pallet::weight(<T as Config>::WeightInfo::sync_network())]
pub fn sync_network(
origin: OriginFor<T>,
network: NetworkId,
block: u64,
) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
SyncHeight::<T>::insert(network, block);
Ok(())
}
#[pallet::call_index(12)]
#[pallet::weight(<T as Config>::WeightInfo::stop_network())]
pub fn stop_network(origin: OriginFor<T>, network: NetworkId) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
ReadEventsTask::<T>::remove(network);
Ok(())
}
#[pallet::call_index(13)]
#[pallet::weight(<T as Config>::WeightInfo::remove_task())]
pub fn remove_task(origin: OriginFor<T>, task: TaskId) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
if TaskOutput::<T>::take(task).is_none() {
return Err(Error::<T>::CannotRemoveTask.into());
}
if let Some(Task::SubmitGatewayMessage { batch_id }) = Tasks::<T>::take(task) {
if let Some(msg) = Batch::<T>::take(batch_id) {
for op in msg.ops {
if let GatewayOp::SendMessage(msg) = op {
let message = msg.message_id();
MessageReceivedTaskId::<T>::remove(message);
MessageBatchId::<T>::remove(message);
}
}
}
BatchTaskId::<T>::remove(batch_id);
}
TaskNetwork::<T>::remove(task);
Ok(())
}
#[pallet::call_index(14)]
#[pallet::weight(<T as Config>::WeightInfo::restart_batch())]
pub fn restart_batch(origin: OriginFor<T>, batch_id: BatchId) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
let old_task_id = BatchTaskId::<T>::get(batch_id).ok_or(Error::<T>::InvalidBatchId)?;
let network = TaskNetwork::<T>::get(old_task_id).ok_or(Error::<T>::UnknownTask)?;
let new_task_id = Self::create_task(network, Task::SubmitGatewayMessage { batch_id });
BatchTaskId::<T>::insert(batch_id, new_task_id);
PendingBatches::<T>::insert(batch_id, ());
FailedBatches::<T>::remove(batch_id);
Self::deposit_event(Event::BatchRestarted(old_task_id, new_task_id));
Ok(())
}
}
impl<T: Config> Pallet<T> {
pub(crate) fn process_events(network: NetworkId, task_id: TaskId, events: GmpEvents) {
for event in events.0 {
match event {
GmpEvent::ShardRegistered(pubkey) => {
ShardRegistered::<T>::insert(pubkey, ());
},
GmpEvent::ShardUnregistered(pubkey) => {
ShardRegistered::<T>::remove(pubkey);
},
GmpEvent::MessageReceived(msg) => {
let msg_id = msg.message_id();
Self::ops_queue(msg.dest_network).push(GatewayOp::SendMessage(msg));
MessageReceivedTaskId::<T>::insert(msg_id, task_id);
Self::deposit_event(Event::<T>::MessageReceived(msg_id));
},
GmpEvent::MessageExecuted(msg_id) => {
MessageExecutedTaskId::<T>::insert(msg_id, task_id);
Self::deposit_event(Event::<T>::MessageExecuted(msg_id));
},
GmpEvent::BatchExecuted { batch_id, tx_hash } => {
PendingBatches::<T>::remove(batch_id);
if let Some(task_id) = BatchTaskId::<T>::get(batch_id) {
Self::finish_task(network, task_id, Ok(()));
}
if let Some(hash) = tx_hash {
BatchTxHash::<T>::insert(batch_id, hash);
}
},
}
}
}
fn verify_signature(
shard_id: ShardId,
data: &[u8],
signature: TssSignature,
) -> DispatchResult {
let public_key = T::Shards::tss_public_key(shard_id).ok_or(Error::<T>::UnknownShard)?;
if time_primitives::verify_signature(public_key, data, signature).is_err() {
log::error!("invalid tss signature shard_id={shard_id} public_key={public_key:?} data={data:?} sig={signature:?}");
return Err(Error::<T>::InvalidSignature.into());
}
Ok(())
}
pub(crate) fn create_task(network: NetworkId, task: Task) -> TaskId {
let task_id = TaskIdCounter::<T>::get().saturating_plus_one();
let needs_registration = task.needs_registration();
Tasks::<T>::insert(task_id, task);
TaskNetwork::<T>::insert(task_id, network);
TaskIdCounter::<T>::put(task_id);
if !needs_registration {
ReadEventsTask::<T>::insert(network, task_id);
} else {
Self::ua_task_queue(network).push(task_id);
}
TaskCount::<T>::insert(network, TaskCount::<T>::get(network).saturating_add(1));
Self::deposit_event(Event::TaskCreated(task_id));
log::debug!("create task {task_id} (network {network})");
task_id
}
pub(crate) fn finish_task(
network: NetworkId,
task_id: TaskId,
result: Result<(), ErrorMsg>,
) {
if TaskOutput::<T>::contains_key(task_id) {
return;
}
TaskOutput::<T>::insert(task_id, result.clone());
if let Some(shard) = TaskShard::<T>::take(task_id) {
log::debug!("finish task {task_id} on {shard}");
ShardTasks::<T>::remove(shard, task_id);
ShardTaskCount::<T>::insert(
shard,
ShardTaskCount::<T>::get(shard).saturating_sub(1),
);
ExecutedTaskCount::<T>::insert(
network,
ExecutedTaskCount::<T>::get(network).saturating_add(1),
);
}
Self::deposit_event(Event::TaskResult(task_id, result));
}
pub(crate) fn read_gateway_events(network: NetworkId) -> TaskId {
let block = SyncHeight::<T>::get(network);
let size = T::Networks::next_batch_size(network, block) as u64;
let end = block.saturating_add(size);
Self::create_task(network, Task::ReadGatewayEvents { blocks: block..end })
}
fn ua_task_queue(network: NetworkId) -> Box<dyn QueueT<T, TaskId>> {
Box::new(
QueueImpl::<T, TaskId, UATasksInsertIndex<T>, UATasksRemoveIndex<T>, UATasks<T>>::new(
network,
),
)
}
pub(crate) fn ops_queue(network: NetworkId) -> Box<dyn QueueT<T, GatewayOp>> {
Box::new(QueueImpl::<T, GatewayOp, OpsInsertIndex<T>, OpsRemoveIndex<T>, Ops<T>>::new(
network,
))
}
pub(crate) fn is_shard_registered(shard: ShardId) -> bool {
let Some(pubkey) = T::Shards::tss_public_key(shard) else {
return false;
};
ShardRegistered::<T>::contains_key(pubkey)
}
pub(crate) fn assign_task(shard: ShardId, task_id: TaskId) {
log::debug!("assigned task {task_id} to {shard}");
ShardTasks::<T>::insert(shard, task_id, ());
TaskShard::<T>::insert(task_id, shard);
ShardTaskCount::<T>::insert(shard, ShardTaskCount::<T>::get(shard).saturating_add(1));
}
fn schedule_tasks_shard(network: NetworkId, shard_id: ShardId, capacity: u32) -> u32 {
let mut num_tasks_assigned = 0u32;
let queue = Self::ua_task_queue(network);
for _ in 0..capacity {
let Some(task) = queue.pop() else {
break;
};
Self::assign_task(shard_id, task);
num_tasks_assigned = num_tasks_assigned.saturating_plus_one();
}
num_tasks_assigned
}
pub(crate) fn schedule_tasks() -> Weight {
let mut num_tasks_assigned: u32 = 0u32;
for (network, task_id) in ReadEventsTask::<T>::iter() {
let max_assignable_tasks = T::Networks::shard_task_limit(network);
if TaskShard::<T>::get(task_id).is_none() {
for (shard, _) in NetworkShards::<T>::iter_prefix(network) {
if ShardTaskCount::<T>::get(shard) < max_assignable_tasks {
if num_tasks_assigned == T::MaxTasksPerBlock::get() {
return <T as Config>::WeightInfo::schedule_tasks(
T::MaxTasksPerBlock::get(),
);
}
Self::assign_task(shard, task_id);
num_tasks_assigned = num_tasks_assigned.saturating_plus_one();
break;
}
}
}
let registered_shards: Vec<ShardId> = NetworkShards::<T>::iter_prefix(network)
.map(|(shard, _)| shard)
.filter(|shard| {
Self::is_shard_registered(*shard) && T::Shards::is_shard_online(*shard)
})
.collect();
if registered_shards.is_empty() {
continue;
}
let task_count = TaskCount::<T>::get(network);
let executed_task_count = ExecutedTaskCount::<T>::get(network);
let assignable_task_count = task_count - executed_task_count;
log::debug!("assignable tasks: {task_count} - {executed_task_count} = {assignable_task_count}");
let tasks_per_shard =
(assignable_task_count as u32 - 1) / registered_shards.len() as u32 + 1;
let tasks_per_shard = core::cmp::min(tasks_per_shard, max_assignable_tasks);
log::debug!("task_per_shard: {tasks_per_shard}");
for shard in registered_shards {
let shard_task_count = ShardTaskCount::<T>::get(shard);
let capacity = tasks_per_shard.saturating_sub(shard_task_count);
log::debug!(
"{shard} shard_task_count: {shard_task_count} shard_capacity: {capacity}",
);
if T::MaxTasksPerBlock::get() > num_tasks_assigned.saturating_add(capacity) {
num_tasks_assigned = num_tasks_assigned
.saturating_add(Self::schedule_tasks_shard(network, shard, capacity));
} else {
Self::schedule_tasks_shard(
network,
shard,
T::MaxTasksPerBlock::get().saturating_sub(num_tasks_assigned),
);
return <T as Config>::WeightInfo::schedule_tasks(
T::MaxTasksPerBlock::get(),
);
}
}
}
<T as Config>::WeightInfo::schedule_tasks(num_tasks_assigned)
}
pub(crate) fn prepare_batches() -> Weight {
let mut num_batches_started = 0u32;
for (network, _) in ReadEventsTask::<T>::iter() {
let batch_gas_params = T::Networks::batch_gas_params(network);
let mut batcher = BatchBuilder::new(batch_gas_params);
let queue = Self::ops_queue(network);
while let Some(op) = queue.pop() {
if let Some(msg) = batcher.push(op) {
Self::start_batch(network, msg);
num_batches_started = num_batches_started.saturating_plus_one();
}
if num_batches_started == T::MaxBatchesPerBlock::get().saturating_less_one() {
break;
}
}
if let Some(msg) = batcher.take_batch() {
Self::start_batch(network, msg);
num_batches_started = num_batches_started.saturating_plus_one();
}
}
<T as Config>::WeightInfo::prepare_batches(num_batches_started)
}
fn start_batch(network: NetworkId, msg: GatewayMessage) {
let batch_id = BatchIdCounter::<T>::get();
BatchIdCounter::<T>::put(batch_id.saturating_add(1));
for op in &msg.ops {
match op {
GatewayOp::SendMessage(msg) => {
let msg_id = msg.message_id();
MessageBatchId::<T>::insert(msg_id, batch_id);
},
GatewayOp::RegisterShard(key, _) => {
ShardRegisterBatchId::<T>::insert(key, batch_id);
},
GatewayOp::UnregisterShard(key, _) => {
ShardUnregisterBatchId::<T>::insert(key, batch_id);
},
}
}
Batch::<T>::insert(batch_id, msg);
let task_id = Self::create_task(network, Task::SubmitGatewayMessage { batch_id });
PendingBatches::<T>::insert(batch_id, ());
BatchTaskId::<T>::insert(batch_id, task_id);
}
}
impl<T: Config> Pallet<T> {
pub fn shard_tasks(shard_id: ShardId) -> Vec<TaskId> {
ShardTasks::<T>::iter_prefix(shard_id).map(|(task_id, _)| task_id).collect()
}
pub fn task(task_id: TaskId) -> Option<Task> {
Tasks::<T>::get(task_id)
}
pub fn task_shard(task_id: TaskId) -> Option<ShardId> {
TaskShard::<T>::get(task_id)
}
pub fn task_result(task_id: TaskId) -> Option<Result<(), ErrorMsg>> {
TaskOutput::<T>::get(task_id)
}
pub fn batch_message(batch: BatchId) -> Option<GatewayMessage> {
Batch::<T>::get(batch)
}
pub fn failed_batches() -> Vec<BatchId> {
FailedBatches::<T>::iter_keys().collect()
}
pub fn pending_batches() -> Vec<BatchId> {
PendingBatches::<T>::iter_keys().collect()
}
}
impl<T: Config> TasksInterface for Pallet<T> {
fn shard_online(shard_id: ShardId, network: NetworkId) {
NetworkShards::<T>::insert(network, shard_id, ());
if T::Networks::gateway(network).is_some() {
let Some(key) = T::Shards::tss_public_key(shard_id) else {
return;
};
let Some(sessions) = T::Shards::num_sessions(shard_id) else {
return;
};
Self::ops_queue(network).push(GatewayOp::RegisterShard(key, sessions));
}
}
fn shard_offline(shard_id: ShardId, network: NetworkId) {
NetworkShards::<T>::remove(network, shard_id);
let read_events_task_id = ReadEventsTask::<T>::get(network);
ShardTasks::<T>::drain_prefix(shard_id).for_each(|(task_id, _)| {
TaskShard::<T>::remove(task_id);
if Some(task_id) != read_events_task_id {
Self::ua_task_queue(network).push(task_id);
}
});
log::info!("shard {shard_id} offline");
ShardTaskCount::<T>::insert(shard_id, 0);
let Some(key) = T::Shards::tss_public_key(shard_id) else {
return;
};
let Some(sessions) = T::Shards::num_sessions(shard_id) else {
return;
};
Self::ops_queue(network).push(GatewayOp::UnregisterShard(key, sessions));
}
fn gateway_registered(network: NetworkId, block: u64) {
SyncHeight::<T>::insert(network, block);
Self::read_gateway_events(network);
}
fn network_removed(network: NetworkId) {
SyncHeight::<T>::remove(network);
ReadEventsTask::<T>::remove(network);
for (shard, _) in NetworkShards::<T>::drain_prefix(network) {
T::Shards::force_shard_offline(shard);
}
}
}
}