1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! Container Chain Spawner
18
//!
19
//! Controls the starting and stopping of container chains.
20
//!
21
//! For more information about when the database is deleted, check the
22
//! [Keep db flowchart](https://raw.githubusercontent.com/moondance-labs/tanssi/master/docs/keep_db_flowchart.png)
23

            
24
use {
25
    crate::{
26
        cli::ContainerChainCli,
27
        container_chain_monitor::{SpawnedContainer, SpawnedContainersMonitor},
28
        service::{start_node_impl_container, ContainerChainClient, ParachainClient},
29
    },
30
    cumulus_primitives_core::ParaId,
31
    cumulus_relay_chain_interface::RelayChainInterface,
32
    dancebox_runtime::{opaque::Block as OpaqueBlock, Block},
33
    dc_orchestrator_chain_interface::OrchestratorChainInterface,
34
    fs2::FileExt,
35
    futures::FutureExt,
36
    node_common::command::generate_genesis_block,
37
    pallet_author_noting_runtime_api::AuthorNotingApi,
38
    pallet_registrar_runtime_api::RegistrarApi,
39
    polkadot_primitives::CollatorPair,
40
    sc_cli::{Database, SyncMode},
41
    sc_network::config::MultiaddrWithPeerId,
42
    sc_service::SpawnTaskHandle,
43
    sc_transaction_pool::FullPool,
44
    sp_api::ProvideRuntimeApi,
45
    sp_core::H256,
46
    sp_keystore::KeystorePtr,
47
    sp_runtime::traits::Block as BlockT,
48
    std::{
49
        collections::{HashMap, HashSet},
50
        path::{Path, PathBuf},
51
        sync::{Arc, Mutex},
52
        time::Instant,
53
    },
54
    tokio::{
55
        sync::{mpsc, oneshot},
56
        time::{sleep, Duration},
57
    },
58
    tokio_util::sync::CancellationToken,
59
};
60

            
61
/// Timeout to wait for the database to close before starting it again, used in `wait_for_paritydb_lock`.
62
/// This is the max timeout, if the db is closed in 1 second then that function will only wait 1 second.
63
const MAX_DB_RESTART_TIMEOUT: Duration = Duration::from_secs(60);
64

            
65
/// Block diff threshold above which we decide it will be faster to delete the database and
66
/// use warp sync, rather than using full sync to download a large number of blocks.
67
/// This is only needed because warp sync does not support syncing from a state that is not
68
/// genesis, it falls back to full sync in that case.
69
/// 30_000 blocks = 50 hours at 6s/block.
70
/// Assuming a syncing speed of 100 blocks per second, this will take 5 minutes to sync.
71
const MAX_BLOCK_DIFF_FOR_FULL_SYNC: u32 = 30_000;
72

            
73
/// Task that handles spawning a stopping container chains based on assignment.
74
/// The main loop is [rx_loop](ContainerChainSpawner::rx_loop).
75
pub struct ContainerChainSpawner {
76
    /// Start container chain params
77
    pub params: ContainerChainSpawnParams,
78

            
79
    /// State
80
    pub state: Arc<Mutex<ContainerChainSpawnerState>>,
81

            
82
    /// Async callback that enables collation on the orchestrator chain
83
    pub collate_on_tanssi:
84
        Arc<dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync>,
85
    /// Stores the cancellation token used to stop the orchestrator chain collator process.
86
    /// When this is None, the orchestrator collator is not running.
87
    pub collation_cancellation_constructs:
88
        Option<(CancellationToken, futures::channel::oneshot::Receiver<()>)>,
89
}
90

            
91
/// Struct with all the params needed to start a container chain node given the CLI arguments,
92
/// and creating the ChainSpec from on-chain data from the orchestrator chain.
93
/// These params must be the same for all container chains, params that change such as the
94
/// `container_chain_para_id` should be passed as separate arguments to the [try_spawn] function.
95
#[derive(Clone)]
96
pub struct ContainerChainSpawnParams {
97
    pub orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
98
    pub orchestrator_client: Arc<ParachainClient>,
99
    pub orchestrator_tx_pool: Arc<FullPool<OpaqueBlock, ParachainClient>>,
100
    pub container_chain_cli: ContainerChainCli,
101
    pub tokio_handle: tokio::runtime::Handle,
102
    pub chain_type: sc_chain_spec::ChainType,
103
    pub relay_chain: String,
104
    pub relay_chain_interface: Arc<dyn RelayChainInterface>,
105
    pub collator_key: Option<CollatorPair>,
106
    pub sync_keystore: KeystorePtr,
107
    pub orchestrator_para_id: ParaId,
108
    pub validator: bool,
109
    pub spawn_handle: SpawnTaskHandle,
110
}
111

            
112
/// Mutable state for container chain spawner. Keeps track of running chains.
113
#[derive(Default)]
114
pub struct ContainerChainSpawnerState {
115
    spawned_container_chains: HashMap<ParaId, ContainerChainState>,
116
    assigned_para_id: Option<ParaId>,
117
    next_assigned_para_id: Option<ParaId>,
118
    failed_para_ids: HashSet<ParaId>,
119
    // For debugging and detecting errors
120
    pub spawned_containers_monitor: SpawnedContainersMonitor,
121
}
122

            
123
pub struct ContainerChainState {
124
    /// Handle that can be used to stop the container chain
125
    stop_handle: StopContainerChain,
126
    /// Database path
127
    db_path: PathBuf,
128
}
129

            
130
/// Stops a container chain when signal is sent. The bool means `keep_db`, whether to keep the
131
/// container chain database (true) or remove it (false).
132
pub struct StopContainerChain {
133
    signal: oneshot::Sender<bool>,
134
    id: usize,
135
}
136

            
137
/// Messages used to control the `ContainerChainSpawner`. This is needed because one of the fields
138
/// of `ContainerChainSpawner` is not `Sync`, so we cannot simply pass an
139
/// `Arc<ContainerChainSpawner>` to other threads.
140
#[derive(Debug)]
141
pub enum CcSpawnMsg {
142
    /// Update container chain assignment
143
    UpdateAssignment {
144
        current: Option<ParaId>,
145
        next: Option<ParaId>,
146
    },
147
}
148

            
149
// Separate function to allow using `?` to return a result, and also to avoid using `self` in an
150
// async function. Mutable state should be written by locking `state`.
151
// TODO: `state` should be an async mutex
152
async fn try_spawn(
153
    try_spawn_params: ContainerChainSpawnParams,
154
    state: Arc<Mutex<ContainerChainSpawnerState>>,
155
    container_chain_para_id: ParaId,
156
    start_collation: bool,
157
) -> sc_service::error::Result<()> {
158
    let ContainerChainSpawnParams {
159
        orchestrator_chain_interface,
160
        orchestrator_client,
161
        mut container_chain_cli,
162
        tokio_handle,
163
        chain_type,
164
        relay_chain,
165
        relay_chain_interface,
166
        collator_key,
167
        sync_keystore,
168
        orchestrator_para_id,
169
        validator,
170
        spawn_handle,
171
        orchestrator_tx_pool,
172
    } = try_spawn_params;
173
    // Preload genesis data from orchestrator chain storage.
174

            
175
    // TODO: the orchestrator chain node may not be fully synced yet,
176
    // in that case we will be reading an old state.
177
    let orchestrator_chain_info = orchestrator_client.chain_info();
178
    log::info!(
179
        "Reading container chain genesis data from orchestrator chain at block #{} {}",
180
        orchestrator_chain_info.best_number,
181
        orchestrator_chain_info.best_hash,
182
    );
183
    let orchestrator_runtime_api = orchestrator_client.runtime_api();
184

            
185
    log::info!(
186
        "Detected assignment for container chain {}",
187
        container_chain_para_id
188
    );
189

            
190
    let genesis_data = orchestrator_runtime_api
191
        .genesis_data(orchestrator_chain_info.best_hash, container_chain_para_id)
192
        .map_err(|e| format!("Failed to call genesis_data runtime api: {}", e))?
193
        .ok_or_else(|| {
194
            format!(
195
                "No genesis data registered for container chain id {}",
196
                container_chain_para_id
197
            )
198
        })?;
199

            
200
    let boot_nodes_raw = orchestrator_runtime_api
201
        .boot_nodes(orchestrator_chain_info.best_hash, container_chain_para_id)
202
        .map_err(|e| format!("Failed to call boot_nodes runtime api: {}", e))?;
203
    if boot_nodes_raw.is_empty() {
204
        log::warn!(
205
            "No boot nodes registered on-chain for container chain {}",
206
            container_chain_para_id
207
        );
208
    }
209
    let boot_nodes = parse_boot_nodes_ignore_invalid(boot_nodes_raw, container_chain_para_id);
210
    if boot_nodes.is_empty() {
211
        log::warn!(
212
            "No valid boot nodes for container chain {}",
213
            container_chain_para_id
214
        );
215
    }
216

            
217
    container_chain_cli
218
        .preload_chain_spec_from_genesis_data(
219
            container_chain_para_id.into(),
220
            genesis_data,
221
            chain_type.clone(),
222
            relay_chain.clone(),
223
            boot_nodes,
224
        )
225
        .map_err(|e| {
226
            format!(
227
                "failed to create container chain chain spec from on chain genesis data: {}",
228
                e
229
            )
230
        })?;
231

            
232
    log::info!(
233
        "Loaded chain spec for container chain {}",
234
        container_chain_para_id
235
    );
236

            
237
    if !start_collation {
238
        log::info!("This is a syncing container chain, using random ports");
239
        // Use random ports to avoid conflicts with the other running container chain
240
        let random_ports = [23456, 23457, 23458];
241
        container_chain_cli
242
            .base
243
            .base
244
            .prometheus_params
245
            .prometheus_port = Some(random_ports[0]);
246
        container_chain_cli.base.base.network_params.port = Some(random_ports[1]);
247
        container_chain_cli.base.base.rpc_port = Some(random_ports[2]);
248
    }
249

            
250
    // Update CLI params
251
    container_chain_cli.base.para_id = Some(container_chain_para_id.into());
252
    container_chain_cli
253
        .base
254
        .base
255
        .import_params
256
        .database_params
257
        .database = Some(Database::ParityDb);
258

            
259
    let keep_db = container_chain_cli.base.keep_db;
260

            
261
    // Get a closure that checks if db_path exists.Need this to know when to use full sync instead of warp sync.
262
    let check_db_exists = {
263
        // Get db_path from config
264
        let mut container_chain_cli_config = sc_cli::SubstrateCli::create_configuration(
265
            &container_chain_cli,
266
            &container_chain_cli,
267
            tokio_handle.clone(),
268
        )
269
        .map_err(|err| format!("Container chain argument error: {}", err))?;
270

            
271
        // Change database path to make it depend on container chain para id
272
        // So instead of the usual "db/full" we have "db/full-container-2000"
273
        let mut db_path = container_chain_cli_config
274
            .database
275
            .path()
276
            .ok_or_else(|| "Failed to get database path".to_string())?
277
            .to_owned();
278
        db_path.set_file_name(format!("full-container-{}", container_chain_para_id));
279
        container_chain_cli_config.database.set_path(&db_path);
280

            
281
        // Return a closure because we may need to check if the db exists multiple times
282
        move || db_path.exists()
283
    };
284

            
285
    // Start container chain node. After starting, check if the database is good or needs to
286
    // be removed. If the db needs to be removed, this function will handle the node restart, and
287
    // return the components of a running container chain node.
288
    // This should be a separate function, but it has so many arguments that I prefer to have it as a closure for now
289
    let start_node_impl_container_with_restart = || async move {
290
        // Loop will run at most 2 times: 1 time if the db is good and 2 times if the db needs to be removed
291
        for _ in 0..2 {
292
            let db_existed_before = check_db_exists();
293
            container_chain_cli.base.base.network_params.sync = select_sync_mode(
294
                db_existed_before,
295
                &orchestrator_client,
296
                container_chain_para_id,
297
            )?;
298
            log::info!(
299
                "Container chain sync mode: {:?}",
300
                container_chain_cli.base.base.network_params.sync
301
            );
302

            
303
            let mut container_chain_cli_config = sc_cli::SubstrateCli::create_configuration(
304
                &container_chain_cli,
305
                &container_chain_cli,
306
                tokio_handle.clone(),
307
            )
308
            .map_err(|err| format!("Container chain argument error: {}", err))?;
309

            
310
            // Change database path to make it depend on container chain para id
311
            // So instead of the usual "db/full" we have "db/full-container-2000"
312
            let mut db_path = container_chain_cli_config
313
                .database
314
                .path()
315
                .ok_or_else(|| "Failed to get database path".to_string())?
316
                .to_owned();
317
            db_path.set_file_name(format!("full-container-{}", container_chain_para_id));
318
            container_chain_cli_config.database.set_path(&db_path);
319

            
320
            let (container_chain_task_manager, container_chain_client, container_chain_db) =
321
                start_node_impl_container(
322
                    container_chain_cli_config,
323
                    orchestrator_client.clone(),
324
                    orchestrator_tx_pool.clone(),
325
                    relay_chain_interface.clone(),
326
                    orchestrator_chain_interface.clone(),
327
                    collator_key.clone(),
328
                    sync_keystore.clone(),
329
                    container_chain_para_id,
330
                    orchestrator_para_id,
331
                    validator && start_collation,
332
                )
333
                .await?;
334

            
335
            // Keep all node parts in one variable to make them easier to drop
336
            let node_parts = (
337
                container_chain_task_manager,
338
                container_chain_client,
339
                container_chain_db,
340
                db_path,
341
            );
342

            
343
            if db_existed_before {
344
                // If the database already existed before, check if it can be used or it needs to be removed.
345
                // To remove the database, we restart the node, wait for the db to close to avoid a
346
                // "shutdown error" log, and then remove it.
347
                if let Some(db_removal_reason) = db_needs_removal(
348
                    &node_parts.1,
349
                    &orchestrator_client,
350
                    container_chain_para_id,
351
                    &container_chain_cli,
352
                    container_chain_cli.base.keep_db,
353
                )? {
354
                    let db_path = node_parts.3.clone();
355
                    // Important, drop `node_parts` before trying to `wait_for_paritydb_lock`
356
                    drop(node_parts);
357
                    // Wait here to for the database created in the previous loop iteration to close.
358
                    // Dropping is not enough because there is some background process that keeps the database open,
359
                    // so we check the paritydb lock file directly.
360
                    log::info!(
361
                        "Restarting container chain {} after db deletion. Reason: {:?}",
362
                        container_chain_para_id,
363
                        db_removal_reason,
364
                    );
365
                    wait_for_paritydb_lock(&db_path, MAX_DB_RESTART_TIMEOUT)
366
                        .await
367
                        .map_err(|e| {
368
                            log::warn!(
369
                                "Error waiting for chain {} to release db lock: {:?}",
370
                                container_chain_para_id,
371
                                e
372
                            );
373

            
374
                            e
375
                        })?;
376
                    delete_container_chain_db(&db_path);
377

            
378
                    // Recursion, will only happen once because `db_existed_before` will be false after
379
                    // removing the db. Apparently closures cannot be recursive so fake recursion by
380
                    // using a loop + continue
381
                    continue;
382
                }
383
            }
384

            
385
            // If using full sync, print a warning if the local db is at block 0 and the chain has thousands of blocks
386
            if container_chain_cli.base.base.network_params.sync == SyncMode::Full {
387
                let last_container_block_temp = node_parts.1.chain_info().best_number;
388
                let cc_block_num = get_latest_container_block_number_from_orchestrator(
389
                    &orchestrator_client,
390
                    container_chain_para_id,
391
                )
392
                .unwrap_or(0);
393
                if last_container_block_temp == 0 && cc_block_num > MAX_BLOCK_DIFF_FOR_FULL_SYNC {
394
                    let db_folder = format!("full-container-{}", container_chain_para_id);
395
                    log::error!("\
396
                        Existing database for container chain {} is at block 0, assuming that warp sync failed.\n\
397
                        The node will now use full sync, which has to download {} blocks.\n\
398
                        If running as collator, it may not finish syncing on time and miss block rewards.\n\
399
                        To force using warp sync, stop tanssi-node and manually remove the db folder: {:?}\n\
400
                        ", container_chain_para_id, cc_block_num, db_folder)
401
                }
402
            }
403

            
404
            return sc_service::error::Result::Ok(node_parts);
405
        }
406

            
407
        unreachable!("Above loop can run at most 2 times, and in the second iteration it is guaranteed to return")
408
    };
409

            
410
    let (mut container_chain_task_manager, container_chain_client, container_chain_db, db_path) =
411
        start_node_impl_container_with_restart().await?;
412

            
413
    // Signal that allows to gracefully stop a container chain
414
    let (signal, on_exit) = oneshot::channel::<bool>();
415

            
416
    let monitor_id;
417
    {
418
        let mut state = state.lock().expect("poison error");
419

            
420
        monitor_id = state.spawned_containers_monitor.push(SpawnedContainer {
421
            id: 0,
422
            para_id: container_chain_para_id,
423
            start_time: Instant::now(),
424
            stop_signal_time: None,
425
            stop_task_manager_time: None,
426
            stop_refcount_time: Default::default(),
427
            backend: Arc::downgrade(&container_chain_db),
428
            client: Arc::downgrade(&container_chain_client),
429
        });
430

            
431
        if state
432
            .spawned_container_chains
433
            .contains_key(&container_chain_para_id)
434
        {
435
            return Err(format!("Tried to spawn a container chain when another container chain with the same para id was already running: {:?}", container_chain_para_id).into());
436
        }
437
        state.spawned_container_chains.insert(
438
            container_chain_para_id,
439
            ContainerChainState {
440
                stop_handle: StopContainerChain {
441
                    signal,
442
                    id: monitor_id,
443
                },
444
                db_path: db_path.clone(),
445
            },
446
        );
447
    }
448

            
449
    // Add the container chain task manager as a child task to the parent task manager.
450
    // We want to stop the node if this task manager stops, but we also want to allow a
451
    // graceful shutdown using the `on_exit` future.
452
    let name = "container-chain-task-manager";
453
    spawn_handle.spawn(name, None, async move {
454
        let mut container_chain_task_manager_future =
455
            container_chain_task_manager.future().fuse();
456
        let mut on_exit_future = on_exit.fuse();
457

            
458
        futures::select! {
459
            res1 = container_chain_task_manager_future => {
460
                // An essential task failed or the task manager was stopped unexpectedly
461
                // using `.terminate()`. This should stop the container chain but not the node.
462
                if res1.is_err() {
463
                    log::error!("Essential task failed in container chain {} task manager. Shutting down container chain service", container_chain_para_id);
464
                } else {
465
                    log::error!("Unexpected shutdown in container chain {} task manager. Shutting down container chain service", container_chain_para_id);
466
                }
467
                // Mark this container chain as "failed to stop" to avoid warning in `self.stop()`
468
                let mut state = state.lock().expect("poison error");
469
                state.failed_para_ids.insert(container_chain_para_id);
470
                // Never delete db in this case because it is not a graceful shutdown
471
            }
472
            stop_unassigned = on_exit_future => {
473
                // Graceful shutdown.
474
                // `stop_unassigned` will be `Ok(keep_db)` if `.stop()` has been called, which means that the
475
                // container chain has been unassigned, and will be `Err` if the handle has been dropped,
476
                // which means that the node is stopping.
477
                // Delete existing database if running as collator
478
                if validator && stop_unassigned == Ok(false) && !keep_db {
479
                    // If this breaks after a code change, make sure that all the variables that
480
                    // may keep the chain alive are dropped before the call to `wait_for_paritydb_lock`.
481
                    drop(container_chain_task_manager_future);
482
                    drop(container_chain_task_manager);
483
                    let db_closed = wait_for_paritydb_lock(&db_path, MAX_DB_RESTART_TIMEOUT)
484
                        .await
485
                        .map_err(|e| {
486
                            log::warn!(
487
                                "Error waiting for chain {} to release db lock: {:?}",
488
                                container_chain_para_id,
489
                                e
490
                            );
491
                        }).is_ok();
492
                    // If db has not closed in 60 seconds we do not delete it.
493
                    if db_closed {
494
                        delete_container_chain_db(&db_path);
495
                    }
496
                }
497
            }
498
        }
499

            
500
        let mut state = state.lock().expect("poison error");
501
        state
502
            .spawned_containers_monitor
503
            .set_stop_task_manager_time(monitor_id, Instant::now());
504
    });
505

            
506
    Ok(())
507
}
508

            
509
impl ContainerChainSpawner {
510
    /// Try to start a new container chain. In case of an error, this does not stop the node, and
511
    /// the container chain will be attempted to spawn again when the collator is reassigned to it.
512
    ///
513
    /// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails
514
    /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop,
515
    /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in
516
    /// `handle_update_assignment`.
517
    async fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) {
518
        let try_spawn_params = self.params.clone();
519
        let state = self.state.clone();
520
        let state2 = state.clone();
521

            
522
        match try_spawn(
523
            try_spawn_params,
524
            state,
525
            container_chain_para_id,
526
            start_collation,
527
        )
528
        .await
529
        {
530
            Ok(()) => {}
531
            Err(e) => {
532
                log::error!(
533
                    "Failed to start container chain {}: {}",
534
                    container_chain_para_id,
535
                    e
536
                );
537
                // Mark this container chain as "failed to start"
538
                let mut state = state2.lock().expect("poison error");
539
                state.failed_para_ids.insert(container_chain_para_id);
540
            }
541
        }
542
    }
543

            
544
    /// Stop a container chain. Prints a warning if the container chain was not running.
545
    /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
546
    /// to ensure that the container chain has fully stopped. The database path can be `None` if the
547
    /// chain was not running.
548
    fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
549
        let mut state = self.state.lock().expect("poison error");
550
        let stop_handle = state
551
            .spawned_container_chains
552
            .remove(&container_chain_para_id);
553

            
554
        match stop_handle {
555
            Some(stop_handle) => {
556
                log::info!("Stopping container chain {}", container_chain_para_id);
557

            
558
                let id = stop_handle.stop_handle.id;
559
                state
560
                    .spawned_containers_monitor
561
                    .set_stop_signal_time(id, Instant::now());
562

            
563
                // Send signal to perform graceful shutdown, which will delete the db if needed
564
                let _ = stop_handle.stop_handle.signal.send(keep_db);
565

            
566
                Some(stop_handle.db_path)
567
            }
568
            None => {
569
                // Do not print the warning message if this is a container chain that has failed to
570
                // start, because in that case it will not be running
571
                if !state.failed_para_ids.remove(&container_chain_para_id) {
572
                    log::warn!(
573
                        "Tried to stop a container chain that is not running: {}",
574
                        container_chain_para_id
575
                    );
576
                }
577

            
578
                None
579
            }
580
        }
581
    }
582

            
583
    /// Receive and process `CcSpawnMsg`s indefinitely
584
    pub async fn rx_loop(mut self, mut rx: mpsc::UnboundedReceiver<CcSpawnMsg>, validator: bool) {
585
        // The node always starts as an orchestrator chain collator.
586
        // This is because the assignment is detected after importing a new block, so if all
587
        // collators stop at the same time, when they start again nobody will produce the new block.
588
        // So all nodes start as orchestrator chain collators, until the first block is imported,
589
        // then the real assignment is used.
590
        if validator {
591
            self.handle_update_assignment(Some(self.params.orchestrator_para_id), None)
592
                .await;
593
        }
594

            
595
        while let Some(msg) = rx.recv().await {
596
            match msg {
597
                CcSpawnMsg::UpdateAssignment { current, next } => {
598
                    self.handle_update_assignment(current, next).await;
599
                }
600
            }
601
        }
602

            
603
        // The while loop can end if all the senders get dropped, but since this is an
604
        // essential task we don't want it to stop. So await a future that never completes.
605
        // This should only happen when starting a full node.
606
        if !validator {
607
            let () = std::future::pending().await;
608
        }
609
    }
610

            
611
    /// Handle `CcSpawnMsg::UpdateAssignment`
612
    async fn handle_update_assignment(&mut self, current: Option<ParaId>, next: Option<ParaId>) {
613
        let HandleUpdateAssignmentResult {
614
            chains_to_stop,
615
            chains_to_start,
616
            need_to_restart: _,
617
        } = handle_update_assignment_state_change(
618
            &mut self.state.lock().expect("poison error"),
619
            self.params.orchestrator_para_id,
620
            current,
621
            next,
622
        );
623

            
624
        if current != Some(self.params.orchestrator_para_id) {
625
            // If not assigned to orchestrator chain anymore, we need to stop the collator process
626
            let maybe_exit_notification_receiver = self
627
                .collation_cancellation_constructs
628
                .take()
629
                .map(|(cancellation_token, exit_notification_receiver)| {
630
                    cancellation_token.cancel();
631
                    exit_notification_receiver
632
                });
633

            
634
            if let Some(exit_notification_receiver) = maybe_exit_notification_receiver {
635
                let _ = exit_notification_receiver.await;
636
            }
637
        } else if self.collation_cancellation_constructs.is_none() {
638
            // If assigned to orchestrator chain but the collator process is not running, start it
639
            self.collation_cancellation_constructs = Some((self.collate_on_tanssi)());
640
        }
641

            
642
        // Stop all container chains that are no longer needed
643
        let mut db_paths_restart = vec![];
644
        for para_id in chains_to_stop {
645
            // Keep db if we are currently assigned to this chain
646
            let keep_db = Some(para_id) == current;
647
            let maybe_db_path = self.stop(para_id, keep_db);
648
            // If we are restarting this chain, save its db_path to check when it actually stopped
649
            if let Some(db_path) = maybe_db_path {
650
                if chains_to_start.contains(&para_id) {
651
                    db_paths_restart.push((para_id, db_path));
652
                }
653
            }
654
        }
655

            
656
        if !db_paths_restart.is_empty() {
657
            // Ensure the chains we stopped actually stopped by checking if their database is unlocked.
658
            // Using `join_all` because in one edge case we may be restarting 2 chains,
659
            // but almost always this will be only one future.
660
            let futs = db_paths_restart
661
                .into_iter()
662
                .map(|(para_id, db_path)| async move {
663
                    wait_for_paritydb_lock(&db_path, MAX_DB_RESTART_TIMEOUT)
664
                        .await
665
                        .map_err(|e| {
666
                            log::warn!(
667
                                "Error waiting for chain {} to release db lock: {:?}",
668
                                para_id,
669
                                e
670
                            );
671
                        })
672
                });
673
            futures::future::join_all(futs).await;
674
        }
675

            
676
        // Start all new container chains (usually 1)
677
        for para_id in chains_to_start {
678
            // Edge case: when starting the node it may be assigned to a container chain, so we need to
679
            // start a container chain already collating.
680
            // TODO: another edge case: if current == None, and running_chains == 0,
681
            // and chains_to_start == 1, we can start this chain as collating, and we won't need
682
            // to restart it on the next session. We need to add some extra state somewhere to
683
            // implement this properly.
684
            let start_collation = Some(para_id) == current;
685
            self.spawn(para_id, start_collation).await;
686
        }
687
    }
688
}
689

            
690
struct HandleUpdateAssignmentResult {
691
    chains_to_stop: Vec<ParaId>,
692
    chains_to_start: Vec<ParaId>,
693
    #[allow(dead_code)] // no longer used except in tests
694
    need_to_restart: bool,
695
}
696

            
697
// This is a separate function to allow testing
698
35
fn handle_update_assignment_state_change(
699
35
    state: &mut ContainerChainSpawnerState,
700
35
    orchestrator_para_id: ParaId,
701
35
    current: Option<ParaId>,
702
35
    next: Option<ParaId>,
703
35
) -> HandleUpdateAssignmentResult {
704
35
    if (state.assigned_para_id, state.next_assigned_para_id) == (current, next) {
705
        // If nothing changed there is nothing to update
706
        return HandleUpdateAssignmentResult {
707
            chains_to_stop: Default::default(),
708
            chains_to_start: Default::default(),
709
            need_to_restart: false,
710
        };
711
35
    }
712
35

            
713
35
    // Create a set with the container chains that were running before, and the container
714
35
    // chains that should be running after the updated assignment. This is used to calculate
715
35
    // the difference, and stop and start the required container chains.
716
35
    let mut running_chains_before = HashSet::new();
717
35
    let mut running_chains_after = HashSet::new();
718
35

            
719
35
    running_chains_before.extend(state.assigned_para_id);
720
35
    running_chains_before.extend(state.next_assigned_para_id);
721
35
    // Ignore orchestrator_para_id because it is handled in a special way, as it does not need to
722
35
    // start one session before in order to sync.
723
35
    running_chains_before.remove(&orchestrator_para_id);
724
35

            
725
35
    running_chains_after.extend(current);
726
35
    running_chains_after.extend(next);
727
35
    running_chains_after.remove(&orchestrator_para_id);
728
35
    let mut need_to_restart_current = false;
729
35
    let mut need_to_restart_next = false;
730
35

            
731
35
    if state.assigned_para_id != current {
732
24
        if let Some(para_id) = current {
733
            // If the assigned container chain has changed, we may need to
734
            // restart it in collation mode, unless it is the orchestrator chain.
735
16
            if para_id != orchestrator_para_id {
736
13
                need_to_restart_current = true;
737
13
            }
738
8
        }
739

            
740
24
        if let Some(para_id) = state.assigned_para_id {
741
18
            if para_id != orchestrator_para_id && Some(para_id) == next {
742
2
                need_to_restart_next = true;
743
16
            }
744
6
        }
745
11
    }
746

            
747
35
    state.assigned_para_id = current;
748
35
    state.next_assigned_para_id = next;
749
35

            
750
35
    let mut chains_to_stop: Vec<_> = running_chains_before
751
35
        .difference(&running_chains_after)
752
35
        .copied()
753
35
        .collect();
754
35
    let mut chains_to_start: Vec<_> = running_chains_after
755
35
        .difference(&running_chains_before)
756
35
        .copied()
757
35
        .collect();
758
35

            
759
35
    if need_to_restart_current {
760
        // Force restart of new assigned container chain: if it was running before it was in "syncing mode",
761
        // which doesn't use the correct ports, so start it in "collation mode".
762
13
        let id = current.unwrap();
763
13
        if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) {
764
6
            chains_to_stop.push(id);
765
7
        }
766
13
        if !chains_to_start.contains(&id) {
767
6
            chains_to_start.push(id);
768
7
        }
769
22
    }
770

            
771
35
    if need_to_restart_next {
772
        // Handle edge case of going from (2000, 2001) to (2001, 2000). In that case we must restart both chains,
773
        // because previously 2000 was collating and now 2000 will only be syncing.
774
2
        let id = next.unwrap();
775
2
        if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) {
776
2
            chains_to_stop.push(id);
777
2
        }
778
2
        if !chains_to_start.contains(&id) {
779
2
            chains_to_start.push(id);
780
2
        }
781
33
    }
782

            
783
    HandleUpdateAssignmentResult {
784
35
        chains_to_stop,
785
35
        chains_to_start,
786
35
        need_to_restart: need_to_restart_current || need_to_restart_next,
787
    }
788
35
}
789

            
790
/// Select [SyncMode] to use for a container chain.
791
/// We want to use warp sync unless the db still exists, or the container chain is
792
/// still at genesis block (because of a warp sync bug in that case).
793
///
794
/// Remember that warp sync doesn't work if a partially synced database already exists, it falls
795
/// back to full sync instead. The only exception is if the previous instance of the database was
796
/// interrupted before it finished downloading the state, in that case the node will use warp sync.
797
/// If it was interrupted during the block history download, the node will use full sync but also
798
/// finish the block history download in the background, even if sync mode is set to full sync.
799
fn select_sync_mode(
800
    db_exists: bool,
801
    orchestrator_client: &Arc<ParachainClient>,
802
    container_chain_para_id: ParaId,
803
) -> sc_service::error::Result<SyncMode> {
804
    if db_exists {
805
        // If the user wants to use warp sync, they should have already removed the database
806
        return Ok(SyncMode::Full);
807
    }
808

            
809
    // The following check is only needed because of this bug:
810
    // https://github.com/paritytech/polkadot-sdk/issues/1930
811

            
812
    let orchestrator_runtime_api = orchestrator_client.runtime_api();
813
    let orchestrator_chain_info = orchestrator_client.chain_info();
814

            
815
    // If the container chain is still at genesis block, use full sync because warp sync is broken
816
    let full_sync_needed = orchestrator_runtime_api
817
        .latest_author(orchestrator_chain_info.best_hash, container_chain_para_id)
818
        .map_err(|e| format!("Failed to read latest author: {}", e))?
819
        .is_none();
820

            
821
    if full_sync_needed {
822
        Ok(SyncMode::Full)
823
    } else {
824
        Ok(SyncMode::Warp)
825
    }
826
}
827

            
828
fn get_latest_container_block_number_from_orchestrator(
829
    orchestrator_client: &Arc<ParachainClient>,
830
    container_chain_para_id: ParaId,
831
) -> Option<u32> {
832
    let orchestrator_runtime_api = orchestrator_client.runtime_api();
833
    let orchestrator_chain_info = orchestrator_client.chain_info();
834
    // Get the container chain's latest block from orchestrator chain and compare with client's one
835
    let last_container_block_from_orchestrator = orchestrator_runtime_api
836
        .latest_block_number(orchestrator_chain_info.best_hash, container_chain_para_id)
837
        .unwrap_or_default();
838

            
839
    last_container_block_from_orchestrator
840
}
841

            
842
#[derive(Debug)]
843
#[allow(dead_code)]
844
enum DbRemovalReason {
845
    HighBlockDiff {
846
        best_block_number_db: u32,
847
        best_block_number_onchain: u32,
848
    },
849
    GenesisHashMismatch {
850
        container_client_genesis_hash: H256,
851
        chain_spec_genesis_hash_v0: H256,
852
        chain_spec_genesis_hash_v1: H256,
853
    },
854
}
855

            
856
/// Given a container chain client, check if the database is valid. If not, returns `Some` with the
857
/// reason for db removal.
858
/// Reasons may be:
859
/// * High block diff: when the local db is outdated and it would take a long time to sync using full sync, we remove it to be able to use warp sync.
860
/// * Genesis hash mismatch, when the chain was deregistered and a different chain with the same para id was registered.
861
fn db_needs_removal(
862
    container_chain_client: &Arc<ContainerChainClient>,
863
    orchestrator_client: &Arc<ParachainClient>,
864
    container_chain_para_id: ParaId,
865
    container_chain_cli: &ContainerChainCli,
866
    keep_db: bool,
867
) -> sc_service::error::Result<Option<DbRemovalReason>> {
868
    // Check block diff, only needed if keep-db is false
869
    if !keep_db {
870
        // Get latest block number from the container chain client
871
        let last_container_block_temp = container_chain_client.chain_info().best_number;
872
        if last_container_block_temp == 0 {
873
            // Don't remove an empty database, as it may be in the process of a warp sync
874
        } else {
875
            if get_latest_container_block_number_from_orchestrator(
876
                orchestrator_client,
877
                container_chain_para_id,
878
            )
879
            .unwrap_or(0)
880
            .abs_diff(last_container_block_temp)
881
                > MAX_BLOCK_DIFF_FOR_FULL_SYNC
882
            {
883
                // if the diff is big, delete db and restart using warp sync
884
                return Ok(Some(DbRemovalReason::HighBlockDiff {
885
                    best_block_number_db: last_container_block_temp,
886
                    best_block_number_onchain: last_container_block_temp,
887
                }));
888
            }
889
        }
890
    }
891

            
892
    // Generate genesis hash to compare against container client's genesis hash
893
    let container_preloaded_genesis = container_chain_cli.preloaded_chain_spec.as_ref().unwrap();
894

            
895
    // Check with both state versions, but first v1 which is the latest
896
    let block_v1: Block =
897
        generate_genesis_block(&**container_preloaded_genesis, sp_runtime::StateVersion::V1)
898
            .map_err(|e| format!("{:?}", e))?;
899
    let chain_spec_genesis_hash_v1 = block_v1.header().hash();
900

            
901
    let container_client_genesis_hash = container_chain_client.chain_info().genesis_hash;
902

            
903
    if container_client_genesis_hash != chain_spec_genesis_hash_v1 {
904
        let block_v0: Block =
905
            generate_genesis_block(&**container_preloaded_genesis, sp_runtime::StateVersion::V0)
906
                .map_err(|e| format!("{:?}", e))?;
907
        let chain_spec_genesis_hash_v0 = block_v0.header().hash();
908

            
909
        if container_client_genesis_hash != chain_spec_genesis_hash_v0 {
910
            log::info!("Container genesis V0: {:?}", chain_spec_genesis_hash_v0);
911
            log::info!("Container genesis V1: {:?}", chain_spec_genesis_hash_v1);
912
            log::info!(
913
                "Chain spec genesis {:?} did not match with any container genesis - Restarting...",
914
                container_client_genesis_hash
915
            );
916
            return Ok(Some(DbRemovalReason::GenesisHashMismatch {
917
                container_client_genesis_hash,
918
                chain_spec_genesis_hash_v0,
919
                chain_spec_genesis_hash_v1,
920
            }));
921
        }
922
    }
923

            
924
    Ok(None)
925
}
926

            
927
/// Remove the container chain database folder. This is called with db_path:
928
///     `Collator2002-01/data/containers/chains/simple_container_2002/paritydb/full-container-2002`
929
/// but we want to delete everything under
930
///     `Collator2002-01/data/containers/chains/simple_container_2002`
931
/// So we use `delete_empty_folders_recursive` to try to remove the parent folders as well, but only
932
/// if they are empty. This is to avoid removing any secret keys or other important data.
933
fn delete_container_chain_db(db_path: &Path) {
934
    // Remove folder `full-container-2002`
935
    let _ = std::fs::remove_dir_all(db_path);
936
    // Remove all the empty folders inside `simple_container_2002`, including self
937
    if let Some(parent) = db_path.ancestors().nth(2) {
938
        delete_empty_folders_recursive(parent);
939
    }
940
}
941

            
942
/// Removes all empty folders in `path`, recursively. Then, if `path` is empty, it removes it as well.
943
/// Ignores any IO errors.
944
fn delete_empty_folders_recursive(path: &Path) {
945
    let entry_iter = std::fs::read_dir(path);
946
    let entry_iter = match entry_iter {
947
        Ok(x) => x,
948
        Err(_e) => return,
949
    };
950

            
951
    for entry in entry_iter {
952
        let entry = match entry {
953
            Ok(x) => x,
954
            Err(_e) => continue,
955
        };
956

            
957
        let path = entry.path();
958
        if path.is_dir() {
959
            delete_empty_folders_recursive(&path);
960
        }
961
    }
962

            
963
    // Try to remove dir. Returns an error if the directory is not empty, but we ignore it.
964
    let _ = std::fs::remove_dir(path);
965
}
966

            
967
/// Parse a list of boot nodes in `Vec<u8>` format. Invalid boot nodes are filtered out.
968
3
fn parse_boot_nodes_ignore_invalid(
969
3
    boot_nodes_raw: Vec<Vec<u8>>,
970
3
    container_chain_para_id: ParaId,
971
3
) -> Vec<MultiaddrWithPeerId> {
972
3
    boot_nodes_raw
973
3
        .into_iter()
974
3
        .filter_map(|x| {
975
3
            let x = String::from_utf8(x)
976
3
                .map_err(|e| {
977
1
                    log::debug!(
978
                        "Invalid boot node in container chain {}: {}",
979
                        container_chain_para_id,
980
                        e
981
                    );
982
3
                })
983
3
                .ok()?;
984

            
985
2
            x.parse::<MultiaddrWithPeerId>()
986
2
                .map_err(|e| {
987
1
                    log::debug!(
988
                        "Invalid boot node in container chain {}: {}",
989
                        container_chain_para_id,
990
                        e
991
                    )
992
2
                })
993
2
                .ok()
994
3
        })
995
3
        .collect()
996
3
}
997

            
998
async fn wait_for_paritydb_lock(db_path: &Path, max_timeout: Duration) -> Result<(), String> {
999
    let now = Instant::now();
    while now.elapsed() < max_timeout {
        let lock_held = check_paritydb_lock_held(db_path)
            .map_err(|e| format!("Failed to check if lock file is held: {}", e))?;
        if !lock_held {
            return Ok(());
        }
        sleep(Duration::from_secs(1)).await;
    }
    Err("Timeout when waiting for paritydb lock".to_string())
}
/// Given a path to a paritydb database, check if its lock file is held. This indicates that a
/// background process is still using the database, so we should wait before trying to open it.
///
/// This should be kept up to date with the way paritydb handles the lock file:
/// <https://github.com/paritytech/parity-db/blob/2b6820e310a08678d4540c044f41a93d87343ac8/src/db.rs#L215>
fn check_paritydb_lock_held(db_path: &Path) -> Result<bool, std::io::Error> {
    if !db_path.is_dir() {
        // Lock file does not exist, so it is not held
        return Ok(false);
    }
    let mut lock_path: std::path::PathBuf = db_path.to_owned();
    lock_path.push("lock");
    let lock_file = std::fs::OpenOptions::new()
        .create(true)
        .read(true)
        .write(true)
        .truncate(true)
        .open(lock_path.as_path())?;
    // Check if the lock file is busy by trying to lock it.
    // Returns err if failed to adquire the lock.
    let lock_held = lock_file.try_lock_exclusive().is_err();
    Ok(lock_held)
}
#[cfg(test)]
mod tests {
    use {super::*, std::path::PathBuf};
    // Copy of ContainerChainSpawner with extra assertions for tests, and mocked spawn function.
    struct MockContainerChainSpawner {
        state: Arc<Mutex<ContainerChainSpawnerState>>,
        orchestrator_para_id: ParaId,
        collate_on_tanssi: Arc<
            dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
        >,
        collation_cancellation_constructs: Option<()>,
        // Keep track of the last CollateOn message, for tests
        currently_collating_on: Arc<Mutex<Option<ParaId>>>,
    }
    impl MockContainerChainSpawner {
10
        fn new() -> Self {
10
            let orchestrator_para_id = 1000.into();
10
            // The node always starts as an orchestrator chain collator
10
            let currently_collating_on = Arc::new(Mutex::new(Some(orchestrator_para_id)));
10
            let currently_collating_on2 = currently_collating_on.clone();
10
            let collate_closure = move || {
3
                let mut cco = currently_collating_on2.lock().unwrap();
3
                assert_ne!(
3
                    *cco,
3
                    Some(orchestrator_para_id),
                    "Received CollateOn message when we were already collating on this chain: {}",
                    orchestrator_para_id
                );
3
                *cco = Some(orchestrator_para_id);
3
                let (_, receiver) = futures::channel::oneshot::channel();
3
                (CancellationToken::new(), receiver)
3
            };
10
            let collate_on_tanssi: Arc<
10
                dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>)
10
                    + Send
10
                    + Sync,
10
            > = Arc::new(collate_closure);
10

            
10
            Self {
10
                state: Arc::new(Mutex::new(ContainerChainSpawnerState {
10
                    spawned_container_chains: Default::default(),
10
                    assigned_para_id: Some(orchestrator_para_id),
10
                    next_assigned_para_id: None,
10
                    failed_para_ids: Default::default(),
10
                    spawned_containers_monitor: Default::default(),
10
                })),
10
                orchestrator_para_id,
10
                collate_on_tanssi,
10
                // Some if collator starts on orchestrator chain
10
                collation_cancellation_constructs: Some(()),
10
                currently_collating_on,
10
            }
10
        }
21
        fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) {
21
            let (signal, _on_exit) = oneshot::channel();
21
            let currently_collating_on2 = self.currently_collating_on.clone();
21
            let collate_closure = move || {
13
                let mut cco = currently_collating_on2.lock().unwrap();
13
                assert_ne!(
13
                    *cco,
13
                    Some(container_chain_para_id),
                    "Received CollateOn message when we were already collating on this chain: {}",
                    container_chain_para_id
                );
13
                *cco = Some(container_chain_para_id);
13
                let (_, receiver) = futures::channel::oneshot::channel();
13
                (CancellationToken::new(), receiver)
13
            };
21
            let collate_on: Arc<
21
                dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>)
21
                    + Send
21
                    + Sync,
21
            > = Arc::new(collate_closure);
21
            // Dummy db_path for tests, is not actually used
21
            let db_path = PathBuf::from(format!("/tmp/container-{}/db", container_chain_para_id));
21

            
21
            let old = self
21
                .state
21
                .lock()
21
                .expect("poison error")
21
                .spawned_container_chains
21
                .insert(
21
                    container_chain_para_id,
21
                    ContainerChainState {
21
                        stop_handle: StopContainerChain { signal, id: 0 },
21
                        db_path,
21
                    },
21
                );
21

            
21
            assert!(
21
                old.is_none(),
                "tried to spawn a container chain that was already running: {}",
                container_chain_para_id
            );
21
            if start_collation {
13
                let (_cancellation_token, _exit_receiver) = collate_on();
13
            }
21
        }
15
        fn stop(&self, container_chain_para_id: ParaId) {
15
            let stop_handle = self
15
                .state
15
                .lock()
15
                .expect("poison error")
15
                .spawned_container_chains
15
                .remove(&container_chain_para_id);
15

            
15
            match stop_handle {
15
                Some(_stop_handle) => {
15
                    log::info!("Stopping container chain {}", container_chain_para_id);
                }
                None => {
                    panic!(
                        "Tried to stop a container chain that is not running: {}",
                        container_chain_para_id
                    );
                }
            }
            // Update currently_collating_on, if we stopped the chain we are no longer collating there
15
            let mut lco = self.currently_collating_on.lock().unwrap();
15
            if *lco == Some(container_chain_para_id) {
7
                *lco = None;
8
            }
15
        }
35
        fn handle_update_assignment(&mut self, current: Option<ParaId>, next: Option<ParaId>) {
35
            let HandleUpdateAssignmentResult {
35
                chains_to_stop,
35
                chains_to_start,
35
                need_to_restart,
35
            } = handle_update_assignment_state_change(
35
                &mut self.state.lock().unwrap(),
35
                self.orchestrator_para_id,
35
                current,
35
                next,
35
            );
35

            
35
            if current != Some(self.orchestrator_para_id) {
                // If not assigned to orchestrator chain anymore, we need to stop the collator process
27
                let mut cco = self.currently_collating_on.lock().unwrap();
27
                if *cco == Some(self.orchestrator_para_id) {
10
                    *cco = None;
17
                }
27
                self.collation_cancellation_constructs = None;
8
            } else if self.collation_cancellation_constructs.is_none() {
3
                let (_cancellation_token, _exit_notification_receiver) = (self.collate_on_tanssi)();
3
                self.collation_cancellation_constructs = Some(());
5
            }
            // Assert we never start and stop the same container chain
56
            for para_id in &chains_to_start {
21
                if !need_to_restart {
4
                    assert!(
4
                        !chains_to_stop.contains(para_id),
                        "Tried to start and stop same container chain: {}",
                        para_id
                    );
                } else {
                    // Will try to start and stop container chain with id "current" or "next", so ignore that
17
                    if Some(*para_id) != current && Some(*para_id) != next {
                        assert!(
                            !chains_to_stop.contains(para_id),
                            "Tried to start and stop same container chain: {}",
                            para_id
                        );
17
                    }
                }
            }
            // Assert we never start or stop the orchestrator chain
35
            assert!(!chains_to_start.contains(&self.orchestrator_para_id));
35
            assert!(!chains_to_stop.contains(&self.orchestrator_para_id));
            // Stop all container chains that are no longer needed
50
            for para_id in chains_to_stop {
15
                self.stop(para_id);
15
            }
            // Start all new container chains (usually 1)
56
            for para_id in chains_to_start {
21
                // Edge case: when starting the node it may be assigned to a container chain, so we need to
21
                // start a container chain already collating.
21
                let start_collation = Some(para_id) == current;
21
                self.spawn(para_id, start_collation);
21
            }
            // Assert that if we are currently assigned to a container chain, we are collating there
35
            if let Some(para_id) = current {
24
                self.assert_collating_on(Some(para_id));
24
            } else {
11
                self.assert_collating_on(None);
11
            }
35
        }
        #[track_caller]
71
        fn assert_collating_on(&self, para_id: Option<ParaId>) {
71
            let currently_collating_on = *self.currently_collating_on.lock().unwrap();
71
            assert_eq!(currently_collating_on, para_id);
71
        }
        #[track_caller]
36
        fn assert_running_chains(&self, para_ids: &[ParaId]) {
36
            let mut actually_running: Vec<ParaId> = self
36
                .state
36
                .lock()
36
                .unwrap()
36
                .spawned_container_chains
36
                .keys()
36
                .cloned()
36
                .collect();
36
            actually_running.sort();
36
            let mut should_be_running = para_ids.to_vec();
36
            should_be_running.sort();
36
            assert_eq!(actually_running, should_be_running);
36
        }
    }
    #[test]
1
    fn starts_collating_on_tanssi() {
1
        let mut m = MockContainerChainSpawner::new();
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn assigned_to_orchestrator_chain() {
1
        let mut m = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(Some(1000.into()), None);
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, Some(1000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn assigned_to_container_chain() {
1
        let mut m = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, Some(2000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn spawn_container_chains() {
1
        let mut m = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(1000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1

            
1
        m.handle_update_assignment(Some(2001.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2001.into()));
1
        m.assert_running_chains(&[2001.into()]);
1

            
1
        m.handle_update_assignment(Some(2001.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(2001.into()));
1
        m.assert_running_chains(&[2001.into()]);
1

            
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn swap_current_next() {
1
        // Going from (2000, 2001) to (2001, 2000) shouldn't start or stop any container chains
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1

            
1
        m.handle_update_assignment(Some(2001.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2001.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1
    }
    #[test]
1
    fn stop_collating_orchestrator() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(Some(1000.into()), None);
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(Some(1000.into()), None);
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn stop_collating_container() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, Some(2000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        // This will send a CollateOn message to the same chain as the last CollateOn,
1
        // but this is needed because that chain has been stopped
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn stop_collating_container_start_immediately() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        // This will start the chain already collating
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn stop_all_chains() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn keep_collating_on_container() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(None, Some(2000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn invalid_boot_nodes_are_ignored() {
1
        let para_id = 100.into();
1
        let bootnode1 =
1
            b"/ip4/127.0.0.1/tcp/33049/ws/p2p/12D3KooWHVMhQDHBpj9vQmssgyfspYecgV6e3hH1dQVDUkUbCYC9"
1
                .to_vec();
1
        assert_eq!(
1
            parse_boot_nodes_ignore_invalid(vec![b"A".to_vec()], para_id),
1
            vec![]
1
        );
1
        assert_eq!(
1
            parse_boot_nodes_ignore_invalid(vec![b"\xff".to_vec()], para_id),
1
            vec![]
1
        );
        // Valid boot nodes are not ignored
1
        assert_eq!(
1
            parse_boot_nodes_ignore_invalid(vec![bootnode1], para_id).len(),
1
            1
1
        );
1
    }
    #[test]
1
    fn path_ancestors() {
1
        // Test the implementation of `delete_container_chain_db`
1
        let db_path = PathBuf::from("/tmp/zombienet/Collator2002-01/data/containers/chains/simple_container_2002/paritydb/full-container-2002");
1
        let parent = db_path.ancestors().nth(2).unwrap();
1

            
1
        assert_eq!(
1
            parent,
1
            PathBuf::from(
1
                "/tmp/zombienet/Collator2002-01/data/containers/chains/simple_container_2002"
1
            )
1
        )
1
    }
}