use super::{AuthorityDiscoveryApi, Block, Error, Hash, IsParachainNode, Registry};
use polkadot_node_subsystem_types::DefaultSubsystemClient;
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sp_core::traits::SpawnNamed;
use polkadot_availability_distribution::IncomingRequestReceivers;
use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig;
use polkadot_node_core_av_store::Config as AvailabilityConfig;
use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig;
use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
request_response::{
v1 as request_v1, vstaging as request_vstaging, IncomingRequestReceiver, ReqProtocolNames,
},
};
#[cfg(any(feature = "malus", test))]
pub use polkadot_overseer::{
dummy::{dummy_overseer_builder, DummySubsystem},
HeadSupportsParachains,
};
use polkadot_overseer::{
metrics::Metrics as OverseerMetrics, InitializedOverseerBuilder, MetricsTrait, Overseer,
OverseerConnector, OverseerHandle, SpawnGlue,
};
use schnellru::{ByLength, LruMap};
use polkadot_primitives::runtime_api::ParachainHost;
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sc_client_api::AuxStore;
use sc_keystore::LocalKeystore;
use sc_network::NetworkStateInfo;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus_babe::BabeApi;
use std::sync::Arc;
pub use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem;
pub use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem;
pub use polkadot_availability_distribution::AvailabilityDistributionSubsystem;
pub use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
pub use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
pub use polkadot_dispute_distribution::DisputeDistributionSubsystem;
pub use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem;
pub use polkadot_network_bridge::{
Metrics as NetworkBridgeMetrics, NetworkBridgeRx as NetworkBridgeRxSubsystem,
NetworkBridgeTx as NetworkBridgeTxSubsystem,
};
pub use polkadot_node_collation_generation::CollationGenerationSubsystem;
pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem;
pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
pub use polkadot_node_core_backing::CandidateBackingSubsystem;
pub use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem;
pub use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
pub use polkadot_node_core_chain_api::ChainApiSubsystem;
pub use polkadot_node_core_chain_selection::ChainSelectionSubsystem;
pub use polkadot_node_core_dispute_coordinator::DisputeCoordinatorSubsystem;
pub use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
pub use polkadot_node_core_provisioner::ProvisionerSubsystem;
pub use polkadot_node_core_pvf_checker::PvfCheckerSubsystem;
pub use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
use polkadot_node_subsystem_util::rand::{self, SeedableRng};
pub use polkadot_statement_distribution::StatementDistributionSubsystem;
pub struct OverseerGenArgs<'a, Spawner, RuntimeClient>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
pub keystore: Arc<LocalKeystore>,
pub runtime_client: Arc<RuntimeClient>,
pub parachains_db: Arc<dyn polkadot_node_subsystem_util::database::Database>,
pub network_service: Arc<sc_network::NetworkService<Block, Hash>>,
pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
pub authority_discovery_service: AuthorityDiscoveryService,
pub pov_req_receiver: IncomingRequestReceiver<request_v1::PoVFetchingRequest>,
pub chunk_req_receiver: IncomingRequestReceiver<request_v1::ChunkFetchingRequest>,
pub collation_req_v1_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
pub collation_req_vstaging_receiver:
IncomingRequestReceiver<request_vstaging::CollationFetchingRequest>,
pub available_data_req_receiver:
IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
pub statement_req_receiver: IncomingRequestReceiver<request_v1::StatementFetchingRequest>,
pub candidate_req_vstaging_receiver:
IncomingRequestReceiver<request_vstaging::AttestedCandidateRequest>,
pub dispute_req_receiver: IncomingRequestReceiver<request_v1::DisputeRequest>,
pub registry: Option<&'a Registry>,
pub spawner: Spawner,
pub is_parachain_node: IsParachainNode,
pub approval_voting_config: ApprovalVotingConfig,
pub availability_config: AvailabilityConfig,
pub candidate_validation_config: Option<CandidateValidationConfig>,
pub chain_selection_config: ChainSelectionConfig,
pub dispute_coordinator_config: DisputeCoordinatorConfig,
pub pvf_checker_enabled: bool,
pub overseer_message_channel_capacity_override: Option<usize>,
pub req_protocol_names: ReqProtocolNames,
pub peerset_protocol_names: PeerSetProtocolNames,
pub offchain_transaction_pool_factory: OffchainTransactionPoolFactory<Block>,
}
pub fn prepared_overseer_builder<Spawner, RuntimeClient>(
OverseerGenArgs {
keystore,
runtime_client,
parachains_db,
network_service,
sync_service,
authority_discovery_service,
pov_req_receiver,
chunk_req_receiver,
collation_req_v1_receiver,
collation_req_vstaging_receiver,
available_data_req_receiver,
statement_req_receiver,
candidate_req_vstaging_receiver,
dispute_req_receiver,
registry,
spawner,
is_parachain_node,
approval_voting_config,
availability_config,
candidate_validation_config,
chain_selection_config,
dispute_coordinator_config,
pvf_checker_enabled,
overseer_message_channel_capacity_override,
req_protocol_names,
peerset_protocol_names,
offchain_transaction_pool_factory,
}: OverseerGenArgs<Spawner, RuntimeClient>,
) -> Result<
InitializedOverseerBuilder<
SpawnGlue<Spawner>,
Arc<DefaultSubsystemClient<RuntimeClient>>,
CandidateValidationSubsystem,
PvfCheckerSubsystem,
CandidateBackingSubsystem,
StatementDistributionSubsystem<rand::rngs::StdRng>,
AvailabilityDistributionSubsystem,
AvailabilityRecoverySubsystem,
BitfieldSigningSubsystem,
BitfieldDistributionSubsystem,
ProvisionerSubsystem,
RuntimeApiSubsystem<DefaultSubsystemClient<RuntimeClient>>,
AvailabilityStoreSubsystem,
NetworkBridgeRxSubsystem<
Arc<sc_network::NetworkService<Block, Hash>>,
AuthorityDiscoveryService,
>,
NetworkBridgeTxSubsystem<
Arc<sc_network::NetworkService<Block, Hash>>,
AuthorityDiscoveryService,
>,
ChainApiSubsystem<RuntimeClient>,
CollationGenerationSubsystem,
CollatorProtocolSubsystem,
ApprovalDistributionSubsystem,
ApprovalVotingSubsystem,
GossipSupportSubsystem<AuthorityDiscoveryService>,
DisputeCoordinatorSubsystem,
DisputeDistributionSubsystem<AuthorityDiscoveryService>,
ChainSelectionSubsystem,
ProspectiveParachainsSubsystem,
>,
Error,
>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
use polkadot_node_subsystem_util::metrics::Metrics;
let metrics = <OverseerMetrics as MetricsTrait>::register(registry)?;
let spawner = SpawnGlue(spawner);
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
let runtime_api_client = Arc::new(DefaultSubsystemClient::new(
runtime_client.clone(),
offchain_transaction_pool_factory,
));
let builder = Overseer::builder()
.network_bridge_tx(NetworkBridgeTxSubsystem::new(
network_service.clone(),
authority_discovery_service.clone(),
network_bridge_metrics.clone(),
req_protocol_names,
peerset_protocol_names.clone(),
))
.network_bridge_rx(NetworkBridgeRxSubsystem::new(
network_service.clone(),
authority_discovery_service.clone(),
Box::new(sync_service.clone()),
network_bridge_metrics,
peerset_protocol_names,
))
.availability_distribution(AvailabilityDistributionSubsystem::new(
keystore.clone(),
IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
Metrics::register(registry)?,
))
.availability_recovery(AvailabilityRecoverySubsystem::with_chunks_if_pov_large(
available_data_req_receiver,
Metrics::register(registry)?,
))
.availability_store(AvailabilityStoreSubsystem::new(
parachains_db.clone(),
availability_config,
Box::new(sync_service.clone()),
Metrics::register(registry)?,
))
.bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
.bitfield_signing(BitfieldSigningSubsystem::new(
keystore.clone(),
Metrics::register(registry)?,
))
.candidate_backing(CandidateBackingSubsystem::new(
keystore.clone(),
Metrics::register(registry)?,
))
.candidate_validation(CandidateValidationSubsystem::with_config(
candidate_validation_config,
Metrics::register(registry)?, Metrics::register(registry)?, ))
.pvf_checker(PvfCheckerSubsystem::new(
pvf_checker_enabled,
keystore.clone(),
Metrics::register(registry)?,
))
.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
.collator_protocol({
let side = match is_parachain_node {
IsParachainNode::Collator(collator_pair) => ProtocolSide::Collator {
peer_id: network_service.local_peer_id(),
collator_pair,
request_receiver_v1: collation_req_v1_receiver,
request_receiver_vstaging: collation_req_vstaging_receiver,
metrics: Metrics::register(registry)?,
},
IsParachainNode::FullNode => ProtocolSide::None,
IsParachainNode::No => ProtocolSide::Validator {
keystore: keystore.clone(),
eviction_policy: Default::default(),
metrics: Metrics::register(registry)?,
},
};
CollatorProtocolSubsystem::new(side)
})
.provisioner(ProvisionerSubsystem::new(Metrics::register(registry)?))
.runtime_api(RuntimeApiSubsystem::new(
runtime_api_client.clone(),
Metrics::register(registry)?,
spawner.clone(),
))
.statement_distribution(StatementDistributionSubsystem::new(
keystore.clone(),
statement_req_receiver,
candidate_req_vstaging_receiver,
Metrics::register(registry)?,
rand::rngs::StdRng::from_entropy(),
))
.approval_distribution(ApprovalDistributionSubsystem::new(Metrics::register(registry)?))
.approval_voting(ApprovalVotingSubsystem::with_config(
approval_voting_config,
parachains_db.clone(),
keystore.clone(),
Box::new(sync_service.clone()),
Metrics::register(registry)?,
))
.gossip_support(GossipSupportSubsystem::new(
keystore.clone(),
authority_discovery_service.clone(),
Metrics::register(registry)?,
))
.dispute_coordinator(DisputeCoordinatorSubsystem::new(
parachains_db.clone(),
dispute_coordinator_config,
keystore.clone(),
Metrics::register(registry)?,
))
.dispute_distribution(DisputeDistributionSubsystem::new(
keystore.clone(),
dispute_req_receiver,
authority_discovery_service.clone(),
Metrics::register(registry)?,
))
.chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
.prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?))
.activation_external_listeners(Default::default())
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
.supports_parachains(runtime_api_client)
.known_leaves(LruMap::new(ByLength::new(KNOWN_LEAVES_CACHE_SIZE)))
.metrics(metrics)
.spawner(spawner);
if let Some(capacity) = overseer_message_channel_capacity_override {
Ok(builder.message_channel_capacity(capacity))
} else {
Ok(builder)
}
}
pub trait OverseerGen {
fn generate<Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<Spawner, RuntimeClient>,
) -> Result<
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
Error,
>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
let gen = RealOverseerGen;
RealOverseerGen::generate::<Spawner, RuntimeClient>(&gen, connector, args)
}
}
use polkadot_overseer::KNOWN_LEAVES_CACHE_SIZE;
pub struct RealOverseerGen;
impl OverseerGen for RealOverseerGen {
fn generate<Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<Spawner, RuntimeClient>,
) -> Result<
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
Error,
>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
prepared_overseer_builder(args)?
.build_with_connector(connector)
.map_err(|e| e.into())
}
}