1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
pub mod basic;
18
pub mod lookahead;
19

            
20
use {
21
    crate::{find_pre_digest, AuthorityId, OrchestratorAuraWorkerAuxData},
22
    cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface,
23
    cumulus_client_consensus_common::ParachainCandidate,
24
    cumulus_client_consensus_proposer::ProposerInterface,
25
    cumulus_client_parachain_inherent::{ParachainInherentData, ParachainInherentDataProvider},
26
    cumulus_primitives_core::{
27
        relay_chain::Hash as PHash, DigestItem, ParachainBlockData, PersistedValidationData,
28
    },
29
    cumulus_relay_chain_interface::RelayChainInterface,
30
    futures::prelude::*,
31
    nimbus_primitives::{CompatibleDigestItem as NimbusCompatibleDigestItem, NIMBUS_KEY_ID},
32
    parity_scale_codec::{Codec, Encode},
33
    polkadot_node_primitives::{Collation, MaybeCompressedPoV},
34
    polkadot_primitives::Id as ParaId,
35
    sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction},
36
    sp_application_crypto::{AppCrypto, AppPublic},
37
    sp_consensus::BlockOrigin,
38
    sp_consensus_aura::{digests::CompatibleDigestItem, Slot},
39
    sp_core::crypto::{ByteArray, Pair},
40
    sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider},
41
    sp_keystore::{Keystore, KeystorePtr},
42
    sp_runtime::{
43
        generic::Digest,
44
        traits::{Block as BlockT, HashingFor, Header as HeaderT, Member, Zero},
45
    },
46
    sp_state_machine::StorageChanges,
47
    sp_timestamp::Timestamp,
48
    std::{convert::TryFrom, error::Error, time::Duration},
49
};
50

            
51
/// Parameters for instantiating a [`Collator`].
52
pub struct Params<BI, CIDP, RClient, Proposer, CS> {
53
    /// A builder for inherent data builders.
54
    pub create_inherent_data_providers: CIDP,
55
    /// The block import handle.
56
    pub block_import: BI,
57
    /// An interface to the relay-chain client.
58
    pub relay_client: RClient,
59
    /// The keystore handle used for accessing parachain key material.
60
    pub keystore: KeystorePtr,
61
    /// The identifier of the parachain within the relay-chain.
62
    pub para_id: ParaId,
63
    /// The block proposer used for building blocks.
64
    pub proposer: Proposer,
65
    /// The collator service used for bundling proposals into collations and announcing
66
    /// to the network.
67
    pub collator_service: CS,
68
}
69

            
70
/// A utility struct for writing collation logic that makes use of
71
/// Tanssi Aura entirely or in part.
72
pub struct Collator<Block, P, BI, CIDP, RClient, Proposer, CS> {
73
    create_inherent_data_providers: CIDP,
74
    block_import: BI,
75
    relay_client: RClient,
76
    keystore: KeystorePtr,
77
    para_id: ParaId,
78
    proposer: Proposer,
79
    collator_service: CS,
80
    _marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
81
}
82

            
83
impl<Block, P, BI, CIDP, RClient, Proposer, CS> Collator<Block, P, BI, CIDP, RClient, Proposer, CS>
84
where
85
    Block: BlockT,
86
    RClient: RelayChainInterface,
87
    CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)> + 'static,
88
    BI: BlockImport<Block> + Send + Sync + 'static,
89
    Proposer: ProposerInterface<Block>,
90
    CS: CollatorServiceInterface<Block>,
91
    P: Pair + Send + Sync + 'static,
92
    P::Public: AppPublic + Member,
93
    P::Signature: TryFrom<Vec<u8>> + Member + Codec,
94
{
95
    /// Instantiate a new instance of the `Tanssi Aura` manager.
96
1
    pub fn new(params: Params<BI, CIDP, RClient, Proposer, CS>) -> Self {
97
1
        Collator {
98
1
            create_inherent_data_providers: params.create_inherent_data_providers,
99
1
            block_import: params.block_import,
100
1
            relay_client: params.relay_client,
101
1
            keystore: params.keystore,
102
1
            para_id: params.para_id,
103
1
            proposer: params.proposer,
104
1
            collator_service: params.collator_service,
105
1
            _marker: std::marker::PhantomData,
106
1
        }
107
1
    }
108

            
109
    /// Explicitly creates the inherent data for parachain block authoring.
110
1
    pub async fn create_inherent_data(
111
1
        &self,
112
1
        relay_parent: PHash,
113
1
        validation_data: &PersistedValidationData,
114
1
        parent_hash: Block::Hash,
115
1
        _timestamp: impl Into<Option<Timestamp>>,
116
1
    ) -> Result<(ParachainInherentData, InherentData), Box<dyn Error + Send + Sync + 'static>> {
117
1
        let paras_inherent_data = ParachainInherentDataProvider::create_at(
118
1
            relay_parent,
119
1
            &self.relay_client,
120
1
            validation_data,
121
1
            self.para_id,
122
1
        )
123
        .await;
124

            
125
1
        let paras_inherent_data = match paras_inherent_data {
126
1
            Some(p) => p,
127
            None => {
128
                return Err(
129
                    format!("Could not create paras inherent data at {:?}", relay_parent).into(),
130
                )
131
            }
132
        };
133

            
134
1
        let other_inherent_data = self
135
1
            .create_inherent_data_providers
136
1
            .create_inherent_data_providers(parent_hash, (relay_parent, validation_data.clone()))
137
1
            .map_err(|e| e as Box<dyn Error + Send + Sync + 'static>)
138
            .await?
139
1
            .create_inherent_data()
140
            .await
141
1
            .map_err(Box::new)?;
142

            
143
1
        Ok((paras_inherent_data, other_inherent_data))
144
1
    }
145

            
146
    /// Propose, seal, and import a block, packaging it into a collation.
147
    ///
148
    /// Provide the slot to build at as well as any other necessary pre-digest logs,
149
    /// the inherent data, and the proposal duration and PoV size limits.
150
    ///
151
    /// The Tanssi Aura pre-digest is set internally.
152
    ///
153
    /// This does not announce the collation to the parachain network or the relay chain.
154
    #[allow(clippy::cast_precision_loss)]
155
1
    pub async fn collate(
156
1
        &mut self,
157
1
        parent_header: &Block::Header,
158
1
        slot_claim: &mut SlotClaim<P::Public>,
159
1
        additional_pre_digest: impl Into<Option<Vec<DigestItem>>>,
160
1
        inherent_data: (ParachainInherentData, InherentData),
161
1
        proposal_duration: Duration,
162
1
        max_pov_size: usize,
163
1
    ) -> Result<
164
1
        Option<(Collation, ParachainBlockData<Block>, Block::Hash)>,
165
1
        Box<dyn Error + Send + 'static>,
166
1
    > {
167
1
        let mut digest = additional_pre_digest.into().unwrap_or_default();
168
1
        digest.append(&mut slot_claim.pre_digest);
169

            
170
1
        let maybe_proposal = self
171
1
            .proposer
172
1
            .propose(
173
1
                parent_header,
174
1
                &inherent_data.0,
175
1
                inherent_data.1,
176
1
                Digest { logs: digest },
177
1
                proposal_duration,
178
1
                Some(max_pov_size),
179
1
            )
180
            .await
181
1
            .map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;
182

            
183
1
        let proposal = match maybe_proposal {
184
            None => return Ok(None),
185
1
            Some(p) => p,
186
        };
187

            
188
1
        let sealed_importable = seal_tanssi::<_, P>(
189
1
            proposal.block,
190
1
            proposal.storage_changes,
191
1
            &slot_claim.author_pub,
192
1
            &self.keystore,
193
1
        )
194
1
        .map_err(|e| e as Box<dyn Error + Send>)?;
195

            
196
1
        let post_hash = sealed_importable.post_hash();
197
1
        let block = Block::new(
198
1
            sealed_importable.post_header(),
199
1
            sealed_importable
200
1
                .body
201
1
                .as_ref()
202
1
                .expect("body always created with this `propose` fn; qed")
203
1
                .clone(),
204
1
        );
205
1

            
206
1
        self.block_import
207
1
            .import_block(sealed_importable)
208
1
            .map_err(|e| Box::new(e) as Box<dyn Error + Send>)
209
            .await?;
210

            
211
1
        if let Some((collation, block_data)) = self.collator_service.build_collation(
212
1
            parent_header,
213
1
            post_hash,
214
1
            ParachainCandidate {
215
1
                block,
216
1
                proof: proposal.proof,
217
1
            },
218
1
        ) {
219
1
            tracing::info!(
220
                target: crate::LOG_TARGET,
221
                "PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}",
222
                block_data.header().encoded_size() as f64 / 1024f64,
223
                block_data.extrinsics().encoded_size() as f64 / 1024f64,
224
                block_data.storage_proof().encoded_size() as f64 / 1024f64,
225
            );
226

            
227
1
            if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
228
1
                tracing::info!(
229
                    target: crate::LOG_TARGET,
230
                    "Compressed PoV size: {}kb",
231
                    pov.block_data.0.len() as f64 / 1024f64,
232
                );
233
            }
234

            
235
1
            Ok(Some((collation, block_data, post_hash)))
236
        } else {
237
            Err(
238
                Box::<dyn Error + Send + Sync>::from("Unable to produce collation")
239
                    as Box<dyn Error + Send>,
240
            )
241
        }
242
1
    }
243

            
244
    /// Get the underlying collator service.
245
    pub fn collator_service(&self) -> &CS {
246
        &self.collator_service
247
    }
248
}
249

            
250
6
fn pre_digest_data<P: Pair>(slot: Slot, claim: P::Public) -> Vec<sp_runtime::DigestItem>
251
6
where
252
6
    P::Public: Codec,
253
6
    P::Signature: Codec,
254
6
{
255
6
    vec![
256
6
        <DigestItem as CompatibleDigestItem<P::Signature>>::aura_pre_digest(slot),
257
6
        // We inject the nimbus digest as well. Crutial to be able to verify signatures
258
6
        <DigestItem as NimbusCompatibleDigestItem>::nimbus_pre_digest(
259
6
            // TODO remove this unwrap through trait reqs
260
6
            nimbus_primitives::NimbusId::from_slice(claim.as_ref()).unwrap(),
261
6
        ),
262
6
    ]
263
6
}
264

            
265
#[derive(Debug)]
266
pub struct SlotClaim<Pub> {
267
    author_pub: Pub,
268
    pre_digest: Vec<DigestItem>,
269
    slot: Slot,
270
}
271

            
272
impl<Pub: Clone> SlotClaim<Pub> {
273
6
    pub fn unchecked<P>(author_pub: Pub, slot: Slot) -> Self
274
6
    where
275
6
        P: Pair<Public = Pub>,
276
6
        P::Public: Codec,
277
6
        P::Signature: Codec,
278
6
    {
279
6
        SlotClaim {
280
6
            author_pub: author_pub.clone(),
281
6
            pre_digest: pre_digest_data::<P>(slot, author_pub),
282
6
            slot,
283
6
        }
284
6
    }
285

            
286
    /// Get the author's public key.
287
    pub fn author_pub(&self) -> &Pub {
288
        &self.author_pub
289
    }
290

            
291
    /// Get the pre-digest.
292
    pub fn pre_digest(&self) -> &Vec<DigestItem> {
293
        &self.pre_digest
294
    }
295

            
296
    /// Get the slot assigned to this claim.
297
    pub fn slot(&self) -> Slot {
298
        self.slot
299
    }
300
}
301

            
302
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
303
pub enum ClaimMode {
304
    ForceAuthoring,
305
    NormalAuthoring,
306
    ParathreadCoreBuying { drift_permitted: Slot },
307
}
308

            
309
/// Attempt to claim a slot locally.
310
19
pub fn tanssi_claim_slot<P, B>(
311
19
    aux_data: OrchestratorAuraWorkerAuxData<P>,
312
19
    chain_head: &B::Header,
313
19
    slot: Slot,
314
19
    claim_mode: ClaimMode,
315
19
    keystore: &KeystorePtr,
316
19
) -> Option<SlotClaim<P::Public>>
317
19
where
318
19
    P: Pair + Send + Sync + 'static,
319
19
    P::Public: Codec + std::fmt::Debug,
320
19
    P::Signature: Codec,
321
19
    B: BlockT,
322
19
{
323
19
    let author_pub = claim_slot_inner::<P>(slot, &aux_data.authorities, keystore, claim_mode)?;
324

            
325
13
    if is_parathread_and_should_skip_slot::<P, B>(&aux_data, chain_head, slot, claim_mode) {
326
7
        return None;
327
6
    }
328
6

            
329
6
    Some(SlotClaim::unchecked::<P>(author_pub, slot))
330
19
}
331

            
332
/// Returns true if this container chain is a parathread and the collator should skip this slot and not produce a block
333
13
pub fn is_parathread_and_should_skip_slot<P, B>(
334
13
    aux_data: &OrchestratorAuraWorkerAuxData<P>,
335
13
    chain_head: &B::Header,
336
13
    slot: Slot,
337
13
    claim_mode: ClaimMode,
338
13
) -> bool
339
13
where
340
13
    P: Pair + Send + Sync + 'static,
341
13
    P::Public: Codec + std::fmt::Debug,
342
13
    P::Signature: Codec,
343
13
    B: BlockT,
344
13
{
345
13
    if slot.is_zero() {
346
        // Always produce on slot 0 (for tests)
347
1
        return false;
348
12
    }
349
12
    if let Some(slot_freq) = &aux_data.slot_freq {
350
9
        if let Ok(chain_head_slot) = find_pre_digest::<B, P::Signature>(chain_head) {
351
            // TODO: this doesn't take into account force authoring.
352
            // So a node with `force_authoring = true` will not propose a block for a parathread until the
353
            // `min_slot_freq` has elapsed.
354
9
            match claim_mode {
355
                ClaimMode::NormalAuthoring | ClaimMode::ForceAuthoring => {
356
9
                    !slot_freq.should_parathread_author_block(slot, chain_head_slot)
357
                }
358
                ClaimMode::ParathreadCoreBuying { drift_permitted } => {
359
                    !slot_freq.should_parathread_buy_core(slot, drift_permitted, chain_head_slot)
360
                }
361
            }
362
        } else {
363
            // In case of error always propose
364
            false
365
        }
366
    } else {
367
        // Not a parathread: always propose
368
3
        false
369
    }
370
13
}
371

            
372
/// Attempt to claim a slot using a keystore.
373
19
pub fn claim_slot_inner<P>(
374
19
    slot: Slot,
375
19
    authorities: &Vec<AuthorityId<P>>,
376
19
    keystore: &KeystorePtr,
377
19
    claim_mode: ClaimMode,
378
19
) -> Option<P::Public>
379
19
where
380
19
    P: Pair,
381
19
    P::Public: Codec + std::fmt::Debug,
382
19
    P::Signature: Codec,
383
19
{
384
19
    let expected_author = crate::slot_author::<P>(slot, authorities.as_slice());
385
19
    // if running with force-authoring, as long as you are in the authority set, propose
386
19
    if claim_mode == ClaimMode::ForceAuthoring {
387
        authorities
388
            .iter()
389
            .find(|key| keystore.has_keys(&[(key.to_raw_vec(), NIMBUS_KEY_ID)]))
390
            .cloned()
391
    }
392
    // if not running with force-authoring, just do the usual slot check
393
    else {
394
19
        expected_author.and_then(|p| {
395
19
            if keystore.has_keys(&[(p.to_raw_vec(), NIMBUS_KEY_ID)]) {
396
13
                Some(p.clone())
397
            } else {
398
6
                None
399
            }
400
19
        })
401
    }
402
19
}
403

            
404
/// Seal a block with a signature in the header.
405
1
pub fn seal_tanssi<B: BlockT, P>(
406
1
    pre_sealed: B,
407
1
    storage_changes: StorageChanges<HashingFor<B>>,
408
1
    author_pub: &P::Public,
409
1
    keystore: &KeystorePtr,
410
1
) -> Result<BlockImportParams<B>, Box<dyn Error + Send + Sync + 'static>>
411
1
where
412
1
    P: Pair,
413
1
    P::Signature: Codec + TryFrom<Vec<u8>>,
414
1
    P::Public: AppPublic,
415
1
{
416
1
    let (pre_header, body) = pre_sealed.deconstruct();
417
1
    let pre_hash = pre_header.hash();
418
1
    let block_number = *pre_header.number();
419

            
420
    // sign the pre-sealed hash of the block and then
421
    // add it to a digest item.
422
1
    let signature = Keystore::sign_with(
423
1
        keystore,
424
1
        <AuthorityId<P> as AppCrypto>::ID,
425
1
        <AuthorityId<P> as AppCrypto>::CRYPTO_ID,
426
1
        author_pub.as_slice(),
427
1
        pre_hash.as_ref(),
428
1
    )
429
1
    .map_err(|e| sp_consensus::Error::CannotSign(format!("{}. Key: {:?}", e, author_pub)))?
430
1
    .ok_or_else(|| {
431
        sp_consensus::Error::CannotSign(format!(
432
            "Could not find key in keystore. Key: {:?}",
433
            author_pub
434
        ))
435
1
    })?;
436
1
    let signature = signature
437
1
        .clone()
438
1
        .try_into()
439
1
        .map_err(|_| sp_consensus::Error::InvalidSignature(signature, author_pub.to_raw_vec()))?;
440

            
441
1
    let signature_digest_item = <DigestItem as NimbusCompatibleDigestItem>::nimbus_seal(signature);
442
1

            
443
1
    // seal the block.
444
1
    let block_import_params = {
445
1
        let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, pre_header);
446
1
        block_import_params.post_digests.push(signature_digest_item);
447
1
        block_import_params.body = Some(body.clone());
448
1
        block_import_params.state_action =
449
1
            StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
450
1
        block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
451
1
        block_import_params
452
1
    };
453
1
    let post_hash = block_import_params.post_hash();
454
1

            
455
1
    tracing::info!(
456
        target: crate::LOG_TARGET,
457
        "🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
458
        block_number,
459
        post_hash,
460
        pre_hash,
461
    );
462

            
463
1
    Ok(block_import_params)
464
1
}