1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
//! # Collator Assignment Pallet
18
//!
19
//! This pallet assigns a list of collators to:
20
//!    - the orchestrator chain
21
//!    - a set of container chains
22
//!
23
//! The set of container chains is retrieved thanks to the GetContainerChains trait
24
//! The number of collators to assign to the orchestrator chain and the number
25
//! of collators to assign to each container chain is retrieved through the GetHostConfiguration
26
//! trait.
27
//!  
28
//! The pallet uses the following approach:
29
//!
30
//! - First, it aims at filling the necessary collators to serve the orchestrator chain
31
//! - Second, it aims at filling in-order (FIFO) the existing containerChains
32
//!
33
//! Upon new session, this pallet takes whatever assignation was in the PendingCollatorContainerChain
34
//! storage, and assigns it as the current CollatorContainerChain. In addition, it takes the next
35
//! queued set of parachains and collators and calculates the assignment for the next session, storing
36
//! it in the PendingCollatorContainerChain storage item.
37
//!
38
//! The reason for the collator-assignment pallet to work with a one-session delay assignment is because
39
//! we want collators to know at least one session in advance the container chain/orchestrator that they
40
//! are assigned to.
41

            
42
#![cfg_attr(not(feature = "std"), no_std)]
43

            
44
use {
45
    crate::assignment::{Assignment, ChainNumCollators},
46
    frame_support::{pallet_prelude::*, traits::Currency},
47
    frame_system::pallet_prelude::BlockNumberFor,
48
    rand::{seq::SliceRandom, SeedableRng},
49
    rand_chacha::ChaCha20Rng,
50
    sp_runtime::{
51
        traits::{AtLeast32BitUnsigned, One, Zero},
52
        Saturating,
53
    },
54
    sp_std::{collections::btree_set::BTreeSet, fmt::Debug, prelude::*, vec},
55
    tp_traits::{
56
        CollatorAssignmentHook, CollatorAssignmentTip, GetContainerChainAuthor,
57
        GetHostConfiguration, GetSessionContainerChains, ParaId, RemoveInvulnerables,
58
        RemoveParaIdsWithNoCredits, ShouldRotateAllCollators, Slot,
59
    },
60
};
61
pub use {dp_collator_assignment::AssignedCollators, pallet::*};
62

            
63
mod assignment;
64
#[cfg(feature = "runtime-benchmarks")]
65
mod benchmarking;
66
pub mod weights;
67
pub use weights::WeightInfo;
68

            
69
#[cfg(test)]
70
mod mock;
71

            
72
#[cfg(test)]
73
mod tests;
74

            
75
1584
#[frame_support::pallet]
76
pub mod pallet {
77
    use super::*;
78

            
79
182
    #[pallet::pallet]
80
    #[pallet::without_storage_info]
81
    pub struct Pallet<T>(_);
82

            
83
    /// Configure the pallet by specifying the parameters and types on which it depends.
84
    #[pallet::config]
85
    pub trait Config: frame_system::Config {
86
        /// The overarching event type.
87
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
88
        type SessionIndex: parity_scale_codec::FullCodec
89
            + TypeInfo
90
            + Copy
91
            + AtLeast32BitUnsigned
92
            + Debug;
93
        // `SESSION_DELAY` is used to delay any changes to Paras registration or configurations.
94
        // Wait until the session index is 2 larger then the current index to apply any changes,
95
        // which guarantees that at least one full session has passed before any changes are applied.
96
        type HostConfiguration: GetHostConfiguration<Self::SessionIndex>;
97
        type ContainerChains: GetSessionContainerChains<Self::SessionIndex>;
98
        type SelfParaId: Get<ParaId>;
99
        type ShouldRotateAllCollators: ShouldRotateAllCollators<Self::SessionIndex>;
100
        type GetRandomnessForNextBlock: GetRandomnessForNextBlock<BlockNumberFor<Self>>;
101
        type RemoveInvulnerables: RemoveInvulnerables<Self::AccountId>;
102
        type RemoveParaIdsWithNoCredits: RemoveParaIdsWithNoCredits;
103
        type CollatorAssignmentHook: CollatorAssignmentHook<BalanceOf<Self>>;
104
        type Currency: Currency<Self::AccountId>;
105
        type CollatorAssignmentTip: CollatorAssignmentTip<BalanceOf<Self>>;
106
        /// The weight information of this pallet.
107
        type WeightInfo: WeightInfo;
108
    }
109

            
110
    #[pallet::event]
111
1911
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
112
    pub enum Event<T: Config> {
113
11
        NewPendingAssignment {
114
            random_seed: [u8; 32],
115
            full_rotation: bool,
116
            target_session: T::SessionIndex,
117
        },
118
    }
119

            
120
86355
    #[pallet::storage]
121
    #[pallet::getter(fn collator_container_chain)]
122
    pub(crate) type CollatorContainerChain<T: Config> =
123
        StorageValue<_, AssignedCollators<T::AccountId>, ValueQuery>;
124

            
125
    /// Pending configuration changes.
126
    ///
127
    /// This is a list of configuration changes, each with a session index at which it should
128
    /// be applied.
129
    ///
130
    /// The list is sorted ascending by session index. Also, this list can only contain at most
131
    /// 2 items: for the next session and for the `scheduled_session`.
132
10200
    #[pallet::storage]
133
    #[pallet::getter(fn pending_collator_container_chain)]
134
    pub(crate) type PendingCollatorContainerChain<T: Config> =
135
        StorageValue<_, Option<AssignedCollators<T::AccountId>>, ValueQuery>;
136

            
137
    /// Randomness from previous block. Used to shuffle collators on session change.
138
    /// Should only be set on the last block of each session and should be killed on the on_initialize of the next block.
139
    /// The default value of [0; 32] disables randomness in the pallet.
140
5776
    #[pallet::storage]
141
    #[pallet::getter(fn randomness)]
142
    pub(crate) type Randomness<T: Config> = StorageValue<_, [u8; 32], ValueQuery>;
143

            
144
176
    #[pallet::call]
145
    impl<T: Config> Pallet<T> {}
146

            
147
    /// A struct that holds the assignment that is active after the session change and optionally
148
    /// the assignment that becomes active after the next session change.
149
    pub struct SessionChangeOutcome<T: Config> {
150
        /// New active assignment.
151
        pub active_assignment: AssignedCollators<T::AccountId>,
152
        /// Next session active assignment.
153
        pub next_assignment: AssignedCollators<T::AccountId>,
154
        /// Total number of registered parachains before filtering them out, used as a weight hint
155
        pub num_total_registered_paras: u32,
156
    }
157

            
158
    impl<T: Config> Pallet<T> {
159
        /// Assign new collators
160
        /// collators should be queued collators
161
1911
        pub fn assign_collators(
162
1911
            current_session_index: &T::SessionIndex,
163
1911
            random_seed: [u8; 32],
164
1911
            collators: Vec<T::AccountId>,
165
1911
        ) -> SessionChangeOutcome<T> {
166
1911
            // We work with one session delay to calculate assignments
167
1911
            let session_delay = T::SessionIndex::one();
168
1911
            let target_session_index = current_session_index.saturating_add(session_delay);
169
1911
            // We get the containerChains that we will have at the target session
170
1911
            let container_chains =
171
1911
                T::ContainerChains::session_container_chains(target_session_index);
172
1911
            let num_total_registered_paras =
173
1911
                (container_chains.parachains.len() + container_chains.parathreads.len()) as u32;
174
1911
            let mut container_chain_ids = container_chains.parachains;
175
1911
            let mut parathreads: Vec<_> = container_chains
176
1911
                .parathreads
177
1911
                .into_iter()
178
1911
                .map(|(para_id, _)| para_id)
179
1911
                .collect();
180
1911

            
181
1911
            // We read current assigned collators
182
1911
            let old_assigned = Self::read_assigned_collators();
183
1911
            let old_assigned_para_ids: BTreeSet<ParaId> =
184
1911
                old_assigned.container_chains.keys().cloned().collect();
185
1911

            
186
1911
            // Remove the containerChains that do not have enough credits for block production
187
1911
            T::RemoveParaIdsWithNoCredits::remove_para_ids_with_no_credits(
188
1911
                &mut container_chain_ids,
189
1911
                &old_assigned_para_ids,
190
1911
            );
191
1911
            // TODO: parathreads should be treated a bit differently, they don't need to have the same amount of credits
192
1911
            // as parathreads because they will not be producing blocks on every slot.
193
1911
            T::RemoveParaIdsWithNoCredits::remove_para_ids_with_no_credits(
194
1911
                &mut parathreads,
195
1911
                &old_assigned_para_ids,
196
1911
            );
197
1911

            
198
1911
            let mut shuffle_collators = None;
199
1911
            // If the random_seed is all zeros, we don't shuffle the list of collators nor the list
200
1911
            // of container chains.
201
1911
            // This should only happen in tests, and in the genesis block.
202
1911
            if random_seed != [0; 32] {
203
74
                let mut rng: ChaCha20Rng = SeedableRng::from_seed(random_seed);
204
74
                container_chain_ids.shuffle(&mut rng);
205
74
                parathreads.shuffle(&mut rng);
206
74
                shuffle_collators = Some(move |collators: &mut Vec<T::AccountId>| {
207
74
                    collators.shuffle(&mut rng);
208
74
                })
209
1837
            }
210

            
211
1911
            let orchestrator_chain = ChainNumCollators {
212
1911
                para_id: T::SelfParaId::get(),
213
1911
                min_collators: T::HostConfiguration::min_collators_for_orchestrator(
214
1911
                    target_session_index,
215
1911
                ),
216
1911
                max_collators: T::HostConfiguration::max_collators_for_orchestrator(
217
1911
                    target_session_index,
218
1911
                ),
219
1911
            };
220
1911
            // Initialize list of chains as `[container1, container2, parathread1, parathread2]`.
221
1911
            // The order means priority: the first chain in the list will be the first one to get assigned collators.
222
1911
            // Chains will not be assigned less than `min_collators`, except the orchestrator chain.
223
1911
            // First all chains will be assigned `min_collators`, and then the first one will be assigned up to `max`,
224
1911
            // then the second one, and so on.
225
1911
            let mut chains = vec![];
226
1911
            let collators_per_container =
227
1911
                T::HostConfiguration::collators_per_container(target_session_index);
228
4460
            for para_id in &container_chain_ids {
229
2549
                chains.push(ChainNumCollators {
230
2549
                    para_id: *para_id,
231
2549
                    min_collators: collators_per_container,
232
2549
                    max_collators: collators_per_container,
233
2549
                });
234
2549
            }
235
1911
            let collators_per_parathread =
236
1911
                T::HostConfiguration::collators_per_parathread(target_session_index);
237
2015
            for para_id in &parathreads {
238
104
                chains.push(ChainNumCollators {
239
104
                    para_id: *para_id,
240
104
                    min_collators: collators_per_parathread,
241
104
                    max_collators: collators_per_parathread,
242
104
                });
243
104
            }
244

            
245
            // Are there enough collators to satisfy the minimum demand?
246
1911
            let enough_collators_for_all_chain = collators.len() as u32
247
1911
                >= T::HostConfiguration::min_collators_for_orchestrator(target_session_index)
248
1911
                    .saturating_add(
249
1911
                        collators_per_container.saturating_mul(container_chain_ids.len() as u32),
250
1911
                    )
251
1911
                    .saturating_add(
252
1911
                        collators_per_parathread.saturating_mul(parathreads.len() as u32),
253
1911
                    );
254
1911

            
255
1911
            // Prioritize paras by tip on congestion
256
1911
            // As of now this doesn't distinguish between parachains and parathreads
257
1911
            // TODO apply different logic to parathreads
258
1911
            if !enough_collators_for_all_chain {
259
1186
                chains.sort_by(|a, b| {
260
1184
                    T::CollatorAssignmentTip::get_para_tip(b.para_id)
261
1184
                        .cmp(&T::CollatorAssignmentTip::get_para_tip(a.para_id))
262
1186
                });
263
1227
            }
264

            
265
            // We assign new collators
266
            // we use the config scheduled at the target_session_index
267
1911
            let new_assigned =
268
1911
                if T::ShouldRotateAllCollators::should_rotate_all_collators(target_session_index) {
269
132
                    log::debug!(
270
                        "Collator assignment: rotating collators. Session {:?}, Seed: {:?}",
271
                        current_session_index.encode(),
272
                        random_seed
273
                    );
274

            
275
132
                    Self::deposit_event(Event::NewPendingAssignment {
276
132
                        random_seed,
277
132
                        full_rotation: true,
278
132
                        target_session: target_session_index,
279
132
                    });
280
132

            
281
132
                    Assignment::<T>::assign_collators_rotate_all(
282
132
                        collators,
283
132
                        orchestrator_chain,
284
132
                        chains,
285
132
                        shuffle_collators,
286
132
                    )
287
                } else {
288
1779
                    log::debug!(
289
                        "Collator assignment: keep old assigned. Session {:?}, Seed: {:?}",
290
                        current_session_index.encode(),
291
                        random_seed
292
                    );
293

            
294
1779
                    Self::deposit_event(Event::NewPendingAssignment {
295
1779
                        random_seed,
296
1779
                        full_rotation: false,
297
1779
                        target_session: target_session_index,
298
1779
                    });
299
1779

            
300
1779
                    Assignment::<T>::assign_collators_always_keep_old(
301
1779
                        collators,
302
1779
                        orchestrator_chain,
303
1779
                        chains,
304
1779
                        old_assigned.clone(),
305
1779
                        shuffle_collators,
306
1779
                    )
307
                };
308

            
309
1911
            let mut new_assigned = match new_assigned {
310
1907
                Ok(x) => x,
311
4
                Err(e) => {
312
4
                    log::error!(
313
4
                        "Error in collator assignment, will keep previous assignment. {:?}",
314
                        e
315
                    );
316

            
317
4
                    old_assigned.clone()
318
                }
319
            };
320

            
321
1911
            let mut assigned_containers = new_assigned.container_chains.clone();
322
2655
            assigned_containers.retain(|_, v| !v.is_empty());
323

            
324
            // On congestion, prioritized chains need to pay the minimum tip of the prioritized chains
325
1911
            let maybe_tip: Option<BalanceOf<T>> = if enough_collators_for_all_chain {
326
803
                None
327
            } else {
328
1108
                assigned_containers
329
1108
                    .into_keys()
330
1108
                    .filter_map(T::CollatorAssignmentTip::get_para_tip)
331
1108
                    .min()
332
            };
333

            
334
            // TODO: this probably is asking for a refactor
335
            // only apply the onCollatorAssignedHook if sufficient collators
336
4460
            for para_id in &container_chain_ids {
337
2549
                if !new_assigned
338
2549
                    .container_chains
339
2549
                    .get(para_id)
340
2549
                    .unwrap_or(&vec![])
341
2549
                    .is_empty()
342
                {
343
1232
                    if let Err(e) = T::CollatorAssignmentHook::on_collators_assigned(
344
1232
                        *para_id,
345
1232
                        maybe_tip.as_ref(),
346
1232
                        false,
347
1232
                    ) {
348
                        // On error remove para from assignment
349
2
                        log::warn!(
350
                            "CollatorAssignmentHook error! Removing para {} from assignment: {:?}",
351
                            u32::from(*para_id),
352
                            e
353
                        );
354
2
                        new_assigned.container_chains.remove(para_id);
355
1230
                    }
356
1317
                }
357
            }
358

            
359
2015
            for para_id in &parathreads {
360
104
                if !new_assigned
361
104
                    .container_chains
362
104
                    .get(para_id)
363
104
                    .unwrap_or(&vec![])
364
104
                    .is_empty()
365
                {
366
93
                    if let Err(e) = T::CollatorAssignmentHook::on_collators_assigned(
367
93
                        *para_id,
368
93
                        maybe_tip.as_ref(),
369
93
                        true,
370
93
                    ) {
371
                        // On error remove para from assignment
372
                        log::warn!(
373
                            "CollatorAssignmentHook error! Removing para {} from assignment: {:?}",
374
                            u32::from(*para_id),
375
                            e
376
                        );
377
                        new_assigned.container_chains.remove(para_id);
378
93
                    }
379
11
                }
380
            }
381

            
382
1911
            let mut pending = PendingCollatorContainerChain::<T>::get();
383
1911

            
384
1911
            let old_assigned_changed = old_assigned != new_assigned;
385
1911
            let mut pending_changed = false;
386
            // Update CollatorContainerChain using last entry of pending, if needed
387
1911
            if let Some(current) = pending.take() {
388
629
                pending_changed = true;
389
629
                CollatorContainerChain::<T>::put(current);
390
1301
            }
391
1911
            if old_assigned_changed {
392
871
                pending = Some(new_assigned.clone());
393
871
                pending_changed = true;
394
1359
            }
395
            // Update PendingCollatorContainerChain, if it changed
396
1911
            if pending_changed {
397
1228
                PendingCollatorContainerChain::<T>::put(pending);
398
1512
            }
399

            
400
            // Only applies to session index 0
401
1911
            if current_session_index == &T::SessionIndex::zero() {
402
532
                CollatorContainerChain::<T>::put(new_assigned.clone());
403
532
                return SessionChangeOutcome {
404
532
                    active_assignment: new_assigned.clone(),
405
532
                    next_assignment: new_assigned,
406
532
                    num_total_registered_paras,
407
532
                };
408
1379
            }
409
1379

            
410
1379
            SessionChangeOutcome {
411
1379
                active_assignment: old_assigned,
412
1379
                next_assignment: new_assigned,
413
1379
                num_total_registered_paras,
414
1379
            }
415
1911
        }
416

            
417
        // Returns the assigned collators as read from storage.
418
        // If there is any item in PendingCollatorContainerChain, returns that element.
419
        // Otherwise, reads and returns the current CollatorContainerChain
420
1911
        fn read_assigned_collators() -> AssignedCollators<T::AccountId> {
421
1911
            let mut pending_collator_list = PendingCollatorContainerChain::<T>::get();
422

            
423
1911
            if let Some(assigned_collators) = pending_collator_list.take() {
424
629
                assigned_collators
425
            } else {
426
                // Read current
427
1282
                CollatorContainerChain::<T>::get()
428
            }
429
1911
        }
430

            
431
1911
        pub fn initializer_on_new_session(
432
1911
            session_index: &T::SessionIndex,
433
1911
            collators: Vec<T::AccountId>,
434
1911
        ) -> SessionChangeOutcome<T> {
435
1911
            let random_seed = Randomness::<T>::take();
436
1911
            let num_collators = collators.len();
437
1911
            let assigned_collators = Self::assign_collators(session_index, random_seed, collators);
438
1911
            let num_total_registered_paras = assigned_collators.num_total_registered_paras;
439
1911

            
440
1911
            frame_system::Pallet::<T>::register_extra_weight_unchecked(
441
1911
                T::WeightInfo::new_session(num_collators as u32, num_total_registered_paras),
442
1911
                DispatchClass::Mandatory,
443
1911
            );
444
1911

            
445
1911
            assigned_collators
446
1911
        }
447
    }
448

            
449
    impl<T: Config> GetContainerChainAuthor<T::AccountId> for Pallet<T> {
450
        // TODO: pending collator container chain if the block is a session change!
451
26864
        fn author_for_slot(slot: Slot, para_id: ParaId) -> Option<T::AccountId> {
452
26864
            let assigned_collators = Pallet::<T>::collator_container_chain();
453
26864
            let collators = if para_id == T::SelfParaId::get() {
454
13404
                Some(&assigned_collators.orchestrator_chain)
455
            } else {
456
13460
                assigned_collators.container_chains.get(&para_id)
457
754
            }?;
458

            
459
26110
            if collators.is_empty() {
460
                // Avoid division by zero below
461
6128
                return None;
462
19982
            }
463
19982
            let author_index = u64::from(slot) % collators.len() as u64;
464
19982
            collators.get(author_index as usize).cloned()
465
26864
        }
466

            
467
        #[cfg(feature = "runtime-benchmarks")]
468
        fn set_authors_for_para_id(para_id: ParaId, authors: Vec<T::AccountId>) {
469
            let mut assigned_collators = Pallet::<T>::collator_container_chain();
470
            assigned_collators.container_chains.insert(para_id, authors);
471
            CollatorContainerChain::<T>::put(assigned_collators);
472
        }
473
    }
474

            
475
20041
    #[pallet::hooks]
476
    impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
477
10708
        fn on_initialize(n: BlockNumberFor<T>) -> Weight {
478
10708
            let mut weight = Weight::zero();
479
10708

            
480
10708
            // Account reads and writes for on_finalize
481
10708
            if T::GetRandomnessForNextBlock::should_end_session(n.saturating_add(One::one())) {
482
980
                weight += T::DbWeight::get().reads_writes(1, 1);
483
9728
            }
484

            
485
10708
            weight
486
10708
        }
487

            
488
10535
        fn on_finalize(n: BlockNumberFor<T>) {
489
10535
            // If the next block is a session change, read randomness and store in pallet storage
490
10535
            if T::GetRandomnessForNextBlock::should_end_session(n.saturating_add(One::one())) {
491
977
                let random_seed = T::GetRandomnessForNextBlock::get_randomness();
492
977
                Randomness::<T>::put(random_seed);
493
9558
            }
494
10535
        }
495
    }
496
}
497

            
498
/// Balance used by this pallet
499
pub type BalanceOf<T> =
500
    <<T as Config>::Currency as Currency<<T as frame_system::Config>::AccountId>>::Balance;
501

            
502
pub struct RotateCollatorsEveryNSessions<Period>(PhantomData<Period>);
503

            
504
impl<Period> ShouldRotateAllCollators<u32> for RotateCollatorsEveryNSessions<Period>
505
where
506
    Period: Get<u32>,
507
{
508
1588
    fn should_rotate_all_collators(session_index: u32) -> bool {
509
1588
        let period = Period::get();
510
1588

            
511
1588
        if period == 0 {
512
            // A period of 0 disables rotation
513
122
            false
514
        } else {
515
1466
            session_index % Period::get() == 0
516
        }
517
1588
    }
518
}
519

            
520
pub trait GetRandomnessForNextBlock<BlockNumber> {
521
    fn should_end_session(block_number: BlockNumber) -> bool;
522
    fn get_randomness() -> [u8; 32];
523
}
524

            
525
impl<BlockNumber> GetRandomnessForNextBlock<BlockNumber> for () {
526
2073
    fn should_end_session(_block_number: BlockNumber) -> bool {
527
2073
        false
528
2073
    }
529

            
530
    fn get_randomness() -> [u8; 32] {
531
        [0; 32]
532
    }
533
}