[Reactive Extensions] Two ways to issue values at arbitrary time intervals

4 minute read

ReactiveExtensions has a ʻObservable.Interval` factory method that issues values at regular intervals, but no factory method that issues values at arbitrary intervals.
No, it’s strictly, but it’s complicated to use, so I’ll introduce it here.

Two ways to publish values at arbitrary time intervals

Method 1. Use the Observable.Generate factory method

ʻObservable.Generate Factory method has a Func <TSource, TimeSpan> type timeSelector` argument that can define the value issuance interval, so you can create an IObservable that issues a value at any time interval. I can do it.

Definition and usage of the Observable.Generate method

First, I will explain the definition and basic usage of the Observable.Generate method.

Definition

The simplest definition is:

public static IObservable<TResult> Generate<TState, TResult> (
    TState initialState,                    //Specify the initial value of TState
    Func<TState, bool> condition,           //Specify continuation condition
    Func<TState, TState> iterate,           //Specify the amount of change in TState
    Func<TState, TResult> resultSelector)   //Specify the value to emit light

Example of use

For example, to issue a value between 0 and 4:

Observable.Generate(initialState: 0,            //initial value:0
                    condition: i => i < 5,      //Continuation condition:Issue value is less than 5
                    iterate: i => ++i,          //Amount of change:Increment by 1
                    resultSelector: i => i)     //Issue i as it is
          .Subscribe(i => Console.WriteLine(i),
                     () => Console.WriteLine("OnCompleted"));

It’s a method of specifying arguments like a for statement.

Execution result

0
1
2
3
4
OnCompleted

Specify the value issuance interval with the Observable.Generate method

The main subject is from here.
The ʻObservable.Generate` method has an overload that allows you to specify the interval between issuing values.

Overloaded definition that allows you to specify the value issuance interval

public static IObservable<TResult> Generate<TState, TResult> (
    TState initialState,                    //Specify the initial value of TState
    Func<TState, bool> condition,           //Specify continuation condition
    Func<TState, TState> iterate,           //Specify the amount of change in TState
    Func<TState, TResult> resultSelector,   //Specify the value to emit light
    Func<TState, TimeSpan> timeSelector)    //Specify time interval;

You can specify a method that defines the issue interval in this timeSelector.

How to use

As for how to use it, for example, by preparing a List of TimeSpan that defines the order of issuance and specifying that the values are read in order with the timeSelector, the values are set at the intervals defined in the List. Will be issued.


class MainClass
{
    public static void Main(string[] args)
    {
        //Define issuance interval
        List<TimeSpan> intervals = new List<TimeSpan>()
        {
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(2),
            TimeSpan.FromSeconds(3),
            TimeSpan.FromSeconds(4),
            TimeSpan.FromSeconds(5)
        };

        Console.WriteLine($"{DateTime.Now}Start issuing values");
        Observable.Generate(initialState: 0,
                            condition: n => n < intervals.Count,
                            iterate: n => ++n,
                            resultSelector: n => n,
                            timeSelector: n => intervals[n])
                  .Timestamp()
                  .Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime}Issued value:{val.Value}"),
                              () => Console.WriteLine("Value issuance completed"));
                    
        Console.Read();
    }
}

Execution result

2020/04/26 23:51:09 Start issuing values
2020/04/26 23:51:10 Issued value: 1
2020/04/26 23:51:12 Issued value: 2
2020/04/26 23:51:15 Issued value: 3
2020/04/26 23:51:19 Issued value: 4
2020/04/26 23:51:24 Issued value: 5
Value issuance completed

Values are published at 1,2,3,4,5 second intervals.

Create a factory method that easily issues values at arbitrary time intervals

The Observable.Generate factory method has many arguments and is troublesome, so create a factory method that can easily issue values at arbitrary time intervals.

public static IObservable<int> AnyInterval(IReadOnlyList<TimeSpan> intervals) =>
    Observable.Generate(initialState: 0,
                        condition: n => n < intervals.Count,
                        iterate: n => ++n,
                        resultSelector: n => n,
                        timeSelector: n => intervals[n]);

All you have to do is pass a TimeSpan type list that defines the order of the times you want to wait.
I really wanted the argument to be ʻIEnumerable `, but I stopped because I was using Count and an indexer internally.

When I actually use it, it looks like this.

ObservableEx.AnyInterval(new List<TimeSpan>{TimeSpan.FromSeconds(1),
                                            TimeSpan.FromSeconds(2),
                                            TimeSpan.FromSeconds(3),
                                            TimeSpan.FromSeconds(4),
                                            TimeSpan.FromSeconds(5) })
            .Timestamp()
            .Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime}Issued value:{val.Value}"),
                        () => Console.WriteLine("Value issuance completed"));

This is the end of issuing 5 values, but if you want to issue infinitely, you can loop by inserting the Repeat () operator.

Method 2. Use the Observable.FromAsync factory method

ʻObservable.FromAsync The factory method takes Task as an argument and sends the return value when Task ends. Therefore, if you adjust the time with Task.Delay etc., you can create ʻIObservable that issues values at arbitrary time intervals.

For example, it looks like the following.


Observable.FromAsync(() => Task.Run(async () =>
{
    await Task.Delay(TimeSpan.FromSeconds(1));
    return Unit.Default;
}));

If this is the case, wait 1 second and issue ʻUnit. Since ʻUnit is issued and OnCompleted is issued, if you issueRepeat infinitely, you can create ʻIObservable` that issues a value at 1-second intervals for the time being.


Observable.FromAsync(() => Task.Run(async () =>
{
    await Task.Delay(TimeSpan.FromSeconds(1));
    return Unit.Default;
}))
.Repeat();

The rest is how to change the argument of Task.Delay, but is there no choice but to use external variables?
Please let me know if there is any other good way.

List<TimeSpan> intervals = new List<TimeSpan>()
{
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(2),
    TimeSpan.FromSeconds(3),
    TimeSpan.FromSeconds(4),
    TimeSpan.FromSeconds(5)
};
int n = 0;

Observable.FromAsync(() => Task.Run(async () =>
{
    await Task.Delay(intervals[n]);
    return n++;
}))
.Repeat(intervals.Count).Timestamp()
.Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime}Issued value:{val.Value}"),
            () => Console.WriteLine("Value issuance completed"));

This method isn’t very clean, but it’s possible to change the publishing interval under certain conditions.

List<TimeSpan> intervals = new List<TimeSpan>()
{
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(2),
    TimeSpan.FromSeconds(3),
    TimeSpan.FromSeconds(4),
    TimeSpan.FromSeconds(5)
};
int n = 0;

Observable.FromAsync(() => Task.Run(async () =>
{
    await Task.Delay(intervals[n]);
    if(conditions) await Task.Delay(~); //特定のconditions下で発行間隔を増やす
    return n++;
}))
.Repeat(intervals.Count).Timestamp()
.Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime}Issued value:{val.Value}"),
            () => Console.WriteLine("Value issuance completed"));

It’s very dirty and has many side effects, so you shouldn’t use it positively, but if you really want to, it seems to be usable temporarily.

Summary

–Basically, if you use ʻObservable.Generate, you can issue values at arbitrary time intervals. --If you want to flexibly change the issue interval, you can use ʻObservable.FromAsync to have a high degree of freedom, but be careful because there are many side effects.

Is it like this …
I ended up with a rough feeling at the end, but I’m wondering how to use From Async, so if you have any advice, I’d appreciate it if you could comment.