[Reactive Extensions] Two ways to issue values at arbitrary time intervals
ReactiveExtensions has a ʻObservable.Interval` factory method that issues values at regular intervals, but no factory method that issues values at arbitrary intervals.
No, it’s strictly, but it’s complicated to use, so I’ll introduce it here.
Two ways to publish values at arbitrary time intervals
Method 1. Use the Observable.Generate factory method
ʻObservable.Generate Factory method has a
Func <TSource, TimeSpan> type
timeSelector` argument that can define the value issuance interval, so you can create an IObservable that issues a value at any time interval. I can do it.
Definition and usage of the Observable.Generate method
First, I will explain the definition and basic usage of the Observable.Generate method.
Definition
The simplest definition is:
public static IObservable<TResult> Generate<TState, TResult> (
TState initialState, //Specify the initial value of TState
Func<TState, bool> condition, //Specify continuation condition
Func<TState, TState> iterate, //Specify the amount of change in TState
Func<TState, TResult> resultSelector) //Specify the value to emit light
Example of use
For example, to issue a value between 0 and 4:
Observable.Generate(initialState: 0, //initial value:0
condition: i => i < 5, //Continuation condition:Issue value is less than 5
iterate: i => ++i, //Amount of change:Increment by 1
resultSelector: i => i) //Issue i as it is
.Subscribe(i => Console.WriteLine(i),
() => Console.WriteLine("OnCompleted"));
It’s a method of specifying arguments like a for statement.
Execution result
0
1
2
3
4
OnCompleted
Specify the value issuance interval with the Observable.Generate method
The main subject is from here.
The ʻObservable.Generate` method has an overload that allows you to specify the interval between issuing values.
Overloaded definition that allows you to specify the value issuance interval
public static IObservable<TResult> Generate<TState, TResult> (
TState initialState, //Specify the initial value of TState
Func<TState, bool> condition, //Specify continuation condition
Func<TState, TState> iterate, //Specify the amount of change in TState
Func<TState, TResult> resultSelector, //Specify the value to emit light
Func<TState, TimeSpan> timeSelector) //Specify time interval;
You can specify a method that defines the issue interval in this timeSelector
.
How to use
As for how to use it, for example, by preparing a List
of TimeSpan
that defines the order of issuance and specifying that the values are read in order with the timeSelector, the values are set at the intervals defined in the List. Will be issued.
class MainClass
{
public static void Main(string[] args)
{
//Define issuance interval
List<TimeSpan> intervals = new List<TimeSpan>()
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3),
TimeSpan.FromSeconds(4),
TimeSpan.FromSeconds(5)
};
Console.WriteLine($"{DateTime.Now}Start issuing values");
Observable.Generate(initialState: 0,
condition: n => n < intervals.Count,
iterate: n => ++n,
resultSelector: n => n,
timeSelector: n => intervals[n])
.Timestamp()
.Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime}Issued value:{val.Value}"),
() => Console.WriteLine("Value issuance completed"));
Console.Read();
}
}
Execution result
2020/04/26 23:51:09 Start issuing values
2020/04/26 23:51:10 Issued value: 1
2020/04/26 23:51:12 Issued value: 2
2020/04/26 23:51:15 Issued value: 3
2020/04/26 23:51:19 Issued value: 4
2020/04/26 23:51:24 Issued value: 5
Value issuance completed
Values are published at 1,2,3,4,5 second intervals.
Create a factory method that easily issues values at arbitrary time intervals
The Observable.Generate factory method has many arguments and is troublesome, so create a factory method that can easily issue values at arbitrary time intervals.
public static IObservable<int> AnyInterval(IReadOnlyList<TimeSpan> intervals) =>
Observable.Generate(initialState: 0,
condition: n => n < intervals.Count,
iterate: n => ++n,
resultSelector: n => n,
timeSelector: n => intervals[n]);
All you have to do is pass a TimeSpan
type list that defines the order of the times you want to wait.
I really wanted the argument to be ʻIEnumerable
When I actually use it, it looks like this.
ObservableEx.AnyInterval(new List<TimeSpan>{TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3),
TimeSpan.FromSeconds(4),
TimeSpan.FromSeconds(5) })
.Timestamp()
.Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime}Issued value:{val.Value}"),
() => Console.WriteLine("Value issuance completed"));
This is the end of issuing 5 values, but if you want to issue infinitely, you can loop by inserting the Repeat ()
operator.
Method 2. Use the Observable.FromAsync factory method
ʻObservable.FromAsync The factory method takes
Task as an argument and sends the return value when
Task ends.
Therefore, if you adjust the time with
Task.Delay etc., you can create ʻIObservable
that issues values at arbitrary time intervals.
For example, it looks like the following.
Observable.FromAsync(() => Task.Run(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(1));
return Unit.Default;
}));
If this is the case, wait 1 second and issue ʻUnit.
Since ʻUnit
is issued and OnCompleted is issued, if you issueRepeat
infinitely, you can create ʻIObservable` that issues a value at 1-second intervals for the time being.
Observable.FromAsync(() => Task.Run(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(1));
return Unit.Default;
}))
.Repeat();
The rest is how to change the argument of Task.Delay
, but is there no choice but to use external variables?
Please let me know if there is any other good way.
List<TimeSpan> intervals = new List<TimeSpan>()
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3),
TimeSpan.FromSeconds(4),
TimeSpan.FromSeconds(5)
};
int n = 0;
Observable.FromAsync(() => Task.Run(async () =>
{
await Task.Delay(intervals[n]);
return n++;
}))
.Repeat(intervals.Count).Timestamp()
.Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime}Issued value:{val.Value}"),
() => Console.WriteLine("Value issuance completed"));
This method isn’t very clean, but it’s possible to change the publishing interval under certain conditions.
List<TimeSpan> intervals = new List<TimeSpan>()
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3),
TimeSpan.FromSeconds(4),
TimeSpan.FromSeconds(5)
};
int n = 0;
Observable.FromAsync(() => Task.Run(async () =>
{
await Task.Delay(intervals[n]);
if(conditions) await Task.Delay(~); //特定のconditions下で発行間隔を増やす
return n++;
}))
.Repeat(intervals.Count).Timestamp()
.Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime}Issued value:{val.Value}"),
() => Console.WriteLine("Value issuance completed"));
It’s very dirty and has many side effects, so you shouldn’t use it positively, but if you really want to, it seems to be usable temporarily.
Summary
–Basically, if you use ʻObservable.Generate, you can issue values at arbitrary time intervals.
--If you want to flexibly change the issue interval, you can use ʻObservable.FromAsync
to have a high degree of freedom, but be careful because there are many side effects.
Is it like this …
I ended up with a rough feeling at the end, but I’m wondering how to use From Async
, so if you have any advice, I’d appreciate it if you could comment.