[flutter] - RxDart : Extension Methods Part1
Extension method
Extension method는 stream에 추가 기능을 더해서 새로운 stream을 만들어주는 stream내부 함수이다.
각 내용들을 하나씩 알아보자
- 순서
buffer method
Stream<List<T>> buffer (
Stream window
)
- Example :
Stream.periodic(Duration(milliseconds: 100), (i) => i) .buffer(Stream.periodic(Duration(milliseconds: 160), (i) => i)) .listen(print); // prints [0, 1] [2, 3] [4, 5] ... - Example :
Stream.periodic(Duration(milliseconds: 100), (i) => i) .buffer(Stream.periodic(Duration(milliseconds: 400), (i) => 'test')) .listen(print); // prints [0, 1, 2, 3, 4] [5, 6, 7, 8] [9, 10, 11, 12] ...
buffer는 주어진 stream의 데이터를 list에 모아서 출력한다. buffer안에 있는 내부 stream (window)에서 데이터가 출력될 때 모아 놓은 list를 출력한다.
bufferCount method
Stream<List<T>> bufferCount (
int count,
[int startBufferEvery = 0]
)
- Example :
RangeStream(1, 4) .bufferCount(2) .listen(print); // prints [1, 2], [3, 4] done! - Example :
RangeStream(1, 5) .bufferCount(3, 2) .listen(print); // prints [1, 2, 3], [3, 4, 5], [5] done!bufferCount는 들어온 데이터를count만큼 모아서list로 출력해준다. 매 출력마다startBufferEvery값을index로 다시 시작하고 앞의 값은 버린다.
Range로1~5데이터가 들어오는데bufferCount(3,2)를 통해3개씩list로 출력하며 매 출력 마다2번째index에서 실행한다.
bufferTest method
Stream<List<T>> bufferTest (
bool onTestHandler(
T event
)
)
- Example :
Stream.periodic(Duration(milliseconds: 100), (int i) => i) .bufferTest((i) => i % 2 == 0) .listen(print); // prints [0], [1, 2] [3, 4] [5, 6] ...
bufferTest는 bufferTest의 function의 값이 True가 될 때 까지의 데이터를 list로 모아서 출력한다.
bufferTimer method
Stream<List<T>> bufferTime (
Duration duration
)
- Example :
Stream.periodic(Duration(milliseconds: 100), (int i) => i) .bufferTime(Duration(milliseconds: 200)) .listen(print); // prints [0, 1, 2] [3, 4] [5, 6] ...
bufferTimer는 Duration의 시간만큼 데이터를 모아서 list로 출력한다.
concatWith method
Stream<T> concatWith (
Iterable<Stream<T>> other
)
- Example :
TimerStream(1, Duration(seconds: 10)) .concatWith([Stream.fromIterable([2])]) .listen(print); // prints 1, 2
concatWith는 메인 stream이 모든 출력이 끝나고 종료되면 이어서 concatWith의 Stream들을 순서대로 출력한다.
defaultEmpty method
Stream<T> defaultIfEmpty (
T defaultValue
)
- Example :
Stream.empty().defaultIfEmpty(10).listen(print); // prints 10
defaultEmpty는 stream값이 empty일때 기본값을 설정해준다.
delay method
Stream<T> delay (
Duration duration
)
- Example :
Stream.fromIterable([1, 2, 3, 4]) .delay(Duration(seconds: 1)) .listen(print); // [after one second delay] prints 1, 2, 3, 4 immediately
delay는 말 그대로 stream 출력을 지연해준다.
materialize method
Stream<Notification<T>> materialize ()
- Example :
Stream<int>.error(Exception()) .materialize() .listen((i) => print(i)); // Prints onError Notification
dematerialize method
Stream<T> dematerialize ()
- Example :
Stream<Notification<int>> .fromIterable([Notification.onData(1), Notification.onDone()]) .dematerialize() .listen((i) => print(i)); // Prints 1
dematerialize와 materialize는 한쌍이다. (서로 반대) 이것을 이해하느라 RxJava를 참고해봤다.


캡슐화에 대한 내용들이 나온다. 단순히 Stream에서 데이터를 출력하는게 아니라 noticiation으로 묶어서 보내거나 아니면 묶여있는 경우 데이터만 출력을 원할 경우 사용한다.
사용 예를 찾아봐야겠지만 여러
stream이 겹쳐있고event들이 겹쳐있을때 사용하는 것 같다.
distinctUnique method
Stream<T> distinctUnique (
{bool equals(
T e1,
T e2
),
int hashCode(
T e
)}
)
- Example :
Stream<Notification<int>> .fromIterable([Notification.onData(1), Notification.onDone()]) .dematerialize() .listen((i) => print(i)); // Prints 1
distinctUnique는 주어진 데이터들의 중복을 제거하고 보내준다. 주의점은 stream이 broadcastStream인 경우 listen할 때 마다 데이터의 equal을 확인한다.
doOnCancel method
Stream<T> doOnCancel (
void onCancel()
)
- Example :
final subscription = TimerStream(1, Duration(minutes: 1)) .doOnCancel(() => print('hi')); .listen(null); subscription.cancel(); // prints 'hi'
doOnCancel은 subscription을 cancle할 때 호출된다.
doOnData method
Stream<T> doOnData (
void onData(
T event
)
)
- Example :
Stream.fromIterable([1, 2, 3]) .doOnData(print) .listen(null); // prints 1, 2, 3
doOnData는 stream에서 데이터가 넘어오며 넘오는 타이밍에 실행하는 함수이다.
doOnDone method
Stream<T> doOnDone (
void onDone()
)
- Example :
Stream.fromIterable([1, 2, 3]) .doOnDone(() => print('all set')) .listen(null); // prints 'all set'
doOnDone는 onDone이 호출 되는 시점(정상 종료)에 호출되는 함수이다.
doOnEach method
Stream<T> doOnEach (
void onEach(
Notification<T> notification
)
)
- Example :
Stream.fromIterable([1]) .doOnEach(print) .listen(null); // prints Notification{kind: OnData, value: 1, errorAndStackTrace: null}, Notification{kind: OnDone, value: null, errorAndStackTrace: null}
doOnEach는 onData, onDone, onError 각 함수 전에 실행되는 함수이다 (error 없을 시 onError 동작 안함)
데이터는
noticiation형태로 캡슐화 되어있음.
doOnError method
Stream<T> doOnError (
Function onError
)
- Example :
Stream.error(Exception()) .doOnError((error, stacktrace) => print('oh no')) .listen(null); // prints 'Oh no'
doOnError는 error가 발생했을때 실행하는 함수이다.
doOnListen method
Stream<T> doOnListen (
void onListen()
)
- Example :
Stream.fromIterable([1]) .doOnListen(() => print('Is someone there?')) .listen(null); // prints 'Is someone there?'
doOnListen는 stream에서 데이터를 처음 listen했을 때 실행하는 함수이다.
doOnPause method
Stream<T> doOnPause (
void onPause(
Future resumeSignal
)
)
- Example :
final subscription = Stream.fromIterable([1]) .doOnPause(() => print('Gimme a minute please')) .listen(null); subscription.pause(); // prints 'Gimme a minute please'
doOnPause는 subscription이 pause될 때 실행되는 함수이다.
doOnResume method
Stream<T> doOnResume (
void onResume()
)
- Example :
final subscription = Stream.fromIterable([1]) .doOnResume(() => print('Lets do this!')) .listen(null); subscription.pause(); subscription.resume(); //'Let's do this!'
doOnResume은 subscription이 pause후 resume할 때 실행되는 함수이다.
exhaustMap method
Stream<S> exhaustMap <S>(
Stream<S> mapper(
T value
)
)
- Example :
RangeStream(0, 2).interval(Duration(milliseconds: 50)) .exhaustMap((i) => TimerStream(i, Duration(milliseconds: 75))) .listen(print); // prints 0, 2
exhaustMap은 mapper를 활용하여 새로운 stream을 만든다. mapper의 stream이 성공할때 까지 source stream의 모든 데이터는 무시된다.
stream nosiy가 심하고 이전 데이터와 async를 맞추고 싶을 때 유용하다.
예시에서는 시간차를 이용해
source stream에서 데이터중 1을 무시하고 출력한다.
flatMap method
Stream<S> flatMap <S>(
Stream<S> mapper(
T value
)
)
- Example :
RangeStream(4, 1) .flatMap((i) => TimerStream(i, Duration(minutes: i))) .listen(print); // prints 1, 2, 3, 4
flatMap은 map처럼 데이터를 가공하는 함수이지만 data를 출력하는게 아니라 stream을 출력한다. 그래서 예제 처럼 TimerStream을 활용해 sequence를 뒤집는 것도 가능하다.
flatMapIterable method
Stream<S> flatMapIterable <S>(
Stream<Iterable<S>> mapper(
T value
)
)
- Example :
RangeStream(1, 4) .flatMapIterable((i) => Stream.fromIterable([[i]])) .listen(print); // prints 1, 2, 3, 4
flatMapIterable는 위에 flatMap과 똑같이 stream을 보내지만 Iterable형태로 보낸다.
API를 사용할 때 데이터가list형태로 오는 경우 유용하게 사용 가능하다.
Stream.fromIterable([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]).flatMapIterable((i) {
print("before : $i");
return Stream.fromIterable([i]);
}).listen(print);
// before : [1, 2, 3]
// 1
// 2
// 3
// before : [4, 5, 6]
// 4
// 5
// 6
// before : [7, 8, 9]
// 7
// 8
// 9
groupBy method
Stream<GroupByStream<T, S>> groupBy <S>(
S grouper(
T value
)
)
- Example :
Stream.fromIterable([ {'name': 'hyojun1', 'age': 10}, {'name': 'hyojun2', 'age': 10}, {'name': 'hyojun3', 'age': 13}, {'name': 'hyojun4', 'age': 13}, {'name': 'hyojun5', 'age': 13}, {'name': 'hyojun3', 'age': 15}, ]).groupBy((data) => data['age']).listen((data){ print("Group Key : ${data.key}"); data.listen(print); }); //Group Key : 10 //{name: hyojun1, age: 10} //{name: hyojun2, age: 10} //Group Key : 13 //{name: hyojun3, age: 13} //{name: hyojun4, age: 13} //{name: hyojun5, age: 13} //Group Key : 15 //{name: hyojun3, age: 15}
groupBy는 데이터를 Key를 기준으로 GroupByStream으로 묶어준다. 그래서 들어온 GroupByStream을 다시 listen하여 데이터를 얻는다.
interval method
Stream<T> interval (
Duration duration
)
- Example :
Stream.fromIterable([1, 2, 3]) .interval(Duration(seconds: 1)) .listen((i) => print('$i sec'); // prints 1 sec, 2 sec, 3 sec
interval은 주어진 Duration의 간격마다 데이터 출력한다.
