1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! Container Chain Spawner
18
//!
19
//! Controls the starting and stopping of container chains.
20
//!
21
//! For more information about when the database is deleted, check the
22
//! [Keep db flowchart](https://raw.githubusercontent.com/moondance-labs/tanssi/master/docs/keep_db_flowchart.png)
23

            
24
use sc_transaction_pool::FullPool;
25
use {
26
    crate::{
27
        cli::ContainerChainCli,
28
        container_chain_monitor::{SpawnedContainer, SpawnedContainersMonitor},
29
        service::{start_node_impl_container, NodeConfig, ParachainClient},
30
    },
31
    cumulus_primitives_core::ParaId,
32
    cumulus_relay_chain_interface::RelayChainInterface,
33
    dancebox_runtime::{opaque::Block as OpaqueBlock, AccountId, Block, BlockNumber},
34
    dc_orchestrator_chain_interface::OrchestratorChainInterface,
35
    fs2::FileExt,
36
    futures::FutureExt,
37
    node_common::{command::generate_genesis_block, service::NodeBuilderConfig},
38
    pallet_author_noting_runtime_api::AuthorNotingApi,
39
    pallet_registrar_runtime_api::RegistrarApi,
40
    polkadot_primitives::CollatorPair,
41
    sc_cli::{Database, SyncMode},
42
    sc_network::config::MultiaddrWithPeerId,
43
    sc_service::SpawnTaskHandle,
44
    sp_api::{ApiExt, ProvideRuntimeApi},
45
    sp_keystore::KeystorePtr,
46
    sp_runtime::traits::Block as BlockT,
47
    std::{
48
        collections::{HashMap, HashSet},
49
        future::Future,
50
        path::{Path, PathBuf},
51
        pin::Pin,
52
        sync::{Arc, Mutex},
53
        time::Instant,
54
    },
55
    tokio::{
56
        sync::{mpsc, oneshot},
57
        time::{sleep, Duration},
58
    },
59
    tokio_util::sync::CancellationToken,
60
};
61

            
62
/// Struct with all the params needed to start a container chain node given the CLI arguments,
63
/// and creating the ChainSpec from on-chain data from the orchestrator chain.
64

            
65
pub struct ContainerChainSpawner {
66
    // Start container chain params
67
    pub orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
68
    pub orchestrator_client: Arc<ParachainClient>,
69
    pub orchestrator_tx_pool: Arc<FullPool<OpaqueBlock, ParachainClient>>,
70
    pub container_chain_cli: ContainerChainCli,
71
    pub tokio_handle: tokio::runtime::Handle,
72
    pub chain_type: sc_chain_spec::ChainType,
73
    pub relay_chain: String,
74
    pub relay_chain_interface: Arc<dyn RelayChainInterface>,
75
    pub collator_key: Option<CollatorPair>,
76
    pub sync_keystore: KeystorePtr,
77
    pub orchestrator_para_id: ParaId,
78
    pub validator: bool,
79
    pub spawn_handle: SpawnTaskHandle,
80

            
81
    // State
82
    pub state: Arc<Mutex<ContainerChainSpawnerState>>,
83

            
84
    // Async callback that enables collation on the orchestrator chain
85
    pub collate_on_tanssi:
86
        Arc<dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync>,
87
    // Stores the cancellation token used to stop the orchestrator chain collator process.
88
    // When this is None, the orchestrator collator is not running.
89
    pub collation_cancellation_constructs:
90
        Option<(CancellationToken, futures::channel::oneshot::Receiver<()>)>,
91
}
92

            
93
#[derive(Default)]
94
pub struct ContainerChainSpawnerState {
95
    spawned_container_chains: HashMap<ParaId, ContainerChainState>,
96
    assigned_para_id: Option<ParaId>,
97
    next_assigned_para_id: Option<ParaId>,
98
    failed_para_ids: HashSet<ParaId>,
99
    // For debugging and detecting errors
100
    pub spawned_containers_monitor: SpawnedContainersMonitor,
101
}
102

            
103
pub struct ContainerChainState {
104
    /// Handle that can be used to stop the container chain
105
    stop_handle: StopContainerChain,
106
    /// Database path
107
    db_path: PathBuf,
108
}
109

            
110
/// Stops a container chain when signal is sent. The bool means `keep_db`, whether to keep the
111
/// container chain database (true) or remove it (false).
112
pub struct StopContainerChain {
113
    signal: oneshot::Sender<bool>,
114
    id: usize,
115
}
116

            
117
/// Messages used to control the `ContainerChainSpawner`. This is needed because one of the fields
118
/// of `ContainerChainSpawner` is not `Sync`, so we cannot simply pass an
119
/// `Arc<ContainerChainSpawner>` to other threads.
120
#[derive(Debug)]
121
pub enum CcSpawnMsg {
122
    /// Update container chain assignment
123
    UpdateAssignment {
124
        current: Option<ParaId>,
125
        next: Option<ParaId>,
126
    },
127
}
128

            
129
impl ContainerChainSpawner {
130
    /// Try to start a new container chain. In case of an error, this does not stop the node, and
131
    /// the container chain will be attempted to spawn again when the collator is reassigned to it.
132
    #[must_use]
133
    fn spawn(
134
        &self,
135
        container_chain_para_id: ParaId,
136
        start_collation: bool,
137
    ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
138
        let (
139
            orchestrator_chain_interface,
140
            orchestrator_client,
141
            orchestrator_tx_pool,
142
            mut container_chain_cli,
143
            tokio_handle,
144
            chain_type,
145
            relay_chain,
146
            relay_chain_interface,
147
            collator_key,
148
            sync_keystore,
149
            orchestrator_para_id,
150
            validator,
151
            spawn_handle,
152
            state,
153
        ) = (
154
            self.orchestrator_chain_interface.clone(),
155
            self.orchestrator_client.clone(),
156
            self.orchestrator_tx_pool.clone(),
157
            self.container_chain_cli.clone(),
158
            self.tokio_handle.clone(),
159
            self.chain_type.clone(),
160
            self.relay_chain.clone(),
161
            self.relay_chain_interface.clone(),
162
            self.collator_key.clone(),
163
            self.sync_keystore.clone(),
164
            self.orchestrator_para_id,
165
            self.validator,
166
            self.spawn_handle.clone(),
167
            self.state.clone(),
168
        );
169
        let state2 = state.clone();
170
        // This closure is used to emulate a try block, it enables using the `?` operator inside
171
        let try_closure = move || async move {
172
            // Preload genesis data from orchestrator chain storage.
173
            // The preload must finish before calling create_configuration, so any async operations
174
            // need to be awaited.
175

            
176
            // TODO: the orchestrator chain node may not be fully synced yet,
177
            // in that case we will be reading an old state.
178
            let orchestrator_chain_info = orchestrator_client.chain_info();
179
            log::info!(
180
                "Reading container chain genesis data from orchestrator chain at block #{} {}",
181
                orchestrator_chain_info.best_number,
182
                orchestrator_chain_info.best_hash,
183
            );
184
            let orchestrator_runtime_api = orchestrator_client.runtime_api();
185

            
186
            log::info!(
187
                "Detected assignment for container chain {}",
188
                container_chain_para_id
189
            );
190

            
191
            let genesis_data = orchestrator_runtime_api
192
                .genesis_data(orchestrator_chain_info.best_hash, container_chain_para_id)
193
                .map_err(|e| format!("Failed to call genesis_data runtime api: {}", e))?
194
                .ok_or_else(|| {
195
                    format!(
196
                        "No genesis data registered for container chain id {}",
197
                        container_chain_para_id
198
                    )
199
                })?;
200

            
201
            let boot_nodes_raw = orchestrator_runtime_api
202
                .boot_nodes(orchestrator_chain_info.best_hash, container_chain_para_id)
203
                .map_err(|e| format!("Failed to call boot_nodes runtime api: {}", e))?;
204
            if boot_nodes_raw.is_empty() {
205
                log::warn!(
206
                    "No boot nodes registered on-chain for container chain {}",
207
                    container_chain_para_id
208
                );
209
            }
210
            let boot_nodes =
211
                parse_boot_nodes_ignore_invalid(boot_nodes_raw, container_chain_para_id);
212
            if boot_nodes.is_empty() {
213
                log::warn!(
214
                    "No valid boot nodes for container chain {}",
215
                    container_chain_para_id
216
                );
217
            }
218

            
219
            container_chain_cli
220
                .preload_chain_spec_from_genesis_data(
221
                    container_chain_para_id.into(),
222
                    genesis_data,
223
                    chain_type.clone(),
224
                    relay_chain.clone(),
225
                    boot_nodes,
226
                )
227
                .map_err(|e| format!("failed to create container chain chain spec from on chain genesis data: {}", e))?;
228

            
229
            log::info!(
230
                "Loaded chain spec for container chain {}",
231
                container_chain_para_id
232
            );
233

            
234
            if !start_collation {
235
                log::info!("This is a syncing container chain, using random ports");
236
                // Use random ports to avoid conflicts with the other running container chain
237
                let random_ports = [23456, 23457, 23458];
238
                container_chain_cli
239
                    .base
240
                    .base
241
                    .prometheus_params
242
                    .prometheus_port = Some(random_ports[0]);
243
                container_chain_cli.base.base.network_params.port = Some(random_ports[1]);
244
                container_chain_cli.base.base.rpc_port = Some(random_ports[2]);
245
            }
246

            
247
            // Update CLI params
248
            container_chain_cli.base.para_id = Some(container_chain_para_id.into());
249
            container_chain_cli
250
                .base
251
                .base
252
                .import_params
253
                .database_params
254
                .database = Some(Database::ParityDb);
255

            
256
            let create_container_chain_cli_config = || {
257
                let mut container_chain_cli_config = sc_cli::SubstrateCli::create_configuration(
258
                    &container_chain_cli,
259
                    &container_chain_cli,
260
                    tokio_handle.clone(),
261
                )
262
                .map_err(|err| format!("Container chain argument error: {}", err))?;
263

            
264
                // Change database path to make it depend on container chain para id
265
                // So instead of the usual "db/full" we have "db/full-container-2000"
266
                let mut db_path = container_chain_cli_config
267
                    .database
268
                    .path()
269
                    .ok_or_else(|| "Failed to get database path".to_string())?
270
                    .to_owned();
271
                db_path.set_file_name(format!("full-container-{}", container_chain_para_id));
272
                container_chain_cli_config.database.set_path(&db_path);
273

            
274
                sc_service::error::Result::Ok((container_chain_cli_config, db_path))
275
            };
276

            
277
            let (_container_chain_cli_config, db_path) = create_container_chain_cli_config()?;
278
            let db_exists = db_path.exists();
279
            let db_exists_but_may_need_removal = db_exists && validator;
280
            if db_exists_but_may_need_removal {
281
                // If the database exists it may be invalid (genesis hash mismatch), so check if it is valid
282
                // and if not, delete it.
283
                // Create a new cli config because otherwise the tasks spawned in `open_and_maybe_delete_db` don't stop
284
                let (container_chain_cli_config, db_path) = create_container_chain_cli_config()?;
285
                open_and_maybe_delete_db(
286
                    container_chain_cli_config,
287
                    &db_path,
288
                    &orchestrator_client,
289
                    container_chain_para_id,
290
                    &container_chain_cli,
291
                    container_chain_cli.base.keep_db,
292
                )?;
293
                // Wait here to for the partial components created in`open_and_maybe_delete_db` to close.
294
                // Dropping is not enough because there is some background process that keeps the database open,
295
                // so we check the paritydb lock file directly.
296
                log::info!("Restarting container chain {}", container_chain_para_id);
297
                let max_restart_timeout = Duration::from_secs(60);
298
                wait_for_paritydb_lock(&db_path, max_restart_timeout)
299
                    .await
300
                    .map_err(|e| {
301
                        log::warn!(
302
                            "Error waiting for chain {} to release db lock: {:?}",
303
                            container_chain_para_id,
304
                            e
305
                        );
306

            
307
                        e
308
                    })?;
309
            }
310

            
311
            // Select appropiate sync mode. We want to use WarpSync unless the db still exists,
312
            // or the block number is 0 (because of a warp sync bug in that case).
313
            let db_still_exists = db_path.exists();
314
            container_chain_cli.base.base.network_params.sync = select_sync_mode(
315
                db_still_exists,
316
                &orchestrator_client,
317
                container_chain_para_id,
318
            )?;
319
            log::info!(
320
                "Container chain sync mode: {:?}",
321
                container_chain_cli.base.base.network_params.sync
322
            );
323
            let mut container_chain_cli_config = sc_cli::SubstrateCli::create_configuration(
324
                &container_chain_cli,
325
                &container_chain_cli,
326
                tokio_handle.clone(),
327
            )
328
            .map_err(|err| format!("Container chain argument error: {}", err))?;
329
            container_chain_cli_config.database.set_path(&db_path);
330

            
331
            // Start container chain node
332
            let (mut container_chain_task_manager, container_chain_client, container_chain_db) =
333
                start_node_impl_container(
334
                    container_chain_cli_config,
335
                    orchestrator_client.clone(),
336
                    orchestrator_tx_pool.clone(),
337
                    relay_chain_interface.clone(),
338
                    orchestrator_chain_interface.clone(),
339
                    collator_key.clone(),
340
                    sync_keystore.clone(),
341
                    container_chain_para_id,
342
                    orchestrator_para_id,
343
                    validator && start_collation,
344
                )
345
                .await?;
346

            
347
            // Signal that allows to gracefully stop a container chain
348
            let (signal, on_exit) = oneshot::channel::<bool>();
349

            
350
            let monitor_id;
351
            {
352
                let mut state = state.lock().expect("poison error");
353

            
354
                monitor_id = state.spawned_containers_monitor.push(SpawnedContainer {
355
                    id: 0,
356
                    para_id: container_chain_para_id,
357
                    start_time: Instant::now(),
358
                    stop_signal_time: None,
359
                    stop_task_manager_time: None,
360
                    stop_refcount_time: Default::default(),
361
                    backend: Arc::downgrade(&container_chain_db),
362
                    client: Arc::downgrade(&container_chain_client),
363
                });
364

            
365
                state.spawned_container_chains.insert(
366
                    container_chain_para_id,
367
                    ContainerChainState {
368
                        stop_handle: StopContainerChain {
369
                            signal,
370
                            id: monitor_id,
371
                        },
372
                        db_path: db_path.clone(),
373
                    },
374
                );
375
            }
376

            
377
            // Add the container chain task manager as a child task to the parent task manager.
378
            // We want to stop the node if this task manager stops, but we also want to allow a
379
            // graceful shutdown using the `on_exit` future.
380
            let name = "container-chain-task-manager";
381
            spawn_handle.spawn(name, None, async move {
382
                let mut container_chain_task_manager_future =
383
                    container_chain_task_manager.future().fuse();
384
                let mut on_exit_future = on_exit.fuse();
385

            
386
                futures::select! {
387
                    res1 = container_chain_task_manager_future => {
388
                        // An essential task failed or the task manager was stopped unexpectedly
389
                        // using `.terminate()`. This should stop the container chain but not the node.
390
                        if res1.is_err() {
391
                            log::error!("Essential task failed in container chain {} task manager. Shutting down container chain service", container_chain_para_id);
392
                        } else {
393
                            log::error!("Unexpected shutdown in container chain {} task manager. Shutting down container chain service", container_chain_para_id);
394
                        }
395
                        // Mark this container chain as "failed to stop" to avoid warning in `self.stop()`
396
                        let mut state = state.lock().expect("poison error");
397
                        state.failed_para_ids.insert(container_chain_para_id);
398
                        // Never delete db in this case because it is not a graceful shutdown
399
                    }
400
                    stop_unassigned = on_exit_future => {
401
                        // Graceful shutdown.
402
                        // `stop_unassigned` will be `Ok(keep_db)` if `.stop()` has been called, which means that the
403
                        // container chain has been unassigned, and will be `Err` if the handle has been dropped,
404
                        // which means that the node is stopping.
405
                        // Delete existing database if running as collator
406
                        if validator && stop_unassigned == Ok(false) && !container_chain_cli.base.keep_db {
407
                            delete_container_chain_db(&db_path);
408
                        }
409
                    }
410
                }
411

            
412
                let mut state = state.lock().expect("poison error");
413
                state
414
                    .spawned_containers_monitor
415
                    .set_stop_task_manager_time(monitor_id, Instant::now());
416
            });
417

            
418
            sc_service::error::Result::Ok(())
419
        };
420

            
421
        async move {
422
            match try_closure().await {
423
                Ok(()) => {}
424
                Err(e) => {
425
                    log::error!(
426
                        "Failed to start container chain {}: {}",
427
                        container_chain_para_id,
428
                        e
429
                    );
430
                    // Mark this container chain as "failed to start"
431
                    let mut state = state2.lock().expect("poison error");
432
                    state.failed_para_ids.insert(container_chain_para_id);
433
                }
434
            }
435
        }
436
        .boxed()
437
    }
438

            
439
    /// Stop a container chain. Prints a warning if the container chain was not running.
440
    /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
441
    /// to ensure that the container chain has fully stopped. The database path can be `None` if the
442
    /// chain was not running.
443
    fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
444
        let mut state = self.state.lock().expect("poison error");
445
        let stop_handle = state
446
            .spawned_container_chains
447
            .remove(&container_chain_para_id);
448

            
449
        match stop_handle {
450
            Some(stop_handle) => {
451
                log::info!("Stopping container chain {}", container_chain_para_id);
452

            
453
                let id = stop_handle.stop_handle.id;
454
                state
455
                    .spawned_containers_monitor
456
                    .set_stop_signal_time(id, Instant::now());
457

            
458
                // Send signal to perform graceful shutdown, which will delete the db if needed
459
                let _ = stop_handle.stop_handle.signal.send(keep_db);
460

            
461
                Some(stop_handle.db_path)
462
            }
463
            None => {
464
                // Do not print the warning message if this is a container chain that has failed to
465
                // start, because in that case it will not be running
466
                if !state.failed_para_ids.remove(&container_chain_para_id) {
467
                    log::warn!(
468
                        "Tried to stop a container chain that is not running: {}",
469
                        container_chain_para_id
470
                    );
471
                }
472

            
473
                None
474
            }
475
        }
476
    }
477

            
478
    /// Receive and process `CcSpawnMsg`s indefinitely
479
    pub async fn rx_loop(mut self, mut rx: mpsc::UnboundedReceiver<CcSpawnMsg>, validator: bool) {
480
        // The node always starts as an orchestrator chain collator.
481
        // This is because the assignment is detected after importing a new block, so if all
482
        // collators stop at the same time, when they start again nobody will produce the new block.
483
        // So all nodes start as orchestrator chain collators, until the first block is imported,
484
        // then the real assignment is used.
485
        if validator {
486
            self.handle_update_assignment(Some(self.orchestrator_para_id), None)
487
                .await;
488
        }
489

            
490
        while let Some(msg) = rx.recv().await {
491
            match msg {
492
                CcSpawnMsg::UpdateAssignment { current, next } => {
493
                    self.handle_update_assignment(current, next).await;
494
                }
495
            }
496
        }
497

            
498
        // The while loop can end if all the senders get dropped, but since this is an
499
        // essential task we don't want it to stop. So await a future that never completes.
500
        // This should only happen when starting a full node.
501
        if !validator {
502
            let () = std::future::pending().await;
503
        }
504
    }
505

            
506
    /// Handle `CcSpawnMsg::UpdateAssignment`
507
    async fn handle_update_assignment(&mut self, current: Option<ParaId>, next: Option<ParaId>) {
508
        let HandleUpdateAssignmentResult {
509
            chains_to_stop,
510
            chains_to_start,
511
            need_to_restart: _,
512
        } = handle_update_assignment_state_change(
513
            &mut self.state.lock().expect("poison error"),
514
            self.orchestrator_para_id,
515
            current,
516
            next,
517
        );
518

            
519
        if current != Some(self.orchestrator_para_id) {
520
            // If not assigned to orchestrator chain anymore, we need to stop the collator process
521
            let maybe_exit_notification_receiver = self
522
                .collation_cancellation_constructs
523
                .take()
524
                .map(|(cancellation_token, exit_notification_receiver)| {
525
                    cancellation_token.cancel();
526
                    exit_notification_receiver
527
                });
528

            
529
            if let Some(exit_notification_receiver) = maybe_exit_notification_receiver {
530
                let _ = exit_notification_receiver.await;
531
            }
532
        } else if self.collation_cancellation_constructs.is_none() {
533
            // If assigned to orchestrator chain but the collator process is not running, start it
534
            self.collation_cancellation_constructs = Some((self.collate_on_tanssi)());
535
        }
536

            
537
        // Stop all container chains that are no longer needed
538
        let mut db_paths_restart = vec![];
539
        for para_id in chains_to_stop {
540
            // Keep db if we are currently assigned to this chain
541
            let keep_db = Some(para_id) == current;
542
            let maybe_db_path = self.stop(para_id, keep_db);
543
            // If we are restarting this chain, save its db_path to check when it actually stopped
544
            if let Some(db_path) = maybe_db_path {
545
                if chains_to_start.contains(&para_id) {
546
                    db_paths_restart.push((para_id, db_path));
547
                }
548
            }
549
        }
550

            
551
        if !db_paths_restart.is_empty() {
552
            // Ensure the chains we stopped actually stopped by checking if their database is unlocked.
553
            // Using `join_all` because in one edge case we may be restarting 2 chains,
554
            // but almost always this will be only one future.
555
            let max_restart_timeout = Duration::from_secs(60);
556
            let futs = db_paths_restart
557
                .into_iter()
558
                .map(|(para_id, db_path)| async move {
559
                    wait_for_paritydb_lock(&db_path, max_restart_timeout)
560
                        .await
561
                        .map_err(|e| {
562
                            log::warn!(
563
                                "Error waiting for chain {} to release db lock: {:?}",
564
                                para_id,
565
                                e
566
                            );
567
                        })
568
                });
569
            futures::future::join_all(futs).await;
570
        }
571

            
572
        // Start all new container chains (usually 1)
573
        for para_id in chains_to_start {
574
            // Edge case: when starting the node it may be assigned to a container chain, so we need to
575
            // start a container chain already collating.
576
            let start_collation = Some(para_id) == current;
577
            self.spawn(para_id, start_collation).await;
578
        }
579
    }
580
}
581

            
582
struct HandleUpdateAssignmentResult {
583
    chains_to_stop: Vec<ParaId>,
584
    chains_to_start: Vec<ParaId>,
585
    #[allow(dead_code)] // no longer used except in tests
586
    need_to_restart: bool,
587
}
588

            
589
// This is a separate function to allow testing
590
35
fn handle_update_assignment_state_change(
591
35
    state: &mut ContainerChainSpawnerState,
592
35
    orchestrator_para_id: ParaId,
593
35
    current: Option<ParaId>,
594
35
    next: Option<ParaId>,
595
35
) -> HandleUpdateAssignmentResult {
596
35
    if (state.assigned_para_id, state.next_assigned_para_id) == (current, next) {
597
        // If nothing changed there is nothing to update
598
        return HandleUpdateAssignmentResult {
599
            chains_to_stop: Default::default(),
600
            chains_to_start: Default::default(),
601
            need_to_restart: false,
602
        };
603
35
    }
604
35

            
605
35
    // Create a set with the container chains that were running before, and the container
606
35
    // chains that should be running after the updated assignment. This is used to calculate
607
35
    // the difference, and stop and start the required container chains.
608
35
    let mut running_chains_before = HashSet::new();
609
35
    let mut running_chains_after = HashSet::new();
610
35

            
611
35
    running_chains_before.extend(state.assigned_para_id);
612
35
    running_chains_before.extend(state.next_assigned_para_id);
613
35
    // Ignore orchestrator_para_id because it is handled in a special way, as it does not need to
614
35
    // start one session before in order to sync.
615
35
    running_chains_before.remove(&orchestrator_para_id);
616
35

            
617
35
    running_chains_after.extend(current);
618
35
    running_chains_after.extend(next);
619
35
    running_chains_after.remove(&orchestrator_para_id);
620
35
    let mut need_to_restart_current = false;
621
35
    let mut need_to_restart_next = false;
622
35

            
623
35
    if state.assigned_para_id != current {
624
24
        if let Some(para_id) = current {
625
            // If the assigned container chain has changed, we may need to
626
            // restart it in collation mode, unless it is the orchestrator chain.
627
16
            if para_id != orchestrator_para_id {
628
13
                need_to_restart_current = true;
629
13
            }
630
8
        }
631

            
632
24
        if let Some(para_id) = state.assigned_para_id {
633
18
            if para_id != orchestrator_para_id && Some(para_id) == next {
634
2
                need_to_restart_next = true;
635
16
            }
636
6
        }
637
11
    }
638

            
639
35
    state.assigned_para_id = current;
640
35
    state.next_assigned_para_id = next;
641
35

            
642
35
    let mut chains_to_stop: Vec<_> = running_chains_before
643
35
        .difference(&running_chains_after)
644
35
        .copied()
645
35
        .collect();
646
35
    let mut chains_to_start: Vec<_> = running_chains_after
647
35
        .difference(&running_chains_before)
648
35
        .copied()
649
35
        .collect();
650
35

            
651
35
    if need_to_restart_current {
652
        // Force restart of new assigned container chain: if it was running before it was in "syncing mode",
653
        // which doesn't use the correct ports, so start it in "collation mode".
654
13
        let id = current.unwrap();
655
13
        if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) {
656
6
            chains_to_stop.push(id);
657
7
        }
658
13
        if !chains_to_start.contains(&id) {
659
6
            chains_to_start.push(id);
660
7
        }
661
22
    }
662

            
663
35
    if need_to_restart_next {
664
        // Handle edge case of going from (2000, 2001) to (2001, 2000). In that case we must restart both chains,
665
        // because previously 2000 was collating and now 2000 will only be syncing.
666
2
        let id = next.unwrap();
667
2
        if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) {
668
2
            chains_to_stop.push(id);
669
2
        }
670
2
        if !chains_to_start.contains(&id) {
671
2
            chains_to_start.push(id);
672
2
        }
673
33
    }
674

            
675
    HandleUpdateAssignmentResult {
676
35
        chains_to_stop,
677
35
        chains_to_start,
678
35
        need_to_restart: need_to_restart_current || need_to_restart_next,
679
    }
680
35
}
681

            
682
/// Select `SyncMode` to use for a container chain.
683
/// We want to use warp sync unless the db still exists, or the block number is 0 (because of a warp sync bug in that case).
684
/// The reason is that warp sync doesn't work if a database already exists, it falls back to full sync instead.
685
fn select_sync_mode(
686
    db_exists: bool,
687
    orchestrator_client: &Arc<ParachainClient>,
688
    container_chain_para_id: ParaId,
689
) -> sc_service::error::Result<SyncMode> {
690
    if db_exists {
691
        // If the user wants to use warp sync, they should have already removed the database
692
        return Ok(SyncMode::Full);
693
    }
694

            
695
    // The following check is only needed because of this bug:
696
    // https://github.com/paritytech/polkadot-sdk/issues/1930
697

            
698
    let orchestrator_runtime_api = orchestrator_client.runtime_api();
699
    let orchestrator_chain_info = orchestrator_client.chain_info();
700

            
701
    // Force container chains to use warp sync, unless full sync is needed for some reason
702
    let full_sync_needed = if !orchestrator_runtime_api
703
        .has_api::<dyn AuthorNotingApi<Block, AccountId, BlockNumber, ParaId>>(
704
            orchestrator_chain_info.best_hash,
705
        )
706
        .map_err(|e| format!("Failed to check if runtime has AuthorNotingApi: {}", e))?
707
    {
708
        // Before runtime API was implemented we don't know if the container chain has any blocks,
709
        // so use full sync because that always works
710
        true
711
    } else {
712
        // If the container chain is still at genesis block, use full sync because warp sync is broken
713
        orchestrator_runtime_api
714
            .latest_author(orchestrator_chain_info.best_hash, container_chain_para_id)
715
            .map_err(|e| format!("Failed to read latest author: {}", e))?
716
            .is_none()
717
    };
718

            
719
    if full_sync_needed {
720
        Ok(SyncMode::Full)
721
    } else {
722
        Ok(SyncMode::Warp)
723
    }
724
}
725

            
726
/// Start a container chain using `new_partial` and check if the database is valid. If not, delete the db.
727
/// The caller may need to wait a few seconds before trying to start the same container chain again, to
728
/// give the database enough time to close.
729
// TODO: instead of waiting, we could also return Weak references to the components `temp_cli.backend`
730
// and `temp_cli.client`, and then the caller would only need to check if the reference counts are 0.
731
fn open_and_maybe_delete_db(
732
    container_chain_cli_config: sc_service::Configuration,
733
    db_path: &Path,
734
    orchestrator_client: &Arc<ParachainClient>,
735
    container_chain_para_id: ParaId,
736
    container_chain_cli: &ContainerChainCli,
737
    keep_db: bool,
738
) -> sc_service::error::Result<()> {
739
    let temp_cli = NodeConfig::new_builder(&container_chain_cli_config, None)?;
740

            
741
    // Check block diff, only needed if keep-db is false
742
    if !keep_db {
743
        // Get latest block number from the container chain client
744
        let last_container_block_temp = temp_cli.client.chain_info().best_number;
745

            
746
        let orchestrator_runtime_api = orchestrator_client.runtime_api();
747
        let orchestrator_chain_info = orchestrator_client.chain_info();
748
        // Get the container chain's latest block from orchestrator chain and compare with client's one
749
        let last_container_block_from_orchestrator = orchestrator_runtime_api
750
            .latest_block_number(orchestrator_chain_info.best_hash, container_chain_para_id)
751
            .unwrap_or_default();
752

            
753
        let max_block_diff_allowed = 100u32;
754
        if last_container_block_from_orchestrator
755
            .unwrap_or(0u32)
756
            .abs_diff(last_container_block_temp)
757
            > max_block_diff_allowed
758
        {
759
            // if the diff is big, delete db and restart using warp sync
760
            delete_container_chain_db(db_path);
761
            return Ok(());
762
        }
763
    }
764

            
765
    // Generate genesis hash to compare against container client's genesis hash
766
    let container_preloaded_genesis = container_chain_cli.preloaded_chain_spec.as_ref().unwrap();
767

            
768
    // Check with both state versions
769
    let block_v0: Block =
770
        generate_genesis_block(&**container_preloaded_genesis, sp_runtime::StateVersion::V0)
771
            .map_err(|e| format!("{:?}", e))?;
772
    let chain_spec_genesis_hash_v0 = block_v0.header().hash();
773

            
774
    let block_v1: Block =
775
        generate_genesis_block(&**container_preloaded_genesis, sp_runtime::StateVersion::V1)
776
            .map_err(|e| format!("{:?}", e))?;
777
    let chain_spec_genesis_hash_v1 = block_v1.header().hash();
778

            
779
    let container_client_genesis_hash = temp_cli.client.chain_info().genesis_hash;
780

            
781
    if container_client_genesis_hash != chain_spec_genesis_hash_v0
782
        && container_client_genesis_hash != chain_spec_genesis_hash_v1
783
    {
784
        log::info!("Container genesis V0: {:?}", chain_spec_genesis_hash_v0);
785
        log::info!("Container genesis V1: {:?}", chain_spec_genesis_hash_v1);
786
        log::info!(
787
            "Chain spec genesis {:?} did not match with any container genesis - Restarting...",
788
            container_client_genesis_hash
789
        );
790
        delete_container_chain_db(db_path);
791
        return Ok(());
792
    }
793

            
794
    Ok(())
795
}
796

            
797
/// Remove the container chain database folder. This is called with db_path:
798
///     `Collator2002-01/data/containers/chains/simple_container_2002/paritydb/full-container-2002`
799
/// but we want to delete everything under
800
///     `Collator2002-01/data/containers/chains/simple_container_2002`
801
/// So we use `delete_empty_folders_recursive` to try to remove the parent folders as well, but only
802
/// if they are empty. This is to avoid removing any secret keys or other important data.
803
fn delete_container_chain_db(db_path: &Path) {
804
    // Remove folder `full-container-2002`
805
    let _ = std::fs::remove_dir_all(db_path);
806
    // Remove all the empty folders inside `simple_container_2002`, including self
807
    if let Some(parent) = db_path.ancestors().nth(2) {
808
        delete_empty_folders_recursive(parent);
809
    }
810
}
811

            
812
/// Removes all empty folders in `path`, recursively. Then, if `path` is empty, it removes it as well.
813
/// Ignores any IO errors.
814
fn delete_empty_folders_recursive(path: &Path) {
815
    let entry_iter = std::fs::read_dir(path);
816
    let entry_iter = match entry_iter {
817
        Ok(x) => x,
818
        Err(_e) => return,
819
    };
820

            
821
    for entry in entry_iter {
822
        let entry = match entry {
823
            Ok(x) => x,
824
            Err(_e) => continue,
825
        };
826

            
827
        let path = entry.path();
828
        if path.is_dir() {
829
            delete_empty_folders_recursive(&path);
830
        }
831
    }
832

            
833
    // Try to remove dir. Returns an error if the directory is not empty, but we ignore it.
834
    let _ = std::fs::remove_dir(path);
835
}
836

            
837
/// Parse a list of boot nodes in `Vec<u8>` format. Invalid boot nodes are filtered out.
838
3
fn parse_boot_nodes_ignore_invalid(
839
3
    boot_nodes_raw: Vec<Vec<u8>>,
840
3
    container_chain_para_id: ParaId,
841
3
) -> Vec<MultiaddrWithPeerId> {
842
3
    boot_nodes_raw
843
3
        .into_iter()
844
3
        .filter_map(|x| {
845
3
            let x = String::from_utf8(x)
846
3
                .map_err(|e| {
847
1
                    log::debug!(
848
                        "Invalid boot node in container chain {}: {}",
849
                        container_chain_para_id,
850
                        e
851
                    );
852
3
                })
853
3
                .ok()?;
854

            
855
2
            x.parse::<MultiaddrWithPeerId>()
856
2
                .map_err(|e| {
857
1
                    log::debug!(
858
                        "Invalid boot node in container chain {}: {}",
859
                        container_chain_para_id,
860
                        e
861
                    )
862
2
                })
863
2
                .ok()
864
3
        })
865
3
        .collect()
866
3
}
867

            
868
async fn wait_for_paritydb_lock(db_path: &Path, max_timeout: Duration) -> Result<(), String> {
869
    let now = Instant::now();
870

            
871
    while now.elapsed() < max_timeout {
872
        let lock_held = check_paritydb_lock_held(db_path)
873
            .map_err(|e| format!("Failed to check if lock file is held: {}", e))?;
874
        if !lock_held {
875
            return Ok(());
876
        }
877
        sleep(Duration::from_secs(1)).await;
878
    }
879

            
880
    Err("Timeout when waiting for paritydb lock".to_string())
881
}
882

            
883
/// Given a path to a paritydb database, check if its lock file is held. This indicates that a
884
/// background process is still using the database, so we should wait before trying to open it.
885
///
886
/// This should be kept up to date with the way paritydb handles the lock file:
887
/// <https://github.com/paritytech/parity-db/blob/2b6820e310a08678d4540c044f41a93d87343ac8/src/db.rs#L215>
888
fn check_paritydb_lock_held(db_path: &Path) -> Result<bool, std::io::Error> {
889
    if !db_path.is_dir() {
890
        // Lock file does not exist, so it is not held
891
        return Ok(false);
892
    }
893

            
894
    let mut lock_path: std::path::PathBuf = db_path.to_owned();
895
    lock_path.push("lock");
896
    let lock_file = std::fs::OpenOptions::new()
897
        .create(true)
898
        .read(true)
899
        .write(true)
900
        .truncate(true)
901
        .open(lock_path.as_path())?;
902
    // Check if the lock file is busy by trying to lock it.
903
    // Returns err if failed to adquire the lock.
904
    let lock_held = lock_file.try_lock_exclusive().is_err();
905

            
906
    Ok(lock_held)
907
}
908

            
909
#[cfg(test)]
910
mod tests {
911
    use {super::*, std::path::PathBuf};
912

            
913
    // Copy of ContainerChainSpawner with extra assertions for tests, and mocked spawn function.
914
    struct MockContainerChainSpawner {
915
        state: Arc<Mutex<ContainerChainSpawnerState>>,
916
        orchestrator_para_id: ParaId,
917
        collate_on_tanssi: Arc<
918
            dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
919
        >,
920
        collation_cancellation_constructs: Option<()>,
921
        // Keep track of the last CollateOn message, for tests
922
        currently_collating_on: Arc<Mutex<Option<ParaId>>>,
923
    }
924

            
925
    impl MockContainerChainSpawner {
926
10
        fn new() -> Self {
927
10
            let orchestrator_para_id = 1000.into();
928
10
            // The node always starts as an orchestrator chain collator
929
10
            let currently_collating_on = Arc::new(Mutex::new(Some(orchestrator_para_id)));
930
10
            let currently_collating_on2 = currently_collating_on.clone();
931
10
            let collate_closure = move || {
932
3
                let mut cco = currently_collating_on2.lock().unwrap();
933
3
                assert_ne!(
934
3
                    *cco,
935
3
                    Some(orchestrator_para_id),
936
                    "Received CollateOn message when we were already collating on this chain: {}",
937
                    orchestrator_para_id
938
                );
939
3
                *cco = Some(orchestrator_para_id);
940
3
                let (_, receiver) = futures::channel::oneshot::channel();
941
3
                (CancellationToken::new(), receiver)
942
3
            };
943
10
            let collate_on_tanssi: Arc<
944
10
                dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>)
945
10
                    + Send
946
10
                    + Sync,
947
10
            > = Arc::new(collate_closure);
948
10

            
949
10
            Self {
950
10
                state: Arc::new(Mutex::new(ContainerChainSpawnerState {
951
10
                    spawned_container_chains: Default::default(),
952
10
                    assigned_para_id: Some(orchestrator_para_id),
953
10
                    next_assigned_para_id: None,
954
10
                    failed_para_ids: Default::default(),
955
10
                    spawned_containers_monitor: Default::default(),
956
10
                })),
957
10
                orchestrator_para_id,
958
10
                collate_on_tanssi,
959
10
                // Some if collator starts on orchestrator chain
960
10
                collation_cancellation_constructs: Some(()),
961
10
                currently_collating_on,
962
10
            }
963
10
        }
964

            
965
21
        fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) {
966
21
            let (signal, _on_exit) = oneshot::channel();
967
21
            let currently_collating_on2 = self.currently_collating_on.clone();
968
21
            let collate_closure = move || {
969
13
                let mut cco = currently_collating_on2.lock().unwrap();
970
13
                assert_ne!(
971
13
                    *cco,
972
13
                    Some(container_chain_para_id),
973
                    "Received CollateOn message when we were already collating on this chain: {}",
974
                    container_chain_para_id
975
                );
976
13
                *cco = Some(container_chain_para_id);
977
13
                let (_, receiver) = futures::channel::oneshot::channel();
978
13
                (CancellationToken::new(), receiver)
979
13
            };
980
21
            let collate_on: Arc<
981
21
                dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>)
982
21
                    + Send
983
21
                    + Sync,
984
21
            > = Arc::new(collate_closure);
985
21
            // Dummy db_path for tests, is not actually used
986
21
            let db_path = PathBuf::from(format!("/tmp/container-{}/db", container_chain_para_id));
987
21

            
988
21
            let old = self
989
21
                .state
990
21
                .lock()
991
21
                .expect("poison error")
992
21
                .spawned_container_chains
993
21
                .insert(
994
21
                    container_chain_para_id,
995
21
                    ContainerChainState {
996
21
                        stop_handle: StopContainerChain { signal, id: 0 },
997
21
                        db_path,
998
21
                    },
999
21
                );
21

            
21
            assert!(
21
                old.is_none(),
                "tried to spawn a container chain that was already running: {}",
                container_chain_para_id
            );
21
            if start_collation {
13
                let (_cancellation_token, _exit_receiver) = collate_on();
13
            }
21
        }
15
        fn stop(&self, container_chain_para_id: ParaId) {
15
            let stop_handle = self
15
                .state
15
                .lock()
15
                .expect("poison error")
15
                .spawned_container_chains
15
                .remove(&container_chain_para_id);
15

            
15
            match stop_handle {
15
                Some(_stop_handle) => {
15
                    log::info!("Stopping container chain {}", container_chain_para_id);
                }
                None => {
                    panic!(
                        "Tried to stop a container chain that is not running: {}",
                        container_chain_para_id
                    );
                }
            }
            // Update currently_collating_on, if we stopped the chain we are no longer collating there
15
            let mut lco = self.currently_collating_on.lock().unwrap();
15
            if *lco == Some(container_chain_para_id) {
7
                *lco = None;
8
            }
15
        }
35
        fn handle_update_assignment(&mut self, current: Option<ParaId>, next: Option<ParaId>) {
35
            let HandleUpdateAssignmentResult {
35
                chains_to_stop,
35
                chains_to_start,
35
                need_to_restart,
35
            } = handle_update_assignment_state_change(
35
                &mut self.state.lock().unwrap(),
35
                self.orchestrator_para_id,
35
                current,
35
                next,
35
            );
35

            
35
            if current != Some(self.orchestrator_para_id) {
                // If not assigned to orchestrator chain anymore, we need to stop the collator process
27
                let mut cco = self.currently_collating_on.lock().unwrap();
27
                if *cco == Some(self.orchestrator_para_id) {
10
                    *cco = None;
17
                }
27
                self.collation_cancellation_constructs = None;
8
            } else if self.collation_cancellation_constructs.is_none() {
3
                let (_cancellation_token, _exit_notification_receiver) = (self.collate_on_tanssi)();
3
                self.collation_cancellation_constructs = Some(());
5
            }
            // Assert we never start and stop the same container chain
56
            for para_id in &chains_to_start {
21
                if !need_to_restart {
4
                    assert!(
4
                        !chains_to_stop.contains(para_id),
                        "Tried to start and stop same container chain: {}",
                        para_id
                    );
                } else {
                    // Will try to start and stop container chain with id "current" or "next", so ignore that
17
                    if Some(*para_id) != current && Some(*para_id) != next {
                        assert!(
                            !chains_to_stop.contains(para_id),
                            "Tried to start and stop same container chain: {}",
                            para_id
                        );
17
                    }
                }
            }
            // Assert we never start or stop the orchestrator chain
35
            assert!(!chains_to_start.contains(&self.orchestrator_para_id));
35
            assert!(!chains_to_stop.contains(&self.orchestrator_para_id));
            // Stop all container chains that are no longer needed
50
            for para_id in chains_to_stop {
15
                self.stop(para_id);
15
            }
            // Start all new container chains (usually 1)
56
            for para_id in chains_to_start {
21
                // Edge case: when starting the node it may be assigned to a container chain, so we need to
21
                // start a container chain already collating.
21
                let start_collation = Some(para_id) == current;
21
                self.spawn(para_id, start_collation);
21
            }
            // Assert that if we are currently assigned to a container chain, we are collating there
35
            if let Some(para_id) = current {
24
                self.assert_collating_on(Some(para_id));
24
            } else {
11
                self.assert_collating_on(None);
11
            }
35
        }
        #[track_caller]
71
        fn assert_collating_on(&self, para_id: Option<ParaId>) {
71
            let currently_collating_on = *self.currently_collating_on.lock().unwrap();
71
            assert_eq!(currently_collating_on, para_id);
71
        }
        #[track_caller]
36
        fn assert_running_chains(&self, para_ids: &[ParaId]) {
36
            let mut actually_running: Vec<ParaId> = self
36
                .state
36
                .lock()
36
                .unwrap()
36
                .spawned_container_chains
36
                .keys()
36
                .cloned()
36
                .collect();
36
            actually_running.sort();
36
            let mut should_be_running = para_ids.to_vec();
36
            should_be_running.sort();
36
            assert_eq!(actually_running, should_be_running);
36
        }
    }
    #[test]
1
    fn starts_collating_on_tanssi() {
1
        let mut m = MockContainerChainSpawner::new();
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn assigned_to_orchestrator_chain() {
1
        let mut m = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(Some(1000.into()), None);
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, Some(1000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn assigned_to_container_chain() {
1
        let mut m = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, Some(2000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn spawn_container_chains() {
1
        let mut m = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(1000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1

            
1
        m.handle_update_assignment(Some(2001.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2001.into()));
1
        m.assert_running_chains(&[2001.into()]);
1

            
1
        m.handle_update_assignment(Some(2001.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(2001.into()));
1
        m.assert_running_chains(&[2001.into()]);
1

            
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn swap_current_next() {
1
        // Going from (2000, 2001) to (2001, 2000) shouldn't start or stop any container chains
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1

            
1
        m.handle_update_assignment(Some(2001.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2001.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1
    }
    #[test]
1
    fn stop_collating_orchestrator() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(Some(1000.into()), None);
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(Some(1000.into()), None);
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn stop_collating_container() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        m.handle_update_assignment(None, Some(2000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        // This will send a CollateOn message to the same chain as the last CollateOn,
1
        // but this is needed because that chain has been stopped
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn stop_collating_container_start_immediately() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1

            
1
        // This will start the chain already collating
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn stop_all_chains() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1

            
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn keep_collating_on_container() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1

            
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(None, Some(2000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[2000.into()]);
1

            
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn invalid_boot_nodes_are_ignored() {
1
        let para_id = 100.into();
1
        let bootnode1 =
1
            b"/ip4/127.0.0.1/tcp/33049/ws/p2p/12D3KooWHVMhQDHBpj9vQmssgyfspYecgV6e3hH1dQVDUkUbCYC9"
1
                .to_vec();
1
        assert_eq!(
1
            parse_boot_nodes_ignore_invalid(vec![b"A".to_vec()], para_id),
1
            vec![]
1
        );
1
        assert_eq!(
1
            parse_boot_nodes_ignore_invalid(vec![b"\xff".to_vec()], para_id),
1
            vec![]
1
        );
        // Valid boot nodes are not ignored
1
        assert_eq!(
1
            parse_boot_nodes_ignore_invalid(vec![bootnode1], para_id).len(),
1
            1
1
        );
1
    }
    #[test]
1
    fn path_ancestors() {
1
        // Test the implementation of `delete_container_chain_db`
1
        let db_path = PathBuf::from("/tmp/zombienet/Collator2002-01/data/containers/chains/simple_container_2002/paritydb/full-container-2002");
1
        let parent = db_path.ancestors().nth(2).unwrap();
1

            
1
        assert_eq!(
1
            parent,
1
            PathBuf::from(
1
                "/tmp/zombienet/Collator2002-01/data/containers/chains/simple_container_2002"
1
            )
1
        )
1
    }
}