#![warn(missing_docs)]
use std::{
collections::{HashMap, VecDeque},
iter::Iterator,
num::NonZeroUsize,
pin::Pin,
time::Duration,
};
use futures::{
channel::oneshot::{self, channel},
future::{Future, FutureExt, RemoteHandle},
pin_mut,
prelude::*,
sink::SinkExt,
stream::{FuturesUnordered, StreamExt},
task::{Context, Poll},
};
use rand::seq::SliceRandom;
use schnellru::{ByLength, LruMap};
use fatality::Nested;
use polkadot_erasure_coding::{
branch_hash, branches, obtain_chunks_v1, recovery_threshold, Error as ErasureEncodingError,
};
#[cfg(not(test))]
use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT;
use polkadot_node_network_protocol::{
request_response::{
self as req_res, outgoing::RequestError, v1 as request_v1, IncomingRequestReceiver,
OutgoingRequest, Recipient, Requests,
},
IfDisconnected, UnifiedReputationChange as Rep,
};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_node_subsystem::{
errors::RecoveryError,
jaeger,
messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage, NetworkBridgeTxMessage},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
SubsystemResult,
};
use polkadot_node_subsystem_util::request_session_info;
use polkadot_primitives::{
AuthorityDiscoveryId, BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex,
Hash, HashT, IndexedVec, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex,
};
mod error;
mod futures_undead;
mod metrics;
use metrics::Metrics;
use futures_undead::FuturesUndead;
use sc_network::{OutboundFailure, RequestFailure};
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "parachain::availability-recovery";
const N_PARALLEL: usize = 50;
const LRU_SIZE: u32 = 16;
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
#[cfg(not(test))]
const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
#[cfg(test)]
const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
const SMALL_POV_LIMIT: usize = 128 * 1024;
#[derive(Clone, PartialEq)]
pub enum RecoveryStrategy {
BackersFirstAlways,
BackersFirstIfSizeLower(usize),
ChunksAlways,
BypassAvailabilityStore,
}
impl RecoveryStrategy {
pub fn needs_backing_group(&self) -> bool {
match self {
RecoveryStrategy::BackersFirstAlways | RecoveryStrategy::BackersFirstIfSizeLower(_) =>
true,
_ => false,
}
}
pub fn pov_size_limit(&self) -> Option<usize> {
match *self {
RecoveryStrategy::BackersFirstIfSizeLower(limit) => Some(limit),
_ => None,
}
}
}
pub struct AvailabilityRecoverySubsystem {
recovery_strategy: RecoveryStrategy,
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
}
struct RequestFromBackers {
shuffled_backers: Vec<ValidatorIndex>,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
}
struct RequestChunksFromValidators {
error_count: usize,
total_received_responses: usize,
shuffling: VecDeque<ValidatorIndex>,
received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
requesting_chunks: FuturesUndead<Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
}
struct RecoveryParams {
validator_authority_keys: Vec<AuthorityDiscoveryId>,
validators: IndexedVec<ValidatorIndex, ValidatorId>,
threshold: usize,
candidate_hash: CandidateHash,
erasure_root: Hash,
metrics: Metrics,
bypass_availability_store: bool,
}
enum Source {
RequestFromBackers(RequestFromBackers),
RequestChunks(RequestChunksFromValidators),
}
enum ErasureTask {
Reconstruct(
usize,
HashMap<ValidatorIndex, ErasureChunk>,
oneshot::Sender<Result<AvailableData, ErasureEncodingError>>,
),
Reencode(usize, Hash, AvailableData, oneshot::Sender<Option<AvailableData>>),
}
struct RecoveryTask<Sender> {
sender: Sender,
params: RecoveryParams,
source: Source,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
}
impl RequestFromBackers {
fn new(
mut backers: Vec<ValidatorIndex>,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
) -> Self {
backers.shuffle(&mut rand::thread_rng());
RequestFromBackers { shuffled_backers: backers, erasure_task_tx }
}
async fn run(
&mut self,
params: &RecoveryParams,
sender: &mut impl overseer::AvailabilityRecoverySenderTrait,
) -> Result<AvailableData, RecoveryError> {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Requesting from backers",
);
loop {
let validator_index =
self.shuffled_backers.pop().ok_or_else(|| RecoveryError::Unavailable)?;
let (req, response) = OutgoingRequest::new(
Recipient::Authority(
params.validator_authority_keys[validator_index.0 as usize].clone(),
),
req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash },
);
sender
.send_message(NetworkBridgeTxMessage::SendRequests(
vec![Requests::AvailableDataFetchingV1(req)],
IfDisconnected::ImmediateError,
))
.await;
match response.await {
Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
let (reencode_tx, reencode_rx) = channel();
self.erasure_task_tx
.send(ErasureTask::Reencode(
params.validators.len(),
params.erasure_root,
data,
reencode_tx,
))
.await
.map_err(|_| RecoveryError::ChannelClosed)?;
let reencode_response =
reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
if let Some(data) = reencode_response {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
"Received full data",
);
return Ok(data)
} else {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?validator_index,
"Invalid data response",
);
}
},
Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {},
Err(e) => gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
?validator_index,
err = ?e,
"Error fetching full available data."
),
}
}
}
}
impl RequestChunksFromValidators {
fn new(
n_validators: u32,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
) -> Self {
let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect();
shuffling.shuffle(&mut rand::thread_rng());
RequestChunksFromValidators {
error_count: 0,
total_received_responses: 0,
shuffling: shuffling.into(),
received_chunks: HashMap::new(),
requesting_chunks: FuturesUndead::new(),
erasure_task_tx,
}
}
fn is_unavailable(&self, params: &RecoveryParams) -> bool {
is_unavailable(
self.chunk_count(),
self.requesting_chunks.total_len(),
self.shuffling.len(),
params.threshold,
)
}
fn chunk_count(&self) -> usize {
self.received_chunks.len()
}
fn insert_chunk(&mut self, validator_index: ValidatorIndex, chunk: ErasureChunk) {
self.received_chunks.insert(validator_index, chunk);
}
fn can_conclude(&self, params: &RecoveryParams) -> bool {
self.chunk_count() >= params.threshold || self.is_unavailable(params)
}
fn get_desired_request_count(&self, threshold: usize) -> usize {
let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
let remaining_chunks = threshold.saturating_sub(self.chunk_count());
let inv_error_rate =
self.total_received_responses.checked_div(self.error_count).unwrap_or(0);
std::cmp::min(
max_requests_boundary,
remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0),
)
}
async fn launch_parallel_requests<Sender>(
&mut self,
params: &RecoveryParams,
sender: &mut Sender,
) where
Sender: overseer::AvailabilityRecoverySenderTrait,
{
let num_requests = self.get_desired_request_count(params.threshold);
let candidate_hash = ¶ms.candidate_hash;
let already_requesting_count = self.requesting_chunks.len();
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?num_requests,
error_count= ?self.error_count,
total_received = ?self.total_received_responses,
threshold = ?params.threshold,
?already_requesting_count,
"Requesting availability chunks for a candidate",
);
let mut requests = Vec::with_capacity(num_requests - already_requesting_count);
while self.requesting_chunks.len() < num_requests {
if let Some(validator_index) = self.shuffling.pop_back() {
let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
gum::trace!(
target: LOG_TARGET,
?validator,
?validator_index,
?candidate_hash,
"Requesting chunk",
);
let raw_request = req_res::v1::ChunkFetchingRequest {
candidate_hash: params.candidate_hash,
index: validator_index,
};
let (req, res) = OutgoingRequest::new(Recipient::Authority(validator), raw_request);
requests.push(Requests::ChunkFetchingV1(req));
params.metrics.on_chunk_request_issued();
let timer = params.metrics.time_chunk_request();
self.requesting_chunks.push(Box::pin(async move {
let _timer = timer;
match res.await {
Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) =>
Ok(Some(chunk.recombine_into_chunk(&raw_request))),
Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
Err(e) => Err((validator_index, e)),
}
}));
} else {
break
}
}
sender
.send_message(NetworkBridgeTxMessage::SendRequests(
requests,
IfDisconnected::TryConnect,
))
.await;
}
async fn wait_for_chunks(&mut self, params: &RecoveryParams) {
let metrics = ¶ms.metrics;
while let Some(request_result) =
self.requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await
{
self.total_received_responses += 1;
match request_result {
Ok(Some(chunk)) =>
if is_chunk_valid(params, &chunk) {
metrics.on_chunk_request_succeeded();
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
"Received valid chunk",
);
self.insert_chunk(chunk.index, chunk);
} else {
metrics.on_chunk_request_invalid();
self.error_count += 1;
},
Ok(None) => {
metrics.on_chunk_request_no_such_chunk();
self.error_count += 1;
},
Err((validator_index, e)) => {
self.error_count += 1;
gum::trace!(
target: LOG_TARGET,
candidate_hash= ?params.candidate_hash,
err = ?e,
?validator_index,
"Failure requesting chunk",
);
match e {
RequestError::InvalidResponse(_) => {
metrics.on_chunk_request_invalid();
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
err = ?e,
?validator_index,
"Chunk fetching response was invalid",
);
},
RequestError::NetworkError(err) => {
if let RequestFailure::Network(OutboundFailure::Timeout) = err {
metrics.on_chunk_request_timeout();
} else {
metrics.on_chunk_request_error();
}
self.shuffling.push_front(validator_index);
},
RequestError::Canceled(_) => {
metrics.on_chunk_request_error();
self.shuffling.push_front(validator_index);
},
}
},
}
if self.can_conclude(params) {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
received_chunks_count = ?self.chunk_count(),
requested_chunks_count = ?self.requesting_chunks.len(),
threshold = ?params.threshold,
"Can conclude availability for a candidate",
);
break
}
}
}
async fn run<Sender>(
&mut self,
params: &RecoveryParams,
sender: &mut Sender,
) -> Result<AvailableData, RecoveryError>
where
Sender: overseer::AvailabilityRecoverySenderTrait,
{
let metrics = ¶ms.metrics;
if !params.bypass_availability_store {
let (tx, rx) = oneshot::channel();
sender
.send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
.await;
match rx.await {
Ok(chunks) => {
let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect();
self.shuffling.retain(|i| !chunk_indices.contains(i));
for chunk in chunks {
if is_chunk_valid(params, &chunk) {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
"Found valid chunk on disk"
);
self.insert_chunk(chunk.index, chunk);
} else {
gum::error!(
target: LOG_TARGET,
"Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!"
);
};
}
},
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
"Failed to reach the availability store"
);
},
}
}
let _recovery_timer = metrics.time_full_recovery();
loop {
if self.is_unavailable(¶ms) {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
received = %self.chunk_count(),
requesting = %self.requesting_chunks.len(),
total_requesting = %self.requesting_chunks.total_len(),
n_validators = %params.validators.len(),
"Data recovery is not possible",
);
metrics.on_recovery_failed();
return Err(RecoveryError::Unavailable)
}
self.launch_parallel_requests(params, sender).await;
self.wait_for_chunks(params).await;
if self.chunk_count() >= params.threshold {
let recovery_duration = metrics.time_erasure_recovery();
let (avilable_data_tx, available_data_rx) = channel();
self.erasure_task_tx
.send(ErasureTask::Reconstruct(
params.validators.len(),
std::mem::take(&mut self.received_chunks),
avilable_data_tx,
))
.await
.map_err(|_| RecoveryError::ChannelClosed)?;
let available_data_response =
available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
return match available_data_response {
Ok(data) => {
let (reencode_tx, reencode_rx) = channel();
self.erasure_task_tx
.send(ErasureTask::Reencode(
params.validators.len(),
params.erasure_root,
data,
reencode_tx,
))
.await
.map_err(|_| RecoveryError::ChannelClosed)?;
let reencode_response =
reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
if let Some(data) = reencode_response {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Data recovery complete",
);
metrics.on_recovery_succeeded();
Ok(data)
} else {
recovery_duration.map(|rd| rd.stop_and_discard());
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Data recovery - root mismatch",
);
metrics.on_recovery_invalid();
Err(RecoveryError::Invalid)
}
},
Err(err) => {
recovery_duration.map(|rd| rd.stop_and_discard());
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
?err,
"Data recovery error ",
);
metrics.on_recovery_invalid();
Err(RecoveryError::Invalid)
},
}
}
}
}
}
const fn is_unavailable(
received_chunks: usize,
requesting_chunks: usize,
unrequested_validators: usize,
threshold: usize,
) -> bool {
received_chunks + requesting_chunks + unrequested_validators < threshold
}
fn is_chunk_valid(params: &RecoveryParams, chunk: &ErasureChunk) -> bool {
let anticipated_hash =
match branch_hash(¶ms.erasure_root, chunk.proof(), chunk.index.0 as usize) {
Ok(hash) => hash,
Err(e) => {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
error = ?e,
"Invalid Merkle proof",
);
return false
},
};
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if anticipated_hash != erasure_chunk_hash {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator_index = ?chunk.index,
"Merkle proof mismatch"
);
return false
}
true
}
fn reconstructed_data_matches_root(
n_validators: usize,
expected_root: &Hash,
data: &AvailableData,
metrics: &Metrics,
) -> bool {
let _timer = metrics.time_reencode_chunks();
let chunks = match obtain_chunks_v1(n_validators, data) {
Ok(chunks) => chunks,
Err(e) => {
gum::debug!(
target: LOG_TARGET,
err = ?e,
"Failed to obtain chunks",
);
return false
},
};
let branches = branches(&chunks);
branches.root() == *expected_root
}
impl<Sender> RecoveryTask<Sender>
where
Sender: overseer::AvailabilityRecoverySenderTrait,
{
async fn run(mut self) -> Result<AvailableData, RecoveryError> {
if !self.params.bypass_availability_store {
let (tx, rx) = oneshot::channel();
self.sender
.send_message(AvailabilityStoreMessage::QueryAvailableData(
self.params.candidate_hash,
tx,
))
.await;
match rx.await {
Ok(Some(data)) => return Ok(data),
Ok(None) => {},
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?self.params.candidate_hash,
"Failed to reach the availability store",
)
},
}
}
self.params.metrics.on_recovery_started();
loop {
match self.source {
Source::RequestFromBackers(ref mut from_backers) => {
match from_backers.run(&self.params, &mut self.sender).await {
Ok(data) => break Ok(data),
Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid),
Err(RecoveryError::ChannelClosed) =>
break Err(RecoveryError::ChannelClosed),
Err(RecoveryError::Unavailable) =>
self.source = Source::RequestChunks(RequestChunksFromValidators::new(
self.params.validators.len() as _,
self.erasure_task_tx.clone(),
)),
}
},
Source::RequestChunks(ref mut from_all) =>
break from_all.run(&self.params, &mut self.sender).await,
}
}
}
}
struct RecoveryHandle {
candidate_hash: CandidateHash,
remote: RemoteHandle<Result<AvailableData, RecoveryError>>,
awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>,
}
impl Future for RecoveryHandle {
type Output = Option<(CandidateHash, Result<AvailableData, RecoveryError>)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut indices_to_remove = Vec::new();
for (i, awaiting) in self.awaiting.iter_mut().enumerate().rev() {
if let Poll::Ready(()) = awaiting.poll_canceled(cx) {
indices_to_remove.push(i);
}
}
for index in indices_to_remove {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?self.candidate_hash,
"Receiver for available data dropped.",
);
self.awaiting.swap_remove(index);
}
if self.awaiting.is_empty() {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?self.candidate_hash,
"All receivers for available data dropped.",
);
return Poll::Ready(None)
}
let remote = &mut self.remote;
futures::pin_mut!(remote);
let result = futures::ready!(remote.poll(cx));
for awaiting in self.awaiting.drain(..) {
let _ = awaiting.send(result.clone());
}
Poll::Ready(Some((self.candidate_hash, result)))
}
}
#[derive(Debug, Clone)]
enum CachedRecovery {
Valid(AvailableData),
Invalid,
}
impl CachedRecovery {
fn into_result(self) -> Result<AvailableData, RecoveryError> {
match self {
Self::Valid(d) => Ok(d),
Self::Invalid => Err(RecoveryError::Invalid),
}
}
}
impl TryFrom<Result<AvailableData, RecoveryError>> for CachedRecovery {
type Error = ();
fn try_from(o: Result<AvailableData, RecoveryError>) -> Result<CachedRecovery, Self::Error> {
match o {
Ok(d) => Ok(Self::Valid(d)),
Err(RecoveryError::Invalid) => Ok(Self::Invalid),
Err(RecoveryError::Unavailable) => Err(()),
Err(RecoveryError::ChannelClosed) => Err(()),
}
}
}
struct State {
ongoing_recoveries: FuturesUnordered<RecoveryHandle>,
live_block: (BlockNumber, Hash),
availability_lru: LruMap<CandidateHash, CachedRecovery>,
}
impl Default for State {
fn default() -> Self {
Self {
ongoing_recoveries: FuturesUnordered::new(),
live_block: (0, Hash::default()),
availability_lru: LruMap::new(ByLength::new(LRU_SIZE)),
}
}
}
#[overseer::subsystem(AvailabilityRecovery, error=SubsystemError, prefix=self::overseer)]
impl<Context> AvailabilityRecoverySubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self
.run(ctx)
.map_err(|e| SubsystemError::with_origin("availability-recovery", e))
.boxed();
SpawnedSubsystem { name: "availability-recovery-subsystem", future }
}
}
async fn handle_signal(state: &mut State, signal: OverseerSignal) -> SubsystemResult<bool> {
match signal {
OverseerSignal::Conclude => Ok(true),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => {
if let Some(activated) = activated {
if activated.number > state.live_block.0 {
state.live_block = (activated.number, activated.hash)
}
}
Ok(false)
},
OverseerSignal::BlockFinalized(_, _) => Ok(false),
}
}
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
async fn launch_recovery_task<Context>(
state: &mut State,
ctx: &mut Context,
session_info: SessionInfo,
receipt: CandidateReceipt,
mut backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
metrics: &Metrics,
recovery_strategy: &RecoveryStrategy,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
let params = RecoveryParams {
validator_authority_keys: session_info.discovery_keys.clone(),
validators: session_info.validators.clone(),
threshold: recovery_threshold(session_info.validators.len())?,
candidate_hash,
erasure_root: receipt.descriptor.erasure_root,
metrics: metrics.clone(),
bypass_availability_store: recovery_strategy == &RecoveryStrategy::BypassAvailabilityStore,
};
if let Some(small_pov_limit) = recovery_strategy.pov_size_limit() {
let chunk_size: Result<Option<usize>, error::Error> =
query_chunk_size(ctx, candidate_hash).await;
if let Ok(Some(chunk_size)) = chunk_size {
let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len()) / 3;
let prefer_backing_group = pov_size_estimate < small_pov_limit;
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
pov_size_estimate,
small_pov_limit,
enabled = prefer_backing_group,
"Prefer fetch from backing group",
);
backing_group = backing_group.filter(|_| {
prefer_backing_group
});
}
}
let phase = backing_group
.and_then(|g| session_info.validator_groups.get(g))
.map(|group| {
Source::RequestFromBackers(RequestFromBackers::new(
group.clone(),
erasure_task_tx.clone(),
))
})
.unwrap_or_else(|| {
Source::RequestChunks(RequestChunksFromValidators::new(
params.validators.len() as _,
erasure_task_tx.clone(),
))
});
let recovery_task =
RecoveryTask { sender: ctx.sender().clone(), params, source: phase, erasure_task_tx };
let (remote, remote_handle) = recovery_task.run().remote_handle();
state.ongoing_recoveries.push(RecoveryHandle {
candidate_hash,
remote: remote_handle,
awaiting: vec![response_sender],
});
if let Err(e) = ctx.spawn("recovery-task", Box::pin(remote)) {
gum::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to spawn a recovery task",
);
}
Ok(())
}
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
async fn handle_recover<Context>(
state: &mut State,
ctx: &mut Context,
receipt: CandidateReceipt,
session_index: SessionIndex,
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
metrics: &Metrics,
recovery_strategy: &RecoveryStrategy,
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
let span = jaeger::Span::new(candidate_hash, "availbility-recovery")
.with_stage(jaeger::Stage::AvailabilityRecovery);
if let Some(result) =
state.availability_lru.get(&candidate_hash).cloned().map(|v| v.into_result())
{
if let Err(e) = response_sender.send(result) {
gum::warn!(
target: LOG_TARGET,
err = ?e,
"Error responding with an availability recovery result",
);
}
return Ok(())
}
if let Some(i) =
state.ongoing_recoveries.iter_mut().find(|i| i.candidate_hash == candidate_hash)
{
i.awaiting.push(response_sender);
return Ok(())
}
let _span = span.child("not-cached");
let session_info = request_session_info(state.live_block.1, session_index, ctx.sender())
.await
.await
.map_err(error::Error::CanceledSessionInfo)??;
let _span = span.child("session-info-ctx-received");
match session_info {
Some(session_info) =>
launch_recovery_task(
state,
ctx,
session_info,
receipt,
backing_group,
response_sender,
metrics,
recovery_strategy,
erasure_task_tx,
)
.await,
None => {
gum::warn!(target: LOG_TARGET, "SessionInfo is `None` at {:?}", state.live_block);
response_sender
.send(Err(RecoveryError::Unavailable))
.map_err(|_| error::Error::CanceledResponseSender)?;
Ok(())
},
}
}
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
async fn query_full_data<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
) -> error::Result<Option<AvailableData>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx))
.await;
rx.await.map_err(error::Error::CanceledQueryFullData)
}
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
async fn query_chunk_size<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
) -> error::Result<Option<usize>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx))
.await;
rx.await.map_err(error::Error::CanceledQueryFullData)
}
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
impl AvailabilityRecoverySubsystem {
pub fn with_availability_store_skip(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { recovery_strategy: RecoveryStrategy::BypassAvailabilityStore, req_receiver, metrics }
}
pub fn with_fast_path(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { recovery_strategy: RecoveryStrategy::BackersFirstAlways, req_receiver, metrics }
}
pub fn with_chunks_only(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { recovery_strategy: RecoveryStrategy::ChunksAlways, req_receiver, metrics }
}
pub fn with_chunks_if_pov_large(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self {
recovery_strategy: RecoveryStrategy::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
req_receiver,
metrics,
}
}
async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
let mut state = State::default();
let Self { recovery_strategy, mut req_receiver, metrics } = self;
let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
let mut erasure_task_rx = erasure_task_rx.fuse();
let mut to_pool = ThreadPoolBuilder::build(
NonZeroUsize::new(2).expect("There are 2 threads; qed"),
metrics.clone(),
&mut ctx,
)
.into_iter()
.cycle();
loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
pin_mut!(recv_req);
futures::select! {
erasure_task = erasure_task_rx.next() => {
match erasure_task {
Some(task) => {
let send_result = to_pool
.next()
.expect("Pool size is `NonZeroUsize`; qed")
.send(task)
.await
.map_err(|_| RecoveryError::ChannelClosed);
if let Err(err) = send_result {
gum::warn!(
target: LOG_TARGET,
?err,
"Failed to send erasure coding task",
);
}
},
None => {
gum::debug!(
target: LOG_TARGET,
"Erasure task channel closed",
);
return Err(SubsystemError::with_origin("availability-recovery", RecoveryError::ChannelClosed))
}
}
}
v = ctx.recv().fuse() => {
match v? {
FromOrchestra::Signal(signal) => if handle_signal(
&mut state,
signal,
).await? {
return Ok(());
}
FromOrchestra::Communication { msg } => {
match msg {
AvailabilityRecoveryMessage::RecoverAvailableData(
receipt,
session_index,
maybe_backing_group,
response_sender,
) => {
if let Err(e) = handle_recover(
&mut state,
&mut ctx,
receipt,
session_index,
maybe_backing_group.filter(|_| recovery_strategy.needs_backing_group()),
response_sender,
&metrics,
&recovery_strategy,
erasure_task_tx.clone(),
).await {
gum::warn!(
target: LOG_TARGET,
err = ?e,
"Error handling a recovery request",
);
}
}
}
}
}
}
in_req = recv_req => {
match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? {
Ok(req) => {
if recovery_strategy == RecoveryStrategy::BypassAvailabilityStore {
gum::debug!(
target: LOG_TARGET,
"Skipping request to availability-store.",
);
let _ = req.send_response(None.into());
continue
}
match query_full_data(&mut ctx, req.payload.candidate_hash).await {
Ok(res) => {
let _ = req.send_response(res.into());
}
Err(e) => {
gum::debug!(
target: LOG_TARGET,
err = ?e,
"Failed to query available data.",
);
let _ = req.send_response(None.into());
}
}
}
Err(jfyi) => {
gum::debug!(
target: LOG_TARGET,
error = ?jfyi,
"Decoding incoming request failed"
);
continue
}
}
}
output = state.ongoing_recoveries.select_next_some() => {
if let Some((candidate_hash, result)) = output {
if let Ok(recovery) = CachedRecovery::try_from(result) {
state.availability_lru.insert(candidate_hash, recovery);
}
}
}
}
}
}
}
struct ThreadPoolBuilder;
const MAX_THREADS: NonZeroUsize = match NonZeroUsize::new(4) {
Some(max_threads) => max_threads,
None => panic!("MAX_THREADS must be non-zero"),
};
impl ThreadPoolBuilder {
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
pub fn build<Context>(
size: NonZeroUsize,
metrics: Metrics,
ctx: &mut Context,
) -> Vec<futures::channel::mpsc::Sender<ErasureTask>> {
let size = std::cmp::min(size, MAX_THREADS);
let mut senders = Vec::new();
for index in 0..size.into() {
let (tx, rx) = futures::channel::mpsc::channel(8);
senders.push(tx);
if let Err(e) = ctx
.spawn_blocking("erasure-task", Box::pin(erasure_task_thread(metrics.clone(), rx)))
{
gum::warn!(
target: LOG_TARGET,
err = ?e,
index,
"Failed to spawn a erasure task",
);
}
}
senders
}
}
async fn erasure_task_thread(
metrics: Metrics,
mut ingress: futures::channel::mpsc::Receiver<ErasureTask>,
) {
loop {
match ingress.next().await {
Some(ErasureTask::Reconstruct(n_validators, chunks, sender)) => {
let _ = sender.send(polkadot_erasure_coding::reconstruct_v1(
n_validators,
chunks.values().map(|c| (&c.chunk[..], c.index.0 as usize)),
));
},
Some(ErasureTask::Reencode(n_validators, root, available_data, sender)) => {
let metrics = metrics.clone();
let maybe_data = if reconstructed_data_matches_root(
n_validators,
&root,
&available_data,
&metrics,
) {
Some(available_data)
} else {
None
};
let _ = sender.send(maybe_data);
},
None => {
gum::debug!(
target: LOG_TARGET,
"Erasure task channel closed. Node shutting down ?",
);
},
}
}
}