1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    async_io::Timer,
19
    core::time::Duration,
20
    core_extensions::TypeIdentity,
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_client_consensus_common::ParachainConsensus,
23
    cumulus_client_service::{
24
        build_relay_chain_interface, CollatorSybilResistance, ParachainHostFunctions,
25
        StartFullNodeParams,
26
    },
27
    cumulus_primitives_core::ParaId,
28
    cumulus_relay_chain_interface::RelayChainInterface,
29
    frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE,
30
    futures::{channel::mpsc, FutureExt, Stream, StreamExt},
31
    jsonrpsee::RpcModule,
32
    polkadot_primitives::CollatorPair,
33
    sc_client_api::Backend,
34
    sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue},
35
    sc_consensus_manual_seal::{
36
        run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams,
37
    },
38
    sc_executor::{
39
        sp_wasm_interface::{ExtendedHostFunctions, HostFunctions},
40
        HeapAllocStrategy, NativeElseWasmExecutor, NativeExecutionDispatch, RuntimeVersionOf,
41
        WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
42
    },
43
    sc_network::{config::FullNetworkConfiguration, NetworkBlock},
44
    sc_network_sync::SyncingService,
45
    sc_network_transactions::TransactionsHandlerController,
46
    sc_rpc::{DenyUnsafe, SubscriptionTaskExecutor},
47
    sc_service::{
48
        Configuration, KeystoreContainer, NetworkStarter, SpawnTaskHandle, TFullBackend,
49
        TFullClient, TaskManager,
50
    },
51
    sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
52
    sc_transaction_pool_api::OffchainTransactionPoolFactory,
53
    sc_utils::mpsc::TracingUnboundedSender,
54
    sp_api::ConstructRuntimeApi,
55
    sp_block_builder::BlockBuilder,
56
    sp_consensus::SelectChain,
57
    sp_core::traits::CodeExecutor,
58
    sp_inherents::CreateInherentDataProviders,
59
    sp_offchain::OffchainWorkerApi,
60
    sp_runtime::Percent,
61
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
62
    std::{str::FromStr, sync::Arc},
63
};
64

            
65
/// Trait to configure the main types the builder rely on, bundled in a single
66
/// type to reduce verbosity and the amount of type parameters.
67
pub trait NodeBuilderConfig {
68
    type Block;
69
    type RuntimeApi;
70
    type ParachainExecutor;
71

            
72
    /// Create a new `NodeBuilder` using the types of this `Config`, along
73
    /// with the parachain `Configuration` and an optional `HwBench`.
74
350
    fn new_builder(
75
350
        parachain_config: &Configuration,
76
350
        hwbench: Option<sc_sysinfo::HwBench>,
77
350
    ) -> Result<NodeBuilder<Self>, sc_service::Error>
78
350
    where
79
350
        Self: Sized,
80
350
        BlockOf<Self>: cumulus_primitives_core::BlockT,
81
350
        ExecutorOf<Self>:
82
350
            Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
83
350
        RuntimeApiOf<Self>:
84
350
            ConstructRuntimeApi<BlockOf<Self>, ClientOf<Self>> + Sync + Send + 'static,
85
350
        ConstructedRuntimeApiOf<Self>:
86
350
            TaggedTransactionQueue<BlockOf<Self>> + BlockBuilder<BlockOf<Self>>,
87
350
    {
88
350
        NodeBuilder::<Self>::new(parachain_config, hwbench)
89
350
    }
90
}
91

            
92
pub type BlockOf<T> = <T as NodeBuilderConfig>::Block;
93
pub type BlockHashOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Hash;
94
pub type BlockHeaderOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Header;
95
pub type RuntimeApiOf<T> = <T as NodeBuilderConfig>::RuntimeApi;
96
pub type ExecutorOf<T> = <T as NodeBuilderConfig>::ParachainExecutor;
97
pub type ClientOf<T> = TFullClient<BlockOf<T>, RuntimeApiOf<T>, ExecutorOf<T>>;
98
pub type BackendOf<T> = TFullBackend<BlockOf<T>>;
99
pub type ConstructedRuntimeApiOf<T> =
100
    <RuntimeApiOf<T> as ConstructRuntimeApi<BlockOf<T>, ClientOf<T>>>::RuntimeApi;
101
pub type ImportQueueServiceOf<T> = Box<dyn ImportQueueService<BlockOf<T>>>;
102
pub type ParachainConsensusOf<T> = Box<dyn ParachainConsensus<BlockOf<T>>>;
103

            
104
// `Cumulus` and `TxHandler` are types that will change during the life of
105
// a `NodeBuilder` because they are generated and consumed when calling
106
// certain functions, with absence of data represented with `()`. Some
107
// function are implemented only for a given concrete type, which ensure it
108
// can only be called if the required data is available (generated and not yet
109
// consumed).
110
//
111
// While this could be implemented with multiple impl blocks with concrete types,
112
// we use here `core_extensions::TypeIdentity` which allow to express type
113
// identity/equality as a trait bound on each function as it removes the
114
// boilerplate of many impl block with duplicated trait bounds. 2 impl blocks
115
// are still required since Rust can't infer the types in the `new` function
116
// that doesn't take `self`.
117
pub struct NodeBuilder<
118
    T: NodeBuilderConfig,
119
    // `(cumulus_client_service/sc_service)::build_network` returns many important systems,
120
    // but can only be called with an `import_queue` which can be different in
121
    // each node. For that reason it is a `()` when calling `new`, then the
122
    // caller create the `import_queue` using systems contained in `NodeBuilder`,
123
    // then call `build_cumulus_network` with it to generate the cumulus systems.
124
    SNetwork = (),
125
    // The `TxHandler` is constructed in `build_X_network`
126
    // and is then consumed when calling `spawn_common_tasks`.
127
    STxHandler = (),
128
    // The import queue service is obtained from the import queue in
129
    // `build_cumulus_network` or `build_substrate_network`, which also
130
    // consumes the import queue. Neither of them are clonable, so we need to
131
    // to store the service here to be able to consume it later in
132
    // `start_full_node`.
133
    SImportQueueService = (),
134
> where
135
    BlockOf<T>: cumulus_primitives_core::BlockT,
136
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
137
    RuntimeApiOf<T>: ConstructRuntimeApi<BlockOf<T>, ClientOf<T>> + Sync + Send + 'static,
138
    ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>> + BlockBuilder<BlockOf<T>>,
139
{
140
    pub client: Arc<ClientOf<T>>,
141
    pub backend: Arc<BackendOf<T>>,
142
    pub task_manager: TaskManager,
143
    pub keystore_container: KeystoreContainer,
144
    pub transaction_pool: Arc<sc_transaction_pool::FullPool<BlockOf<T>, ClientOf<T>>>,
145
    pub telemetry: Option<Telemetry>,
146
    pub telemetry_worker_handle: Option<TelemetryWorkerHandle>,
147

            
148
    pub hwbench: Option<sc_sysinfo::HwBench>,
149
    pub prometheus_registry: Option<substrate_prometheus_endpoint::Registry>,
150

            
151
    pub network: SNetwork,
152
    pub tx_handler_controller: STxHandler,
153
    pub import_queue_service: SImportQueueService,
154
}
155

            
156
pub struct Network<Block: cumulus_primitives_core::BlockT> {
157
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
158
    pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
159
    pub start_network: NetworkStarter,
160
    pub sync_service: Arc<SyncingService<Block>>,
161
}
162

            
163
/// Allows to create a parachain-defined executor from a `WasmExecutor`
164
pub trait TanssiExecutorExt {
165
    type HostFun: HostFunctions;
166
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self;
167
}
168

            
169
impl TanssiExecutorExt for WasmExecutor<ParachainHostFunctions> {
170
    type HostFun = ParachainHostFunctions;
171

            
172
558
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
173
558
        wasm_executor
174
558
    }
175
}
176

            
177
impl<D> TanssiExecutorExt for NativeElseWasmExecutor<D>
178
where
179
    D: NativeExecutionDispatch,
180
{
181
    type HostFun = ExtendedHostFunctions<sp_io::SubstrateHostFunctions, D::ExtendHostFunctions>;
182

            
183
164
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
184
164
        NativeElseWasmExecutor::new_with_wasm_executor(wasm_executor)
185
164
    }
186
}
187

            
188
// `new` function doesn't take self, and the Rust compiler cannot infer that
189
// only one type T implements `TypeIdentity`. With thus need a separate impl
190
// block with concrete types `()`.
191
impl<T: NodeBuilderConfig> NodeBuilder<T>
192
where
193
    BlockOf<T>: cumulus_primitives_core::BlockT,
194
    ExecutorOf<T>:
195
        Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
196
    RuntimeApiOf<T>: ConstructRuntimeApi<BlockOf<T>, ClientOf<T>> + Sync + Send + 'static,
197
    ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>> + BlockBuilder<BlockOf<T>>,
198
{
199
    /// Create a new `NodeBuilder` which prepare objects required to launch a
200
    /// node. However it only starts telemetry, and doesn't provide any
201
    /// network-dependent objects (as it requires an import queue, which usually
202
    /// is different for each node).
203
350
    fn new(
204
350
        parachain_config: &Configuration,
205
350
        hwbench: Option<sc_sysinfo::HwBench>,
206
350
    ) -> Result<Self, sc_service::Error> {
207
        // Refactor: old new_partial
208

            
209
350
        let telemetry = parachain_config
210
350
            .telemetry_endpoints
211
350
            .clone()
212
350
            .filter(|x| !x.is_empty())
213
350
            .map(|endpoints| -> Result<_, sc_telemetry::Error> {
214
                let worker = TelemetryWorker::new(16)?;
215
                let telemetry = worker.handle().new_telemetry(endpoints);
216
                Ok((worker, telemetry))
217
350
            })
218
350
            .transpose()?;
219

            
220
350
        let heap_pages =
221
350
            parachain_config
222
350
                .default_heap_pages
223
350
                .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
224
                    extra_pages: h as u32,
225
350
                });
226
350

            
227
350
        // Default runtime_cache_size is 2
228
350
        // For now we can work with this, but it will likely need
229
350
        // to change once we start having runtime_cache_sizes, or
230
350
        // run nodes with the maximum for this value
231
350
        let mut wasm_builder = WasmExecutor::builder()
232
350
            .with_execution_method(parachain_config.wasm_method)
233
350
            .with_onchain_heap_alloc_strategy(heap_pages)
234
350
            .with_offchain_heap_alloc_strategy(heap_pages)
235
350
            .with_max_runtime_instances(parachain_config.max_runtime_instances)
236
350
            .with_runtime_cache_size(parachain_config.runtime_cache_size);
237
350
        if let Some(ref wasmtime_precompiled_path) = parachain_config.wasmtime_precompiled {
238
332
            wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
239
332
        }
240

            
241
350
        let executor = ExecutorOf::<T>::new_with_wasm_executor(wasm_builder.build());
242

            
243
350
        let (client, backend, keystore_container, task_manager) =
244
350
            sc_service::new_full_parts::<BlockOf<T>, RuntimeApiOf<T>, _>(
245
350
                parachain_config,
246
350
                telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
247
350
                executor,
248
350
            )?;
249
350
        let client = Arc::new(client);
250
350

            
251
350
        let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
252
350

            
253
350
        let telemetry = telemetry.map(|(worker, telemetry)| {
254
            task_manager
255
                .spawn_handle()
256
                .spawn("telemetry", None, worker.run());
257
            telemetry
258
350
        });
259
350

            
260
350
        let transaction_pool = sc_transaction_pool::BasicPool::new_full(
261
350
            parachain_config.transaction_pool.clone(),
262
350
            parachain_config.role.is_authority().into(),
263
350
            parachain_config.prometheus_registry(),
264
350
            task_manager.spawn_essential_handle(),
265
350
            client.clone(),
266
350
        );
267
350

            
268
350
        Ok(Self {
269
350
            client,
270
350
            backend,
271
350
            transaction_pool,
272
350
            telemetry,
273
350
            telemetry_worker_handle,
274
350
            task_manager,
275
350
            keystore_container,
276
350
            hwbench,
277
350
            prometheus_registry: parachain_config.prometheus_registry().cloned(),
278
350
            network: TypeIdentity::from_type(()),
279
350
            tx_handler_controller: TypeIdentity::from_type(()),
280
350
            import_queue_service: TypeIdentity::from_type(()),
281
350
        })
282
350
    }
283
}
284

            
285
impl<T: NodeBuilderConfig, SNetwork, STxHandler, SImportQueueService>
286
    NodeBuilder<T, SNetwork, STxHandler, SImportQueueService>
287
where
288
    BlockOf<T>: cumulus_primitives_core::BlockT,
289
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
290
    RuntimeApiOf<T>: ConstructRuntimeApi<BlockOf<T>, ClientOf<T>> + Sync + Send + 'static,
291
    ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
292
        + BlockBuilder<BlockOf<T>>
293
        + cumulus_primitives_core::CollectCollationInfo<BlockOf<T>>,
294
{
295
    pub async fn build_relay_chain_interface(
296
        &mut self,
297
        parachain_config: &Configuration,
298
        polkadot_config: Configuration,
299
        collator_options: CollatorOptions,
300
    ) -> sc_service::error::Result<(
301
        Arc<(dyn RelayChainInterface + 'static)>,
302
        Option<CollatorPair>,
303
    )> {
304
        build_relay_chain_interface(
305
            polkadot_config,
306
            parachain_config,
307
            self.telemetry_worker_handle.clone(),
308
            &mut self.task_manager,
309
            collator_options.clone(),
310
            self.hwbench.clone(),
311
        )
312
        .await
313
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))
314
    }
315

            
316
    /// Given an import queue, calls `cumulus_client_service::build_network` and
317
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
318
    ///
319
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
320
    /// data.
321
    pub async fn build_cumulus_network<RCInterface, Net>(
322
        self,
323
        parachain_config: &Configuration,
324
        para_id: ParaId,
325
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
326
        relay_chain_interface: RCInterface,
327
    ) -> sc_service::error::Result<
328
        NodeBuilder<
329
            T,
330
            Network<BlockOf<T>>,
331
            TransactionsHandlerController<BlockHashOf<T>>,
332
            ImportQueueServiceOf<T>,
333
        >,
334
    >
335
    where
336
        SNetwork: TypeIdentity<Type = ()>,
337
        STxHandler: TypeIdentity<Type = ()>,
338
        SImportQueueService: TypeIdentity<Type = ()>,
339
        RCInterface: RelayChainInterface + Clone + 'static,
340
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
341
    {
342
        let Self {
343
            client,
344
            backend,
345
            transaction_pool,
346
            telemetry,
347
            telemetry_worker_handle,
348
            task_manager,
349
            keystore_container,
350
            hwbench,
351
            prometheus_registry,
352
            network: _,
353
            tx_handler_controller: _,
354
            import_queue_service: _,
355
        } = self;
356

            
357
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(&parachain_config.network);
358

            
359
        let import_queue_service = import_queue.service();
360
        let spawn_handle = task_manager.spawn_handle();
361

            
362
        let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
363
            cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
364
                parachain_config,
365
                client: client.clone(),
366
                transaction_pool: transaction_pool.clone(),
367
                spawn_handle,
368
                import_queue,
369
                para_id,
370
                relay_chain_interface,
371
                net_config,
372
                sybil_resistance_level: CollatorSybilResistance::Resistant,
373
            })
374
            .await?;
375

            
376
        Ok(NodeBuilder {
377
            client,
378
            backend,
379
            transaction_pool,
380
            telemetry,
381
            telemetry_worker_handle,
382
            task_manager,
383
            keystore_container,
384
            hwbench,
385
            prometheus_registry,
386
            network: Network {
387
                network,
388
                system_rpc_tx,
389
                start_network,
390
                sync_service,
391
            },
392
            tx_handler_controller,
393
            import_queue_service,
394
        })
395
    }
396

            
397
    /// Given an import queue, calls `sc_service::build_network` and
398
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
399
    ///
400
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
401
    /// data.
402
344
    pub fn build_substrate_network<Net>(
403
344
        self,
404
344
        parachain_config: &Configuration,
405
344
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
406
344
    ) -> sc_service::error::Result<
407
344
        NodeBuilder<
408
344
            T,
409
344
            Network<BlockOf<T>>,
410
344
            TransactionsHandlerController<BlockHashOf<T>>,
411
344
            ImportQueueServiceOf<T>,
412
344
        >,
413
344
    >
414
344
    where
415
344
        SNetwork: TypeIdentity<Type = ()>,
416
344
        STxHandler: TypeIdentity<Type = ()>,
417
344
        SImportQueueService: TypeIdentity<Type = ()>,
418
344
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
419
344
    {
420
344
        let Self {
421
344
            client,
422
344
            backend,
423
344
            transaction_pool,
424
344
            telemetry,
425
344
            telemetry_worker_handle,
426
344
            task_manager,
427
344
            keystore_container,
428
344
            hwbench,
429
344
            prometheus_registry,
430
344
            network: _,
431
344
            tx_handler_controller: _,
432
344
            import_queue_service: _,
433
344
        } = self;
434
344

            
435
344
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(&parachain_config.network);
436
344

            
437
344
        let metrics = Net::register_notification_metrics(
438
344
            parachain_config
439
344
                .prometheus_config
440
344
                .as_ref()
441
344
                .map(|cfg| &cfg.registry),
442
344
        );
443
344

            
444
344
        let import_queue_service = import_queue.service();
445

            
446
344
        let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
447
344
            sc_service::build_network(sc_service::BuildNetworkParams {
448
344
                config: parachain_config,
449
344
                client: client.clone(),
450
344
                transaction_pool: transaction_pool.clone(),
451
344
                spawn_handle: task_manager.spawn_handle(),
452
344
                import_queue,
453
344
                warp_sync_params: None,
454
344
                block_announce_validator_builder: None,
455
344
                net_config,
456
344
                block_relay: None,
457
344
                metrics,
458
344
            })?;
459

            
460
344
        Ok(NodeBuilder {
461
344
            client,
462
344
            backend,
463
344
            transaction_pool,
464
344
            telemetry,
465
344
            telemetry_worker_handle,
466
344
            task_manager,
467
344
            keystore_container,
468
344
            hwbench,
469
344
            prometheus_registry,
470
344
            network: Network {
471
344
                network,
472
344
                system_rpc_tx,
473
344
                start_network,
474
344
                sync_service,
475
344
            },
476
344
            tx_handler_controller,
477
344
            import_queue_service,
478
344
        })
479
344
    }
480

            
481
    /// Given an `rpc_builder`, spawns the common tasks of a Substrate
482
    /// node. It consumes `self.tx_handler_controller` in the process, which means
483
    /// it can only be called once, and any other code that would need this
484
    /// controller should interact with it before calling this function.
485
344
    pub fn spawn_common_tasks<TRpc>(
486
344
        self,
487
344
        parachain_config: Configuration,
488
344
        rpc_builder: Box<
489
344
            dyn Fn(
490
344
                DenyUnsafe,
491
344
                SubscriptionTaskExecutor,
492
344
            ) -> Result<RpcModule<TRpc>, sc_service::Error>,
493
344
        >,
494
344
    ) -> sc_service::error::Result<NodeBuilder<T, Network<BlockOf<T>>, (), SImportQueueService>>
495
344
    where
496
344
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
497
344
        STxHandler: TypeIdentity<Type = TransactionsHandlerController<BlockHashOf<T>>>,
498
344
        BlockHashOf<T>: Unpin,
499
344
        BlockHeaderOf<T>: Unpin,
500
344
        ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
501
344
            + BlockBuilder<BlockOf<T>>
502
344
            + OffchainWorkerApi<BlockOf<T>>
503
344
            + sp_api::Metadata<BlockOf<T>>
504
344
            + sp_session::SessionKeys<BlockOf<T>>,
505
344
    {
506
344
        let NodeBuilder {
507
344
            client,
508
344
            backend,
509
344
            transaction_pool,
510
344
            mut telemetry,
511
344
            telemetry_worker_handle,
512
344
            mut task_manager,
513
344
            keystore_container,
514
344
            hwbench,
515
344
            prometheus_registry,
516
344
            network,
517
344
            tx_handler_controller,
518
344
            import_queue_service,
519
344
        } = self;
520
344

            
521
344
        let network = TypeIdentity::into_type(network);
522
344
        let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller);
523
344

            
524
344
        let collator = parachain_config.role.is_authority();
525
344

            
526
344
        if parachain_config.offchain_worker.enabled {
527
344
            task_manager.spawn_handle().spawn(
528
344
                "offchain-workers-runner",
529
344
                "offchain-work",
530
344
                sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
531
344
                    runtime_api_provider: client.clone(),
532
344
                    keystore: Some(keystore_container.keystore()),
533
344
                    offchain_db: backend.offchain_storage(),
534
344
                    transaction_pool: Some(OffchainTransactionPoolFactory::new(
535
344
                        transaction_pool.clone(),
536
344
                    )),
537
344
                    network_provider: Arc::new(network.network.clone()),
538
344
                    is_validator: parachain_config.role.is_authority(),
539
344
                    enable_http_requests: false,
540
8208
                    custom_extensions: move |_| vec![],
541
344
                })
542
344
                .run(client.clone(), task_manager.spawn_handle())
543
344
                .boxed(),
544
344
            );
545
344
        }
546

            
547
344
        let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
548
344
            rpc_builder,
549
344
            client: client.clone(),
550
344
            transaction_pool: transaction_pool.clone(),
551
344
            task_manager: &mut task_manager,
552
344
            config: parachain_config,
553
344
            keystore: keystore_container.keystore(),
554
344
            backend: backend.clone(),
555
344
            network: network.network.clone(),
556
344
            system_rpc_tx: network.system_rpc_tx.clone(),
557
344
            tx_handler_controller,
558
344
            telemetry: telemetry.as_mut(),
559
344
            sync_service: network.sync_service.clone(),
560
344
        })?;
561

            
562
344
        if let Some(hwbench) = &hwbench {
563
            sc_sysinfo::print_hwbench(hwbench);
564
            // Here you can check whether the hardware meets your chains' requirements. Putting a link
565
            // in there and swapping out the requirements for your own are probably a good idea. The
566
            // requirements for a para-chain are dictated by its relay-chain.
567
            if collator {
568
                if let Err(err) = SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench) {
569
                    log::warn!(
570
                        "⚠️  The hardware does not meet the minimal requirements {} for role 'Authority'.",
571
                        err
572
                    );
573
                }
574
            }
575

            
576
            if let Some(ref mut telemetry) = telemetry {
577
                let telemetry_handle = telemetry.handle();
578
                task_manager.spawn_handle().spawn(
579
                    "telemetry_hwbench",
580
                    None,
581
                    sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench.clone()),
582
                );
583
            }
584
344
        }
585

            
586
344
        Ok(NodeBuilder {
587
344
            client,
588
344
            backend,
589
344
            transaction_pool,
590
344
            telemetry,
591
344
            telemetry_worker_handle,
592
344
            task_manager,
593
344
            keystore_container,
594
344
            hwbench,
595
344
            prometheus_registry,
596
344
            network: TypeIdentity::from_type(network),
597
344
            tx_handler_controller: TypeIdentity::from_type(()),
598
344
            import_queue_service,
599
344
        })
600
344
    }
601

            
602
344
    pub fn install_manual_seal<BI, SC, CIDP>(
603
344
        &mut self,
604
344
        manual_seal_config: ManualSealConfiguration<BlockOf<T>, BI, SC, CIDP>,
605
344
    ) -> sc_service::error::Result<Option<mpsc::Sender<EngineCommand<BlockHashOf<T>>>>>
606
344
    where
607
344
        BI: BlockImport<BlockOf<T>, Error = sp_consensus::Error> + Send + Sync + 'static,
608
344
        SC: SelectChain<BlockOf<T>> + 'static,
609
344
        CIDP: CreateInherentDataProviders<BlockOf<T>, ()> + 'static,
610
344
    {
611
344
        let ManualSealConfiguration {
612
344
            sealing,
613
344
            soft_deadline,
614
344
            block_import,
615
344
            select_chain,
616
344
            consensus_data_provider,
617
344
            create_inherent_data_providers,
618
344
        } = manual_seal_config;
619
344

            
620
344
        let prometheus_registry = self.prometheus_registry.clone();
621
344

            
622
344
        let mut env = sc_basic_authorship::ProposerFactory::new(
623
344
            self.task_manager.spawn_handle(),
624
344
            self.client.clone(),
625
344
            self.transaction_pool.clone(),
626
344
            prometheus_registry.as_ref(),
627
344
            self.telemetry.as_ref().map(|x| x.handle()),
628
344
        );
629
344

            
630
344
        let mut command_sink = None;
631

            
632
344
        if let Some(deadline) = soft_deadline {
633
162
            env.set_soft_deadline(deadline);
634
344
        }
635

            
636
344
        let commands_stream: Box<
637
344
            dyn Stream<Item = EngineCommand<BlockHashOf<T>>> + Send + Sync + Unpin,
638
344
        > = match sealing {
639
            Sealing::Instant => {
640
12
                Box::new(
641
12
                    // This bit cribbed from the implementation of instant seal.
642
12
                    self.transaction_pool
643
12
                        .pool()
644
12
                        .validated_pool()
645
12
                        .import_notification_stream()
646
12
                        .map(|_| EngineCommand::SealNewBlock {
647
                            create_empty: false,
648
                            finalize: false,
649
                            parent_hash: None,
650
                            sender: None,
651
12
                        }),
652
12
                )
653
            }
654
            Sealing::Manual => {
655
332
                let (sink, stream) = futures::channel::mpsc::channel(1000);
656
332
                // Keep a reference to the other end of the channel. It goes to the RPC.
657
332
                command_sink = Some(sink);
658
332
                Box::new(stream)
659
            }
660
            Sealing::Interval(millis) => Box::new(futures::StreamExt::map(
661
                Timer::interval(Duration::from_millis(millis)),
662
                |_| EngineCommand::SealNewBlock {
663
                    create_empty: true,
664
                    finalize: true,
665
                    parent_hash: None,
666
                    sender: None,
667
                },
668
            )),
669
        };
670

            
671
344
        self.task_manager.spawn_essential_handle().spawn_blocking(
672
344
            "authorship_task",
673
344
            Some("block-authoring"),
674
344
            run_manual_seal(ManualSealParams {
675
344
                block_import,
676
344
                env,
677
344
                client: self.client.clone(),
678
344
                pool: self.transaction_pool.clone(),
679
344
                commands_stream,
680
344
                select_chain,
681
344
                consensus_data_provider,
682
344
                create_inherent_data_providers,
683
344
            }),
684
344
        );
685
344

            
686
344
        Ok(command_sink)
687
344
    }
688

            
689
    pub fn start_full_node<RCInterface>(
690
        self,
691
        para_id: ParaId,
692
        relay_chain_interface: RCInterface,
693
        relay_chain_slot_duration: Duration,
694
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
695
    where
696
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
697
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
698
        RCInterface: RelayChainInterface + Clone + 'static,
699
    {
700
        let NodeBuilder {
701
            client,
702
            backend,
703
            transaction_pool,
704
            telemetry,
705
            telemetry_worker_handle,
706
            mut task_manager,
707
            keystore_container,
708
            hwbench,
709
            prometheus_registry,
710
            network,
711
            tx_handler_controller,
712
            import_queue_service,
713
        } = self;
714

            
715
        let network = TypeIdentity::into_type(network);
716
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
717

            
718
        let announce_block = {
719
            let sync_service = network.sync_service.clone();
720
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
721
        };
722

            
723
        let overseer_handle = relay_chain_interface
724
            .overseer_handle()
725
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
726

            
727
        let params = StartFullNodeParams {
728
            client: client.clone(),
729
            announce_block,
730
            task_manager: &mut task_manager,
731
            para_id,
732
            relay_chain_interface,
733
            relay_chain_slot_duration,
734
            import_queue: import_queue_service,
735
            recovery_handle: Box::new(overseer_handle),
736
            sync_service: network.sync_service.clone(),
737
        };
738

            
739
        // TODO: change for async backing
740
        #[allow(deprecated)]
741
        cumulus_client_service::start_full_node(params)?;
742

            
743
        Ok(NodeBuilder {
744
            client,
745
            backend,
746
            transaction_pool,
747
            telemetry,
748
            telemetry_worker_handle,
749
            task_manager,
750
            keystore_container,
751
            hwbench,
752
            prometheus_registry,
753
            network: TypeIdentity::from_type(network),
754
            tx_handler_controller,
755
            import_queue_service: (),
756
        })
757
    }
758

            
759
    pub async fn start_collator<RCInterface>(
760
        self,
761
        para_id: ParaId,
762
        relay_chain_interface: RCInterface,
763
        relay_chain_slot_duration: Duration,
764
        parachain_consensus: ParachainConsensusOf<T>,
765
        collator_key: CollatorPair,
766
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
767
    where
768
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
769
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
770
        RCInterface: RelayChainInterface + Clone + 'static,
771
    {
772
        let NodeBuilder {
773
            client,
774
            backend,
775
            transaction_pool,
776
            telemetry,
777
            telemetry_worker_handle,
778
            mut task_manager,
779
            keystore_container,
780
            hwbench,
781
            prometheus_registry,
782
            network,
783
            tx_handler_controller,
784
            import_queue_service,
785
        } = self;
786

            
787
        let network = TypeIdentity::into_type(network);
788
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
789

            
790
        let spawner = task_manager.spawn_handle();
791
        let announce_block = {
792
            let sync_service = network.sync_service.clone();
793
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
794
        };
795
        let overseer_handle = relay_chain_interface
796
            .overseer_handle()
797
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
798

            
799
        let params = cumulus_client_service::StartCollatorParams {
800
            para_id,
801
            block_status: client.clone(),
802
            announce_block: announce_block.clone(),
803
            client: client.clone(),
804
            task_manager: &mut task_manager,
805
            relay_chain_interface: relay_chain_interface.clone(),
806
            spawner: spawner.clone(),
807
            parachain_consensus,
808
            import_queue: import_queue_service,
809
            collator_key,
810
            relay_chain_slot_duration,
811
            recovery_handle: Box::new(overseer_handle.clone()),
812
            sync_service: network.sync_service.clone(),
813
        };
814

            
815
        // TODO: change for async backing
816
        #[allow(deprecated)]
817
        cumulus_client_service::start_collator(params).await?;
818

            
819
        Ok(NodeBuilder {
820
            client,
821
            backend,
822
            transaction_pool,
823
            telemetry,
824
            telemetry_worker_handle,
825
            task_manager,
826
            keystore_container,
827
            hwbench,
828
            prometheus_registry,
829
            network: TypeIdentity::from_type(network),
830
            tx_handler_controller,
831
            import_queue_service: (),
832
        })
833
    }
834

            
835
    pub fn extract_import_queue_service(
836
        self,
837
    ) -> (
838
        NodeBuilder<T, SNetwork, STxHandler, ()>,
839
        SImportQueueService,
840
    )
841
    where
842
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
843
    {
844
        let NodeBuilder {
845
            client,
846
            backend,
847
            transaction_pool,
848
            telemetry,
849
            telemetry_worker_handle,
850
            task_manager,
851
            keystore_container,
852
            hwbench,
853
            prometheus_registry,
854
            network,
855
            tx_handler_controller,
856
            import_queue_service,
857
        } = self;
858

            
859
        (
860
            NodeBuilder {
861
                client,
862
                backend,
863
                transaction_pool,
864
                telemetry,
865
                telemetry_worker_handle,
866
                task_manager,
867
                keystore_container,
868
                hwbench,
869
                prometheus_registry,
870
                network,
871
                tx_handler_controller,
872
                import_queue_service: (),
873
            },
874
            import_queue_service,
875
        )
876
    }
877

            
878
    pub fn cumulus_client_collator_params_generator(
879
        &self,
880
        para_id: ParaId,
881
        overseer_handle: cumulus_relay_chain_interface::OverseerHandle,
882
        collator_key: CollatorPair,
883
        parachain_consensus: ParachainConsensusOf<T>,
884
    ) -> impl Fn() -> cumulus_client_collator::StartCollatorParams<
885
        BlockOf<T>,
886
        ClientOf<T>,
887
        ClientOf<T>,
888
        SpawnTaskHandle,
889
    > + Send
890
           + Clone
891
           + 'static
892
    where
893
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
894
    {
895
        let network = TypeIdentity::as_type(&self.network);
896

            
897
        let client = self.client.clone();
898
        let announce_block = {
899
            let sync_service = network.sync_service.clone();
900
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
901
        };
902
        let spawner = self.task_manager.spawn_handle();
903

            
904
        move || cumulus_client_collator::StartCollatorParams {
905
            runtime_api: client.clone(),
906
            block_status: client.clone(),
907
            announce_block: announce_block.clone(),
908
            overseer_handle: overseer_handle.clone(),
909
            spawner: spawner.clone(),
910
            para_id,
911
            key: collator_key.clone(),
912
            parachain_consensus: parachain_consensus.clone(),
913
        }
914
    }
915
}
916

            
917
/// Block authoring scheme to be used by the dev service.
918
#[derive(Debug, Copy, Clone)]
919
pub enum Sealing {
920
    /// Author a block immediately upon receiving a transaction into the transaction pool
921
    Instant,
922
    /// Author a block upon receiving an RPC command
923
    Manual,
924
    /// Author blocks at a regular interval specified in milliseconds
925
    Interval(u64),
926
}
927

            
928
impl FromStr for Sealing {
929
    type Err = String;
930

            
931
1092
    fn from_str(s: &str) -> Result<Self, Self::Err> {
932
1092
        Ok(match s {
933
1092
            "instant" => Self::Instant,
934
996
            "manual" => Self::Manual,
935
            s => {
936
                let millis = s
937
                    .parse::<u64>()
938
                    .map_err(|_| "couldn't decode sealing param")?;
939
                Self::Interval(millis)
940
            }
941
        })
942
1092
    }
943
}
944

            
945
pub struct ManualSealConfiguration<B, BI, SC, CIDP> {
946
    pub sealing: Sealing,
947
    pub block_import: BI,
948
    pub soft_deadline: Option<Percent>,
949
    pub select_chain: SC,
950
    pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = ()>>>,
951
    pub create_inherent_data_providers: CIDP,
952
}