1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
#![doc = include_str!("../README.md")]
18
#![cfg_attr(not(feature = "std"), no_std)]
19

            
20
#[cfg(test)]
21
mod mock;
22

            
23
#[cfg(test)]
24
mod tests;
25

            
26
#[cfg(feature = "runtime-benchmarks")]
27
mod benchmarking;
28

            
29
pub mod weights;
30
pub use weights::WeightInfo;
31

            
32
#[cfg(feature = "std")]
33
use serde::{Deserialize, Serialize};
34

            
35
use {
36
    core::cmp::min,
37
    frame_support::{
38
        dispatch::DispatchErrorWithPostInfo,
39
        pallet,
40
        pallet_prelude::*,
41
        storage::types::{StorageDoubleMap, StorageMap},
42
        traits::{
43
            fungible::{Inspect, MutateHold},
44
            tokens::{Balance, Precision},
45
        },
46
        Blake2_128Concat,
47
    },
48
    frame_system::pallet_prelude::*,
49
    parity_scale_codec::{FullCodec, MaxEncodedLen},
50
    scale_info::TypeInfo,
51
    sp_runtime::{
52
        traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedSub, One, Saturating, Zero},
53
        ArithmeticError,
54
    },
55
    sp_std::{fmt::Debug, marker::PhantomData},
56
};
57

            
58
pub use pallet::*;
59

            
60
/// Type able to provide the current time for given unit.
61
/// For each unit the returned number should monotonically increase and not
62
/// overflow.
63
pub trait TimeProvider<Unit, Number> {
64
    fn now(unit: &Unit) -> Option<Number>;
65

            
66
    /// Benchmarks: should return the time unit which has the worst performance calling
67
    /// `TimeProvider::now(unit)` with.
68
    #[cfg(feature = "runtime-benchmarks")]
69
    fn bench_worst_case_time_unit() -> Unit;
70

            
71
    /// Benchmarks: sets the "now" time for time unit returned by `bench_worst_case_time_unit`.
72
    #[cfg(feature = "runtime-benchmarks")]
73
    fn bench_set_now(instant: Number);
74
}
75

            
76
/// Interactions the pallet needs with assets.
77
pub trait Assets<AccountId, AssetId, Balance> {
78
    /// Transfer assets deposited by an account to another account.
79
    /// Those assets should not be considered deposited in the target account.
80
    fn transfer_deposit(
81
        asset_id: &AssetId,
82
        from: &AccountId,
83
        to: &AccountId,
84
        amount: Balance,
85
    ) -> DispatchResult;
86

            
87
    /// Increase the deposit for an account and asset id. Should fail if account doesn't have
88
    /// enough of that asset. Funds should be safe and not slashable.
89
    fn increase_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
90
        -> DispatchResult;
91

            
92
    /// Decrease the deposit for an account and asset id. Should fail on underflow.
93
    fn decrease_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
94
        -> DispatchResult;
95

            
96
    /// Return the deposit for given asset and account.
97
    fn get_deposit(asset_id: &AssetId, account: &AccountId) -> Balance;
98

            
99
    /// Benchmarks: should return the asset id which has the worst performance when interacting
100
    /// with it.
101
    #[cfg(feature = "runtime-benchmarks")]
102
    fn bench_worst_case_asset_id() -> AssetId;
103

            
104
    /// Benchmarks: should return the another asset id which has the worst performance when interacting
105
    /// with it afther `bench_worst_case_asset_id`. This is to benchmark the worst case when changing config
106
    /// from one asset to another. If there is only one asset id it is fine to return it in both
107
    /// `bench_worst_case_asset_id` and `bench_worst_case_asset_id2`.
108
    #[cfg(feature = "runtime-benchmarks")]
109
    fn bench_worst_case_asset_id2() -> AssetId;
110

            
111
    /// Benchmarks: should set the balance.
112
    #[cfg(feature = "runtime-benchmarks")]
113
    fn bench_set_balance(asset_id: &AssetId, account: &AccountId, amount: Balance);
114
}
115

            
116
#[pallet]
117
pub mod pallet {
118
    use super::*;
119

            
120
    /// Pooled Staking pallet.
121
13620
    #[pallet::pallet]
122
    #[pallet::without_storage_info]
123
    pub struct Pallet<T>(PhantomData<T>);
124

            
125
    #[pallet::config]
126
    pub trait Config: frame_system::Config {
127
        /// Overarching event type
128
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
129

            
130
        /// Type used to represent stream ids. Should be large enough to not overflow.
131
        type StreamId: AtLeast32BitUnsigned
132
            + Default
133
            + Debug
134
            + Copy
135
            + Clone
136
            + FullCodec
137
            + TypeInfo
138
            + MaxEncodedLen;
139

            
140
        /// The balance type, which is also the type representing time (as this
141
        /// pallet will do math with both time and balances to compute how
142
        /// much should be paid).
143
        type Balance: Balance;
144

            
145
        /// Type representing an asset id, a identifier allowing distinguishing assets.
146
        type AssetId: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + PartialEq + Eq;
147

            
148
        /// Provide interaction with assets.
149
        type Assets: Assets<Self::AccountId, Self::AssetId, Self::Balance>;
150

            
151
        /// Currency for the opening balance hold for the storage used by the Stream.
152
        /// NOT to be confused with Assets.
153
        type Currency: Inspect<Self::AccountId, Balance = Self::Balance>
154
            + MutateHold<Self::AccountId, Reason = Self::RuntimeHoldReason>;
155

            
156
        type RuntimeHoldReason: From<HoldReason>;
157

            
158
        #[pallet::constant]
159
        type OpenStreamHoldAmount: Get<Self::Balance>;
160

            
161
        /// Represents which units of time can be used. Designed to be an enum
162
        /// with a variant for each kind of time source/scale supported.
163
        type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
164

            
165
        /// Provide the current time in given unit.
166
        type TimeProvider: TimeProvider<Self::TimeUnit, Self::Balance>;
167

            
168
        type WeightInfo: weights::WeightInfo;
169
    }
170

            
171
    type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
172
    type AssetIdOf<T> = <T as Config>::AssetId;
173

            
174
    pub type RequestNonce = u32;
175

            
176
    /// A stream payment from source to target.
177
    /// Stores the last time the stream was updated, which allows to compute
178
    /// elapsed time and perform payment.
179
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
180
1584
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Clone, TypeInfo)]
181
    pub struct Stream<AccountId, Unit, AssetId, Balance> {
182
        /// Payer, source of the stream.
183
        pub source: AccountId,
184
        /// Payee, target of the stream.
185
        pub target: AccountId,
186
        /// Steam config (time unit, asset id, rate)
187
        pub config: StreamConfig<Unit, AssetId, Balance>,
188
        /// How much is deposited to fund this stream.
189
        pub deposit: Balance,
190
        /// Last time the stream was updated in `config.time_unit`.
191
        pub last_time_updated: Balance,
192
        /// Nonce for requests. This prevents a request to make a first request
193
        /// then change it to another request to frontrun the other party
194
        /// accepting.
195
        pub request_nonce: RequestNonce,
196
        /// A pending change request if any.
197
        pub pending_request: Option<ChangeRequest<Unit, AssetId, Balance>>,
198
        /// One-time opening deposit. Will be released on close.
199
        pub opening_deposit: Balance,
200
    }
201

            
202
    impl<AccountId: PartialEq, Unit, AssetId, Balance> Stream<AccountId, Unit, AssetId, Balance> {
203
69
        pub fn account_to_party(&self, account: AccountId) -> Option<Party> {
204
69
            match account {
205
69
                a if a == self.source => Some(Party::Source),
206
26
                a if a == self.target => Some(Party::Target),
207
3
                _ => None,
208
            }
209
69
        }
210
    }
211

            
212
    /// Stream configuration.
213
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
214
704
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Copy, Clone, TypeInfo)]
215
    pub struct StreamConfig<Unit, AssetId, Balance> {
216
        /// Unit in which time is measured using a `TimeProvider`.
217
        pub time_unit: Unit,
218
        /// Asset used for payment.
219
        pub asset_id: AssetId,
220
        /// Amount of asset / unit.
221
        pub rate: Balance,
222
    }
223

            
224
    /// Origin of a change request.
225
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
226
528
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Copy, Clone, TypeInfo)]
227
    pub enum Party {
228
32
        Source,
229
23
        Target,
230
    }
231

            
232
    impl Party {
233
4
        pub fn inverse(self) -> Self {
234
4
            match self {
235
2
                Party::Source => Party::Target,
236
2
                Party::Target => Party::Source,
237
            }
238
4
        }
239
    }
240

            
241
    /// Kind of change requested.
242
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
243
704
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Copy, Clone, TypeInfo)]
244
    pub enum ChangeKind<Time> {
245
23
        /// The requested change is a suggestion, and the other party doesn't
246
        /// need to accept it.
247
        Suggestion,
248
28
        /// The requested change is mandatory, and the other party must either
249
        /// accept the change or close the stream. Reaching the deadline will
250
        /// close the stream too.
251
        Mandatory { deadline: Time },
252
    }
253

            
254
    /// Describe how the deposit should change.
255
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
256
1232
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Copy, Clone, TypeInfo)]
257
    pub enum DepositChange<Balance> {
258
19
        /// Increase deposit by given amount.
259
        Increase(Balance),
260
1
        /// Decrease deposit by given amount.
261
        Decrease(Balance),
262
1
        /// Set deposit to given amount.
263
        Absolute(Balance),
264
    }
265

            
266
    /// A request to change a stream config.
267
    #[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
268
880
    #[derive(RuntimeDebug, PartialEq, Eq, Encode, Decode, Clone, TypeInfo)]
269
    pub struct ChangeRequest<Unit, AssetId, Balance> {
270
        pub requester: Party,
271
        pub kind: ChangeKind<Balance>,
272
        pub new_config: StreamConfig<Unit, AssetId, Balance>,
273
        pub deposit_change: Option<DepositChange<Balance>>,
274
    }
275

            
276
    pub type StreamOf<T> =
277
        Stream<AccountIdOf<T>, <T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
278

            
279
    pub type StreamConfigOf<T> =
280
        StreamConfig<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
281

            
282
    pub type ChangeRequestOf<T> =
283
        ChangeRequest<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
284

            
285
    #[derive(Debug, Copy, Clone, PartialEq, Eq)]
286
    pub struct StreamPaymentStatus<Balance> {
287
        pub payment: Balance,
288
        pub deposit_left: Balance,
289
        /// Whenever the stream is stalled, which can occur either when no funds are left or
290
        /// if the time is past a mandatory request deadline.
291
        pub stalled: bool,
292
    }
293

            
294
    /// Store the next available stream id.
295
248
    #[pallet::storage]
296
    pub type NextStreamId<T: Config> = StorageValue<Value = T::StreamId, QueryKind = ValueQuery>;
297

            
298
    /// Store each stream indexed by an Id.
299
281
    #[pallet::storage]
300
    pub type Streams<T: Config> = StorageMap<
301
        Hasher = Blake2_128Concat,
302
        Key = T::StreamId,
303
        Value = StreamOf<T>,
304
        QueryKind = OptionQuery,
305
    >;
306

            
307
    /// Lookup for all streams with given source.
308
    /// To avoid maintaining a growing list of stream ids, they are stored in
309
    /// the form of an entry (AccountId, StreamId). If such entry exists then
310
    /// this AccountId is a source in StreamId. One can iterate over all storage
311
    /// keys starting with the AccountId to find all StreamIds.
312
72
    #[pallet::storage]
313
    pub type LookupStreamsWithSource<T: Config> = StorageDoubleMap<
314
        Key1 = AccountIdOf<T>,
315
        Hasher1 = Blake2_128Concat,
316
        Key2 = T::StreamId,
317
        Hasher2 = Blake2_128Concat,
318
        Value = (),
319
        QueryKind = OptionQuery,
320
    >;
321

            
322
    /// Lookup for all streams with given target.
323
    /// To avoid maintaining a growing list of stream ids, they are stored in
324
    /// the form of an entry (AccountId, StreamId). If such entry exists then
325
    /// this AccountId is a target in StreamId. One can iterate over all storage
326
    /// keys starting with the AccountId to find all StreamIds.
327
72
    #[pallet::storage]
328
    pub type LookupStreamsWithTarget<T: Config> = StorageDoubleMap<
329
        Key1 = AccountIdOf<T>,
330
        Hasher1 = Blake2_128Concat,
331
        Key2 = T::StreamId,
332
        Hasher2 = Blake2_128Concat,
333
        Value = (),
334
        QueryKind = OptionQuery,
335
    >;
336

            
337
62
    #[pallet::error]
338
    #[derive(Clone, PartialEq, Eq)]
339
    pub enum Error<T> {
340
        UnknownStreamId,
341
        StreamIdOverflow,
342
        UnauthorizedOrigin,
343
        CantBeBothSourceAndTarget,
344
        CantFetchCurrentTime,
345
        SourceCantDecreaseRate,
346
        TargetCantIncreaseRate,
347
        CantOverrideMandatoryChange,
348
        NoPendingRequest,
349
        CantAcceptOwnRequest,
350
        CanOnlyCancelOwnRequest,
351
        WrongRequestNonce,
352
        ChangingAssetRequiresAbsoluteDepositChange,
353
        TargetCantChangeDeposit,
354
        ImmediateDepositChangeRequiresSameAssetId,
355
        DeadlineCantBeInPast,
356
        CantFetchStatusBeforeLastTimeUpdated,
357
    }
358

            
359
    #[pallet::event]
360
139
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
361
    pub enum Event<T: Config> {
362
43
        StreamOpened {
363
            stream_id: T::StreamId,
364
        },
365
4
        StreamClosed {
366
            stream_id: T::StreamId,
367
            refunded: T::Balance,
368
        },
369
9
        StreamPayment {
370
            stream_id: T::StreamId,
371
            source: AccountIdOf<T>,
372
            target: AccountIdOf<T>,
373
            amount: T::Balance,
374
            stalled: bool,
375
        },
376
12
        StreamConfigChangeRequested {
377
            stream_id: T::StreamId,
378
            request_nonce: RequestNonce,
379
            requester: Party,
380
            old_config: StreamConfigOf<T>,
381
            new_config: StreamConfigOf<T>,
382
        },
383
5
        StreamConfigChanged {
384
            stream_id: T::StreamId,
385
            old_config: StreamConfigOf<T>,
386
            new_config: StreamConfigOf<T>,
387
            deposit_change: Option<DepositChange<T::Balance>>,
388
        },
389
    }
390

            
391
    /// Freeze reason to use if needed.
392
    #[pallet::composite_enum]
393
    pub enum FreezeReason {
394
        StreamPayment,
395
    }
396

            
397
    /// Hold reason to use if needed.
398
    #[pallet::composite_enum]
399
    pub enum HoldReason {
400
135
        StreamPayment,
401
333
        StreamOpened,
402
    }
403

            
404
120
    #[pallet::call]
405
    impl<T: Config> Pallet<T> {
406
        /// Create a payment stream from the origin to the target with provided config
407
        /// and initial deposit (in the asset defined in the config).
408
        #[pallet::call_index(0)]
409
        #[pallet::weight(T::WeightInfo::open_stream())]
410
        pub fn open_stream(
411
            origin: OriginFor<T>,
412
            target: AccountIdOf<T>,
413
            config: StreamConfigOf<T>,
414
            initial_deposit: T::Balance,
415
63
        ) -> DispatchResultWithPostInfo {
416
63
            let origin = ensure_signed(origin)?;
417
63
            let opening_deposit = T::OpenStreamHoldAmount::get();
418

            
419
63
            let _stream_id = Self::open_stream_returns_id(
420
63
                origin,
421
63
                target,
422
63
                config,
423
63
                initial_deposit,
424
63
                opening_deposit,
425
63
            )?;
426

            
427
58
            Ok(().into())
428
        }
429

            
430
        /// Close a given stream in which the origin is involved. It performs the pending payment
431
        /// before closing the stream.
432
        #[pallet::call_index(1)]
433
        #[pallet::weight(T::WeightInfo::close_stream())]
434
        pub fn close_stream(
435
            origin: OriginFor<T>,
436
            stream_id: T::StreamId,
437
11
        ) -> DispatchResultWithPostInfo {
438
11
            let origin = ensure_signed(origin)?;
439
11
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
440

            
441
            // Only source or target can close a stream.
442
10
            ensure!(
443
10
                origin == stream.source || origin == stream.target,
444
1
                Error::<T>::UnauthorizedOrigin
445
            );
446

            
447
            // Update stream before closing it to ensure fair payment.
448
9
            Self::perform_stream_payment(stream_id, &mut stream)?;
449

            
450
            // Unfreeze funds left in the stream.
451
9
            T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, stream.deposit)?;
452

            
453
            // Release opening deposit
454
9
            if stream.opening_deposit > 0u32.into() {
455
9
                T::Currency::release(
456
9
                    &HoldReason::StreamOpened.into(),
457
9
                    &stream.source,
458
9
                    stream.opening_deposit,
459
9
                    Precision::Exact,
460
9
                )?;
461
            }
462

            
463
            // Remove stream from storage.
464
9
            Streams::<T>::remove(stream_id);
465
9
            LookupStreamsWithSource::<T>::remove(stream.source, stream_id);
466
9
            LookupStreamsWithTarget::<T>::remove(stream.target, stream_id);
467
9

            
468
9
            // Emit event.
469
9
            Pallet::<T>::deposit_event(Event::<T>::StreamClosed {
470
9
                stream_id,
471
9
                refunded: stream.deposit.saturating_add(stream.opening_deposit),
472
9
            });
473
9

            
474
9
            Ok(().into())
475
        }
476

            
477
        /// Perform the pending payment of a stream. Anyone can call this.
478
        #[pallet::call_index(2)]
479
        #[pallet::weight(T::WeightInfo::perform_payment())]
480
        pub fn perform_payment(
481
            origin: OriginFor<T>,
482
            stream_id: T::StreamId,
483
17
        ) -> DispatchResultWithPostInfo {
484
17
            // No problem with anyone updating any stream.
485
17
            let _ = ensure_signed(origin)?;
486

            
487
17
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
488
16
            Self::perform_stream_payment(stream_id, &mut stream)?;
489
16
            Streams::<T>::insert(stream_id, stream);
490
16

            
491
16
            Ok(().into())
492
        }
493

            
494
        /// Requests a change to a stream config or deposit.
495
        ///
496
        /// If the new config don't change the time unit and asset id, the change will be applied
497
        /// immediately if it is at the desadvantage of the caller. Otherwise, the request is stored
498
        /// in the stream and will have to be approved by the other party.
499
        ///
500
        /// This call accepts a deposit change, which can only be provided by the source of the
501
        /// stream. An absolute change is required when changing asset id, as the current deposit
502
        /// will be released and a new deposit is required in the new asset.
503
        #[pallet::call_index(3)]
504
        #[pallet::weight(
505
            T::WeightInfo::request_change_immediate()
506
            .max(T::WeightInfo::request_change_delayed())
507
        )]
508
        pub fn request_change(
509
            origin: OriginFor<T>,
510
            stream_id: T::StreamId,
511
            kind: ChangeKind<T::Balance>,
512
            new_config: StreamConfigOf<T>,
513
            deposit_change: Option<DepositChange<T::Balance>>,
514
49
        ) -> DispatchResultWithPostInfo {
515
49
            let origin = ensure_signed(origin)?;
516
49
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
517

            
518
48
            let requester = stream
519
48
                .account_to_party(origin)
520
48
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
521

            
522
47
            ensure!(
523
47
                requester == Party::Source || deposit_change.is_none(),
524
1
                Error::<T>::TargetCantChangeDeposit
525
            );
526

            
527
46
            if stream.config == new_config && deposit_change.is_none() {
528
1
                return Ok(().into());
529
45
            }
530

            
531
45
            if let ChangeKind::Mandatory { deadline } = kind {
532
10
                let now = T::TimeProvider::now(&stream.config.time_unit)
533
10
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
534

            
535
10
                ensure!(deadline >= now, Error::<T>::DeadlineCantBeInPast);
536
35
            }
537

            
538
            // If asset id and time unit are the same, we allow to make the change
539
            // immediatly if the origin is at a disadvantage.
540
            // We allow this even if there is already a pending request.
541
44
            if Self::maybe_immediate_change(
542
44
                stream_id,
543
44
                &mut stream,
544
44
                &new_config,
545
44
                deposit_change,
546
44
                requester,
547
44
            )? {
548
5
                return Ok(().into());
549
37
            }
550
37

            
551
37
            // If the source is requesting a change of asset, they must provide an absolute change.
552
37
            if requester == Party::Source
553
27
                && new_config.asset_id != stream.config.asset_id
554
3
                && !matches!(deposit_change, Some(DepositChange::Absolute(_)))
555
            {
556
3
                Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?;
557
34
            }
558

            
559
            // If there is already a mandatory change request, only the origin
560
            // of this request can change it.
561
            if let Some(ChangeRequest {
562
                kind: ChangeKind::Mandatory { .. },
563
3
                requester: pending_requester,
564
                ..
565
34
            }) = &stream.pending_request
566
            {
567
3
                ensure!(
568
3
                    &requester == pending_requester,
569
2
                    Error::<T>::CantOverrideMandatoryChange
570
                );
571
31
            }
572

            
573
32
            stream.request_nonce = stream.request_nonce.wrapping_add(1);
574
32
            stream.pending_request = Some(ChangeRequest {
575
32
                requester,
576
32
                kind,
577
32
                new_config: new_config.clone(),
578
32
                deposit_change,
579
32
            });
580
32

            
581
32
            // Emit event.
582
32
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChangeRequested {
583
32
                stream_id,
584
32
                request_nonce: stream.request_nonce,
585
32
                requester,
586
32
                old_config: stream.config.clone(),
587
32
                new_config,
588
32
            });
589
32

            
590
32
            // Update storage.
591
32
            Streams::<T>::insert(stream_id, stream);
592
32

            
593
32
            Ok(().into())
594
        }
595

            
596
        /// Accepts a change requested before by the other party. Takes a nonce to prevent
597
        /// frontrunning attacks. If the target made a request, the source is able to change their
598
        /// deposit.
599
        #[pallet::call_index(4)]
600
        #[pallet::weight(T::WeightInfo::accept_requested_change())]
601
        pub fn accept_requested_change(
602
            origin: OriginFor<T>,
603
            stream_id: T::StreamId,
604
            request_nonce: RequestNonce,
605
            deposit_change: Option<DepositChange<T::Balance>>,
606
18
        ) -> DispatchResultWithPostInfo {
607
18
            let origin = ensure_signed(origin)?;
608
18
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
609

            
610
17
            let accepter = stream
611
17
                .account_to_party(origin)
612
17
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
613

            
614
16
            let Some(request) = stream.pending_request.clone() else {
615
                return Err(Error::<T>::NoPendingRequest.into());
616
            };
617

            
618
16
            ensure!(
619
16
                request_nonce == stream.request_nonce,
620
1
                Error::<T>::WrongRequestNonce
621
            );
622
15
            ensure!(
623
15
                accepter != request.requester,
624
1
                Error::<T>::CantAcceptOwnRequest
625
            );
626

            
627
14
            ensure!(
628
14
                accepter == Party::Source || deposit_change.is_none(),
629
1
                Error::<T>::TargetCantChangeDeposit
630
            );
631

            
632
            // Perform pending payment before changing config.
633
13
            Self::perform_stream_payment(stream_id, &mut stream)?;
634

            
635
            // Apply change.
636
13
            let deposit_change = deposit_change.or(request.deposit_change);
637
13
            match (
638
13
                stream.config.asset_id == request.new_config.asset_id,
639
13
                deposit_change,
640
            ) {
641
                // Same asset and a change, we apply it like in `change_deposit` call.
642
5
                (true, Some(change)) => {
643
5
                    Self::apply_deposit_change(&mut stream, change)?;
644
                }
645
                // Same asset and no change, no problem.
646
4
                (true, None) => (),
647
                // Change in asset with absolute new amount
648
1
                (false, Some(DepositChange::Absolute(amount))) => {
649
1
                    // Release deposit in old asset.
650
1
                    T::Assets::decrease_deposit(
651
1
                        &stream.config.asset_id,
652
1
                        &stream.source,
653
1
                        stream.deposit,
654
1
                    )?;
655

            
656
                    // Make deposit in new asset.
657
1
                    T::Assets::increase_deposit(
658
1
                        &request.new_config.asset_id,
659
1
                        &stream.source,
660
1
                        amount,
661
1
                    )?;
662
1
                    stream.deposit = amount;
663
                }
664
                // It doesn't make sense to change asset while not providing an absolute new
665
                // amount.
666
3
                (false, _) => Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?,
667
            }
668

            
669
            // If time unit changes we need to update `last_time_updated` to be in the
670
            // new unit.
671
10
            if stream.config.time_unit != request.new_config.time_unit {
672
2
                stream.last_time_updated = T::TimeProvider::now(&request.new_config.time_unit)
673
2
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
674
8
            }
675

            
676
            // Event
677
10
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
678
10
                stream_id,
679
10
                old_config: stream.config,
680
10
                new_config: request.new_config.clone(),
681
10
                deposit_change,
682
10
            });
683
10

            
684
10
            // Update config in storage.
685
10
            stream.config = request.new_config;
686
10
            stream.pending_request = None;
687
10
            Streams::<T>::insert(stream_id, stream);
688
10

            
689
10
            Ok(().into())
690
        }
691

            
692
        #[pallet::call_index(5)]
693
        #[pallet::weight(T::WeightInfo::cancel_change_request())]
694
        pub fn cancel_change_request(
695
            origin: OriginFor<T>,
696
            stream_id: T::StreamId,
697
5
        ) -> DispatchResultWithPostInfo {
698
5
            let origin = ensure_signed(origin)?;
699
5
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
700

            
701
4
            let accepter = stream
702
4
                .account_to_party(origin)
703
4
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
704

            
705
3
            let Some(request) = stream.pending_request.take() else {
706
1
                return Err(Error::<T>::NoPendingRequest.into());
707
            };
708

            
709
2
            ensure!(
710
2
                accepter == request.requester,
711
2
                Error::<T>::CanOnlyCancelOwnRequest
712
            );
713

            
714
            // Update storage.
715
            // Pending request is removed by calling `.take()`.
716
            Streams::<T>::insert(stream_id, stream);
717

            
718
            Ok(().into())
719
        }
720

            
721
        /// Allows immediately changing the deposit for a stream, which is simpler than
722
        /// calling `request_change` with the proper parameters.
723
        /// The call takes an asset id to ensure it has not changed (by an accepted request) before
724
        /// the call is included in a block, in which case the unit is no longer the same and quantities
725
        /// will not have the same scale/value.
726
        #[pallet::call_index(6)]
727
        #[pallet::weight(T::WeightInfo::immediately_change_deposit())]
728
        pub fn immediately_change_deposit(
729
            origin: OriginFor<T>,
730
            stream_id: T::StreamId,
731
            asset_id: T::AssetId,
732
            change: DepositChange<T::Balance>,
733
7
        ) -> DispatchResultWithPostInfo {
734
7
            let origin = ensure_signed(origin)?;
735
7
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
736

            
737
6
            ensure!(stream.source == origin, Error::<T>::UnauthorizedOrigin);
738
4
            ensure!(
739
4
                stream.config.asset_id == asset_id,
740
                Error::<T>::ImmediateDepositChangeRequiresSameAssetId
741
            );
742

            
743
            // Perform pending payment before changing deposit.
744
4
            Self::perform_stream_payment(stream_id, &mut stream)?;
745

            
746
            // Apply change.
747
4
            Self::apply_deposit_change(&mut stream, change)?;
748

            
749
            // Event
750
1
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
751
1
                stream_id,
752
1
                old_config: stream.config.clone(),
753
1
                new_config: stream.config.clone(),
754
1
                deposit_change: Some(change),
755
1
            });
756
1

            
757
1
            // Update stream in storage.
758
1
            Streams::<T>::insert(stream_id, stream);
759
1

            
760
1
            Ok(().into())
761
        }
762
    }
763

            
764
    impl<T: Config> Pallet<T> {
765
        /// Try to open a stream and returns its id.
766
        /// Prefers calling this function from other pallets instead of `open_stream` as the
767
        /// latter can't return the id.
768
63
        pub fn open_stream_returns_id(
769
63
            origin: AccountIdOf<T>,
770
63
            target: AccountIdOf<T>,
771
63
            config: StreamConfigOf<T>,
772
63
            initial_deposit: T::Balance,
773
63
            opening_deposit: T::Balance,
774
63
        ) -> Result<T::StreamId, DispatchErrorWithPostInfo> {
775
63
            ensure!(origin != target, Error::<T>::CantBeBothSourceAndTarget);
776

            
777
            // Generate a new stream id.
778
62
            let stream_id = NextStreamId::<T>::get();
779
62
            let next_stream_id = stream_id
780
62
                .checked_add(&One::one())
781
62
                .ok_or(Error::<T>::StreamIdOverflow)?;
782
61
            NextStreamId::<T>::set(next_stream_id);
783
61

            
784
61
            // Hold opening deposit for the storage used by Stream
785
61
            if opening_deposit > 0u32.into() {
786
61
                T::Currency::hold(&HoldReason::StreamOpened.into(), &origin, opening_deposit)?;
787
            }
788

            
789
            // Freeze initial deposit.
790
61
            T::Assets::increase_deposit(&config.asset_id, &origin, initial_deposit)?;
791

            
792
            // Create stream data.
793
58
            let now =
794
59
                T::TimeProvider::now(&config.time_unit).ok_or(Error::<T>::CantFetchCurrentTime)?;
795
58
            let stream = Stream {
796
58
                source: origin.clone(),
797
58
                target: target.clone(),
798
58
                config,
799
58
                deposit: initial_deposit,
800
58
                last_time_updated: now,
801
58
                request_nonce: 0,
802
58
                pending_request: None,
803
58
                opening_deposit,
804
58
            };
805
58

            
806
58
            // Insert stream in storage.
807
58
            Streams::<T>::insert(stream_id, stream);
808
58
            LookupStreamsWithSource::<T>::insert(origin, stream_id, ());
809
58
            LookupStreamsWithTarget::<T>::insert(target, stream_id, ());
810
58

            
811
58
            // Emit event.
812
58
            Pallet::<T>::deposit_event(Event::<T>::StreamOpened { stream_id });
813
58

            
814
58
            Ok(stream_id)
815
63
        }
816

            
817
        /// Get the stream payment current status, telling how much payment is
818
        /// pending, how much deposit will be left and whenever the stream is stalled.
819
        /// The stream is considered stalled if no funds are left or if the provided
820
        /// time is past a mandatory request deadline. If the provided `now` is `None`
821
        /// then the current time will be fetched. Being able to provide a custom `now`
822
        /// allows to check the status in the future. It is invalid to provide a `now` that is
823
        /// before `last_time_updated`.
824
18
        pub fn stream_payment_status(
825
18
            stream_id: T::StreamId,
826
18
            now: Option<T::Balance>,
827
18
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
828
18
            let stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
829
14
            let now = match now {
830
                Some(v) => v,
831
14
                None => T::TimeProvider::now(&stream.config.time_unit)
832
14
                    .ok_or(Error::<T>::CantFetchCurrentTime)?,
833
            };
834

            
835
14
            let last_time_updated = stream.last_time_updated;
836
14

            
837
14
            ensure!(
838
14
                now >= last_time_updated,
839
                Error::<T>::CantFetchStatusBeforeLastTimeUpdated
840
            );
841

            
842
14
            Self::stream_payment_status_by_ref(&stream, last_time_updated, now)
843
18
        }
844

            
845
63
        fn stream_payment_status_by_ref(
846
63
            stream: &StreamOf<T>,
847
63
            last_time_updated: T::Balance,
848
63
            mut now: T::Balance,
849
63
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
850
63
            let mut stalled_by_deadline = false;
851

            
852
            // Take into account mandatory change request deadline. Note that
853
            // while it'll perform payment up to deadline,
854
            // `stream.last_time_updated` is still the "real now" to avoid
855
            // retroactive payment in case the deadline changes.
856
            if let Some(ChangeRequest {
857
14
                kind: ChangeKind::Mandatory { deadline },
858
                ..
859
63
            }) = &stream.pending_request
860
            {
861
14
                now = min(now, *deadline);
862
14

            
863
14
                if now == *deadline {
864
7
                    stalled_by_deadline = true;
865
9
                }
866
49
            }
867

            
868
            // If deposit is zero the stream is fully drained and there is nothing to transfer.
869
63
            if stream.deposit.is_zero() {
870
                return Ok(StreamPaymentStatus {
871
                    payment: 0u32.into(),
872
                    deposit_left: stream.deposit,
873
                    stalled: true,
874
                });
875
63
            }
876

            
877
            // Dont perform payment if now is before or equal to `last_time_updated`.
878
            // It can be before due to the deadline adjustment.
879
63
            let Some(delta) = now.checked_sub(&last_time_updated) else {
880
2
                return Ok(StreamPaymentStatus {
881
2
                    payment: 0u32.into(),
882
2
                    deposit_left: stream.deposit,
883
2
                    stalled: true,
884
2
                });
885
            };
886

            
887
            // We compute the amount due to the target according to the rate, which may be
888
            // lowered if the stream deposit is lower.
889
            // Saturating is fine as it'll be clamped to the source deposit. It is also safer as
890
            // considering it an error can make a stream un-updatable if too much time has passed
891
            // without updates.
892
61
            let mut payment = delta.saturating_mul(stream.config.rate);
893

            
894
            // We compute the new amount of locked funds. If it underflows it
895
            // means that there is more to pay that what is left, in which case
896
            // we pay all that is left.
897
61
            let (deposit_left, stalled) = match stream.deposit.checked_sub(&payment) {
898
59
                Some(v) if v.is_zero() => (v, true),
899
58
                Some(v) => (v, stalled_by_deadline),
900
                None => {
901
2
                    payment = stream.deposit;
902
2
                    (Zero::zero(), true)
903
                }
904
            };
905

            
906
61
            Ok(StreamPaymentStatus {
907
61
                payment,
908
61
                deposit_left,
909
61
                stalled,
910
61
            })
911
63
        }
912

            
913
        /// Behavior:
914
        /// A stream payment consist of a locked deposit, a rate per unit of time and the
915
        /// last time the stream was updated. When updating the stream, **at most**
916
        /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
917
        /// account. If this amount is greater than the left deposit, the stream is considered
918
        /// drained **but not closed**. The source can come back later and refill the stream,
919
        /// however there will be no retroactive payment for the time spent as drained.
920
        /// If the stream payment is used to rent a service, the target should pause the service
921
        /// while the stream is drained, and resume it once it is refilled.
922
49
        fn perform_stream_payment(
923
49
            stream_id: T::StreamId,
924
49
            stream: &mut StreamOf<T>,
925
49
        ) -> Result<T::Balance, DispatchErrorWithPostInfo> {
926
49
            let now = T::TimeProvider::now(&stream.config.time_unit)
927
49
                .ok_or(Error::<T>::CantFetchCurrentTime)?;
928

            
929
            // We want to update `stream.last_time_updated` to `now` as soon
930
            // as possible to avoid forgetting to do it. We copy the old value
931
            // for payment computation.
932
49
            let last_time_updated = stream.last_time_updated;
933
49
            stream.last_time_updated = now;
934

            
935
            let StreamPaymentStatus {
936
49
                payment,
937
49
                deposit_left,
938
49
                stalled,
939
49
            } = Self::stream_payment_status_by_ref(stream, last_time_updated, now)?;
940

            
941
49
            if payment.is_zero() {
942
25
                return Ok(0u32.into());
943
24
            }
944
24

            
945
24
            // Transfer from the source to target.
946
24
            T::Assets::transfer_deposit(
947
24
                &stream.config.asset_id,
948
24
                &stream.source,
949
24
                &stream.target,
950
24
                payment,
951
24
            )?;
952

            
953
            // Update stream info.
954
24
            stream.deposit = deposit_left;
955
24

            
956
24
            // Emit event.
957
24
            Pallet::<T>::deposit_event(Event::<T>::StreamPayment {
958
24
                stream_id,
959
24
                source: stream.source.clone(),
960
24
                target: stream.target.clone(),
961
24
                amount: payment,
962
24
                stalled,
963
24
            });
964
24

            
965
24
            Ok(payment)
966
49
        }
967

            
968
14
        fn apply_deposit_change(
969
14
            stream: &mut StreamOf<T>,
970
14
            change: DepositChange<T::Balance>,
971
14
        ) -> DispatchResultWithPostInfo {
972
14
            match change {
973
3
                DepositChange::Absolute(amount) => {
974
3
                    if let Some(increase) = amount.checked_sub(&stream.deposit) {
975
1
                        T::Assets::increase_deposit(
976
1
                            &stream.config.asset_id,
977
1
                            &stream.source,
978
1
                            increase,
979
1
                        )?;
980
2
                    } else if let Some(decrease) = stream.deposit.checked_sub(&amount) {
981
2
                        T::Assets::decrease_deposit(
982
2
                            &stream.config.asset_id,
983
2
                            &stream.source,
984
2
                            decrease,
985
2
                        )?;
986
                    }
987
2
                    stream.deposit = amount;
988
                }
989
8
                DepositChange::Increase(increase) => {
990
8
                    stream.deposit = stream
991
8
                        .deposit
992
8
                        .checked_add(&increase)
993
8
                        .ok_or(ArithmeticError::Overflow)?;
994
6
                    T::Assets::increase_deposit(&stream.config.asset_id, &stream.source, increase)?;
995
                }
996
3
                DepositChange::Decrease(decrease) => {
997
3
                    stream.deposit = stream
998
3
                        .deposit
999
3
                        .checked_sub(&decrease)
3
                        .ok_or(ArithmeticError::Underflow)?;
1
                    T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, decrease)?;
                }
            }
9
            Ok(().into())
14
        }
        /// Tries to apply a possibly immediate change. Return if that change was immediate and
        /// applied or not.
        ///
        /// If asset id and time unit are the same, we allow to make the change
        /// immediatly if the origin is at a disadvantage.
        /// We allow this even if there is already a pending request.
44
        fn maybe_immediate_change(
44
            stream_id: T::StreamId,
44
            stream: &mut StreamOf<T>,
44
            new_config: &StreamConfigOf<T>,
44
            deposit_change: Option<DepositChange<T::Balance>>,
44
            requester: Party,
44
        ) -> Result<bool, DispatchErrorWithPostInfo> {
44
            if new_config.time_unit != stream.config.time_unit
21
                || new_config.asset_id != stream.config.asset_id
            {
28
                return Ok(false);
16
            }
16

            
16
            if requester == Party::Source && new_config.rate < stream.config.rate {
6
                return Ok(false);
10
            }
10

            
10
            if requester == Party::Target && new_config.rate > stream.config.rate {
3
                return Ok(false);
7
            }
7

            
7
            // Perform pending payment before changing config.
7
            Self::perform_stream_payment(stream_id, stream)?;
            // We apply the requested deposit change.
7
            if let Some(change) = deposit_change {
5
                Self::apply_deposit_change(stream, change)?;
2
            }
            // Emit event.
5
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
5
                stream_id,
5
                old_config: stream.config.clone(),
5
                new_config: new_config.clone(),
5
                deposit_change,
5
            });
5

            
5
            // Update storage.
5
            stream.config = new_config.clone();
5
            Streams::<T>::insert(stream_id, stream);
5

            
5
            Ok(true)
44
        }
    }
}