1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
//! # Collator Assignment Pallet
18
//!
19
//! This pallet assigns a list of collators to:
20
//!    - the orchestrator chain
21
//!    - a set of container chains
22
//!
23
//! The set of container chains is retrieved thanks to the GetContainerChains trait
24
//! The number of collators to assign to the orchestrator chain and the number
25
//! of collators to assign to each container chain is retrieved through the GetHostConfiguration
26
//! trait.
27
//!  
28
//! The pallet uses the following approach:
29
//!
30
//! - First, it aims at filling the necessary collators to serve the orchestrator chain
31
//! - Second, it aims at filling in-order (FIFO) the existing containerChains
32
//!
33
//! Upon new session, this pallet takes whatever assignation was in the PendingCollatorContainerChain
34
//! storage, and assigns it as the current CollatorContainerChain. In addition, it takes the next
35
//! queued set of parachains and collators and calculates the assignment for the next session, storing
36
//! it in the PendingCollatorContainerChain storage item.
37
//!
38
//! The reason for the collator-assignment pallet to work with a one-session delay assignment is because
39
//! we want collators to know at least one session in advance the container chain/orchestrator that they
40
//! are assigned to.
41

            
42
#![cfg_attr(not(feature = "std"), no_std)]
43

            
44
use {
45
    crate::assignment::{Assignment, ChainNumCollators},
46
    frame_support::{pallet_prelude::*, traits::Currency},
47
    frame_system::pallet_prelude::BlockNumberFor,
48
    rand::{seq::SliceRandom, SeedableRng},
49
    rand_chacha::ChaCha20Rng,
50
    sp_runtime::{
51
        traits::{AtLeast32BitUnsigned, One, Zero},
52
        Saturating,
53
    },
54
    sp_std::{collections::btree_set::BTreeSet, fmt::Debug, prelude::*, vec},
55
    tp_traits::{
56
        CollatorAssignmentHook, CollatorAssignmentTip, GetContainerChainAuthor,
57
        GetHostConfiguration, GetSessionContainerChains, ParaId, RemoveInvulnerables,
58
        RemoveParaIdsWithNoCredits, ShouldRotateAllCollators, Slot,
59
    },
60
};
61
pub use {dp_collator_assignment::AssignedCollators, pallet::*};
62

            
63
mod assignment;
64
#[cfg(feature = "runtime-benchmarks")]
65
mod benchmarking;
66
pub mod weights;
67
pub use weights::WeightInfo;
68

            
69
#[cfg(test)]
70
mod mock;
71

            
72
#[cfg(test)]
73
mod tests;
74

            
75
1585
#[frame_support::pallet]
76
pub mod pallet {
77
    use super::*;
78

            
79
184
    #[pallet::pallet]
80
    #[pallet::without_storage_info]
81
    pub struct Pallet<T>(_);
82

            
83
    /// Configure the pallet by specifying the parameters and types on which it depends.
84
    #[pallet::config]
85
    pub trait Config: frame_system::Config {
86
        /// The overarching event type.
87
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
88
        type SessionIndex: parity_scale_codec::FullCodec
89
            + TypeInfo
90
            + Copy
91
            + AtLeast32BitUnsigned
92
            + Debug;
93
        // `SESSION_DELAY` is used to delay any changes to Paras registration or configurations.
94
        // Wait until the session index is 2 larger then the current index to apply any changes,
95
        // which guarantees that at least one full session has passed before any changes are applied.
96
        type HostConfiguration: GetHostConfiguration<Self::SessionIndex>;
97
        type ContainerChains: GetSessionContainerChains<Self::SessionIndex>;
98
        type SelfParaId: Get<ParaId>;
99
        type ShouldRotateAllCollators: ShouldRotateAllCollators<Self::SessionIndex>;
100
        type GetRandomnessForNextBlock: GetRandomnessForNextBlock<BlockNumberFor<Self>>;
101
        type RemoveInvulnerables: RemoveInvulnerables<Self::AccountId>;
102
        type RemoveParaIdsWithNoCredits: RemoveParaIdsWithNoCredits;
103
        type CollatorAssignmentHook: CollatorAssignmentHook<BalanceOf<Self>>;
104
        type Currency: Currency<Self::AccountId>;
105
        type CollatorAssignmentTip: CollatorAssignmentTip<BalanceOf<Self>>;
106
        type ForceEmptyOrchestrator: Get<bool>;
107
        /// The weight information of this pallet.
108
        type WeightInfo: WeightInfo;
109
    }
110

            
111
    #[pallet::event]
112
2112
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
113
    pub enum Event<T: Config> {
114
11
        NewPendingAssignment {
115
            random_seed: [u8; 32],
116
            full_rotation: bool,
117
            target_session: T::SessionIndex,
118
        },
119
    }
120

            
121
59368
    #[pallet::storage]
122
    pub(crate) type CollatorContainerChain<T: Config> =
123
        StorageValue<_, AssignedCollators<T::AccountId>, ValueQuery>;
124

            
125
    /// Pending configuration changes.
126
    ///
127
    /// This is a list of configuration changes, each with a session index at which it should
128
    /// be applied.
129
    ///
130
    /// The list is sorted ascending by session index. Also, this list can only contain at most
131
    /// 2 items: for the next session and for the `scheduled_session`.
132
11042
    #[pallet::storage]
133
    pub(crate) type PendingCollatorContainerChain<T: Config> =
134
        StorageValue<_, Option<AssignedCollators<T::AccountId>>, ValueQuery>;
135

            
136
    /// Randomness from previous block. Used to shuffle collators on session change.
137
    /// Should only be set on the last block of each session and should be killed on the on_initialize of the next block.
138
    /// The default value of [0; 32] disables randomness in the pallet.
139
6154
    #[pallet::storage]
140
    pub(crate) type Randomness<T: Config> = StorageValue<_, [u8; 32], ValueQuery>;
141

            
142
176
    #[pallet::call]
143
    impl<T: Config> Pallet<T> {}
144

            
145
    /// A struct that holds the assignment that is active after the session change and optionally
146
    /// the assignment that becomes active after the next session change.
147
    pub struct SessionChangeOutcome<T: Config> {
148
        /// New active assignment.
149
        pub active_assignment: AssignedCollators<T::AccountId>,
150
        /// Next session active assignment.
151
        pub next_assignment: AssignedCollators<T::AccountId>,
152
        /// Total number of registered parachains before filtering them out, used as a weight hint
153
        pub num_total_registered_paras: u32,
154
    }
155

            
156
    impl<T: Config> Pallet<T> {
157
        /// Assign new collators
158
        /// collators should be queued collators
159
2112
        pub fn assign_collators(
160
2112
            current_session_index: &T::SessionIndex,
161
2112
            random_seed: [u8; 32],
162
2112
            collators: Vec<T::AccountId>,
163
2112
        ) -> SessionChangeOutcome<T> {
164
2112
            // We work with one session delay to calculate assignments
165
2112
            let session_delay = T::SessionIndex::one();
166
2112
            let target_session_index = current_session_index.saturating_add(session_delay);
167
2112
            // We get the containerChains that we will have at the target session
168
2112
            let container_chains =
169
2112
                T::ContainerChains::session_container_chains(target_session_index);
170
2112
            let num_total_registered_paras =
171
2112
                (container_chains.parachains.len() + container_chains.parathreads.len()) as u32;
172
2112
            let mut container_chain_ids = container_chains.parachains;
173
2112
            let mut parathreads: Vec<_> = container_chains
174
2112
                .parathreads
175
2112
                .into_iter()
176
2112
                .map(|(para_id, _)| para_id)
177
2112
                .collect();
178
2112

            
179
2112
            // We read current assigned collators
180
2112
            let old_assigned = Self::read_assigned_collators();
181
2112
            let old_assigned_para_ids: BTreeSet<ParaId> =
182
2112
                old_assigned.container_chains.keys().cloned().collect();
183
2112

            
184
2112
            // Remove the containerChains that do not have enough credits for block production
185
2112
            T::RemoveParaIdsWithNoCredits::remove_para_ids_with_no_credits(
186
2112
                &mut container_chain_ids,
187
2112
                &old_assigned_para_ids,
188
2112
            );
189
2112
            // TODO: parathreads should be treated a bit differently, they don't need to have the same amount of credits
190
2112
            // as parathreads because they will not be producing blocks on every slot.
191
2112
            T::RemoveParaIdsWithNoCredits::remove_para_ids_with_no_credits(
192
2112
                &mut parathreads,
193
2112
                &old_assigned_para_ids,
194
2112
            );
195
2112

            
196
2112
            let mut shuffle_collators = None;
197
2112
            // If the random_seed is all zeros, we don't shuffle the list of collators nor the list
198
2112
            // of container chains.
199
2112
            // This should only happen in tests, and in the genesis block.
200
2112
            if random_seed != [0; 32] {
201
74
                let mut rng: ChaCha20Rng = SeedableRng::from_seed(random_seed);
202
74
                container_chain_ids.shuffle(&mut rng);
203
74
                parathreads.shuffle(&mut rng);
204
74
                shuffle_collators = Some(move |collators: &mut Vec<T::AccountId>| {
205
74
                    collators.shuffle(&mut rng);
206
74
                })
207
2038
            }
208

            
209
2112
            let orchestrator_chain: ChainNumCollators = if T::ForceEmptyOrchestrator::get() {
210
249
                ChainNumCollators {
211
249
                    para_id: T::SelfParaId::get(),
212
249
                    min_collators: 0u32,
213
249
                    max_collators: 0u32,
214
249
                }
215
            } else {
216
1863
                ChainNumCollators {
217
1863
                    para_id: T::SelfParaId::get(),
218
1863
                    min_collators: T::HostConfiguration::min_collators_for_orchestrator(
219
1863
                        target_session_index,
220
1863
                    ),
221
1863
                    max_collators: T::HostConfiguration::max_collators_for_orchestrator(
222
1863
                        target_session_index,
223
1863
                    ),
224
1863
                }
225
            };
226

            
227
            // Initialize list of chains as `[container1, container2, parathread1, parathread2]`.
228
            // The order means priority: the first chain in the list will be the first one to get assigned collators.
229
            // Chains will not be assigned less than `min_collators`, except the orchestrator chain.
230
            // First all chains will be assigned `min_collators`, and then the first one will be assigned up to `max`,
231
            // then the second one, and so on.
232
2112
            let mut chains = vec![];
233
2112
            let collators_per_container =
234
2112
                T::HostConfiguration::collators_per_container(target_session_index);
235
4877
            for para_id in &container_chain_ids {
236
2765
                chains.push(ChainNumCollators {
237
2765
                    para_id: *para_id,
238
2765
                    min_collators: collators_per_container,
239
2765
                    max_collators: collators_per_container,
240
2765
                });
241
2765
            }
242
2112
            let collators_per_parathread =
243
2112
                T::HostConfiguration::collators_per_parathread(target_session_index);
244
2192
            for para_id in &parathreads {
245
80
                chains.push(ChainNumCollators {
246
80
                    para_id: *para_id,
247
80
                    min_collators: collators_per_parathread,
248
80
                    max_collators: collators_per_parathread,
249
80
                });
250
80
            }
251

            
252
            // Are there enough collators to satisfy the minimum demand?
253
2112
            let enough_collators_for_all_chain = collators.len() as u32
254
2112
                >= T::HostConfiguration::min_collators_for_orchestrator(target_session_index)
255
2112
                    .saturating_add(
256
2112
                        collators_per_container.saturating_mul(container_chain_ids.len() as u32),
257
2112
                    )
258
2112
                    .saturating_add(
259
2112
                        collators_per_parathread.saturating_mul(parathreads.len() as u32),
260
2112
                    );
261
2112

            
262
2112
            // Prioritize paras by tip on congestion
263
2112
            // As of now this doesn't distinguish between parachains and parathreads
264
2112
            // TODO apply different logic to parathreads
265
2112
            if !enough_collators_for_all_chain {
266
1331
                chains.sort_by(|a, b| {
267
1232
                    T::CollatorAssignmentTip::get_para_tip(b.para_id)
268
1232
                        .cmp(&T::CollatorAssignmentTip::get_para_tip(a.para_id))
269
1331
                });
270
1341
            }
271

            
272
            // We assign new collators
273
            // we use the config scheduled at the target_session_index
274
2112
            let new_assigned =
275
2112
                if T::ShouldRotateAllCollators::should_rotate_all_collators(target_session_index) {
276
132
                    log::debug!(
277
                        "Collator assignment: rotating collators. Session {:?}, Seed: {:?}",
278
                        current_session_index.encode(),
279
                        random_seed
280
                    );
281

            
282
132
                    Self::deposit_event(Event::NewPendingAssignment {
283
132
                        random_seed,
284
132
                        full_rotation: true,
285
132
                        target_session: target_session_index,
286
132
                    });
287
132

            
288
132
                    Assignment::<T>::assign_collators_rotate_all(
289
132
                        collators,
290
132
                        orchestrator_chain,
291
132
                        chains,
292
132
                        shuffle_collators,
293
132
                    )
294
                } else {
295
1980
                    log::debug!(
296
                        "Collator assignment: keep old assigned. Session {:?}, Seed: {:?}",
297
                        current_session_index.encode(),
298
                        random_seed
299
                    );
300

            
301
1980
                    Self::deposit_event(Event::NewPendingAssignment {
302
1980
                        random_seed,
303
1980
                        full_rotation: false,
304
1980
                        target_session: target_session_index,
305
1980
                    });
306
1980

            
307
1980
                    Assignment::<T>::assign_collators_always_keep_old(
308
1980
                        collators,
309
1980
                        orchestrator_chain,
310
1980
                        chains,
311
1980
                        old_assigned.clone(),
312
1980
                        shuffle_collators,
313
1980
                    )
314
                };
315

            
316
2112
            let mut new_assigned = match new_assigned {
317
1979
                Ok(x) => x,
318
133
                Err(e) => {
319
133
                    log::error!(
320
5
                        "Error in collator assignment, will keep previous assignment. {:?}",
321
                        e
322
                    );
323

            
324
133
                    old_assigned.clone()
325
                }
326
            };
327

            
328
2112
            let mut assigned_containers = new_assigned.container_chains.clone();
329
2880
            assigned_containers.retain(|_, v| !v.is_empty());
330

            
331
            // On congestion, prioritized chains need to pay the minimum tip of the prioritized chains
332
2112
            let maybe_tip: Option<BalanceOf<T>> = if enough_collators_for_all_chain {
333
859
                None
334
            } else {
335
1253
                assigned_containers
336
1253
                    .into_keys()
337
1253
                    .filter_map(T::CollatorAssignmentTip::get_para_tip)
338
1253
                    .min()
339
            };
340

            
341
            // TODO: this probably is asking for a refactor
342
            // only apply the onCollatorAssignedHook if sufficient collators
343
4877
            for para_id in &container_chain_ids {
344
2765
                if !new_assigned
345
2765
                    .container_chains
346
2765
                    .get(para_id)
347
2765
                    .unwrap_or(&vec![])
348
2765
                    .is_empty()
349
                {
350
1320
                    if let Err(e) = T::CollatorAssignmentHook::on_collators_assigned(
351
1320
                        *para_id,
352
1320
                        maybe_tip.as_ref(),
353
1320
                        false,
354
1320
                    ) {
355
                        // On error remove para from assignment
356
2
                        log::warn!(
357
                            "CollatorAssignmentHook error! Removing para {} from assignment: {:?}",
358
                            u32::from(*para_id),
359
                            e
360
                        );
361
2
                        new_assigned.container_chains.remove(para_id);
362
1318
                    }
363
1445
                }
364
            }
365

            
366
2192
            for para_id in &parathreads {
367
80
                if !new_assigned
368
80
                    .container_chains
369
80
                    .get(para_id)
370
80
                    .unwrap_or(&vec![])
371
80
                    .is_empty()
372
                {
373
69
                    if let Err(e) = T::CollatorAssignmentHook::on_collators_assigned(
374
69
                        *para_id,
375
69
                        maybe_tip.as_ref(),
376
69
                        true,
377
69
                    ) {
378
                        // On error remove para from assignment
379
                        log::warn!(
380
                            "CollatorAssignmentHook error! Removing para {} from assignment: {:?}",
381
                            u32::from(*para_id),
382
                            e
383
                        );
384
                        new_assigned.container_chains.remove(para_id);
385
69
                    }
386
11
                }
387
            }
388

            
389
2112
            let mut pending = PendingCollatorContainerChain::<T>::get();
390
2112

            
391
2112
            let old_assigned_changed = old_assigned != new_assigned;
392
2112
            let mut pending_changed = false;
393
            // Update CollatorContainerChain using last entry of pending, if needed
394
2112
            if let Some(current) = pending.take() {
395
653
                pending_changed = true;
396
653
                CollatorContainerChain::<T>::put(current);
397
1478
            }
398
2112
            if old_assigned_changed {
399
883
                pending = Some(new_assigned.clone());
400
883
                pending_changed = true;
401
1524
            }
402
            // Update PendingCollatorContainerChain, if it changed
403
2112
            if pending_changed {
404
1260
                PendingCollatorContainerChain::<T>::put(pending);
405
1633
            }
406

            
407
            // Only applies to session index 0
408
2112
            if current_session_index == &T::SessionIndex::zero() {
409
597
                CollatorContainerChain::<T>::put(new_assigned.clone());
410
597
                return SessionChangeOutcome {
411
597
                    active_assignment: new_assigned.clone(),
412
597
                    next_assignment: new_assigned,
413
597
                    num_total_registered_paras,
414
597
                };
415
1515
            }
416
1515

            
417
1515
            SessionChangeOutcome {
418
1515
                active_assignment: old_assigned,
419
1515
                next_assignment: new_assigned,
420
1515
                num_total_registered_paras,
421
1515
            }
422
2112
        }
423

            
424
        // Returns the assigned collators as read from storage.
425
        // If there is any item in PendingCollatorContainerChain, returns that element.
426
        // Otherwise, reads and returns the current CollatorContainerChain
427
2112
        fn read_assigned_collators() -> AssignedCollators<T::AccountId> {
428
2112
            let mut pending_collator_list = PendingCollatorContainerChain::<T>::get();
429

            
430
2112
            if let Some(assigned_collators) = pending_collator_list.take() {
431
653
                assigned_collators
432
            } else {
433
                // Read current
434
1459
                CollatorContainerChain::<T>::get()
435
            }
436
2112
        }
437

            
438
2112
        pub fn initializer_on_new_session(
439
2112
            session_index: &T::SessionIndex,
440
2112
            collators: Vec<T::AccountId>,
441
2112
        ) -> SessionChangeOutcome<T> {
442
2112
            let random_seed = Randomness::<T>::take();
443
2112
            let num_collators = collators.len();
444
2112
            let assigned_collators = Self::assign_collators(session_index, random_seed, collators);
445
2112
            let num_total_registered_paras = assigned_collators.num_total_registered_paras;
446
2112

            
447
2112
            frame_system::Pallet::<T>::register_extra_weight_unchecked(
448
2112
                T::WeightInfo::new_session(num_collators as u32, num_total_registered_paras),
449
2112
                DispatchClass::Mandatory,
450
2112
            );
451
2112

            
452
2112
            assigned_collators
453
2112
        }
454

            
455
26895
        pub fn collator_container_chain() -> AssignedCollators<T::AccountId> {
456
26895
            CollatorContainerChain::<T>::get()
457
26895
        }
458

            
459
35
        pub fn pending_collator_container_chain() -> Option<AssignedCollators<T::AccountId>> {
460
35
            PendingCollatorContainerChain::<T>::get()
461
35
        }
462

            
463
        pub fn randomness() -> [u8; 32] {
464
            Randomness::<T>::get()
465
        }
466
    }
467

            
468
    impl<T: Config> GetContainerChainAuthor<T::AccountId> for Pallet<T> {
469
        // TODO: pending collator container chain if the block is a session change!
470
26662
        fn author_for_slot(slot: Slot, para_id: ParaId) -> Option<T::AccountId> {
471
26662
            let assigned_collators = Pallet::<T>::collator_container_chain();
472
26662
            let collators = if para_id == T::SelfParaId::get() {
473
13202
                Some(&assigned_collators.orchestrator_chain)
474
            } else {
475
13460
                assigned_collators.container_chains.get(&para_id)
476
754
            }?;
477

            
478
25908
            if collators.is_empty() {
479
                // Avoid division by zero below
480
6128
                return None;
481
19780
            }
482
19780
            let author_index = u64::from(slot) % collators.len() as u64;
483
19780
            collators.get(author_index as usize).cloned()
484
26662
        }
485

            
486
        #[cfg(feature = "runtime-benchmarks")]
487
        fn set_authors_for_para_id(para_id: ParaId, authors: Vec<T::AccountId>) {
488
            let mut assigned_collators = Pallet::<T>::collator_container_chain();
489
            assigned_collators.container_chains.insert(para_id, authors);
490
            CollatorContainerChain::<T>::put(assigned_collators);
491
        }
492
    }
493

            
494
19845
    #[pallet::hooks]
495
    impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
496
10607
        fn on_initialize(n: BlockNumberFor<T>) -> Weight {
497
10607
            let mut weight = Weight::zero();
498
10607

            
499
10607
            // Account reads and writes for on_finalize
500
10607
            if T::GetRandomnessForNextBlock::should_end_session(n.saturating_add(One::one())) {
501
968
                weight += T::DbWeight::get().reads_writes(1, 1);
502
9639
            }
503

            
504
10607
            weight
505
10607
        }
506

            
507
10440
        fn on_finalize(n: BlockNumberFor<T>) {
508
10440
            // If the next block is a session change, read randomness and store in pallet storage
509
10440
            if T::GetRandomnessForNextBlock::should_end_session(n.saturating_add(One::one())) {
510
965
                let random_seed = T::GetRandomnessForNextBlock::get_randomness();
511
965
                Randomness::<T>::put(random_seed);
512
9475
            }
513
10440
        }
514
    }
515
}
516

            
517
/// Balance used by this pallet
518
pub type BalanceOf<T> =
519
    <<T as Config>::Currency as Currency<<T as frame_system::Config>::AccountId>>::Balance;
520

            
521
pub struct RotateCollatorsEveryNSessions<Period>(PhantomData<Period>);
522

            
523
impl<Period> ShouldRotateAllCollators<u32> for RotateCollatorsEveryNSessions<Period>
524
where
525
    Period: Get<u32>,
526
{
527
1540
    fn should_rotate_all_collators(session_index: u32) -> bool {
528
1540
        let period = Period::get();
529
1540

            
530
1540
        if period == 0 {
531
            // A period of 0 disables rotation
532
74
            false
533
        } else {
534
1466
            session_index % Period::get() == 0
535
        }
536
1540
    }
537
}
538

            
539
pub trait GetRandomnessForNextBlock<BlockNumber> {
540
    fn should_end_session(block_number: BlockNumber) -> bool;
541
    fn get_randomness() -> [u8; 32];
542
}
543

            
544
impl<BlockNumber> GetRandomnessForNextBlock<BlockNumber> for () {
545
2073
    fn should_end_session(_block_number: BlockNumber) -> bool {
546
2073
        false
547
2073
    }
548

            
549
    fn get_randomness() -> [u8; 32] {
550
        [0; 32]
551
    }
552
}