[flutter] - RxDart : Extension Methods Part2
Extension method
Extension method는 stream에 추가 기능을 더해서 새로운 stream을 만들어주는 stream내부 함수이다.
Part1을 이어서 알아보자
- 순서
mapTo method
Stream<S> mapTo <S>(
S value
)
- Example :
Stream.fromIterable([1, 2, 3, 4]) .mapTo(true) .listen(print); // prints true, true, true, true
mapTo는 주어진 스트림 데이터를 지정된 상수로 출력한다.
max method
Future<T> max (
[Comparator<T> comparator]
)
- Example :
final max = await Stream.fromIterable([1, 2, 3]).max(); print(max); // prints 3 - Example With
Comparator:final stream = Stream.fromIterable(['short', 'looooooong']); final max = await stream.max((a, b) => a.length - b.length); print(max); // prints 'looooooong'
max는 list형의 데이터들에서 가장 큰 값을 찾는 함수이다. 나만의 Comparator를 지정해 사용할 수 있다.
min method
Future<T> min (
[Comparator<T> comparator]
)
- Example :
final min = await Stream.fromIterable([1, 2, 3]).min(); print(min); // prints 1 - Example With
Comparator:final stream = Stream.fromIterable(['short', 'looooooong']); final min = await stream.min((a, b) => a.length - b.length); print(min); // prints 'short'
min는 list형의 데이터들에서 가장 작은 값을 찾는 함수이다. 나만의 Comparator를 지정해 사용할 수 있다.
mergeWith method
Stream<T> mergeWith (
Iterable<Stream<T>> streams
)
- Example :
TimerStream(1, Duration(seconds: 10)) .mergeWith([Stream.fromIterable([2])]) .listen(print); // prints 2, 1
mergeWith는 다른 stream들과 조합하여 출력해준다. 출력 순서는 들어온 데이터 순서이다.
onErrorResume method
Stream<T> onErrorResume (
Stream<T> recoveryFn(
dynamic error
)
)
- Example :
ErrorStream(Exception()) .onErrorResume((dynamic e) => Stream.fromIterable([e is StateError ? 1 : 0]) .listen(print); // prints 0
onErrorResume는 error 이벤트를 차단하고 recoveryFn함수를 통해 새로운 stream을 반환해줍니다. onError가 발생하지 않습니다.
error에 따른 logic처리가 가능합니다.
onErrorResumeNext method
Stream<T> onErrorResumeNext (
Stream<T> recoveryStream
)
- Example :
ErrorStream(Exception()) .onErrorResumeNext(Stream.fromIterable([1, 2, 3])) .listen(print); // prints 1, 2, 3
onErrorResumeNext는 위에 onErrorResume과 같이 error를 차단하고 stream을 반환해주지만 logic처리는 필요 없을 때 사용합니다.
onErrorReturn method
Stream<T> onErrorReturn (
T returnValue
)
- Example :
ErrorStream(Exception()) .onErrorReturn(1) .listen(print); // prints 1
onErrorReturn은 error상황에서 onError이벤트를 막고 고정된 데이터를 출력시켜줍니다. 정상종료합니다.
onErrorReturnWith method
Stream<T> onErrorReturnWith (
T returnFn(
dynamic error
)
)
- Example :
ErrorStream(Exception()) .onErrorReturnWith((e) => e is Exception ? 1 : 0) .listen(print); // prints 1
onErrorReturnWith는 onErrorReturn과 같지만 error데이터를 받아서 logic처리를 해줄 수 있다.
pairwise method
Stream<Iterable<T>> pairwise ()
- Example :
RangeStream(1, 4) .pairwise() .listen(print); // prints [1, 2], [2, 3], [3, 4]
pairwise는 pair로 쌍으로 데이터를 보내준다.
특이점은 그냥 쌍으로 묶는게 아니라다음 데이터가중복되서 나온다.[1,2,3,4] => [1,2] , [2,3], [3,4]
sample method
Stream<T> sample (
Stream sampleStream
)
- Example :
Stream.fromIterable([1, 2, 3]) .sample(TimerStream(1, Duration(seconds: 1))) .listen(print); // prints 3
sample은 sample내부 sampleStream이 데이터를 출력하면 메인 stream에서 가장 최근에 출력된 데이터를 출력해준다.
sampleTime method
Stream<T> sampleTime (
Duration duration
)
- Example :
Stream.fromIterable([1, 2, 3]) .sampleTime(Duration(seconds: 1)) .listen(print); // prints 3
sampleTime은 sample과 거의 다 같지만 stream이 아니라 Duration으로 정의해준다.
scan method
Stream<S> scan <S>(
S accumulator(
S accumulated,
T value,
int index
),
[S seed]
)
- Example :
Stream.fromIterable([1, 2, 3]) .scan((acc, curr, i) => acc + curr, 0) .listen(print); // prints 1, 3, 6
scan은 데이터의 누적값을 출력해주는 함수이다. accumulator를 지정해 logic을 짤 수 있으며 seed값은 초기값이다.
skipUntil method
Stream<T> skipUntil <S>(
Stream<S> otherStream
)
- Example :
MergeStream([ Stream.fromIterable([1]), TimerStream(2, Duration(minutes: 2)) ]) .skipUntil(TimerStream(true, Duration(minutes: 1))) .listen(print); // prints 2;
skipUntil은 otherStream에서 데이터가 출력될 때 까지 메인 stream에서의 데이터는 무시한다.
startWith method
Stream<T> startWith (
T startValue
)
- Example :
Stream.fromIterable([2]).startWith(1).listen(print); // prints 1, 2
startWith는 startValue를 stream의 데이터 앞에 붙여준다.
startWithMany method
Stream<T> startWithMany (
List<T> startValues
)
- Example :
Stream.fromIterable([3]).startWithMany([1, 2]) .listen(print); // prints 1, 2, 3
startWithMany는 startValues를 stream의 데이터 앞에 붙여준다.
startWith랑 거의 같지만 단일이 아닌list형태의 데이터를 앞에 붙임.
switchIfEmpty method
Stream<T> switchIfEmpty (
Stream<T> fallbackStream
)
- Example :
// Let's pretend we have some Data sources that complete without // emitting any items if they don't contain the data we're looking for Stream<Data> memory; Stream<Data> disk; Stream<Data> network; // Start with memory, fallback to disk, then fallback to network. // Simple as that! Stream<Data> getThatData = memory.switchIfEmpty(disk).switchIfEmpty(network);
switchIfEmpty는 stream에 데이터가 없을 경우 fallbackStream으로 교체한다.
예시에서는
memory->disk->network로 데이터가 없을 경우이다.Repository Pattern에서 유용하다.
switchMap method
Stream<S> switchMap <S>(
Stream<S> mapper(
T value
)
)
- Example :
RangeStream(4, 1) .switchMap((i) => TimerStream(i, Duration(minutes: i)) .listen(print); // prints 1
switchMap은 stream으로 부터 들어온 데이터를 mapper를 활용해서 새로운 stream을 만든다. 특이점은 가장 최신 데이터만 받고 나머지 stream은 정지시킨다.
flatMap과 유사하지만 특이점은 최신 데이터만 받는다는점이다.비동기 API에서 가장 최신 데이터를 원할때 유용하다.
takeUntil method
Stream<T> takeUntil <S>(
Stream<S> otherStream
)
- Example :
MergeStream([ Stream.fromIterable([1]), TimerStream(2, Duration(minutes: 1)) ]) .takeUntil(TimerStream(3, Duration(seconds: 10))) .listen(print); // prints 1
takeUntil은 skipUntil과 반대로 otherStream이 데이터를 출력할 때까지만 데이터를 출력한다.
debounce method
Stream<T> debounce (
Stream window(
T event
)
)
- Example :
Stream.fromIterable([1, 2, 3, 4,5,6,7,8,9]).interval(Duration(milliseconds: 500)) .debounce((_) => TimerStream(true, Duration(seconds: 1))) .listen(print); // prints 4
debounce는 여러 데이터 요청이 겹쳐오는 경우 debounce의 값이 True이면서 stream에서 새로운 데이터가 들어오지 않을 때만 데이터를 출력한다.
예를 들어 클릭 이벤트의 경우 수많은 이벤트가 발생하지만 debounce에서 Duration을 5ms로 하면 첫 입력이 들어오고 5ms동안 새로운 입력이 없어야 이벤트를 발생시킨다.
debounce에서(x) => TimerStream을 사용 하면x에는 메인stream의 값들이 들어온다
throttle과 비교해서 공부하자
debounceTime method
Stream<T> debounceTime (
Duration duration
)
- Example :
Stream.fromIterable([1, 2, 3, 4]) .debounceTime(Duration(seconds: 1)) .listen(print); // prints 4
debounceTime은 debounce와 똑같이 여러 요청을 한번만 처리하게 해준다. 여기서는 Duration 시간만큼만 대기한다.
throttle method
Stream<T> throttle (
Stream window(
T event
),
{bool trailing: false}
)
- Example :
Stream.fromIterable([1, 2, 3]) .throttle((_) => TimerStream(true, Duration(seconds: 1)))
throttle은 window stream이 onDone일때 까지 데이터를 queue에 모아서 첫 데이터를 출력해준다. 즉 특정 시간동안 이벤트가 한번만 발생한다.
trailing을 true로 하면 queue에서 가장 마지막 데이터를 꺼내준다.
throttle은debounce와 비교해보면 좋다.debounce: 첫 입력이 들어오면 주어진 시간동안 새로운 입력이 들어오지 않아야 이벤트를 처리한다.throttle: 주어진 시간동안 입력을 한번만 처리한다.
Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.interval(Duration(milliseconds: 500))
.throttle((data) {
print('throttle data =$data');
return TimerStream(true, Duration(seconds: 1));
}, trailing: false).listen(print);
내가 혼자 예습중 이상동작을 발견했다… 설명대로라면
1,3,5,7,9만 출력해야하지만 가장 마지막 데이터인10도 출력된다…backpressure.dart를 읽어봤더니window의onDone이 출력되어야queue가 정리가 되는데 마지막 데이터에서는main Stream의onDone이 출력되어서 문제가 생긴다. 현재github issue에 등록해놓았다.
throttleTime method
Stream<T> throttleTime (
Duration duration,
{bool trailing: false}
)
- Example :
Stream.fromIterable([1, 2, 3]) .throttleTime(Duration(seconds: 1))
throttleTime은 trottle에 window대신에 Duration으로 설정한다. 동작 내용은 똑같다.
timeInterval method
Stream<TimeInterval<T>> timeInterval ()
- Example :
Stream.fromIterable([1]) .interval(Duration(seconds: 1)) .timeInterval() .listen(print); // prints TimeInterval{interval: 0:00:01, value: 1}
timeInterval은 interval처럼 데이터가 시간을 두고 연속적으로 오는 경우 시간을 기록해준다. (일반 stream도 사용가능)
데이터의 형태가
TimerInterval로 묶인다.TimeInterval.interval:interval시간 정보TimerInterval.value: 데이터 정보
timestamp method
Stream<Timestamped<T>> timestamp ()
- Example :
Stream.fromIterable([1]) .timestamp() .listen((i) => print(i)); // prints 'TimeStamp{timestamp: XXX, value: 1}';
timestamp는 데이터 출력의 시간을 기록해준다.
TimeStamp{timestamp: 2020-03-20 14:16:10.224771, value: 1}와 같은 형태로 출력된다.
whereType method
Stream<S> whereType <S>()
- Example :
Stream.fromIterable([1, 'two', 3, 'four']) .whereType<int>() .listen(print); // prints 1, 3 //as opposed to: Stream.fromIterable([1, 'two', 3, 'four']) .where((event) => event is int) .cast<int>() .listen(print); // prints 1, 3
whereType은 주어진 <S> Type을 보고 맞지 않는 것은 버리고 출력해준다.
where과cast를 활용해 똑같이 작성할 수 있다.
window method
Stream<Stream<T>> window (
Stream window
)
- Example :
Stream.periodic(Duration(milliseconds: 100), (i) => i) .window(Stream.periodic(Duration(milliseconds: 160), (i) => i)) .asyncMap((stream) => stream.toList()) .listen(print); // prints [0, 1] [2, 3] [4, 5] ...
window는 buffer와 매우 유사하지만 buffer는 데이터를 모아서 출력해준다면 여기는 데이터를 모아서 stream으로 출력해준다.
windowCount method
Stream<Stream<T>> windowCount (
int count,
[int startBufferEvery = 0]
)
- Example :
RangeStream(1, 4) .windowCount(2) .asyncMap((stream) => stream.toList()) .listen(print); // prints [1, 2], [3, 4] done! RangeStream(1, 5) .bufferCount(3, 2) .listen(print); // prints [1, 2, 3], [3, 4, 5], [5] done!
windowCount는 bufferCount와 비슷하게 count를 지정해 데이터를 모아서 출력한다. 단 stream형태로 출력한다.
startBufferEvery는 다음 buffer의 시작점을 지정해준다. 기본값은 0이며 2로 지정할 경우 위의 예제처럼 동작한다. (buffer출력 후 메인 데이터의 index처리 후 다시 시작)
windowTest method
Stream<Stream<T>> windowTest (
bool onTestHandler(
T event
)
)
- Example :
RangeStream(1, 10) .windowTest((i) => i % 2 == 0) .asyncMap((stream) => stream.toList()) .listen(print); // prints [1, 2], [3, 4] [5, 6] ...
windowTest는 주어진 onTestHandler함수가 true를 반환할때 까지 값을 buffer에 모아서 stream형태로 반환해준다.
withLatestFrom method
Stream<R> withLatestFrom <S, R>(
Stream<S> latestFromStream,
R fn(
T t,
S s
)
)
- Example :
Stream.fromIterable([1, 2]).withLatestFrom( Stream.fromIterable([2, 3]), (a, b) => a + b) .listen(print); // prints 4 (due to the async nature of streams)
withLatestFrom은 여러개의 stream을 조합해서 출력한다. latestFromStream의 값이 출력되면 가장 마지막에 들어온 메인 stream의 값과 주어진 fn에 따라 조합되서 출력된다.
여기서는 메인
stream이 아니라latestFromStream이 메인이다.latestFromStream에서 데이터가 출력되지 않으면 아무것도 출력되지 않는다.

withLatestForm은 종류가 여러개이다.withLatestFrom,withLatestFrom2…withLatestFrom9까지 있으며 List형태를 받는withLatestFromList가 있다. 참고로withLatestFromList는fn을 지정할 수 없다.
zipWith method
Stream<R> zipWith <S, R>(
Stream<S> other,
R zipper(
T t,
S s
)
)
- Example :
Stream.fromIterable([1, 3, 4, 5, 6]) .zipWith(Stream.fromIterable([2, 10, 11, 12]), (one, two) => one + two) .listen(print); // prints 3 , 13, 15, 17
zipWith은 withLatestFrom과 비슷하게 데이터를 모아서 fn를 통해 출력한다. 다른점은 withLatestFrom은 메인 stream의 마지막 데이터와 조합해서 출력했다면 zipWith는 현재 데이터와 조합해서 출력한다.
드디어 extension method를 다 정리했다..
기나고 긴 여정이었다 ㅠㅠ 이제 정리된거를 공통 분모로 나눠보고 해야겠다….. (소셜 로그인도 해야하는데…)
