use crate::admin::AdminMsg;
use crate::network::{create_iroh_network, NetworkConfig};
use crate::runtime::Runtime;
use crate::shards::{TimeWorker, TimeWorkerParams};
use crate::tasks::TaskParams;
use anyhow::Result;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use gmp::Backend;
use scale_codec::Decode;
use std::path::PathBuf;
use std::sync::Arc;
use time_primitives::admin::Config;
use time_primitives::NetworkId;
use tracing::{span, Level};
use opentelemetry::{trace::TracerProvider as _, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
Resource,
};
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
pub mod admin;
#[cfg(test)]
mod mock;
mod network;
mod runtime;
mod shards;
mod tasks;
fn resource() -> Resource {
Resource::builder()
.with_schema_url(
[KeyValue::new("service.name", "chronicle"), KeyValue::new("service.version", "v1.0")],
"https://opentelemetry.io/schemas/1.30.0",
)
.build()
}
pub fn init_opentelemetry() {
let log_subscriber = json_subscriber::fmt::layer()
.flatten_event(true)
.flatten_current_span_on_top_level(true)
.flatten_span_list_on_top_level(true)
.with_file(true)
.with_line_number(true);
let filter_layer = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("debug"))
.unwrap();
if let Ok(endpoint) = std::env::var("TRACING_ENDPOINT") {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
let tracer_provider = SdkTracerProvider::builder()
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(1.0))))
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource())
.with_batch_exporter(exporter)
.build();
let tracer = tracer_provider.tracer("tracing-otel-subscriber");
tracing_subscriber::registry()
.with(filter_layer)
.with(log_subscriber)
.with(OpenTelemetryLayer::new(tracer))
.init();
} else {
let filter = EnvFilter::from_default_env()
.add_directive("chronicle=debug".parse().unwrap())
.add_directive("tss=debug".parse().unwrap())
.add_directive("peernet=debug".parse().unwrap());
tracing_subscriber::registry()
.with(filter_layer)
.with(log_subscriber)
.with(filter)
.try_init()
.ok();
}
std::panic::set_hook(Box::new(tracing_panic::panic_hook));
}
pub struct ChronicleConfig {
pub network_id: NetworkId,
pub network_key: [u8; 32],
pub target_url: String,
pub target_mnemonic: String,
pub tss_keyshare_cache: PathBuf,
pub backend: Backend,
}
pub async fn run_chronicle(
config: ChronicleConfig,
substrate: Arc<dyn Runtime>,
mut admin: mpsc::Sender<AdminMsg>,
) -> Result<()> {
let span = tracing::span!(Level::INFO, "run_chronicle");
let mut ticker = substrate.finality_notification_stream();
let chain = loop {
let Some((hash, _)) = ticker.next().await else { continue };
let name = substrate.network(config.network_id, hash).await?;
if let Some(name) = name {
break String::decode(&mut name.0.to_vec().as_slice()).unwrap_or_default();
}
tracing::warn!(parent: &span, "network {} isn't registered", config.network_id);
};
tracing::info!(parent: &span, "joining network {chain}");
let (tss_tx, tss_rx) = mpsc::channel(10);
let chain = config.backend.chain(config.network_id, &config.target_mnemonic)?;
let connector = loop {
match chain.connect(config.target_url.clone()).await {
Ok(connector) => break connector,
Err(error) => {
tracing::info!(
parent: &span,
"Initializing connector returned an error {:?}, retrying in one second",
error
);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
},
}
};
let (network, network_requests) =
create_iroh_network(NetworkConfig { secret: config.network_key }, &span).await?;
let account = time_primitives::format_address(substrate.account_id());
let address = connector.chain().format_address(connector.chain().address());
let peer_id = network.format_peer_id(network.peer_id());
let span = span!(
parent: &span,
Level::INFO,
"chronicle",
tc_account = account,
chain_address = address,
gmp_network_id = config.network_id,
net_peer_id = peer_id,
);
admin
.send(AdminMsg::SetConfig(Config {
network: config.network_id,
account,
address,
peer_id,
peer_id_hex: hex::encode(network.peer_id()),
}))
.await?;
loop {
let Some((hash, _)) = ticker.next().await else { continue };
if substrate.is_registered(hash).await? {
break;
}
tracing::warn!(parent: &span, "chronicle isn't registered");
}
let task_params = TaskParams::new(substrate.clone(), connector, tss_tx, admin.clone());
let time_worker = TimeWorker::new(TimeWorkerParams {
network,
task_params,
substrate,
tss_request: tss_rx,
net_request: network_requests,
tss_keyshare_cache: config.tss_keyshare_cache,
admin_request: admin.clone(),
});
time_worker.run(&span).await;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::Mock;
use futures::{Future, FutureExt, StreamExt};
use polkadot_sdk::sp_runtime::BoundedVec;
use scale_codec::Encode;
use std::time::Duration;
use time_primitives::{AccountId, BlockHash, ChainName, ShardStatus, Task};
async fn chronicle(mock: Mock, network_id: NetworkId, exit: impl Future<Output = ()> + Unpin) {
tracing::info!("running chronicle");
let network_key = *mock.account_id().as_ref();
let (tx, mut rx) = mpsc::channel(10);
let root = if std::env::var("CI").is_ok() { "." } else { "/tmp" };
let tss_keyshare_cache = format!("{root}/chronicles/{}", hex::encode(network_key)).into();
std::fs::create_dir_all(&tss_keyshare_cache).unwrap();
let handle = tokio::task::spawn(run_chronicle(
ChronicleConfig {
network_id,
network_key,
target_url: "tempfile".to_string(),
target_mnemonic: "mnemonic".into(),
tss_keyshare_cache,
backend: Backend::Rust,
},
Arc::new(mock.clone()),
tx,
));
tokio::spawn(async move {
while let Some(msg) = rx.next().await {
if let AdminMsg::SetConfig(config) = msg {
tracing::info!("received chronicle config");
mock.register_member(
network_id,
config.account.parse().unwrap(),
hex::decode(&config.peer_id_hex).unwrap().try_into().unwrap(),
);
tracing::info!("registered chronicle");
}
}
});
tracing::info!("registered chronicle");
futures::future::select(handle, exit).await;
}
#[tokio::test]
async fn chronicle_smoke() -> Result<()> {
let (n, t) = (3, 3);
init_opentelemetry();
let mock = Mock::default().instance(42);
let block: BlockHash = BlockHash::from([0u8; 32]);
let network_id = mock.create_network(ChainName(BoundedVec::truncate_from("rust".encode())));
for id in 0..n {
let instance = mock.instance(id as u8);
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(chronicle(instance, network_id, futures::future::pending::<()>()));
});
}
loop {
tracing::info!("waiting for members to register");
if mock.members(network_id).len() < n {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
break;
}
let members: Vec<AccountId> =
mock.members(network_id).into_iter().map(|(public, _)| public).collect();
let shard_id = mock.create_shard(members.clone(), t);
loop {
tracing::info!("waiting for shard");
if mock.shard_status(shard_id, block).await.unwrap() != ShardStatus::Online {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
break;
}
tracing::info!("creating task");
let task_id = mock.create_task(Task::ReadGatewayEvents { blocks: 0..1 });
tracing::info!("assigning task {task_id} {shard_id}");
mock.assign_task(task_id, shard_id);
loop {
tracing::info!("waiting for task {task_id}");
let task = mock.task(task_id).unwrap();
if task.result.is_none() {
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
break;
}
Ok(())
}
#[tokio::test]
async fn chronicle_restart() -> Result<()> {
init_opentelemetry();
let mock = Mock::default().instance(42);
let block: BlockHash = BlockHash::from([0u8; 32]);
let network_id = mock.create_network(ChainName(BoundedVec::truncate_from("rust".encode())));
let mut shutdown = vec![];
for id in 0..3 {
let instance = mock.instance(id + 4);
let (tx, rx) = futures::channel::oneshot::channel();
shutdown.push(tx);
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(chronicle(instance, network_id, rx.map(|_| ())));
});
}
loop {
tracing::info!("waiting for members to register");
if mock.members(network_id).len() < 3 {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
break;
}
let members: Vec<AccountId> =
mock.members(network_id).into_iter().map(|(public, _)| public).collect();
let shard_id = mock.create_shard(members.clone(), 2);
loop {
tracing::info!("waiting for shard");
if mock.shard_status(shard_id, block).await.unwrap() != ShardStatus::Online {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
break;
}
for tx in shutdown {
tx.send(()).unwrap();
}
for id in 0..3 {
let instance = mock.instance(id + 4);
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(chronicle(instance, network_id, futures::future::pending()));
});
}
tracing::info!("creating task");
let task_id = mock.create_task(Task::ReadGatewayEvents { blocks: 0..1 });
tracing::info!("assigning task");
mock.assign_task(task_id, shard_id);
loop {
tracing::info!("waiting for task");
let task = mock.task(task_id).unwrap();
if task.result.is_none() {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
break;
}
Ok(())
}
}