1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
#![doc = include_str!("../README.md")]
18
#![cfg_attr(not(feature = "std"), no_std)]
19

            
20
#[cfg(test)]
21
mod mock;
22

            
23
#[cfg(test)]
24
mod tests;
25

            
26
#[cfg(feature = "runtime-benchmarks")]
27
mod benchmarking;
28

            
29
pub mod weights;
30
pub use weights::WeightInfo;
31

            
32
#[cfg(feature = "std")]
33
use serde::{Deserialize, Serialize};
34

            
35
use {
36
    core::cmp::min,
37
    frame_support::{
38
        dispatch::DispatchErrorWithPostInfo,
39
        pallet,
40
        pallet_prelude::*,
41
        storage::types::{StorageDoubleMap, StorageMap},
42
        traits::{
43
            fungible::{Inspect, MutateHold},
44
            tokens::{Balance, Precision},
45
        },
46
        Blake2_128Concat,
47
    },
48
    frame_system::pallet_prelude::*,
49
    parity_scale_codec::{FullCodec, MaxEncodedLen},
50
    scale_info::TypeInfo,
51
    sp_runtime::{
52
        traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedSub, One, Saturating, Zero},
53
        ArithmeticError,
54
    },
55
    sp_std::{fmt::Debug, marker::PhantomData},
56
};
57

            
58
pub use pallet::*;
59

            
60
/// Type able to provide the current time for given unit.
61
/// For each unit the returned number should monotonically increase and not
62
/// overflow.
63
pub trait TimeProvider<Unit, Number> {
64
    fn now(unit: &Unit) -> Option<Number>;
65

            
66
    /// Benchmarks: should return the time unit which has the worst performance calling
67
    /// `TimeProvider::now(unit)` with.
68
    #[cfg(feature = "runtime-benchmarks")]
69
    fn bench_worst_case_time_unit() -> Unit;
70

            
71
    /// Benchmarks: sets the "now" time for time unit returned by `bench_worst_case_time_unit`.
72
    #[cfg(feature = "runtime-benchmarks")]
73
    fn bench_set_now(instant: Number);
74
}
75

            
76
/// Interactions the pallet needs with assets.
77
pub trait Assets<AccountId, AssetId, Balance> {
78
    /// Transfer assets deposited by an account to another account.
79
    /// Those assets should not be considered deposited in the target account.
80
    fn transfer_deposit(
81
        asset_id: &AssetId,
82
        from: &AccountId,
83
        to: &AccountId,
84
        amount: Balance,
85
    ) -> DispatchResult;
86

            
87
    /// Increase the deposit for an account and asset id. Should fail if account doesn't have
88
    /// enough of that asset. Funds should be safe and not slashable.
89
    fn increase_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
90
        -> DispatchResult;
91

            
92
    /// Decrease the deposit for an account and asset id. Should fail on underflow.
93
    fn decrease_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
94
        -> DispatchResult;
95

            
96
    /// Return the deposit for given asset and account.
97
    fn get_deposit(asset_id: &AssetId, account: &AccountId) -> Balance;
98

            
99
    /// Benchmarks: should return the asset id which has the worst performance when interacting
100
    /// with it.
101
    #[cfg(feature = "runtime-benchmarks")]
102
    fn bench_worst_case_asset_id() -> AssetId;
103

            
104
    /// Benchmarks: should return the another asset id which has the worst performance when interacting
105
    /// with it afther `bench_worst_case_asset_id`. This is to benchmark the worst case when changing config
106
    /// from one asset to another. If there is only one asset id it is fine to return it in both
107
    /// `bench_worst_case_asset_id` and `bench_worst_case_asset_id2`.
108
    #[cfg(feature = "runtime-benchmarks")]
109
    fn bench_worst_case_asset_id2() -> AssetId;
110

            
111
    /// Benchmarks: should set the balance.
112
    #[cfg(feature = "runtime-benchmarks")]
113
    fn bench_set_balance(asset_id: &AssetId, account: &AccountId, amount: Balance);
114
}
115

            
116
#[pallet]
117
pub mod pallet {
118
    use super::*;
119

            
120
    /// Pooled Staking pallet.
121
13612
    #[pallet::pallet]
122
    #[pallet::without_storage_info]
123
    pub struct Pallet<T>(PhantomData<T>);
124

            
125
    #[pallet::config]
126
    pub trait Config: frame_system::Config {
127
        /// Overarching event type
128
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
129

            
130
        /// Type used to represent stream ids. Should be large enough to not overflow.
131
        type StreamId: AtLeast32BitUnsigned
132
            + Default
133
            + Debug
134
            + Copy
135
            + Clone
136
            + FullCodec
137
            + TypeInfo
138
            + MaxEncodedLen;
139

            
140
        /// The balance type, which is also the type representing time (as this
141
        /// pallet will do math with both time and balances to compute how
142
        /// much should be paid).
143
        type Balance: Balance;
144

            
145
        /// Type representing an asset id, a identifier allowing distinguishing assets.
146
        type AssetId: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + PartialEq + Eq;
147

            
148
        /// Provide interaction with assets.
149
        type Assets: Assets<Self::AccountId, Self::AssetId, Self::Balance>;
150

            
151
        /// Currency for the opening balance hold for the storage used by the Stream.
152
        /// NOT to be confused with Assets.
153
        type Currency: Inspect<Self::AccountId, Balance = Self::Balance>
154
            + MutateHold<Self::AccountId, Reason = Self::RuntimeHoldReason>;
155

            
156
        type RuntimeHoldReason: From<HoldReason>;
157

            
158
        #[pallet::constant]
159
        type OpenStreamHoldAmount: Get<Self::Balance>;
160

            
161
        /// Represents which units of time can be used. Designed to be an enum
162
        /// with a variant for each kind of time source/scale supported.
163
        type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
164

            
165
        /// Provide the current time in given unit.
166
        type TimeProvider: TimeProvider<Self::TimeUnit, Self::Balance>;
167

            
168
        type WeightInfo: weights::WeightInfo;
169
    }
170

            
171
    type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
172
    type AssetIdOf<T> = <T as Config>::AssetId;
173

            
174
    pub type RequestNonce = u32;
175

            
176
    /// A stream payment from source to target.
177
    /// Stores the last time the stream was updated, which allows to compute
178
    /// elapsed time and perform payment.
179
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
180
1530
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Clone, TypeInfo)]
181
    pub struct Stream<AccountId, Unit, AssetId, Balance> {
182
        /// Payer, source of the stream.
183
        pub source: AccountId,
184
        /// Payee, target of the stream.
185
        pub target: AccountId,
186
        /// Steam config (time unit, asset id, rate)
187
        pub config: StreamConfig<Unit, AssetId, Balance>,
188
        /// How much is deposited to fund this stream.
189
        pub deposit: Balance,
190
        /// Last time the stream was updated in `config.time_unit`.
191
        pub last_time_updated: Balance,
192
        /// Nonce for requests. This prevents a request to make a first request
193
        /// then change it to another request to frontrun the other party
194
        /// accepting.
195
        pub request_nonce: RequestNonce,
196
        /// A pending change request if any.
197
        pub pending_request: Option<ChangeRequest<Unit, AssetId, Balance>>,
198
        /// One-time opening deposit. Will be released on close.
199
        pub opening_deposit: Balance,
200
    }
201

            
202
    impl<AccountId: PartialEq, Unit, AssetId, Balance> Stream<AccountId, Unit, AssetId, Balance> {
203
70
        pub fn account_to_party(&self, account: AccountId) -> Option<Party> {
204
70
            match account {
205
70
                a if a == self.source => Some(Party::Source),
206
27
                a if a == self.target => Some(Party::Target),
207
3
                _ => None,
208
            }
209
70
        }
210
    }
211

            
212
    /// Stream configuration.
213
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
214
680
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Copy, Clone, TypeInfo)]
215
    pub struct StreamConfig<Unit, AssetId, Balance> {
216
        /// Unit in which time is measured using a `TimeProvider`.
217
        pub time_unit: Unit,
218
        /// Asset used for payment.
219
        pub asset_id: AssetId,
220
        /// Amount of asset / unit.
221
        pub rate: Balance,
222
    }
223

            
224
    /// Origin of a change request.
225
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
226
510
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Copy, Clone, TypeInfo)]
227
    pub enum Party {
228
30
        Source,
229
29
        Target,
230
    }
231

            
232
    impl Party {
233
4
        pub fn inverse(self) -> Self {
234
4
            match self {
235
2
                Party::Source => Party::Target,
236
2
                Party::Target => Party::Source,
237
            }
238
4
        }
239
    }
240

            
241
    /// Kind of change requested.
242
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
243
680
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Copy, Clone, TypeInfo)]
244
    pub enum ChangeKind<Time> {
245
23
        /// The requested change is a suggestion, and the other party doesn't
246
        /// need to accept it.
247
        Suggestion,
248
31
        /// The requested change is mandatory, and the other party must either
249
        /// accept the change or close the stream. Reaching the deadline will
250
        /// close the stream too.
251
        Mandatory { deadline: Time },
252
    }
253

            
254
    /// Describe how the deposit should change.
255
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
256
1190
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Copy, Clone, TypeInfo)]
257
    pub enum DepositChange<Balance> {
258
17
        /// Increase deposit by given amount.
259
        Increase(Balance),
260
1
        /// Decrease deposit by given amount.
261
        Decrease(Balance),
262
1
        /// Set deposit to given amount.
263
        Absolute(Balance),
264
    }
265

            
266
    /// A request to change a stream config.
267
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
268
850
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Clone, TypeInfo)]
269
    pub struct ChangeRequest<Unit, AssetId, Balance> {
270
        pub requester: Party,
271
        pub kind: ChangeKind<Balance>,
272
        pub new_config: StreamConfig<Unit, AssetId, Balance>,
273
        pub deposit_change: Option<DepositChange<Balance>>,
274
    }
275

            
276
    pub type StreamOf<T> =
277
        Stream<AccountIdOf<T>, <T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
278

            
279
    pub type StreamConfigOf<T> =
280
        StreamConfig<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
281

            
282
    pub type ChangeRequestOf<T> =
283
        ChangeRequest<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
284

            
285
    #[derive(Debug, Copy, Clone, PartialEq, Eq)]
286
    pub struct StreamPaymentStatus<Balance> {
287
        pub payment: Balance,
288
        pub deposit_left: Balance,
289
        /// Whenever the stream is stalled, which can occur either when no funds are left or
290
        /// if the time is past a mandatory request deadline.
291
        pub stalled: bool,
292
    }
293

            
294
    /// Store the next available stream id.
295
248
    #[pallet::storage]
296
    pub type NextStreamId<T: Config> = StorageValue<Value = T::StreamId, QueryKind = ValueQuery>;
297

            
298
    /// Store each stream indexed by an Id.
299
288
    #[pallet::storage]
300
    pub type Streams<T: Config> = StorageMap<
301
        Hasher = Blake2_128Concat,
302
        Key = T::StreamId,
303
        Value = StreamOf<T>,
304
        QueryKind = OptionQuery,
305
    >;
306

            
307
    /// Lookup for all streams with given source.
308
    /// To avoid maintaining a growing list of stream ids, they are stored in
309
    /// the form of an entry (AccountId, StreamId). If such entry exists then
310
    /// this AccountId is a source in StreamId. One can iterate over all storage
311
    /// keys starting with the AccountId to find all StreamIds.
312
72
    #[pallet::storage]
313
    pub type LookupStreamsWithSource<T: Config> = StorageDoubleMap<
314
        Key1 = AccountIdOf<T>,
315
        Hasher1 = Blake2_128Concat,
316
        Key2 = T::StreamId,
317
        Hasher2 = Blake2_128Concat,
318
        Value = (),
319
        QueryKind = OptionQuery,
320
    >;
321

            
322
    /// Lookup for all streams with given target.
323
    /// To avoid maintaining a growing list of stream ids, they are stored in
324
    /// the form of an entry (AccountId, StreamId). If such entry exists then
325
    /// this AccountId is a target in StreamId. One can iterate over all storage
326
    /// keys starting with the AccountId to find all StreamIds.
327
72
    #[pallet::storage]
328
    pub type LookupStreamsWithTarget<T: Config> = StorageDoubleMap<
329
        Key1 = AccountIdOf<T>,
330
        Hasher1 = Blake2_128Concat,
331
        Key2 = T::StreamId,
332
        Hasher2 = Blake2_128Concat,
333
        Value = (),
334
        QueryKind = OptionQuery,
335
    >;
336

            
337
60
    #[pallet::error]
338
    #[derive(Clone, PartialEq, Eq)]
339
    pub enum Error<T> {
340
        UnknownStreamId,
341
        StreamIdOverflow,
342
        UnauthorizedOrigin,
343
        CantBeBothSourceAndTarget,
344
        CantFetchCurrentTime,
345
        SourceCantDecreaseRate,
346
        TargetCantIncreaseRate,
347
        CantOverrideMandatoryChange,
348
        NoPendingRequest,
349
        CantAcceptOwnRequest,
350
        CanOnlyCancelOwnRequest,
351
        WrongRequestNonce,
352
        ChangingAssetRequiresAbsoluteDepositChange,
353
        TargetCantChangeDeposit,
354
        ImmediateDepositChangeRequiresSameAssetId,
355
    }
356

            
357
    #[pallet::event]
358
138
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
359
    pub enum Event<T: Config> {
360
44
        StreamOpened {
361
            stream_id: T::StreamId,
362
        },
363
4
        StreamClosed {
364
            stream_id: T::StreamId,
365
            refunded: T::Balance,
366
        },
367
10
        StreamPayment {
368
            stream_id: T::StreamId,
369
            source: AccountIdOf<T>,
370
            target: AccountIdOf<T>,
371
            amount: T::Balance,
372
            stalled: bool,
373
        },
374
13
        StreamConfigChangeRequested {
375
            stream_id: T::StreamId,
376
            request_nonce: RequestNonce,
377
            requester: Party,
378
            old_config: StreamConfigOf<T>,
379
            new_config: StreamConfigOf<T>,
380
        },
381
5
        StreamConfigChanged {
382
            stream_id: T::StreamId,
383
            old_config: StreamConfigOf<T>,
384
            new_config: StreamConfigOf<T>,
385
            deposit_change: Option<DepositChange<T::Balance>>,
386
        },
387
    }
388

            
389
    /// Freeze reason to use if needed.
390
    #[pallet::composite_enum]
391
    pub enum FreezeReason {
392
        StreamPayment,
393
    }
394

            
395
    /// Hold reason to use if needed.
396
    #[pallet::composite_enum]
397
    pub enum HoldReason {
398
129
        StreamPayment,
399
327
        StreamOpened,
400
    }
401

            
402
120
    #[pallet::call]
403
    impl<T: Config> Pallet<T> {
404
        /// Create a payment stream from the origin to the target with provided config
405
        /// and initial deposit (in the asset defined in the config).
406
        #[pallet::call_index(0)]
407
        #[pallet::weight(T::WeightInfo::open_stream())]
408
        pub fn open_stream(
409
            origin: OriginFor<T>,
410
            target: AccountIdOf<T>,
411
            config: StreamConfigOf<T>,
412
            initial_deposit: T::Balance,
413
63
        ) -> DispatchResultWithPostInfo {
414
63
            let origin = ensure_signed(origin)?;
415
63
            let opening_deposit = T::OpenStreamHoldAmount::get();
416

            
417
63
            let _stream_id = Self::open_stream_returns_id(
418
63
                origin,
419
63
                target,
420
63
                config,
421
63
                initial_deposit,
422
63
                opening_deposit,
423
63
            )?;
424

            
425
58
            Ok(().into())
426
        }
427

            
428
        /// Close a given stream in which the origin is involved. It performs the pending payment
429
        /// before closing the stream.
430
        #[pallet::call_index(1)]
431
        #[pallet::weight(T::WeightInfo::close_stream())]
432
        pub fn close_stream(
433
            origin: OriginFor<T>,
434
            stream_id: T::StreamId,
435
11
        ) -> DispatchResultWithPostInfo {
436
11
            let origin = ensure_signed(origin)?;
437
11
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
438

            
439
            // Only source or target can close a stream.
440
10
            ensure!(
441
10
                origin == stream.source || origin == stream.target,
442
1
                Error::<T>::UnauthorizedOrigin
443
            );
444

            
445
            // Update stream before closing it to ensure fair payment.
446
9
            Self::perform_stream_payment(stream_id, &mut stream)?;
447

            
448
            // Unfreeze funds left in the stream.
449
9
            T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, stream.deposit)?;
450

            
451
            // Release opening deposit
452
9
            if stream.opening_deposit > 0u32.into() {
453
9
                T::Currency::release(
454
9
                    &HoldReason::StreamOpened.into(),
455
9
                    &stream.source,
456
9
                    stream.opening_deposit,
457
9
                    Precision::Exact,
458
9
                )?;
459
            }
460

            
461
            // Remove stream from storage.
462
9
            Streams::<T>::remove(stream_id);
463
9
            LookupStreamsWithSource::<T>::remove(stream.source, stream_id);
464
9
            LookupStreamsWithTarget::<T>::remove(stream.target, stream_id);
465
9

            
466
9
            // Emit event.
467
9
            Pallet::<T>::deposit_event(Event::<T>::StreamClosed {
468
9
                stream_id,
469
9
                refunded: stream.deposit.saturating_add(stream.opening_deposit),
470
9
            });
471
9

            
472
9
            Ok(().into())
473
        }
474

            
475
        /// Perform the pending payment of a stream. Anyone can call this.
476
        #[pallet::call_index(2)]
477
        #[pallet::weight(T::WeightInfo::perform_payment())]
478
        pub fn perform_payment(
479
            origin: OriginFor<T>,
480
            stream_id: T::StreamId,
481
19
        ) -> DispatchResultWithPostInfo {
482
19
            // No problem with anyone updating any stream.
483
19
            let _ = ensure_signed(origin)?;
484

            
485
19
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
486
18
            Self::perform_stream_payment(stream_id, &mut stream)?;
487
18
            Streams::<T>::insert(stream_id, stream);
488
18

            
489
18
            Ok(().into())
490
        }
491

            
492
        /// Requests a change to a stream config or deposit.
493
        ///
494
        /// If the new config don't change the time unit and asset id, the change will be applied
495
        /// immediately if it is at the desadvantage of the caller. Otherwise, the request is stored
496
        /// in the stream and will have to be approved by the other party.
497
        ///
498
        /// This call accepts a deposit change, which can only be provided by the source of the
499
        /// stream. An absolute change is required when changing asset id, as the current deposit
500
        /// will be released and a new deposit is required in the new asset.
501
        #[pallet::call_index(3)]
502
        #[pallet::weight(
503
            T::WeightInfo::request_change_immediate()
504
            .max(T::WeightInfo::request_change_delayed())
505
        )]
506
        pub fn request_change(
507
            origin: OriginFor<T>,
508
            stream_id: T::StreamId,
509
            kind: ChangeKind<T::Balance>,
510
            new_config: StreamConfigOf<T>,
511
            deposit_change: Option<DepositChange<T::Balance>>,
512
50
        ) -> DispatchResultWithPostInfo {
513
50
            let origin = ensure_signed(origin)?;
514
50
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
515

            
516
49
            let requester = stream
517
49
                .account_to_party(origin)
518
49
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
519

            
520
48
            ensure!(
521
48
                requester == Party::Source || deposit_change.is_none(),
522
1
                Error::<T>::TargetCantChangeDeposit
523
            );
524

            
525
47
            if stream.config == new_config && deposit_change.is_none() {
526
1
                return Ok(().into());
527
46
            }
528
46

            
529
46
            // If asset id and time unit are the same, we allow to make the change
530
46
            // immediatly if the origin is at a disadvantage.
531
46
            // We allow this even if there is already a pending request.
532
46
            if Self::maybe_immediate_change(
533
46
                stream_id,
534
46
                &mut stream,
535
46
                &new_config,
536
46
                deposit_change,
537
46
                requester,
538
46
            )? {
539
5
                return Ok(().into());
540
39
            }
541
39

            
542
39
            // If the source is requesting a change of asset, they must provide an absolute change.
543
39
            if requester == Party::Source
544
27
                && new_config.asset_id != stream.config.asset_id
545
3
                && !matches!(deposit_change, Some(DepositChange::Absolute(_)))
546
            {
547
3
                Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?;
548
36
            }
549

            
550
            // If there is already a mandatory change request, only the origin
551
            // of this request can change it.
552
            if let Some(ChangeRequest {
553
                kind: ChangeKind::Mandatory { .. },
554
4
                requester: pending_requester,
555
                ..
556
36
            }) = &stream.pending_request
557
            {
558
4
                ensure!(
559
4
                    &requester == pending_requester,
560
2
                    Error::<T>::CantOverrideMandatoryChange
561
                );
562
32
            }
563

            
564
34
            stream.request_nonce = stream.request_nonce.wrapping_add(1);
565
34
            stream.pending_request = Some(ChangeRequest {
566
34
                requester,
567
34
                kind,
568
34
                new_config: new_config.clone(),
569
34
                deposit_change,
570
34
            });
571
34

            
572
34
            // Emit event.
573
34
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChangeRequested {
574
34
                stream_id,
575
34
                request_nonce: stream.request_nonce,
576
34
                requester,
577
34
                old_config: stream.config.clone(),
578
34
                new_config,
579
34
            });
580
34

            
581
34
            // Update storage.
582
34
            Streams::<T>::insert(stream_id, stream);
583
34

            
584
34
            Ok(().into())
585
        }
586

            
587
        /// Accepts a change requested before by the other party. Takes a nonce to prevent
588
        /// frontrunning attacks. If the target made a request, the source is able to change their
589
        /// deposit.
590
        #[pallet::call_index(4)]
591
        #[pallet::weight(T::WeightInfo::accept_requested_change())]
592
        pub fn accept_requested_change(
593
            origin: OriginFor<T>,
594
            stream_id: T::StreamId,
595
            request_nonce: RequestNonce,
596
            deposit_change: Option<DepositChange<T::Balance>>,
597
18
        ) -> DispatchResultWithPostInfo {
598
18
            let origin = ensure_signed(origin)?;
599
18
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
600

            
601
17
            let accepter = stream
602
17
                .account_to_party(origin)
603
17
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
604

            
605
16
            let Some(request) = stream.pending_request.clone() else {
606
                return Err(Error::<T>::NoPendingRequest.into());
607
            };
608

            
609
16
            ensure!(
610
16
                request_nonce == stream.request_nonce,
611
1
                Error::<T>::WrongRequestNonce
612
            );
613
15
            ensure!(
614
15
                accepter != request.requester,
615
1
                Error::<T>::CantAcceptOwnRequest
616
            );
617

            
618
14
            ensure!(
619
14
                accepter == Party::Source || deposit_change.is_none(),
620
1
                Error::<T>::TargetCantChangeDeposit
621
            );
622

            
623
            // Perform pending payment before changing config.
624
13
            Self::perform_stream_payment(stream_id, &mut stream)?;
625

            
626
            // Apply change.
627
13
            let deposit_change = deposit_change.or(request.deposit_change);
628
13
            match (
629
13
                stream.config.asset_id == request.new_config.asset_id,
630
13
                deposit_change,
631
            ) {
632
                // Same asset and a change, we apply it like in `change_deposit` call.
633
5
                (true, Some(change)) => {
634
5
                    Self::apply_deposit_change(&mut stream, change)?;
635
                }
636
                // Same asset and no change, no problem.
637
4
                (true, None) => (),
638
                // Change in asset with absolute new amount
639
1
                (false, Some(DepositChange::Absolute(amount))) => {
640
1
                    // Release deposit in old asset.
641
1
                    T::Assets::decrease_deposit(
642
1
                        &stream.config.asset_id,
643
1
                        &stream.source,
644
1
                        stream.deposit,
645
1
                    )?;
646

            
647
                    // Make deposit in new asset.
648
1
                    T::Assets::increase_deposit(
649
1
                        &request.new_config.asset_id,
650
1
                        &stream.source,
651
1
                        amount,
652
1
                    )?;
653
1
                    stream.deposit = amount;
654
                }
655
                // It doesn't make sense to change asset while not providing an absolute new
656
                // amount.
657
3
                (false, _) => Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?,
658
            }
659

            
660
            // If time unit changes we need to update `last_time_updated` to be in the
661
            // new unit.
662
10
            if stream.config.time_unit != request.new_config.time_unit {
663
2
                stream.last_time_updated = T::TimeProvider::now(&request.new_config.time_unit)
664
2
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
665
8
            }
666

            
667
            // Event
668
10
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
669
10
                stream_id,
670
10
                old_config: stream.config,
671
10
                new_config: request.new_config.clone(),
672
10
                deposit_change,
673
10
            });
674
10

            
675
10
            // Update config in storage.
676
10
            stream.config = request.new_config;
677
10
            stream.pending_request = None;
678
10
            Streams::<T>::insert(stream_id, stream);
679
10

            
680
10
            Ok(().into())
681
        }
682

            
683
        #[pallet::call_index(5)]
684
        #[pallet::weight(T::WeightInfo::cancel_change_request())]
685
        pub fn cancel_change_request(
686
            origin: OriginFor<T>,
687
            stream_id: T::StreamId,
688
5
        ) -> DispatchResultWithPostInfo {
689
5
            let origin = ensure_signed(origin)?;
690
5
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
691

            
692
4
            let accepter = stream
693
4
                .account_to_party(origin)
694
4
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
695

            
696
3
            let Some(request) = stream.pending_request.take() else {
697
1
                return Err(Error::<T>::NoPendingRequest.into());
698
            };
699

            
700
2
            ensure!(
701
2
                accepter == request.requester,
702
2
                Error::<T>::CanOnlyCancelOwnRequest
703
            );
704

            
705
            // Update storage.
706
            // Pending request is removed by calling `.take()`.
707
            Streams::<T>::insert(stream_id, stream);
708

            
709
            Ok(().into())
710
        }
711

            
712
        /// Allows immediately changing the deposit for a stream, which is simpler than
713
        /// calling `request_change` with the proper parameters.
714
        /// The call takes an asset id to ensure it has not changed (by an accepted request) before
715
        /// the call is included in a block, in which case the unit is no longer the same and quantities
716
        /// will not have the same scale/value.
717
        #[pallet::call_index(6)]
718
        #[pallet::weight(T::WeightInfo::immediately_change_deposit())]
719
        pub fn immediately_change_deposit(
720
            origin: OriginFor<T>,
721
            stream_id: T::StreamId,
722
            asset_id: T::AssetId,
723
            change: DepositChange<T::Balance>,
724
7
        ) -> DispatchResultWithPostInfo {
725
7
            let origin = ensure_signed(origin)?;
726
7
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
727

            
728
6
            ensure!(stream.source == origin, Error::<T>::UnauthorizedOrigin);
729
4
            ensure!(
730
4
                stream.config.asset_id == asset_id,
731
                Error::<T>::ImmediateDepositChangeRequiresSameAssetId
732
            );
733

            
734
            // Perform pending payment before changing deposit.
735
4
            Self::perform_stream_payment(stream_id, &mut stream)?;
736

            
737
            // Apply change.
738
4
            Self::apply_deposit_change(&mut stream, change)?;
739

            
740
            // Event
741
1
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
742
1
                stream_id,
743
1
                old_config: stream.config.clone(),
744
1
                new_config: stream.config.clone(),
745
1
                deposit_change: Some(change),
746
1
            });
747
1

            
748
1
            // Update stream in storage.
749
1
            Streams::<T>::insert(stream_id, stream);
750
1

            
751
1
            Ok(().into())
752
        }
753
    }
754

            
755
    impl<T: Config> Pallet<T> {
756
        /// Try to open a stream and returns its id.
757
        /// Prefers calling this function from other pallets instead of `open_stream` as the
758
        /// latter can't return the id.
759
63
        pub fn open_stream_returns_id(
760
63
            origin: AccountIdOf<T>,
761
63
            target: AccountIdOf<T>,
762
63
            config: StreamConfigOf<T>,
763
63
            initial_deposit: T::Balance,
764
63
            opening_deposit: T::Balance,
765
63
        ) -> Result<T::StreamId, DispatchErrorWithPostInfo> {
766
63
            ensure!(origin != target, Error::<T>::CantBeBothSourceAndTarget);
767

            
768
            // Generate a new stream id.
769
62
            let stream_id = NextStreamId::<T>::get();
770
62
            let next_stream_id = stream_id
771
62
                .checked_add(&One::one())
772
62
                .ok_or(Error::<T>::StreamIdOverflow)?;
773
61
            NextStreamId::<T>::set(next_stream_id);
774
61

            
775
61
            // Hold opening deposit for the storage used by Stream
776
61
            if opening_deposit > 0u32.into() {
777
61
                T::Currency::hold(&HoldReason::StreamOpened.into(), &origin, opening_deposit)?;
778
            }
779

            
780
            // Freeze initial deposit.
781
61
            T::Assets::increase_deposit(&config.asset_id, &origin, initial_deposit)?;
782

            
783
            // Create stream data.
784
58
            let now =
785
59
                T::TimeProvider::now(&config.time_unit).ok_or(Error::<T>::CantFetchCurrentTime)?;
786
58
            let stream = Stream {
787
58
                source: origin.clone(),
788
58
                target: target.clone(),
789
58
                config,
790
58
                deposit: initial_deposit,
791
58
                last_time_updated: now,
792
58
                request_nonce: 0,
793
58
                pending_request: None,
794
58
                opening_deposit,
795
58
            };
796
58

            
797
58
            // Insert stream in storage.
798
58
            Streams::<T>::insert(stream_id, stream);
799
58
            LookupStreamsWithSource::<T>::insert(origin, stream_id, ());
800
58
            LookupStreamsWithTarget::<T>::insert(target, stream_id, ());
801
58

            
802
58
            // Emit event.
803
58
            Pallet::<T>::deposit_event(Event::<T>::StreamOpened { stream_id });
804
58

            
805
58
            Ok(stream_id)
806
63
        }
807

            
808
        /// Get the stream payment current status, telling how much payment is
809
        /// pending, how much deposit will be left and whenever the stream is stalled.
810
        /// The stream is considered stalled if no funds are left or if the provided
811
        /// time is past a mandatory request deadline. If the provided `now` is `None`
812
        /// then the current time will be fetched. Being able to provide a custom `now`
813
        /// allows to check the status in the future.
814
16
        pub fn stream_payment_status(
815
16
            stream_id: T::StreamId,
816
16
            now: Option<T::Balance>,
817
16
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
818
16
            let stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
819
12
            let now = match now {
820
                Some(v) => v,
821
12
                None => T::TimeProvider::now(&stream.config.time_unit)
822
12
                    .ok_or(Error::<T>::CantFetchCurrentTime)?,
823
            };
824

            
825
12
            let last_time_updated = stream.last_time_updated;
826
12
            Self::stream_payment_status_by_ref(&stream, last_time_updated, now)
827
16
        }
828

            
829
63
        fn stream_payment_status_by_ref(
830
63
            stream: &StreamOf<T>,
831
63
            last_time_updated: T::Balance,
832
63
            mut now: T::Balance,
833
63
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
834
            // Take into account mandatory change request deadline. Note that
835
            // while it'll perform payment up to deadline,
836
            // `stream.last_time_updated` is still the "real now" to avoid
837
            // retroactive payment in case the deadline changes.
838
            if let Some(ChangeRequest {
839
14
                kind: ChangeKind::Mandatory { deadline },
840
                ..
841
63
            }) = &stream.pending_request
842
14
            {
843
14
                now = min(now, *deadline);
844
49
            }
845

            
846
            // If deposit is zero the stream is fully drained and there is nothing to transfer.
847
63
            if stream.deposit.is_zero() {
848
                return Ok(StreamPaymentStatus {
849
                    payment: 0u32.into(),
850
                    deposit_left: stream.deposit,
851
                    stalled: true,
852
                });
853
63
            }
854

            
855
            // Dont perform payment if now is before or equal to `last_time_updated`.
856
            // It can be before due to the deadline adjustment.
857
63
            let Some(delta) = now.checked_sub(&last_time_updated) else {
858
11
                return Ok(StreamPaymentStatus {
859
11
                    payment: 0u32.into(),
860
11
                    deposit_left: stream.deposit,
861
11
                    stalled: true,
862
11
                });
863
            };
864

            
865
            // We compute the amount due to the target according to the rate, which may be
866
            // lowered if the stream deposit is lower.
867
            // Saturating is fine as it'll be clamped to the source deposit. It is also safer as
868
            // considering it an error can make a stream un-updatable if too much time has passed
869
            // without updates.
870
52
            let mut payment = delta.saturating_mul(stream.config.rate);
871

            
872
            // We compute the new amount of locked funds. If it underflows it
873
            // means that there is more to pay that what is left, in which case
874
            // we pay all that is left.
875
52
            let (deposit_left, stalled) = match stream.deposit.checked_sub(&payment) {
876
50
                Some(v) if v.is_zero() => (v, true),
877
49
                Some(v) => (v, false),
878
                None => {
879
2
                    payment = stream.deposit;
880
2
                    (Zero::zero(), true)
881
                }
882
            };
883

            
884
52
            Ok(StreamPaymentStatus {
885
52
                payment,
886
52
                deposit_left,
887
52
                stalled,
888
52
            })
889
63
        }
890

            
891
        /// Behavior:
892
        /// A stream payment consist of a locked deposit, a rate per unit of time and the
893
        /// last time the stream was updated. When updating the stream, **at most**
894
        /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
895
        /// account. If this amount is greater than the left deposit, the stream is considered
896
        /// drained **but not closed**. The source can come back later and refill the stream,
897
        /// however there will be no retroactive payment for the time spent as drained.
898
        /// If the stream payment is used to rent a service, the target should pause the service
899
        /// while the stream is drained, and resume it once it is refilled.
900
51
        fn perform_stream_payment(
901
51
            stream_id: T::StreamId,
902
51
            stream: &mut StreamOf<T>,
903
51
        ) -> Result<T::Balance, DispatchErrorWithPostInfo> {
904
51
            let now = T::TimeProvider::now(&stream.config.time_unit)
905
51
                .ok_or(Error::<T>::CantFetchCurrentTime)?;
906

            
907
            // We want to update `stream.last_time_updated` to `now` as soon
908
            // as possible to avoid forgetting to do it. We copy the old value
909
            // for payment computation.
910
51
            let last_time_updated = stream.last_time_updated;
911
51
            stream.last_time_updated = now;
912

            
913
            let StreamPaymentStatus {
914
51
                payment,
915
51
                deposit_left,
916
51
                stalled,
917
51
            } = Self::stream_payment_status_by_ref(stream, last_time_updated, now)?;
918

            
919
51
            if payment.is_zero() {
920
30
                return Ok(0u32.into());
921
21
            }
922
21

            
923
21
            // Transfer from the source to target.
924
21
            T::Assets::transfer_deposit(
925
21
                &stream.config.asset_id,
926
21
                &stream.source,
927
21
                &stream.target,
928
21
                payment,
929
21
            )?;
930

            
931
            // Update stream info.
932
21
            stream.deposit = deposit_left;
933
21

            
934
21
            // Emit event.
935
21
            Pallet::<T>::deposit_event(Event::<T>::StreamPayment {
936
21
                stream_id,
937
21
                source: stream.source.clone(),
938
21
                target: stream.target.clone(),
939
21
                amount: payment,
940
21
                stalled,
941
21
            });
942
21

            
943
21
            Ok(payment)
944
51
        }
945

            
946
14
        fn apply_deposit_change(
947
14
            stream: &mut StreamOf<T>,
948
14
            change: DepositChange<T::Balance>,
949
14
        ) -> DispatchResultWithPostInfo {
950
14
            match change {
951
3
                DepositChange::Absolute(amount) => {
952
3
                    if let Some(increase) = amount.checked_sub(&stream.deposit) {
953
1
                        T::Assets::increase_deposit(
954
1
                            &stream.config.asset_id,
955
1
                            &stream.source,
956
1
                            increase,
957
1
                        )?;
958
2
                    } else if let Some(decrease) = stream.deposit.checked_sub(&amount) {
959
2
                        T::Assets::decrease_deposit(
960
2
                            &stream.config.asset_id,
961
2
                            &stream.source,
962
2
                            decrease,
963
2
                        )?;
964
                    }
965
2
                    stream.deposit = amount;
966
                }
967
8
                DepositChange::Increase(increase) => {
968
8
                    stream.deposit = stream
969
8
                        .deposit
970
8
                        .checked_add(&increase)
971
8
                        .ok_or(ArithmeticError::Overflow)?;
972
6
                    T::Assets::increase_deposit(&stream.config.asset_id, &stream.source, increase)?;
973
                }
974
3
                DepositChange::Decrease(decrease) => {
975
3
                    stream.deposit = stream
976
3
                        .deposit
977
3
                        .checked_sub(&decrease)
978
3
                        .ok_or(ArithmeticError::Underflow)?;
979
1
                    T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, decrease)?;
980
                }
981
            }
982

            
983
9
            Ok(().into())
984
14
        }
985

            
986
        /// Tries to apply a possibly immediate change. Return if that change was immediate and
987
        /// applied or not.
988
        ///
989
        /// If asset id and time unit are the same, we allow to make the change
990
        /// immediatly if the origin is at a disadvantage.
991
        /// We allow this even if there is already a pending request.
992
46
        fn maybe_immediate_change(
993
46
            stream_id: T::StreamId,
994
46
            stream: &mut StreamOf<T>,
995
46
            new_config: &StreamConfigOf<T>,
996
46
            deposit_change: Option<DepositChange<T::Balance>>,
997
46
            requester: Party,
998
46
        ) -> Result<bool, DispatchErrorWithPostInfo> {
999
46
            if new_config.time_unit != stream.config.time_unit
23
                || new_config.asset_id != stream.config.asset_id
            {
28
                return Ok(false);
18
            }
18

            
18
            if requester == Party::Source && new_config.rate < stream.config.rate {
6
                return Ok(false);
12
            }
12

            
12
            if requester == Party::Target && new_config.rate > stream.config.rate {
5
                return Ok(false);
7
            }
7

            
7
            // Perform pending payment before changing config.
7
            Self::perform_stream_payment(stream_id, stream)?;
            // We apply the requested deposit change.
7
            if let Some(change) = deposit_change {
5
                Self::apply_deposit_change(stream, change)?;
2
            }
            // Emit event.
5
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
5
                stream_id,
5
                old_config: stream.config.clone(),
5
                new_config: new_config.clone(),
5
                deposit_change,
5
            });
5

            
5
            // Update storage.
5
            stream.config = new_config.clone();
5
            Streams::<T>::insert(stream_id, stream);
5

            
5
            Ok(true)
46
        }
    }
}