[flutter] - RxDart : Extension Methods Part1


Extension method

Extension methodstream에 추가 기능을 더해서 새로운 stream을 만들어주는 stream내부 함수이다.

각 내용들을 하나씩 알아보자


buffer method

Stream<List<T>> buffer (
    Stream window
)
  • Example :
      Stream.periodic(Duration(milliseconds: 100), (i) => i)
          .buffer(Stream.periodic(Duration(milliseconds: 160), (i) => i))
          .listen(print); // prints [0, 1] [2, 3] [4, 5] ...
    
  • Example :
      Stream.periodic(Duration(milliseconds: 100), (i) => i)
          .buffer(Stream.periodic(Duration(milliseconds: 400), (i) => 'test'))
          .listen(print); // prints [0, 1, 2, 3, 4] [5, 6, 7, 8] [9, 10, 11, 12] ...
    

buffer는 주어진 stream의 데이터를 list에 모아서 출력한다. buffer안에 있는 내부 stream (window)에서 데이터가 출력될 때 모아 놓은 list를 출력한다.


bufferCount method

Stream<List<T>> bufferCount (
    int count,
    [int startBufferEvery = 0]
)
  • Example :
      RangeStream(1, 4)
          .bufferCount(2)
          .listen(print); // prints [1, 2], [3, 4] done!
    
  • Example :
      RangeStream(1, 5)
          .bufferCount(3, 2)
          .listen(print); // prints [1, 2, 3], [3, 4, 5], [5] done!
    

    bufferCount는 들어온 데이터를 count만큼 모아서 list로 출력해준다. 매 출력마다 startBufferEvery값을 index로 다시 시작하고 앞의 값은 버린다.

Range1~5 데이터가 들어오는데 bufferCount(3,2)를 통해 3개씩 list로 출력하며 매 출력 마다 2번째 index에서 실행한다.


bufferTest method

Stream<List<T>> bufferTest (
    bool onTestHandler(
        T event
    )
)
  • Example :
      Stream.periodic(Duration(milliseconds: 100), (int i) => i)
          .bufferTest((i) => i % 2 == 0)
          .listen(print); // prints [0], [1, 2] [3, 4] [5, 6] ...
    

bufferTestbufferTestfunction의 값이 True가 될 때 까지의 데이터를 list로 모아서 출력한다.


bufferTimer method

Stream<List<T>> bufferTime (
    Duration duration
)
  • Example :
      Stream.periodic(Duration(milliseconds: 100), (int i) => i)
          .bufferTime(Duration(milliseconds: 200))
          .listen(print); // prints [0, 1, 2] [3, 4] [5, 6] ...
    

bufferTimerDuration의 시간만큼 데이터를 모아서 list로 출력한다.


concatWith method

Stream<T> concatWith (
    Iterable<Stream<T>> other
)
  • Example :
      TimerStream(1, Duration(seconds: 10))
          .concatWith([Stream.fromIterable([2])])
          .listen(print); // prints 1, 2
    

concatWith는 메인 stream이 모든 출력이 끝나고 종료되면 이어서 concatWithStream들을 순서대로 출력한다.


defaultEmpty method

Stream<T> defaultIfEmpty (
    T defaultValue
)
  • Example :
      Stream.empty().defaultIfEmpty(10).listen(print); // prints 10
    

defaultEmptystream값이 empty일때 기본값을 설정해준다.


delay method

Stream<T> delay (
    Duration duration
)
  • Example :
      Stream.fromIterable([1, 2, 3, 4])
          .delay(Duration(seconds: 1))
          .listen(print); // [after one second delay] prints 1, 2, 3, 4 immediately
    

delay는 말 그대로 stream 출력을 지연해준다.


materialize method

Stream<Notification<T>> materialize ()
  • Example :
      Stream<int>.error(Exception())
          .materialize()
          .listen((i) => print(i)); // Prints onError Notification
    

dematerialize method

Stream<T> dematerialize ()
  • Example :
      Stream<Notification<int>>
          .fromIterable([Notification.onData(1), Notification.onDone()])
          .dematerialize()
          .listen((i) => print(i)); // Prints 1
    

dematerializematerialize는 한쌍이다. (서로 반대) 이것을 이해하느라 RxJava를 참고해봤다.

이미지

이미지

캡슐화에 대한 내용들이 나온다. 단순히 Stream에서 데이터를 출력하는게 아니라 noticiation으로 묶어서 보내거나 아니면 묶여있는 경우 데이터만 출력을 원할 경우 사용한다.

사용 예를 찾아봐야겠지만 여러 stream이 겹쳐있고 event들이 겹쳐있을때 사용하는 것 같다.


distinctUnique method

Stream<T> distinctUnique (
    {bool equals(
        T e1,
        T e2
    ),
    int hashCode(
        T e
    )}
)
  • Example :
      Stream<Notification<int>>
          .fromIterable([Notification.onData(1), Notification.onDone()])
          .dematerialize()
          .listen((i) => print(i)); // Prints 1
    

distinctUnique는 주어진 데이터들의 중복을 제거하고 보내준다. 주의점은 streambroadcastStream인 경우 listen할 때 마다 데이터의 equal을 확인한다.


doOnCancel method

Stream<T> doOnCancel (
    void onCancel()
)
  • Example :
      final subscription = TimerStream(1, Duration(minutes: 1))
          .doOnCancel(() => print('hi'));
          .listen(null);
    
      subscription.cancel(); // prints 'hi'
    

doOnCancelsubscriptioncancle할 때 호출된다.


doOnData method

Stream<T> doOnData (
    void onData(
        T event
    )
)
  • Example :
      Stream.fromIterable([1, 2, 3])
          .doOnData(print)
          .listen(null); // prints 1, 2, 3
    

doOnDatastream에서 데이터가 넘어오며 넘오는 타이밍에 실행하는 함수이다.


doOnDone method

Stream<T> doOnDone (
    void onDone()
)
  • Example :
      Stream.fromIterable([1, 2, 3])
          .doOnDone(() => print('all set'))
          .listen(null); // prints 'all set'
    

doOnDoneonDone이 호출 되는 시점(정상 종료)에 호출되는 함수이다.


doOnEach method

Stream<T> doOnEach (
    void onEach(
        Notification<T> notification
    )
)
  • Example :
      Stream.fromIterable([1])
          .doOnEach(print)
          .listen(null); // prints Notification{kind: OnData, value: 1, errorAndStackTrace: null}, Notification{kind: OnDone, value: null, errorAndStackTrace: null}
    

doOnEachonData, onDone, onError 각 함수 전에 실행되는 함수이다 (error 없을 시 onError 동작 안함)

데이터는 noticiation 형태로 캡슐화 되어있음.


doOnError method

Stream<T> doOnError (
    Function onError
)
  • Example :
      Stream.error(Exception())
          .doOnError((error, stacktrace) => print('oh no'))
          .listen(null); // prints 'Oh no'
    

doOnErrorerror가 발생했을때 실행하는 함수이다.


doOnListen method

Stream<T> doOnListen (
    void onListen()
)
  • Example :
      Stream.fromIterable([1])
          .doOnListen(() => print('Is someone there?'))
          .listen(null); // prints 'Is someone there?'
    

doOnListenstream에서 데이터를 처음 listen했을 때 실행하는 함수이다.


doOnPause method

Stream<T> doOnPause (
    void onPause(
        Future resumeSignal
    )
)
  • Example :
      final subscription = Stream.fromIterable([1])
          .doOnPause(() => print('Gimme a minute please'))
          .listen(null);
    
      subscription.pause(); // prints 'Gimme a minute please'
    

doOnPausesubscriptionpause될 때 실행되는 함수이다.


doOnResume method

Stream<T> doOnResume (
    void onResume()
)
  • Example :
      final subscription = Stream.fromIterable([1])
          .doOnResume(() => print('Lets do this!'))
          .listen(null);
    
      subscription.pause();
      subscription.resume(); //'Let's do this!'
    

doOnResumesubscriptionpauseresume할 때 실행되는 함수이다.


exhaustMap method

Stream<S> exhaustMap <S>(
    Stream<S> mapper(
        T value
    )
)
  • Example :
      RangeStream(0, 2).interval(Duration(milliseconds: 50))
          .exhaustMap((i) =>
              TimerStream(i, Duration(milliseconds: 75)))
          .listen(print); // prints 0, 2
    

exhaustMapmapper를 활용하여 새로운 stream을 만든다. mapperstream이 성공할때 까지 source stream의 모든 데이터는 무시된다.

stream nosiy가 심하고 이전 데이터와 async를 맞추고 싶을 때 유용하다.

예시에서는 시간차를 이용해 source stream에서 데이터중 1을 무시하고 출력한다.


flatMap method

Stream<S> flatMap <S>(
    Stream<S> mapper(
        T value
    )
)
  • Example :
      RangeStream(4, 1)
          .flatMap((i) => TimerStream(i, Duration(minutes: i)))
          .listen(print); // prints 1, 2, 3, 4
    

flatMapmap처럼 데이터를 가공하는 함수이지만 data를 출력하는게 아니라 stream을 출력한다. 그래서 예제 처럼 TimerStream을 활용해 sequence를 뒤집는 것도 가능하다.


flatMapIterable method

Stream<S> flatMapIterable <S>(
    Stream<Iterable<S>> mapper(
        T value
    )
)
  • Example :
      RangeStream(1, 4)
          .flatMapIterable((i) => Stream.fromIterable([[i]]))
          .listen(print); // prints 1, 2, 3, 4
    

flatMapIterable는 위에 flatMap과 똑같이 stream을 보내지만 Iterable형태로 보낸다.

API를 사용할 때 데이터가 list형태로 오는 경우 유용하게 사용 가능하다.

  Stream.fromIterable([
    [1, 2, 3],
    [4, 5, 6],
    [7, 8, 9]
  ]).flatMapIterable((i) {
    print("before : $i");
    return Stream.fromIterable([i]);
  }).listen(print);

// before : [1, 2, 3]
// 1
// 2
// 3
// before : [4, 5, 6]
// 4
// 5
// 6
// before : [7, 8, 9]
// 7
// 8
// 9

groupBy method

Stream<GroupByStream<T, S>> groupBy <S>(
    S grouper(
        T value
    )
)
  • Example :
      Stream.fromIterable([
          {'name': 'hyojun1', 'age': 10},
          {'name': 'hyojun2', 'age': 10},
          {'name': 'hyojun3', 'age': 13},
          {'name': 'hyojun4', 'age': 13},
          {'name': 'hyojun5', 'age': 13},
          {'name': 'hyojun3', 'age': 15},
      ]).groupBy((data) => data['age']).listen((data){
          print("Group Key : ${data.key}");
          data.listen(print);
      });
    
      //Group Key : 10
      //{name: hyojun1, age: 10}
      //{name: hyojun2, age: 10}
      //Group Key : 13
      //{name: hyojun3, age: 13}
      //{name: hyojun4, age: 13}
      //{name: hyojun5, age: 13}
      //Group Key : 15
      //{name: hyojun3, age: 15}
    

groupBy는 데이터를 Key를 기준으로 GroupByStream으로 묶어준다. 그래서 들어온 GroupByStream을 다시 listen하여 데이터를 얻는다.


interval method

Stream<T> interval (
    Duration duration
)
  • Example :
      Stream.fromIterable([1, 2, 3])
          .interval(Duration(seconds: 1))
          .listen((i) => print('$i sec'); // prints 1 sec, 2 sec, 3 sec
    

interval은 주어진 Duration의 간격마다 데이터 출력한다.


나머지는 다음장에서…