use crate::admin::AdminMsg;
use crate::network::{create_iroh_network, NetworkConfig};
use crate::runtime::Runtime;
use crate::shards::{TimeWorker, TimeWorkerParams};
use crate::tasks::TaskParams;
use anyhow::Result;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use gmp::Backend;
use scale_codec::Decode;
use std::path::PathBuf;
use std::sync::Arc;
use time_primitives::admin::Config;
use time_primitives::NetworkId;
use tracing::{span, Level};
use opentelemetry::{trace::TracerProvider as _, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
	trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
	Resource,
};
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
pub mod admin;
#[cfg(test)]
mod mock;
mod network;
mod runtime;
mod shards;
mod tasks;
fn resource() -> Resource {
	Resource::builder()
		.with_schema_url(
			[KeyValue::new("service.name", "chronicle"), KeyValue::new("service.version", "v1.0")],
			"https://opentelemetry.io/schemas/1.30.0",
		)
		.build()
}
pub fn init_opentelemetry() {
	let log_subscriber = json_subscriber::fmt::layer()
		.flatten_event(true)
		.flatten_current_span_on_top_level(true)
		.flatten_span_list_on_top_level(true)
		.with_file(true)
		.with_line_number(true);
	let filter_layer = EnvFilter::try_from_default_env()
		.or_else(|_| EnvFilter::try_new("debug"))
		.unwrap();
	if let Ok(endpoint) = std::env::var("TRACING_ENDPOINT") {
		let exporter = opentelemetry_otlp::SpanExporter::builder()
			.with_tonic()
			.with_endpoint(endpoint)
			.build()
			.unwrap();
		let tracer_provider = SdkTracerProvider::builder()
			.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(1.0))))
			.with_id_generator(RandomIdGenerator::default())
			.with_resource(resource())
			.with_batch_exporter(exporter)
			.build();
		let tracer = tracer_provider.tracer("tracing-otel-subscriber");
		tracing_subscriber::registry()
			.with(filter_layer)
			.with(log_subscriber)
			.with(OpenTelemetryLayer::new(tracer))
			.init();
	} else {
		let filter = EnvFilter::from_default_env()
			.add_directive("chronicle=debug".parse().unwrap())
			.add_directive("tss=debug".parse().unwrap())
			.add_directive("peernet=debug".parse().unwrap());
		tracing_subscriber::registry()
			.with(filter_layer)
			.with(log_subscriber)
			.with(filter)
			.try_init()
			.ok();
	}
	std::panic::set_hook(Box::new(tracing_panic::panic_hook));
}
pub struct ChronicleConfig {
	pub network_id: NetworkId,
	pub network_key: [u8; 32],
	pub target_url: String,
	pub target_mnemonic: String,
	pub tss_keyshare_cache: PathBuf,
	pub backend: Backend,
}
pub async fn run_chronicle(
	config: ChronicleConfig,
	substrate: Arc<dyn Runtime>,
	mut admin: mpsc::Sender<AdminMsg>,
) -> Result<()> {
	let span = tracing::span!(Level::INFO, "run_chronicle");
	let mut ticker = substrate.finality_notification_stream();
	let chain = loop {
		let Some((hash, _)) = ticker.next().await else { continue };
		let name = substrate.network(config.network_id, hash).await?;
		if let Some(name) = name {
			break String::decode(&mut name.0.to_vec().as_slice()).unwrap_or_default();
		}
		tracing::warn!(parent: &span, "network {} isn't registered", config.network_id);
	};
	tracing::info!(parent: &span, "joining network {chain}");
	let (tss_tx, tss_rx) = mpsc::channel(10);
	let chain = config.backend.chain(config.network_id, &config.target_mnemonic)?;
	let connector = loop {
		match chain.connect(config.target_url.clone()).await {
			Ok(connector) => break connector,
			Err(error) => {
				tracing::info!(
					parent: &span,
					"Initializing connector returned an error {:?}, retrying in one second",
					error
				);
				tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
			},
		}
	};
	let (network, network_requests) =
		create_iroh_network(NetworkConfig { secret: config.network_key }, &span).await?;
	let account = time_primitives::format_address(substrate.account_id());
	let address = connector.chain().format_address(connector.chain().address());
	let peer_id = network.format_peer_id(network.peer_id());
	let span = span!(
		parent: &span,
		Level::INFO,
		"chronicle",
		tc_account = account,
		chain_address = address,
		gmp_network_id = config.network_id,
		net_peer_id = peer_id,
	);
	admin
		.send(AdminMsg::SetConfig(Config {
			network: config.network_id,
			account,
			address,
			peer_id,
			peer_id_hex: hex::encode(network.peer_id()),
		}))
		.await?;
	loop {
		let Some((hash, _)) = ticker.next().await else { continue };
		if substrate.is_registered(hash).await? {
			break;
		}
		tracing::warn!(parent: &span, "chronicle isn't registered");
	}
	let task_params = TaskParams::new(substrate.clone(), connector, tss_tx, admin.clone());
	let time_worker = TimeWorker::new(TimeWorkerParams {
		network,
		task_params,
		substrate,
		tss_request: tss_rx,
		net_request: network_requests,
		tss_keyshare_cache: config.tss_keyshare_cache,
		admin_request: admin.clone(),
	});
	time_worker.run(&span).await;
	Ok(())
}
#[cfg(test)]
mod tests {
	use super::*;
	use crate::mock::Mock;
	use futures::{Future, FutureExt, StreamExt};
	use polkadot_sdk::sp_runtime::BoundedVec;
	use scale_codec::Encode;
	use std::time::Duration;
	use time_primitives::{AccountId, BlockHash, ChainName, ShardStatus, Task};
	async fn chronicle(mock: Mock, network_id: NetworkId, exit: impl Future<Output = ()> + Unpin) {
		tracing::info!("running chronicle");
		let network_key = *mock.account_id().as_ref();
		let (tx, mut rx) = mpsc::channel(10);
		let root = if std::env::var("CI").is_ok() { "." } else { "/tmp" };
		let tss_keyshare_cache = format!("{root}/chronicles/{}", hex::encode(network_key)).into();
		std::fs::create_dir_all(&tss_keyshare_cache).unwrap();
		let handle = tokio::task::spawn(run_chronicle(
			ChronicleConfig {
				network_id,
				network_key,
				target_url: "tempfile".to_string(),
				target_mnemonic: "mnemonic".into(),
				tss_keyshare_cache,
				backend: Backend::Rust,
			},
			Arc::new(mock.clone()),
			tx,
		));
		tokio::spawn(async move {
			while let Some(msg) = rx.next().await {
				if let AdminMsg::SetConfig(config) = msg {
					tracing::info!("received chronicle config");
					mock.register_member(
						network_id,
						config.account.parse().unwrap(),
						hex::decode(&config.peer_id_hex).unwrap().try_into().unwrap(),
					);
					tracing::info!("registered chronicle");
				}
			}
		});
		tracing::info!("registered chronicle");
		futures::future::select(handle, exit).await;
	}
	#[tokio::test]
	async fn chronicle_smoke() -> Result<()> {
		let (n, t) = (3, 3);
		init_opentelemetry();
		let mock = Mock::default().instance(42);
		let block: BlockHash = BlockHash::from([0u8; 32]);
		let network_id = mock.create_network(ChainName(BoundedVec::truncate_from("rust".encode())));
		for id in 0..n {
			let instance = mock.instance(id as u8);
			std::thread::spawn(move || {
				let rt = tokio::runtime::Runtime::new().unwrap();
				rt.block_on(chronicle(instance, network_id, futures::future::pending::<()>()));
			});
		}
		loop {
			tracing::info!("waiting for members to register");
			if mock.members(network_id).len() < n {
				tokio::time::sleep(Duration::from_secs(1)).await;
				continue;
			}
			break;
		}
		let members: Vec<AccountId> =
			mock.members(network_id).into_iter().map(|(public, _)| public).collect();
		let shard_id = mock.create_shard(members.clone(), t);
		loop {
			tracing::info!("waiting for shard");
			if mock.shard_status(shard_id, block).await.unwrap() != ShardStatus::Online {
				tokio::time::sleep(Duration::from_secs(1)).await;
				continue;
			}
			break;
		}
		tracing::info!("creating task");
		let task_id = mock.create_task(Task::ReadGatewayEvents { blocks: 0..1 });
		tracing::info!("assigning task {task_id} {shard_id}");
		mock.assign_task(task_id, shard_id);
		loop {
			tracing::info!("waiting for task {task_id}");
			let task = mock.task(task_id).unwrap();
			if task.result.is_none() {
				tokio::time::sleep(Duration::from_secs(10)).await;
				continue;
			}
			break;
		}
		Ok(())
	}
	#[tokio::test]
	async fn chronicle_restart() -> Result<()> {
		init_opentelemetry();
		let mock = Mock::default().instance(42);
		let block: BlockHash = BlockHash::from([0u8; 32]);
		let network_id = mock.create_network(ChainName(BoundedVec::truncate_from("rust".encode())));
		let mut shutdown = vec![];
		for id in 0..3 {
			let instance = mock.instance(id + 4);
			let (tx, rx) = futures::channel::oneshot::channel();
			shutdown.push(tx);
			std::thread::spawn(move || {
				let rt = tokio::runtime::Runtime::new().unwrap();
				rt.block_on(chronicle(instance, network_id, rx.map(|_| ())));
			});
		}
		loop {
			tracing::info!("waiting for members to register");
			if mock.members(network_id).len() < 3 {
				tokio::time::sleep(Duration::from_secs(1)).await;
				continue;
			}
			break;
		}
		let members: Vec<AccountId> =
			mock.members(network_id).into_iter().map(|(public, _)| public).collect();
		let shard_id = mock.create_shard(members.clone(), 2);
		loop {
			tracing::info!("waiting for shard");
			if mock.shard_status(shard_id, block).await.unwrap() != ShardStatus::Online {
				tokio::time::sleep(Duration::from_secs(1)).await;
				continue;
			}
			break;
		}
		for tx in shutdown {
			tx.send(()).unwrap();
		}
		for id in 0..3 {
			let instance = mock.instance(id + 4);
			std::thread::spawn(move || {
				let rt = tokio::runtime::Runtime::new().unwrap();
				rt.block_on(chronicle(instance, network_id, futures::future::pending()));
			});
		}
		tracing::info!("creating task");
		let task_id = mock.create_task(Task::ReadGatewayEvents { blocks: 0..1 });
		tracing::info!("assigning task");
		mock.assign_task(task_id, shard_id);
		loop {
			tracing::info!("waiting for task");
			let task = mock.task(task_id).unwrap();
			if task.result.is_none() {
				tokio::time::sleep(Duration::from_secs(1)).await;
				continue;
			}
			break;
		}
		Ok(())
	}
}