use crate::{
communication::{
notification::{
BeefyBestBlockSender, BeefyBestBlockStream, BeefyVersionedFinalityProofSender,
BeefyVersionedFinalityProofStream,
},
peers::KnownPeers,
request_response::{
outgoing_requests_engine::OnDemandJustificationsEngine, BeefyJustifsRequestHandler,
},
},
import::BeefyBlockImport,
metrics::register_metrics,
round::Rounds,
worker::PersistedState,
};
use futures::{stream::Fuse, StreamExt};
use log::{error, info};
use parking_lot::Mutex;
use prometheus::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
use sc_consensus::BlockImport;
use sc_network::{NetworkRequest, ProtocolName};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
use sp_api::{HeaderT, NumberFor, ProvideRuntimeApi};
use sp_blockchain::{
Backend as BlockchainBackend, Error as ClientError, HeaderBackend, Result as ClientResult,
};
use sp_consensus::{Error as ConsensusError, SyncOracle};
use sp_consensus_beefy::{
ecdsa_crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet,
BEEFY_ENGINE_ID,
};
use sp_keystore::KeystorePtr;
use sp_mmr_primitives::MmrApi;
use sp_runtime::traits::{Block, Zero};
use std::{
collections::{BTreeMap, VecDeque},
marker::PhantomData,
sync::Arc,
};
mod aux_schema;
mod error;
mod keystore;
mod metrics;
mod round;
mod worker;
pub mod communication;
pub mod import;
pub mod justification;
pub use communication::beefy_protocol_name::{
gossip_protocol_name, justifications_protocol_name as justifs_protocol_name,
};
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "beefy";
pub trait Client<B, BE>:
BlockchainEvents<B> + HeaderBackend<B> + Finalizer<B, BE> + Send + Sync
where
B: Block,
BE: Backend<B>,
{
}
impl<B, BE, T> Client<B, BE> for T
where
B: Block,
BE: Backend<B>,
T: BlockchainEvents<B>
+ HeaderBackend<B>
+ Finalizer<B, BE>
+ ProvideRuntimeApi<B>
+ Send
+ Sync,
{
}
#[derive(Clone)]
pub struct BeefyVoterLinks<B: Block> {
pub from_block_import_justif_stream: BeefyVersionedFinalityProofStream<B>,
pub to_rpc_justif_sender: BeefyVersionedFinalityProofSender<B>,
pub to_rpc_best_block_sender: BeefyBestBlockSender<B>,
}
#[derive(Clone)]
pub struct BeefyRPCLinks<B: Block> {
pub from_voter_justif_stream: BeefyVersionedFinalityProofStream<B>,
pub from_voter_best_beefy_stream: BeefyBestBlockStream<B>,
}
pub fn beefy_block_import_and_links<B, BE, RuntimeApi, I>(
wrapped_block_import: I,
backend: Arc<BE>,
runtime: Arc<RuntimeApi>,
prometheus_registry: Option<Registry>,
) -> (BeefyBlockImport<B, BE, RuntimeApi, I>, BeefyVoterLinks<B>, BeefyRPCLinks<B>)
where
B: Block,
BE: Backend<B>,
I: BlockImport<B, Error = ConsensusError> + Send + Sync,
RuntimeApi: ProvideRuntimeApi<B> + Send + Sync,
RuntimeApi::Api: BeefyApi<B, AuthorityId>,
{
let (to_rpc_justif_sender, from_voter_justif_stream) =
BeefyVersionedFinalityProofStream::<B>::channel();
let (to_rpc_best_block_sender, from_voter_best_beefy_stream) =
BeefyBestBlockStream::<B>::channel();
let (to_voter_justif_sender, from_block_import_justif_stream) =
BeefyVersionedFinalityProofStream::<B>::channel();
let metrics = register_metrics(prometheus_registry);
let import = BeefyBlockImport::new(
backend,
runtime,
wrapped_block_import,
to_voter_justif_sender,
metrics,
);
let voter_links = BeefyVoterLinks {
from_block_import_justif_stream,
to_rpc_justif_sender,
to_rpc_best_block_sender,
};
let rpc_links = BeefyRPCLinks { from_voter_best_beefy_stream, from_voter_justif_stream };
(import, voter_links, rpc_links)
}
pub struct BeefyNetworkParams<B: Block, N, S> {
pub network: Arc<N>,
pub sync: Arc<S>,
pub gossip_protocol_name: ProtocolName,
pub justifications_protocol_name: ProtocolName,
pub _phantom: PhantomData<B>,
}
pub struct BeefyParams<B: Block, BE, C, N, P, R, S> {
pub client: Arc<C>,
pub backend: Arc<BE>,
pub payload_provider: P,
pub runtime: Arc<R>,
pub key_store: Option<KeystorePtr>,
pub network_params: BeefyNetworkParams<B, N, S>,
pub min_block_delta: u32,
pub prometheus_registry: Option<Registry>,
pub links: BeefyVoterLinks<B>,
pub on_demand_justifications_handler: BeefyJustifsRequestHandler<B, C>,
}
pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
beefy_params: BeefyParams<B, BE, C, N, P, R, S>,
) where
B: Block,
BE: Backend<B>,
C: Client<B, BE> + BlockBackend<B>,
P: PayloadProvider<B> + Clone,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId> + MmrApi<B, MmrRootHash, NumberFor<B>>,
N: GossipNetwork<B> + NetworkRequest + Send + Sync + 'static,
S: GossipSyncing<B> + SyncOracle + 'static,
{
let BeefyParams {
client,
backend,
payload_provider,
runtime,
key_store,
network_params,
min_block_delta,
prometheus_registry,
links,
mut on_demand_justifications_handler,
} = beefy_params;
let BeefyNetworkParams {
network,
sync,
gossip_protocol_name,
justifications_protocol_name,
..
} = network_params;
let metrics = register_metrics(prometheus_registry.clone());
let mut finality_notifications = client.finality_notification_stream().fuse();
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name.clone(),
gossip_validator.clone(),
None,
);
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name.clone(),
known_peers,
prometheus_registry.clone(),
);
let mut beefy_comms = worker::BeefyComms {
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
};
loop {
let persisted_state = match wait_for_runtime_pallet(
&*runtime,
&mut beefy_comms.gossip_engine,
&mut finality_notifications,
)
.await
.and_then(|(beefy_genesis, best_grandpa)| {
load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
}) {
Ok(state) => state,
Err(e) => {
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
},
};
if let Err(e) = persisted_state
.gossip_filter_config()
.map(|f| beefy_comms.gossip_validator.update_filter(f))
{
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
}
let worker = worker::BeefyWorker {
backend: backend.clone(),
payload_provider: payload_provider.clone(),
runtime: runtime.clone(),
sync: sync.clone(),
key_store: key_store.clone().into(),
comms: beefy_comms,
links: links.clone(),
metrics: metrics.clone(),
pending_justifications: BTreeMap::new(),
persisted_state,
};
match futures::future::select(
Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)),
Box::pin(on_demand_justifications_handler.run()),
)
.await
{
futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
beefy_comms = reuse_comms;
continue
},
futures::future::Either::Left(((worker_err, _), _)) =>
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err),
futures::future::Either::Right((odj_handler_err, _)) =>
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err),
};
return
}
}
fn load_or_init_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
beefy_genesis: NumberFor<B>,
best_grandpa: <B as Block>::Header,
min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
where
B: Block,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId>,
{
crate::aux_schema::load_persistent(backend)?
.filter(|state| state.pallet_genesis() == beefy_genesis)
.and_then(|mut state| {
state.set_best_grandpa(best_grandpa.clone());
state.set_min_block_delta(min_block_delta);
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
Some(Ok(state))
})
.unwrap_or_else(|| {
initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta)
})
}
fn initialize_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
beefy_genesis: NumberFor<B>,
best_grandpa: <B as Block>::Header,
min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
where
B: Block,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId>,
{
let beefy_genesis = runtime
.runtime_api()
.beefy_genesis(best_grandpa.hash())
.ok()
.flatten()
.filter(|genesis| *genesis == beefy_genesis)
.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))?;
let blockchain = backend.blockchain();
let mut sessions = VecDeque::new();
let mut header = best_grandpa.clone();
let state = loop {
if let Some(true) = blockchain
.justifications(header.hash())
.ok()
.flatten()
.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
{
info!(
target: LOG_TARGET,
"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
*header.number()
);
let best_beefy = *header.number();
if sessions.is_empty() {
let active_set = expect_validator_set(runtime, backend, &header, beefy_genesis)?;
let mut rounds = Rounds::new(best_beefy, active_set);
rounds.conclude(best_beefy);
sessions.push_front(rounds);
}
let state = PersistedState::checked_new(
best_grandpa,
best_beefy,
sessions,
min_block_delta,
beefy_genesis,
)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?;
break state
}
if *header.number() == beefy_genesis {
let genesis_set = expect_validator_set(runtime, backend, &header, beefy_genesis)?;
info!(
target: LOG_TARGET,
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
Starting voting rounds at block {:?}, genesis validator set {:?}.",
beefy_genesis,
genesis_set,
);
sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
break PersistedState::checked_new(
best_grandpa,
Zero::zero(),
sessions,
min_block_delta,
beefy_genesis,
)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?
}
if let Some(active) = worker::find_authorities_change::<B>(&header) {
info!(
target: LOG_TARGET,
"🥩 Marking block {:?} as BEEFY Mandatory.",
*header.number()
);
sessions.push_front(Rounds::new(*header.number(), active));
}
header = blockchain.expect_header(*header.parent_hash())?;
};
aux_schema::write_current_version(backend)?;
aux_schema::write_voter_state(backend, &state)?;
Ok(state)
}
async fn wait_for_runtime_pallet<B, R>(
runtime: &R,
mut gossip_engine: &mut GossipEngine<B>,
finality: &mut Fuse<FinalityNotifications<B>>,
) -> ClientResult<(NumberFor<B>, <B as Block>::Header)>
where
B: Block,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId>,
{
info!(target: LOG_TARGET, "🥩 BEEFY gadget waiting for BEEFY pallet to become available...");
loop {
futures::select! {
notif = finality.next() => {
let notif = match notif {
Some(notif) => notif,
None => break
};
let at = notif.header.hash();
if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() {
if *notif.header.number() >= start {
info!(
target: LOG_TARGET,
"🥩 BEEFY pallet available: block {:?} beefy genesis {:?}",
notif.header.number(), start
);
return Ok((start, notif.header))
}
}
},
_ = gossip_engine => {
break
}
}
}
let err_msg = "🥩 Gossip engine has unexpectedly terminated.".into();
error!(target: LOG_TARGET, "{}", err_msg);
Err(ClientError::Backend(err_msg))
}
fn expect_validator_set<B, BE, R>(
runtime: &R,
backend: &BE,
at_header: &B::Header,
beefy_genesis: NumberFor<B>,
) -> ClientResult<ValidatorSet<AuthorityId>>
where
B: Block,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId>,
{
runtime
.runtime_api()
.validator_set(at_header.hash())
.ok()
.flatten()
.or_else(|| {
let blockchain = backend.blockchain();
let mut header = at_header.clone();
while *header.number() >= beefy_genesis {
match worker::find_authorities_change::<B>(&header) {
Some(active) => return Some(active),
None => header = blockchain.expect_header(*header.parent_hash()).ok()?,
}
}
None
})
.ok_or_else(|| ClientError::Backend("Could not find initial validator set".into()))
}