1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
18

            
19
use tokio_util::sync::CancellationToken;
20
#[allow(deprecated)]
21
use {
22
    crate::{
23
        cli::ContainerChainCli,
24
        container_chain_spawner::{CcSpawnMsg, ContainerChainSpawner},
25
    },
26
    cumulus_client_cli::CollatorOptions,
27
    cumulus_client_collator::service::CollatorService,
28
    cumulus_client_consensus_common::{
29
        ParachainBlockImport as TParachainBlockImport, ParachainBlockImportMarker,
30
    },
31
    cumulus_client_consensus_proposer::Proposer,
32
    cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig},
33
    cumulus_client_service::{
34
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, ParachainHostFunctions,
35
        StartRelayChainTasksParams,
36
    },
37
    cumulus_primitives_core::{
38
        relay_chain::{well_known_keys as RelayWellKnownKeys, CollatorPair},
39
        ParaId,
40
    },
41
    cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface},
42
    dancebox_runtime::{
43
        opaque::{Block, Hash},
44
        RuntimeApi,
45
    },
46
    dc_orchestrator_chain_interface::{
47
        OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash, PHeader,
48
    },
49
    dp_slot_duration_runtime_api::TanssiSlotDurationApi,
50
    futures::{Stream, StreamExt},
51
    nimbus_primitives::NimbusPair,
52
    node_common::service::NodeBuilderConfig,
53
    node_common::service::{ManualSealConfiguration, NodeBuilder, Sealing},
54
    pallet_registrar_runtime_api::RegistrarApi,
55
    parity_scale_codec::Encode,
56
    polkadot_cli::ProvideRuntimeApi,
57
    polkadot_parachain_primitives::primitives::HeadData,
58
    polkadot_service::Handle,
59
    sc_basic_authorship::ProposerFactory,
60
    sc_client_api::{
61
        AuxStore, Backend as BackendT, BlockchainEvents, HeaderBackend, UsageProvider,
62
    },
63
    sc_consensus::{BasicQueue, BlockImport, ImportQueue},
64
    sc_executor::{NativeElseWasmExecutor, WasmExecutor},
65
    sc_network::NetworkBlock,
66
    sc_network_sync::SyncingService,
67
    sc_service::{Configuration, SpawnTaskHandle, TFullBackend, TFullClient, TaskManager},
68
    sc_telemetry::TelemetryHandle,
69
    sc_transaction_pool::FullPool,
70
    sp_api::StorageProof,
71
    sp_consensus::{EnableProofRecording, SyncOracle},
72
    sp_consensus_slots::{Slot, SlotDuration},
73
    sp_core::{traits::SpawnEssentialNamed, H256},
74
    sp_keystore::KeystorePtr,
75
    sp_state_machine::{Backend as StateBackend, StorageValue},
76
    std::{pin::Pin, sync::Arc, time::Duration},
77
    substrate_prometheus_endpoint::Registry,
78
    tc_consensus::{
79
        collators::lookahead::{
80
            self as lookahead_tanssi_aura, Params as LookaheadTanssiAuraParams,
81
        },
82
        OrchestratorAuraWorkerAuxData,
83
    },
84
    tokio::sync::mpsc::{unbounded_channel, UnboundedSender},
85
};
86

            
87
mod mocked_relay_keys;
88

            
89
type FullBackend = TFullBackend<Block>;
90

            
91
/// Native executor type.
92
pub struct ParachainNativeExecutor;
93

            
94
impl sc_executor::NativeExecutionDispatch for ParachainNativeExecutor {
95
    type ExtendHostFunctions = ParachainHostFunctions;
96

            
97
79674
    fn dispatch(method: &str, data: &[u8]) -> Option<Vec<u8>> {
98
79674
        dancebox_runtime::api::dispatch(method, data)
99
79674
    }
100

            
101
664
    fn native_version() -> sc_executor::NativeVersion {
102
664
        dancebox_runtime::native_version()
103
664
    }
104
}
105

            
106
pub struct NodeConfig;
107
impl NodeBuilderConfig for NodeConfig {
108
    type Block = Block;
109
    type RuntimeApi = RuntimeApi;
110
    type ParachainExecutor = ParachainExecutor;
111
}
112

            
113
pub struct ContainerChainNodeConfig;
114
impl NodeBuilderConfig for ContainerChainNodeConfig {
115
    type Block = Block;
116
    // TODO: RuntimeApi here should be the subset of runtime apis available for all containers
117
    // Currently we are using the orchestrator runtime apis
118
    type RuntimeApi = RuntimeApi;
119
    type ParachainExecutor = ContainerChainExecutor;
120
}
121

            
122
// Orchestrator chain types
123
type ParachainExecutor = NativeElseWasmExecutor<ParachainNativeExecutor>;
124
pub type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
125
pub type ParachainBackend = TFullBackend<Block>;
126
type DevParachainBlockImport = OrchestratorParachainBlockImport<Arc<ParachainClient>>;
127
type ParachainBlockImport = TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
128
type ParachainProposerFactory =
129
    ProposerFactory<FullPool<Block, ParachainClient>, ParachainClient, EnableProofRecording>;
130

            
131
// Container chains types
132
type ContainerChainExecutor = WasmExecutor<ParachainHostFunctions>;
133
pub type ContainerChainClient = TFullClient<Block, RuntimeApi, ContainerChainExecutor>;
134
pub type ContainerChainBackend = ParachainBackend;
135
type ContainerChainBlockImport =
136
    TParachainBlockImport<Block, Arc<ContainerChainClient>, ContainerChainBackend>;
137

            
138
thread_local!(static TIMESTAMP: std::cell::RefCell<u64> = const { std::cell::RefCell::new(0) });
139

            
140
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
141
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
142
struct MockTimestampInherentDataProvider;
143
#[async_trait::async_trait]
144
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
145
    async fn provide_inherent_data(
146
        &self,
147
        inherent_data: &mut sp_inherents::InherentData,
148
6806
    ) -> Result<(), sp_inherents::Error> {
149
6806
        TIMESTAMP.with(|x| {
150
6806
            *x.borrow_mut() += dancebox_runtime::SLOT_DURATION;
151
6806
            inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &*x.borrow())
152
6806
        })
153
6806
    }
154

            
155
    async fn try_handle_error(
156
        &self,
157
        _identifier: &sp_inherents::InherentIdentifier,
158
        _error: &[u8],
159
    ) -> Option<Result<(), sp_inherents::Error>> {
160
        // The pallet never reports error.
161
        None
162
    }
163
}
164

            
165
/// Background task used to detect changes to container chain assignment,
166
/// and start/stop container chains on demand. The check runs on every new block.
167
pub fn build_check_assigned_para_id(
168
    client: Arc<ParachainClient>,
169
    sync_keystore: KeystorePtr,
170
    cc_spawn_tx: UnboundedSender<CcSpawnMsg>,
171
    spawner: impl SpawnEssentialNamed,
172
) {
173
    // Subscribe to new blocks in order to react to para id assignment
174
    // This must be the stream of finalized blocks, otherwise the collators may rotate to a
175
    // different chain before the block is finalized, and that could lead to a stalled chain
176
    let mut import_notifications = client.finality_notification_stream();
177

            
178
    let check_assigned_para_id_task = async move {
179
        while let Some(msg) = import_notifications.next().await {
180
            let block_hash = msg.hash;
181
            let client_set_aside_for_cidp = client.clone();
182
            let sync_keystore = sync_keystore.clone();
183
            let cc_spawn_tx = cc_spawn_tx.clone();
184

            
185
            check_assigned_para_id(
186
                cc_spawn_tx,
187
                sync_keystore,
188
                client_set_aside_for_cidp,
189
                block_hash,
190
            )
191
            .unwrap();
192
        }
193
    };
194

            
195
    spawner.spawn_essential(
196
        "check-assigned-para-id",
197
        None,
198
        Box::pin(check_assigned_para_id_task),
199
    );
200
}
201

            
202
/// Check the parachain assignment using the orchestrator chain client, and send a `CcSpawnMsg` to
203
/// start or stop the required container chains.
204
///
205
/// Checks the assignment for the next block, so if there is a session change on block 15, this will
206
/// detect the assignment change after importing block 14.
207
fn check_assigned_para_id(
208
    cc_spawn_tx: UnboundedSender<CcSpawnMsg>,
209
    sync_keystore: KeystorePtr,
210
    client_set_aside_for_cidp: Arc<ParachainClient>,
211
    block_hash: H256,
212
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
213
    // Check current assignment
214
    let current_container_chain_para_id =
215
        tc_consensus::first_eligible_key::<Block, ParachainClient, NimbusPair>(
216
            client_set_aside_for_cidp.as_ref(),
217
            &block_hash,
218
            sync_keystore.clone(),
219
        )
220
        .map(|(_nimbus_key, para_id)| para_id);
221

            
222
    // Check assignment in the next session
223
    let next_container_chain_para_id =
224
        tc_consensus::first_eligible_key_next_session::<Block, ParachainClient, NimbusPair>(
225
            client_set_aside_for_cidp.as_ref(),
226
            &block_hash,
227
            sync_keystore,
228
        )
229
        .map(|(_nimbus_key, para_id)| para_id);
230

            
231
    cc_spawn_tx.send(CcSpawnMsg::UpdateAssignment {
232
        current: current_container_chain_para_id,
233
        next: next_container_chain_para_id,
234
    })?;
235

            
236
    Ok(())
237
}
238

            
239
pub fn import_queue(
240
    parachain_config: &Configuration,
241
    node_builder: &NodeBuilder<NodeConfig>,
242
) -> (ParachainBlockImport, BasicQueue<Block>) {
243
    // The nimbus import queue ONLY checks the signature correctness
244
    // Any other checks corresponding to the author-correctness should be done
245
    // in the runtime
246
    let block_import =
247
        ParachainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
248

            
249
    let import_queue = nimbus_consensus::import_queue(
250
        node_builder.client.clone(),
251
        block_import.clone(),
252
        move |_, _| async move {
253
            let time = sp_timestamp::InherentDataProvider::from_system_time();
254

            
255
            Ok((time,))
256
        },
257
        &node_builder.task_manager.spawn_essential_handle(),
258
        parachain_config.prometheus_registry(),
259
        false,
260
    )
261
    .expect("function never fails");
262

            
263
    (block_import, import_queue)
264
}
265

            
266
pub fn container_chain_import_queue(
267
    parachain_config: &Configuration,
268
    node_builder: &NodeBuilder<ContainerChainNodeConfig>,
269
) -> (ContainerChainBlockImport, BasicQueue<Block>) {
270
    // The nimbus import queue ONLY checks the signature correctness
271
    // Any other checks corresponding to the author-correctness should be done
272
    // in the runtime
273
    let block_import =
274
        ContainerChainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
275

            
276
    let import_queue = nimbus_consensus::import_queue(
277
        node_builder.client.clone(),
278
        block_import.clone(),
279
        move |_, _| async move {
280
            let time = sp_timestamp::InherentDataProvider::from_system_time();
281

            
282
            Ok((time,))
283
        },
284
        &node_builder.task_manager.spawn_essential_handle(),
285
        parachain_config.prometheus_registry(),
286
        false,
287
    )
288
    .expect("function never fails");
289

            
290
    (block_import, import_queue)
291
}
292

            
293
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
294
///
295
/// This is the actual implementation that is abstract over the executor and the runtime api.
296
#[sc_tracing::logging::prefix_logs_with("Orchestrator")]
297
async fn start_node_impl(
298
    orchestrator_config: Configuration,
299
    polkadot_config: Configuration,
300
    mut container_chain_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
301
    collator_options: CollatorOptions,
302
    para_id: ParaId,
303
    hwbench: Option<sc_sysinfo::HwBench>,
304
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
305
    let parachain_config = prepare_node_config(orchestrator_config);
306
    if let Some((container_chain_cli, _)) = &mut container_chain_config {
307
        // If the container chain args have no --wasmtime-precompiled flag, use the same as the orchestrator
308
        if container_chain_cli
309
            .base
310
            .base
311
            .import_params
312
            .wasmtime_precompiled
313
            .is_none()
314
        {
315
            container_chain_cli
316
                .base
317
                .base
318
                .import_params
319
                .wasmtime_precompiled
320
                .clone_from(&parachain_config.wasmtime_precompiled);
321
        }
322
    }
323

            
324
    let chain_type: sc_chain_spec::ChainType = parachain_config.chain_spec.chain_type();
325
    let relay_chain = crate::chain_spec::Extensions::try_get(&*parachain_config.chain_spec)
326
        .map(|e| e.relay_chain.clone())
327
        .ok_or("Could not find relay_chain extension in chain-spec.")?;
328

            
329
    // Channel to send messages to start/stop container chains
330
    let (cc_spawn_tx, cc_spawn_rx) = unbounded_channel();
331

            
332
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
333
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
334

            
335
    let (block_import, import_queue) = import_queue(&parachain_config, &node_builder);
336

            
337
    let (relay_chain_interface, collator_key) = node_builder
338
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
339
        .await?;
340

            
341
    let validator = parachain_config.role.is_authority();
342
    let force_authoring = parachain_config.force_authoring;
343

            
344
    let node_builder = node_builder
345
        .build_cumulus_network::<_, sc_network::NetworkWorker<_, _>>(
346
            &parachain_config,
347
            para_id,
348
            import_queue,
349
            relay_chain_interface.clone(),
350
        )
351
        .await?;
352

            
353
    let rpc_builder = {
354
        let client = node_builder.client.clone();
355
        let transaction_pool = node_builder.transaction_pool.clone();
356

            
357
        Box::new(move |deny_unsafe, _| {
358
            let deps = crate::rpc::FullDeps {
359
                client: client.clone(),
360
                pool: transaction_pool.clone(),
361
                deny_unsafe,
362
                command_sink: None,
363
                xcm_senders: None,
364
            };
365

            
366
            crate::rpc::create_full(deps).map_err(Into::into)
367
        })
368
    };
369

            
370
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
371

            
372
    let relay_chain_slot_duration = Duration::from_secs(6);
373
    let overseer_handle = relay_chain_interface
374
        .overseer_handle()
375
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
376
    let sync_keystore = node_builder.keystore_container.keystore();
377
    let mut collate_on_tanssi: Arc<
378
        dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
379
    > = Arc::new(move || {
380
        if validator {
381
            panic!("Called uninitialized collate_on_tanssi");
382
        } else {
383
            panic!("Called collate_on_tanssi when node is not running as a validator");
384
        }
385
    });
386

            
387
    let announce_block = {
388
        let sync_service = node_builder.network.sync_service.clone();
389
        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
390
    };
391

            
392
    let (mut node_builder, import_queue_service) = node_builder.extract_import_queue_service();
393

            
394
    start_relay_chain_tasks(StartRelayChainTasksParams {
395
        client: node_builder.client.clone(),
396
        announce_block: announce_block.clone(),
397
        para_id,
398
        relay_chain_interface: relay_chain_interface.clone(),
399
        task_manager: &mut node_builder.task_manager,
400
        da_recovery_profile: if validator {
401
            DARecoveryProfile::Collator
402
        } else {
403
            DARecoveryProfile::FullNode
404
        },
405
        import_queue: import_queue_service,
406
        relay_chain_slot_duration,
407
        recovery_handle: Box::new(overseer_handle.clone()),
408
        sync_service: node_builder.network.sync_service.clone(),
409
    })?;
410

            
411
    if validator {
412
        let collator_key = collator_key
413
            .clone()
414
            .expect("Command line arguments do not allow this. qed");
415

            
416
        // Start task which detects para id assignment, and starts/stops container chains.
417
        // Note that if this node was started without a `container_chain_config`, we don't
418
        // support collation on container chains, so there is no need to detect changes to assignment
419
        if container_chain_config.is_some() {
420
            build_check_assigned_para_id(
421
                node_builder.client.clone(),
422
                sync_keystore.clone(),
423
                cc_spawn_tx.clone(),
424
                node_builder.task_manager.spawn_essential_handle(),
425
            );
426
        }
427

            
428
        let start_collation = {
429
            // Params for collate_on_tanssi closure
430
            let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
431
            let node_keystore = node_builder.keystore_container.keystore().clone();
432
            let node_telemetry_handle = node_builder.telemetry.as_ref().map(|t| t.handle()).clone();
433
            let node_client = node_builder.client.clone();
434
            let node_backend = node_builder.backend.clone();
435
            let relay_interface = relay_chain_interface.clone();
436
            let node_sync_service = node_builder.network.sync_service.clone();
437
            let orchestrator_tx_pool = node_builder.transaction_pool.clone();
438
            let overseer = overseer_handle.clone();
439
            let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
440
                node_spawn_handle.clone(),
441
                node_client.clone(),
442
                node_builder.transaction_pool.clone(),
443
                node_builder.prometheus_registry.as_ref(),
444
                node_telemetry_handle.clone(),
445
            );
446

            
447
            move || {
448
                start_consensus_orchestrator(
449
                    node_client.clone(),
450
                    node_backend.clone(),
451
                    block_import.clone(),
452
                    node_spawn_handle.clone(),
453
                    relay_interface.clone(),
454
                    node_sync_service.clone(),
455
                    node_keystore.clone(),
456
                    force_authoring,
457
                    relay_chain_slot_duration,
458
                    para_id,
459
                    collator_key.clone(),
460
                    overseer.clone(),
461
                    announce_block.clone(),
462
                    proposer_factory.clone(),
463
                    orchestrator_tx_pool.clone(),
464
                )
465
            }
466
        };
467
        // Save callback for later, used when collator rotates from container chain back to orchestrator chain
468
        collate_on_tanssi = Arc::new(start_collation);
469
    }
470

            
471
    node_builder.network.start_network.start_network();
472

            
473
    let sync_keystore = node_builder.keystore_container.keystore();
474
    let orchestrator_chain_interface_builder = OrchestratorChainInProcessInterfaceBuilder {
475
        client: node_builder.client.clone(),
476
        backend: node_builder.backend.clone(),
477
        sync_oracle: node_builder.network.sync_service.clone(),
478
        overseer_handle: overseer_handle.clone(),
479
    };
480

            
481
    if let Some((container_chain_cli, tokio_handle)) = container_chain_config {
482
        // If the orchestrator chain is running as a full-node, we start a full node for the
483
        // container chain immediately, because only collator nodes detect their container chain
484
        // assignment so otherwise it will never start.
485
        if !validator {
486
            if let Some(container_chain_para_id) = container_chain_cli.base.para_id {
487
                // Spawn new container chain node
488
                cc_spawn_tx
489
                    .send(CcSpawnMsg::UpdateAssignment {
490
                        current: Some(container_chain_para_id.into()),
491
                        next: Some(container_chain_para_id.into()),
492
                    })
493
                    .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
494
            }
495
        }
496

            
497
        // Start container chain spawner task. This will start and stop container chains on demand.
498
        let orchestrator_client = node_builder.client.clone();
499
        let spawn_handle = node_builder.task_manager.spawn_handle();
500
        let container_chain_spawner = ContainerChainSpawner {
501
            orchestrator_chain_interface: orchestrator_chain_interface_builder.build(),
502
            orchestrator_client,
503
            orchestrator_tx_pool: node_builder.transaction_pool.clone(),
504
            container_chain_cli,
505
            tokio_handle,
506
            chain_type,
507
            relay_chain,
508
            relay_chain_interface,
509
            collator_key,
510
            sync_keystore,
511
            orchestrator_para_id: para_id,
512
            validator,
513
            spawn_handle,
514
            state: Default::default(),
515
            collate_on_tanssi,
516
            collation_cancellation_constructs: None,
517
        };
518
        let state = container_chain_spawner.state.clone();
519

            
520
        node_builder.task_manager.spawn_essential_handle().spawn(
521
            "container-chain-spawner-rx-loop",
522
            None,
523
            container_chain_spawner.rx_loop(cc_spawn_rx, validator),
524
        );
525

            
526
        node_builder.task_manager.spawn_essential_handle().spawn(
527
            "container-chain-spawner-debug-state",
528
            None,
529
            crate::container_chain_monitor::monitor_task(state),
530
        )
531
    }
532

            
533
    Ok((node_builder.task_manager, node_builder.client))
534
}
535

            
536
// Log string that will be shown for the container chain: `[Container-2000]`.
537
// This needs to be a separate function because the `prefix_logs_with` macro
538
// has trouble parsing expressions.
539
fn container_log_str(para_id: ParaId) -> String {
540
    format!("Container-{}", para_id)
541
}
542

            
543
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
544
///
545
/// This is the actual implementation that is abstract over the executor and the runtime api.
546
#[sc_tracing::logging::prefix_logs_with(container_log_str(para_id))]
547
pub async fn start_node_impl_container(
548
    parachain_config: Configuration,
549
    orchestrator_client: Arc<ParachainClient>,
550
    orchestrator_tx_pool: Arc<FullPool<Block, ParachainClient>>,
551
    relay_chain_interface: Arc<dyn RelayChainInterface>,
552
    orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
553
    collator_key: Option<CollatorPair>,
554
    keystore: KeystorePtr,
555
    para_id: ParaId,
556
    orchestrator_para_id: ParaId,
557
    collator: bool,
558
) -> sc_service::error::Result<(
559
    TaskManager,
560
    Arc<ContainerChainClient>,
561
    Arc<ParachainBackend>,
562
)> {
563
    let parachain_config = prepare_node_config(parachain_config);
564

            
565
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
566
    let node_builder = ContainerChainNodeConfig::new_builder(&parachain_config, None)?;
567

            
568
    let (block_import, import_queue) =
569
        container_chain_import_queue(&parachain_config, &node_builder);
570
    let import_queue_service = import_queue.service();
571

            
572
    log::info!("are we collators? {:?}", collator);
573
    let node_builder = node_builder
574
        .build_cumulus_network::<_, sc_network::NetworkWorker<_, _>>(
575
            &parachain_config,
576
            para_id,
577
            import_queue,
578
            relay_chain_interface.clone(),
579
        )
580
        .await?;
581

            
582
    let force_authoring = parachain_config.force_authoring;
583
    let prometheus_registry = parachain_config.prometheus_registry().cloned();
584

            
585
    let rpc_builder = {
586
        let client = node_builder.client.clone();
587
        let transaction_pool = node_builder.transaction_pool.clone();
588

            
589
        Box::new(move |deny_unsafe, _| {
590
            let deps = crate::rpc::FullDeps {
591
                client: client.clone(),
592
                pool: transaction_pool.clone(),
593
                deny_unsafe,
594
                command_sink: None,
595
                xcm_senders: None,
596
            };
597

            
598
            crate::rpc::create_full(deps).map_err(Into::into)
599
        })
600
    };
601

            
602
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
603

            
604
    let announce_block = {
605
        let sync_service = node_builder.network.sync_service.clone();
606
        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
607
    };
608

            
609
    let relay_chain_slot_duration = Duration::from_secs(6);
610

            
611
    let overseer_handle = relay_chain_interface
612
        .overseer_handle()
613
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
614
    let (mut node_builder, _) = node_builder.extract_import_queue_service();
615

            
616
    start_relay_chain_tasks(StartRelayChainTasksParams {
617
        client: node_builder.client.clone(),
618
        announce_block: announce_block.clone(),
619
        para_id,
620
        relay_chain_interface: relay_chain_interface.clone(),
621
        task_manager: &mut node_builder.task_manager,
622
        da_recovery_profile: if collator {
623
            DARecoveryProfile::Collator
624
        } else {
625
            DARecoveryProfile::FullNode
626
        },
627
        import_queue: import_queue_service,
628
        relay_chain_slot_duration,
629
        recovery_handle: Box::new(overseer_handle.clone()),
630
        sync_service: node_builder.network.sync_service.clone(),
631
    })?;
632

            
633
    if collator {
634
        let collator_key = collator_key
635
            .clone()
636
            .expect("Command line arguments do not allow this. qed");
637

            
638
        let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
639
        let node_client = node_builder.client.clone();
640
        let node_backend = node_builder.backend.clone();
641

            
642
        start_consensus_container(
643
            node_client.clone(),
644
            node_backend.clone(),
645
            orchestrator_client.clone(),
646
            orchestrator_tx_pool.clone(),
647
            block_import.clone(),
648
            prometheus_registry.clone(),
649
            node_builder.telemetry.as_ref().map(|t| t.handle()).clone(),
650
            node_spawn_handle.clone(),
651
            relay_chain_interface.clone(),
652
            orchestrator_chain_interface.clone(),
653
            node_builder.transaction_pool.clone(),
654
            node_builder.network.sync_service.clone(),
655
            keystore.clone(),
656
            force_authoring,
657
            relay_chain_slot_duration,
658
            para_id,
659
            orchestrator_para_id,
660
            collator_key.clone(),
661
            overseer_handle.clone(),
662
            announce_block.clone(),
663
        );
664
    }
665

            
666
    node_builder.network.start_network.start_network();
667

            
668
    Ok((
669
        node_builder.task_manager,
670
        node_builder.client,
671
        node_builder.backend,
672
    ))
673
}
674

            
675
/// Build the import queue for the parachain runtime (manual seal).
676
164
fn build_manual_seal_import_queue(
677
164
    _client: Arc<ParachainClient>,
678
164
    block_import: DevParachainBlockImport,
679
164
    config: &Configuration,
680
164
    _telemetry: Option<TelemetryHandle>,
681
164
    task_manager: &TaskManager,
682
164
) -> Result<sc_consensus::DefaultImportQueue<Block>, sc_service::Error> {
683
164
    Ok(sc_consensus_manual_seal::import_queue(
684
164
        Box::new(block_import),
685
164
        &task_manager.spawn_essential_handle(),
686
164
        config.prometheus_registry(),
687
164
    ))
688
164
}
689

            
690
#[sc_tracing::logging::prefix_logs_with(container_log_str(para_id))]
691
fn start_consensus_container(
692
    client: Arc<ContainerChainClient>,
693
    backend: Arc<FullBackend>,
694
    orchestrator_client: Arc<ParachainClient>,
695
    orchestrator_tx_pool: Arc<FullPool<Block, ParachainClient>>,
696
    block_import: ContainerChainBlockImport,
697
    prometheus_registry: Option<Registry>,
698
    telemetry: Option<TelemetryHandle>,
699
    spawner: SpawnTaskHandle,
700
    relay_chain_interface: Arc<dyn RelayChainInterface>,
701
    orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
702
    transaction_pool: Arc<sc_transaction_pool::FullPool<Block, ContainerChainClient>>,
703
    sync_oracle: Arc<SyncingService<Block>>,
704
    keystore: KeystorePtr,
705
    force_authoring: bool,
706
    relay_chain_slot_duration: Duration,
707
    para_id: ParaId,
708
    orchestrator_para_id: ParaId,
709
    collator_key: CollatorPair,
710
    overseer_handle: OverseerHandle,
711
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
712
) {
713
    let slot_duration = cumulus_client_consensus_aura::slot_duration(&*orchestrator_client)
714
        .expect("start_consensus_container: slot duration should exist");
715

            
716
    let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
717
        spawner.clone(),
718
        client.clone(),
719
        transaction_pool,
720
        prometheus_registry.as_ref(),
721
        telemetry.clone(),
722
    );
723

            
724
    let proposer = Proposer::new(proposer_factory);
725

            
726
    let collator_service = CollatorService::new(
727
        client.clone(),
728
        Arc::new(spawner.clone()),
729
        announce_block,
730
        client.clone(),
731
    );
732

            
733
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
734
    let relay_chain_interace_for_orch = relay_chain_interface.clone();
735
    let orchestrator_client_for_cidp = orchestrator_client.clone();
736
    let client_for_cidp = client.clone();
737
    let client_for_hash_provider = client.clone();
738
    let client_for_slot_duration = client.clone();
739

            
740
    let code_hash_provider = move |block_hash| {
741
        client_for_hash_provider
742
            .code_at(block_hash)
743
            .ok()
744
            .map(polkadot_primitives::ValidationCode)
745
            .map(|c| c.hash())
746
    };
747

            
748
    let params = LookaheadTanssiAuraParams {
749
        get_current_slot_duration: move |block_hash| {
750
            // Default to 12s if runtime API does not exist
751
            let slot_duration_ms = client_for_slot_duration
752
                .runtime_api()
753
                .slot_duration(block_hash)
754
                .unwrap_or(12_000);
755

            
756
            SlotDuration::from_millis(slot_duration_ms)
757
        },
758
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
759
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
760
            let orchestrator_chain_interface = orchestrator_chain_interface.clone();
761
            let client = client_for_cidp.clone();
762

            
763
            async move {
764
                let authorities_noting_inherent =
765
                    ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::create_at(
766
                        relay_parent,
767
                        &relay_chain_interface,
768
                        &orchestrator_chain_interface,
769
                        orchestrator_para_id,
770
                    )
771
                    .await;
772

            
773
                let slot_duration = {
774
                    // Default to 12s if runtime API does not exist
775
                    let slot_duration_ms = client
776
                        .runtime_api()
777
                        .slot_duration(block_hash)
778
                        .unwrap_or(12_000);
779

            
780
                    SlotDuration::from_millis(slot_duration_ms)
781
                };
782

            
783
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
784

            
785
                let slot =
786
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
787
							*timestamp,
788
							slot_duration,
789
						);
790

            
791
                let authorities_noting_inherent = authorities_noting_inherent.ok_or_else(|| {
792
                    Box::<dyn std::error::Error + Send + Sync>::from(
793
                        "Failed to create authoritiesnoting inherent",
794
                    )
795
                })?;
796

            
797
                Ok((slot, timestamp, authorities_noting_inherent))
798
            }
799
        },
800
        get_orchestrator_aux_data: move |_block_hash, (relay_parent, _validation_data)| {
801
            let relay_chain_interace_for_orch = relay_chain_interace_for_orch.clone();
802
            let orchestrator_client_for_cidp = orchestrator_client_for_cidp.clone();
803

            
804
            async move {
805
                let latest_header =
806
                    ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::get_latest_orchestrator_head_info(
807
                        relay_parent,
808
                        &relay_chain_interace_for_orch,
809
                        orchestrator_para_id,
810
                    )
811
                    .await;
812

            
813
                let latest_header = latest_header.ok_or_else(|| {
814
                    Box::<dyn std::error::Error + Send + Sync>::from(
815
                        "Failed to fetch latest header",
816
                    )
817
                })?;
818

            
819
                let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
820
                    orchestrator_client_for_cidp.as_ref(),
821
                    &latest_header.hash(),
822
                    para_id,
823
                );
824

            
825
                let authorities = authorities.ok_or_else(|| {
826
                    Box::<dyn std::error::Error + Send + Sync>::from(
827
                        "Failed to fetch authorities with error",
828
                    )
829
                })?;
830

            
831
                log::info!(
832
                    "Authorities {:?} found for header {:?}",
833
                    authorities,
834
                    latest_header
835
                );
836

            
837
                let slot_freq = tc_consensus::min_slot_freq::<Block, ParachainClient, NimbusPair>(
838
                    orchestrator_client_for_cidp.as_ref(),
839
                    &latest_header.hash(),
840
                    para_id,
841
                );
842

            
843
                let aux_data = OrchestratorAuraWorkerAuxData {
844
                    authorities,
845
                    slot_freq,
846
                };
847

            
848
                Ok(aux_data)
849
            }
850
        },
851
        block_import,
852
        para_client: client,
853
        relay_client: relay_chain_interface,
854
        sync_oracle,
855
        keystore,
856
        collator_key,
857
        para_id,
858
        overseer_handle,
859
        orchestrator_slot_duration: slot_duration,
860
        force_authoring,
861
        relay_chain_slot_duration,
862
        proposer,
863
        collator_service,
864
        // Very limited proposal time.
865
        authoring_duration: Duration::from_millis(500),
866
        para_backend: backend,
867
        code_hash_provider,
868
        // This cancellation token is no-op as it is not shared outside.
869
        cancellation_token: CancellationToken::new(),
870
        orchestrator_tx_pool,
871
        orchestrator_client,
872
    };
873

            
874
    let (fut, _exit_notification_receiver) =
875
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
876
            params,
877
        );
878
    spawner.spawn("tanssi-aura-container", None, fut);
879
}
880

            
881
/// Start collator task for orchestrator chain.
882
/// Returns a `CancellationToken` that can be used to cancel the collator task,
883
/// and a `oneshot::Receiver<()>` that can be used to wait until the task has ended.
884
fn start_consensus_orchestrator(
885
    client: Arc<ParachainClient>,
886
    backend: Arc<FullBackend>,
887
    block_import: ParachainBlockImport,
888
    spawner: SpawnTaskHandle,
889
    relay_chain_interface: Arc<dyn RelayChainInterface>,
890
    sync_oracle: Arc<SyncingService<Block>>,
891
    keystore: KeystorePtr,
892
    force_authoring: bool,
893
    relay_chain_slot_duration: Duration,
894
    para_id: ParaId,
895
    collator_key: CollatorPair,
896
    overseer_handle: OverseerHandle,
897
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
898
    proposer_factory: ParachainProposerFactory,
899
    orchestrator_tx_pool: Arc<FullPool<Block, ParachainClient>>,
900
) -> (CancellationToken, futures::channel::oneshot::Receiver<()>) {
901
    let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)
902
        .expect("start_consensus_orchestrator: slot duration should exist");
903

            
904
    let proposer = Proposer::new(proposer_factory);
905

            
906
    let collator_service = CollatorService::new(
907
        client.clone(),
908
        Arc::new(spawner.clone()),
909
        announce_block,
910
        client.clone(),
911
    );
912

            
913
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
914
    let client_set_aside_for_cidp = client.clone();
915
    let client_set_aside_for_orch = client.clone();
916
    let client_for_hash_provider = client.clone();
917
    let client_for_slot_duration_provider = client.clone();
918

            
919
    let code_hash_provider = move |block_hash| {
920
        client_for_hash_provider
921
            .code_at(block_hash)
922
            .ok()
923
            .map(polkadot_primitives::ValidationCode)
924
            .map(|c| c.hash())
925
    };
926

            
927
    let cancellation_token = CancellationToken::new();
928

            
929
    let params = LookaheadTanssiAuraParams {
930
        get_current_slot_duration: move |block_hash| {
931
            sc_consensus_aura::standalone::slot_duration_at(
932
                &*client_for_slot_duration_provider,
933
                block_hash,
934
            )
935
            .expect("Slot duration should be set")
936
        },
937
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
938
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
939
            let client_set_aside_for_cidp = client_set_aside_for_cidp.clone();
940
            async move {
941
                let para_ids = client_set_aside_for_cidp
942
                    .runtime_api()
943
                    .registered_paras(block_hash)?;
944
                let para_ids: Vec<_> = para_ids.into_iter().collect();
945
                let author_noting_inherent =
946
                    tp_author_noting_inherent::OwnParachainInherentData::create_at(
947
                        relay_parent,
948
                        &relay_chain_interface,
949
                        &para_ids,
950
                    )
951
                    .await;
952

            
953
                // Fetch duration every block to avoid downtime when passing from 12 to 6s
954
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
955
                    &*client_set_aside_for_cidp.clone(),
956
                    block_hash,
957
                )
958
                .expect("Slot duration should be set");
959

            
960
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
961

            
962
                let slot =
963
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
964
							*timestamp,
965
							slot_duration,
966
						);
967

            
968
                let author_noting_inherent = author_noting_inherent.ok_or_else(|| {
969
                    Box::<dyn std::error::Error + Send + Sync>::from(
970
                        "Failed to create author noting inherent",
971
                    )
972
                })?;
973

            
974
                Ok((slot, timestamp, author_noting_inherent))
975
            }
976
        },
977
        get_orchestrator_aux_data: move |block_hash: H256, (_relay_parent, _validation_data)| {
978
            let client_set_aside_for_orch = client_set_aside_for_orch.clone();
979

            
980
            async move {
981
                let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
982
                    client_set_aside_for_orch.as_ref(),
983
                    &block_hash,
984
                    para_id,
985
                );
986

            
987
                let authorities = authorities.ok_or_else(|| {
988
                    Box::<dyn std::error::Error + Send + Sync>::from(
989
                        "Failed to fetch authorities with error",
990
                    )
991
                })?;
992

            
993
                log::info!(
994
                    "Authorities {:?} found for header {:?}",
995
                    authorities,
996
                    block_hash
997
                );
998

            
999
                let aux_data = OrchestratorAuraWorkerAuxData {
                    authorities,
                    // This is the orchestrator consensus, it does not have a slot frequency
                    slot_freq: None,
                };
                Ok(aux_data)
            }
        },
        block_import,
        para_client: client.clone(),
        relay_client: relay_chain_interface,
        sync_oracle,
        keystore,
        collator_key,
        para_id,
        overseer_handle,
        orchestrator_slot_duration: slot_duration,
        relay_chain_slot_duration,
        force_authoring,
        proposer,
        collator_service,
        // Very limited proposal time.
        authoring_duration: Duration::from_millis(500),
        code_hash_provider,
        para_backend: backend,
        cancellation_token: cancellation_token.clone(),
        orchestrator_tx_pool,
        orchestrator_client: client,
    };
    let (fut, exit_notification_receiver) =
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
            params,
        );
    spawner.spawn("tanssi-aura", None, fut);
    (cancellation_token, exit_notification_receiver)
}
/// Start a parachain node.
pub async fn start_parachain_node(
    parachain_config: Configuration,
    polkadot_config: Configuration,
    container_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
    collator_options: CollatorOptions,
    para_id: ParaId,
    hwbench: Option<sc_sysinfo::HwBench>,
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
    start_node_impl(
        parachain_config,
        polkadot_config,
        container_config,
        collator_options,
        para_id,
        hwbench,
    )
    .await
}
pub const SOFT_DEADLINE_PERCENT: sp_runtime::Percent = sp_runtime::Percent::from_percent(100);
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
///
/// This is the actual implementation that is abstract over the executor and the runtime api.
164
#[sc_tracing::logging::prefix_logs_with("Orchestrator Dev Node")]
pub fn start_dev_node(
    orchestrator_config: Configuration,
    sealing: Sealing,
    hwbench: Option<sc_sysinfo::HwBench>,
    para_id: ParaId,
) -> sc_service::error::Result<TaskManager> {
    let parachain_config = prepare_node_config(orchestrator_config);
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
    let node_builder = NodeConfig::new_builder(&parachain_config, hwbench)?;
    // This node block import.
    let block_import = DevParachainBlockImport::new(node_builder.client.clone());
    let import_queue = build_manual_seal_import_queue(
        node_builder.client.clone(),
        block_import.clone(),
        &parachain_config,
        node_builder
            .telemetry
            .as_ref()
            .map(|telemetry| telemetry.handle()),
        &node_builder.task_manager,
    )?;
    // Build a Substrate Network. (not cumulus since it is a dev node, it mocks
    // the relaychain)
    let mut node_builder = node_builder
        .build_substrate_network::<sc_network::NetworkWorker<_, _>>(
            &parachain_config,
            import_queue,
        )?;
    // If we're running a collator dev node we must install manual seal block
    // production.
    let mut command_sink = None;
    let mut xcm_senders = None;
    if parachain_config.role.is_authority() {
        let client = node_builder.client.clone();
        let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
        let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
        xcm_senders = Some((downward_xcm_sender, hrmp_xcm_sender));
        command_sink = node_builder.install_manual_seal(ManualSealConfiguration {
            block_import,
            sealing,
            soft_deadline: Some(SOFT_DEADLINE_PERCENT),
            select_chain: sc_consensus::LongestChain::new(node_builder.backend.clone()),
            consensus_data_provider: Some(Box::new(
                tc_consensus::OrchestratorManualSealAuraConsensusDataProvider::new(
                    node_builder.client.clone(),
                    node_builder.keystore_container.keystore(),
                    para_id,
                ),
            )),
6806
            create_inherent_data_providers: move |block: H256, ()| {
6806
                let current_para_block = client
6806
                    .number(block)
6806
                    .expect("Header lookup should succeed")
6806
                    .expect("Header passed in as parent should be present in backend.");
6806

            
6806
                let para_ids = client
6806
                    .runtime_api()
6806
                    .registered_paras(block)
6806
                    .expect("registered_paras runtime API should exist")
6806
                    .into_iter()
6806
                    .collect();
6806

            
6806
                let hash = client
6806
                    .hash(current_para_block.saturating_sub(1))
6806
                    .expect("Hash of the desired block must be present")
6806
                    .expect("Hash of the desired block should exist");
6806

            
6806
                let para_header = client
6806
                    .expect_header(hash)
6806
                    .expect("Expected parachain header should exist")
6806
                    .encode();
6806

            
6806
                let para_head_data = HeadData(para_header).encode();
6806
                let para_head_key = RelayWellKnownKeys::para_head(para_id);
6806
                let relay_slot_key = RelayWellKnownKeys::CURRENT_SLOT.to_vec();
6806

            
6806
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
6806
                    &*client.clone(),
6806
                    block,
6806
                ).expect("Slot duration should be set");
6806

            
6806
                let mut timestamp = 0u64;
6806
                TIMESTAMP.with(|x| {
6806
                    timestamp = x.clone().take();
6806
                });
6806

            
6806
                timestamp += dancebox_runtime::SLOT_DURATION;
6806
                let relay_slot = sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
6806
						timestamp.into(),
6806
						slot_duration,
6806
                    );
6806
                let relay_slot = u64::from(*relay_slot);
6806

            
6806
                let downward_xcm_receiver = downward_xcm_receiver.clone();
6806
                let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
6806

            
6806
                let client_for_xcm = client.clone();
6806
                async move {
6806
                    let mocked_author_noting =
6806
                        tp_author_noting_inherent::MockAuthorNotingInherentDataProvider {
6806
                            current_para_block,
6806
                            relay_offset: 1000,
6806
                            relay_blocks_per_para_block: 2,
6806
                            para_ids,
6806
                            slots_per_para_block: 1,
6806
                        };
6806
                    let mut additional_keys = mocked_author_noting.get_key_values();
6806
                    // Mock only chain 2002 in relay.
6806
                    // This will allow any signed origin to deregister chains 2000 and 2001, and register 2002.
6806
                    let (registrar_paras_key_2002, para_info_2002) = mocked_relay_keys::get_mocked_registrar_paras(2002.into());
6806
                    additional_keys.extend([(para_head_key, para_head_data), (relay_slot_key, Slot::from(relay_slot).encode()), (registrar_paras_key_2002, para_info_2002)]);
6806

            
6806
                    let time = MockTimestampInherentDataProvider;
6806
                    let mocked_parachain = MockValidationDataInherentDataProvider {
6806
                        current_para_block,
6806
                        relay_offset: 1000,
6806
                        relay_blocks_per_para_block: 2,
6806
                        // TODO: Recheck
6806
                        para_blocks_per_relay_epoch: 10,
6806
                        relay_randomness_config: (),
6806
                        xcm_config: MockXcmConfig::new(
6806
                            &*client_for_xcm,
6806
                            block,
6806
                            para_id,
6806
                            Default::default(),
6806
                        ),
6806
                        raw_downward_messages: downward_xcm_receiver.drain().collect(),
6806
                        raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
6806
                        additional_key_values: Some(additional_keys),
6806
                    };
6806

            
6806
                    Ok((time, mocked_parachain, mocked_author_noting))
6806
                }
6806
            },
        })?;
    }
    // This node RPC builder.
    let rpc_builder = {
        let client = node_builder.client.clone();
        let transaction_pool = node_builder.transaction_pool.clone();
328
        Box::new(move |deny_unsafe, _| {
328
            let deps = crate::rpc::FullDeps {
328
                client: client.clone(),
328
                pool: transaction_pool.clone(),
328
                deny_unsafe,
328
                command_sink: command_sink.clone(),
328
                xcm_senders: xcm_senders.clone(),
328
            };
328

            
328
            crate::rpc::create_full(deps).map_err(Into::into)
328
        })
    };
    // We spawn all the common substrate tasks to properly run a node.
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
    log::info!("Development Service Ready");
    // We start the networking part.
    node_builder.network.start_network.start_network();
    Ok(node_builder.task_manager)
}
/// Can be called for a `Configuration` to check if it is a configuration for
/// the orchestrator network.
pub trait IdentifyVariant {
    /// Returns `true` if this is a configuration for a dev network.
    fn is_dev(&self) -> bool;
}
impl IdentifyVariant for Box<dyn sc_service::ChainSpec> {
164
    fn is_dev(&self) -> bool {
164
        self.chain_type() == sc_chain_spec::ChainType::Development
164
    }
}
/// Orchestrator Parachain Block import. We cannot use the one in cumulus as it overrides the best
/// chain selection rule
#[derive(Clone)]
pub struct OrchestratorParachainBlockImport<BI> {
    inner: BI,
}
impl<BI> OrchestratorParachainBlockImport<BI> {
    /// Create a new instance.
164
    pub fn new(inner: BI) -> Self {
164
        Self { inner }
164
    }
}
/// We simply rely on the inner
#[async_trait::async_trait]
impl<BI> BlockImport<Block> for OrchestratorParachainBlockImport<BI>
where
    BI: BlockImport<Block> + Send,
{
    type Error = BI::Error;
    async fn check_block(
        &mut self,
        block: sc_consensus::BlockCheckParams<Block>,
    ) -> Result<sc_consensus::ImportResult, Self::Error> {
        self.inner.check_block(block).await
    }
    async fn import_block(
        &mut self,
        params: sc_consensus::BlockImportParams<Block>,
6806
    ) -> Result<sc_consensus::ImportResult, Self::Error> {
6806
        let res = self.inner.import_block(params).await?;
6806

            
6806
        Ok(res)
6806
    }
}
/// But we need to implement the ParachainBlockImportMarker trait to fullfil
impl<BI> ParachainBlockImportMarker for OrchestratorParachainBlockImport<BI> {}
/// Builder for a concrete relay chain interface, created from a full node. Builds
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
///
/// The builder takes a [`polkadot_client::Client`]
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
struct OrchestratorChainInProcessInterfaceBuilder {
    client: Arc<ParachainClient>,
    backend: Arc<FullBackend>,
    sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
    overseer_handle: Handle,
}
impl OrchestratorChainInProcessInterfaceBuilder {
    pub fn build(self) -> Arc<dyn OrchestratorChainInterface> {
        Arc::new(OrchestratorChainInProcessInterface::new(
            self.client,
            self.backend,
            self.sync_oracle,
            self.overseer_handle,
        ))
    }
}
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
pub struct OrchestratorChainInProcessInterface<Client> {
    pub full_client: Arc<Client>,
    pub backend: Arc<FullBackend>,
    pub sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
    pub overseer_handle: Handle,
}
impl<Client> OrchestratorChainInProcessInterface<Client> {
    /// Create a new instance of [`RelayChainInProcessInterface`]
    pub fn new(
        full_client: Arc<Client>,
        backend: Arc<FullBackend>,
        sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
        overseer_handle: Handle,
    ) -> Self {
        Self {
            full_client,
            backend,
            sync_oracle,
            overseer_handle,
        }
    }
}
impl<T> Clone for OrchestratorChainInProcessInterface<T> {
    fn clone(&self) -> Self {
        Self {
            full_client: self.full_client.clone(),
            backend: self.backend.clone(),
            sync_oracle: self.sync_oracle.clone(),
            overseer_handle: self.overseer_handle.clone(),
        }
    }
}
#[async_trait::async_trait]
impl<Client> OrchestratorChainInterface for OrchestratorChainInProcessInterface<Client>
where
    Client: ProvideRuntimeApi<Block>
        + BlockchainEvents<Block>
        + AuxStore
        + UsageProvider<Block>
        + Sync
        + Send,
{
    async fn get_storage_by_key(
        &self,
        orchestrator_parent: PHash,
        key: &[u8],
    ) -> OrchestratorChainResult<Option<StorageValue>> {
        let state = self.backend.state_at(orchestrator_parent)?;
        state
            .storage(key)
            .map_err(OrchestratorChainError::GenericError)
    }
    async fn prove_read(
        &self,
        orchestrator_parent: PHash,
        relevant_keys: &[Vec<u8>],
    ) -> OrchestratorChainResult<StorageProof> {
        let state_backend = self.backend.state_at(orchestrator_parent)?;
        sp_state_machine::prove_read(state_backend, relevant_keys)
            .map_err(OrchestratorChainError::StateMachineError)
    }
    fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
        Ok(self.overseer_handle.clone())
    }
    /// Get a stream of import block notifications.
    async fn import_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notification_stream = self
            .full_client
            .import_notification_stream()
            .map(|notification| notification.header);
        Ok(Box::pin(notification_stream))
    }
    /// Get a stream of new best block notifications.
    async fn new_best_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notifications_stream =
            self.full_client
                .import_notification_stream()
                .filter_map(|notification| async move {
                    notification.is_new_best.then_some(notification.header)
                });
        Ok(Box::pin(notifications_stream))
    }
    /// Get a stream of finality notifications.
    async fn finality_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notification_stream = self
            .full_client
            .finality_notification_stream()
            .map(|notification| notification.header);
        Ok(Box::pin(notification_stream))
    }
}