# 异步编程思想

本篇来介绍在Flutter&Dart中的异步编程:

  • Future类
  • async/await用法
  • Stream的概念

主要适合前端同学的入门内容,不适合已经有专业的dart知识的同学学习,请跳过学习其他的章节!

# Dart执行机制

Dart 代码运行在单个执行“线程”中,这个线程是独立的内存、独立的垃圾回收、事件循环机制。

同时,Dart设计了一个概念,叫isolate,是一种“多线程”的实现。链接:官方 (opens new window)

如果 Dart 代码在执行时阻塞,例如:处理一个需要长时间运行的计算操作或等待 I/O 完成,此时整个程序会被“冻结”。基于这一点需要实现单线程的异步方式,基于Dart内置的非阻塞API实现——Future。

  1. 为什么有了isolate还需要有异步的Future?

    Future相对于isolate来说更轻量级

  2. 实现异步Dart中的async和await也能实现异步,为什么还需要Future?

    Future相对于async, await的在于它提供了链式调用,链式调用优势在于可以明确代码执行前后依赖关系以及实现异常的捕获——这一点与js类似。

# 事件循环

ScreenFlow

Dart的EventLoop事件循环和Javascript很类似,同样循环中拥有两个FIFO队列: 「一个是事件队列(Event Queue),另一个就是微任务队列(MicroTask Queue)。」

  • **「Event Queue」**主要包含IO、手势、绘制、定时器(Timer)、Stream流以及本文所讲Future等
  • **「MicroTask Queue」**主要包含Dart内部的微任务(内部非常短暂操作),一般是通过 scheduleMicroTask 方法实现调度,它的优先级比Event Queue要高

eventLoop

特点:单线程、队列执行。

# Future类

# 执行机制

异步操作可以让你的程序在等待一个操作完成时继续处理其它的工作,这时,需要借助到Dart中的Future类来实现异步操作。

Dart 使用 Future 对象来表示异步操作的结果,Future类类似于前端中的Promise,其实就是dart告诉你这段代码返回的值是一个异步的。

如果异步操作不需要结果,则可为 Future<void>。当一个返回 future 对象的函数被调用时,会发生两件事:

  1. 将函数操作列入队列等待执行并返回一个未完成的 Future 对象。
  2. 不久后当函数操作执行完成,Future 对象变为完成并携带一个值或一个错误,通过.then方法来进行获取

image-20210925142311783

Future表示异步返回的结果,当执行一个异步延迟的计算时候,首先会返回一个Future结果,后续代码可以继续执行不会阻塞主isolate,当Future中的计算结果到达时,如果注册了 then 函数回调,对于返回成功的回调就会拿到最终计算的值,对于返回失败的回调就会拿到一个异常信息。

基础的案例:

void main() {
  print('main is executed start');
  var function = () {
    print('future is executed');
  };
  Future(function);
  print('main is executed end');
}

//或者直接传入一个匿名函数
void main() {
  print('main is executed start');
  var future = Future(() {
    print('future is executed');
  });
  print('main is executed end');
}

// 输出
// main is executed start
// main is executed end
// future is executed

# Future.then

Future使用 then 方法来注册Future回调,需要注意的Future返回的也是一个Future对象,所以可以使用链式调用使用Future。

这样就可以将前一个Future的输出结果作为后一个Future的输入,可以写成链式调用。

再来一个的案例:

ElevatedButton(
  onPressed: () {
    final myFuture = http.get('https://example.com'); //返回一个Future对象
    myFuture.then((response) {
      //注册then事件
      if(response.statusCode == 200) {
        print('Success')
      }
    });
  },
  child: Text('Click me!'),
);

上面的案例解析:

  • Flutter会渲染Button按钮,但是不会执行onPressed方法后面的代码,因为它在等待press事件;

  • press事件触发后,即会执行onPressed后续的代码;随即,执行到http.get方法后,发现其返回了一个Future,即会把对应的任务代码(.then之后的)放入EventLoop(Task队列)中存起来,等待网络请求完成后进行回调执行;

    EventLoop不会等待网络执行完成,而是会去处理其他的Event事件与逻辑;

  • 当网络回调成功进入队列,并轮到执行它的时候,开始执行.then之后的代码,判断逻辑与输出内容;

# Future.value

创建一个返回指定value值的Future对象, 注意在value内部实际上实现异步是通过 scheduleMicrotask

之前文章也说过,实现Future异步方式无非只有两种,一种是使用 Timer 另一种就是使用scheduleMicrotask

void main() {
  var commonFuture = Future((){
    print('future is executed');
  });
  var valueFuture = Future.value(100.0);
  valueFuture.then((value) => print(value));
  print(valueFuture is Future<double>);
}

// 输出
// true
// 100.0  -> 这里注意上面代码的顺序,
// future is executed

解析:

  • valueFuture is Future<double>返回true,先输出来是因为它是同步执行的,也说明最开始同步执行拿到了 valueFuture

  • Future.value 内部实际上是通过 scheduleMicrotask 实现异步的,需要等到main方法执行结束后,开始检查 MicroTask 队列中是否存在task;

正好此时的 valueFuture 就是那么就先执行它,直到检查到 MicroTask Queue 为空,再去检查 Event Queue

  • 由于Future的本质是在内部开启了一个 Timer 实现异步,最终这个异步事件是会放入到 Event Queue 中的,此时正好检查到当前 Future ,这时候的Event Loop就会去处理执行这个 Future

# Future.delayed

这个API非常好玩。

创建一个延迟执行的future,实际上内部就是通过创建一个延迟的 Timer 来实现延迟异步操作。

Future.delayed 主要传入两个参数一个是 Duration 延迟时长,另一个就是异步执行的Function。

void main() {
  var delayedFuture = Future.delayed(const Duration(seconds: 3), (){//延迟3s
    print('this is delayed future');
  });
  print('main is executed, waiting a delayed output....');
}

// 输出
// main is executed, waiting a delayed output....
// (3s后)
// this is delayed future

# 其他

  • Future的any方法:返回的是第一个执行完成的future的结果,同时返回Error;
  • Future的doWhile方法:重复的执行某一个操作,直至返回false,退出循环;true或者Future会继续执行循环。
  • Future的wait方法:等待多个future完成,并整合它们的结果:
    • 若所有future都有正常结果返回:则future的返回结果是所有指定future的结果的集合
    • 若其中一个future有error返回:则future的返回结果是第一个error的值
  • Future的microtask方法:创建一个在microtask队列运行的future,该task的执行优先级要大于其他future

# async/await

可以用 asyncawait 关键字或 Future 类的相关 API 来配合使用 future,async与await是一个语法糖,省去了.then,让写异步的功能如同“同步”代码一样的写。

void main() {
  print('t1:' + DateTime.now().toString());
  getData();
  print('t2:' + DateTime.now().toString());

}

getData() async {
    int result = await Future.delayed(const Duration(milliseconds: 2000), () {
      return Future.value(123);
    });
    print('t3:' + DateTime.now().toString());
    print(result);
}

// flutter: t1:2021-09-25 15:30:22.738729
// flutter: t2:2021-09-25 15:30:22.749342
// flutter: t3:2021-09-25 15:30:24.762490
// flutter: 123

# Stream

一句话来说明Stream:异步数据集,例如:读取文件,就是一系列的数据集。

对比:

分类 单数据类型 多数据类型
同步Sync int Iterator<int>
异步Async Future<int> Stream<int>

直接上案例:

// 如果不熟悉Streams,请试着想象一个有水流过的管道。管道是“流”(Stream),管道里的水是异步的数据.
Stream<int> countStream(int max) async* {
  for (int i = 0; i < max; i++) {
    yield i; // 这是不是前端里面的generator,非常像
  }
}


Future<int> sumStream(Stream<int> stream) async {
  int sum = 0;
  await for (int value in stream) {
    sum += value;
  }
  return sum;
}


void main() async {
  /// Initialize a stream of integers 0-9
  Stream<int> stream = countStream(10);
  /// Compute the sum of the stream of integers
  int sum = await sumStream(stream);
  /// Print the sum
  print(sum); // 45
}

学习Stream要学习:

  • 创建Stream;
  • 控制Stream;
  • 监听Stream;

# 创建Stream

class NumberCreator {
  // 通过StreamController来创建
  final _controller = StreamController<int>();

  // 私有变量
  var _count = 1;

  // 构造函数
  NumberCreator() {
    // 这里使用定时器来模拟stream流数据
    Timer.periodic(Duration(seconds: 1), (t) {
      // 不断往stream中添加数据
      _controller.sink.add(_count);
      _count++;
    });
  }

  // 添加一个公共方法,获取stream实例中的异步数据流
  Stream<int> get stream => _controller.stream;
}

# 监听Stream

如何使用呢?使用listen方法:

final myStream = NumberCreator().stream
  
final subscription = myStream.listen(
  (data) => print('Data: $data')
)
  
// flutter: Data: 1
// flutter: Data: 2
// flutter: Data: 3
// flutter: Data: 4

特别说明:

  • 默认情况下一个stream只能有一个listen;

  • 如果需要多个listen,可以把stream设置成broadcast模式

    final myStream = NumberCreator().stream.asBroadcastStream();
    
    final subscription = myStream.listen((data) => print('Data: $data'));
    
    final subscription1 = myStream.listen((data) => print('Data1: $data'));
    

还有一些非常有意思的属性:

class NumberCreator {
  final _controller = StreamController<int>();

  var _count = 1;

  NumberCreator() {
    Timer.periodic(Duration(seconds: 1), (t) {
      _controller.sink.add(_count);
      if (_count == 3) _controller.addError('Error');
      if (_count == 4) {
        t.cancel();
      }
      _count++;
    });
  }

  Stream<int> get stream => _controller.stream;
}


void main() {
  final myStream = NumberCreator().stream.asBroadcastStream();

  final subscription = myStream.listen(
    (data) => print('Data: $data'),
    onError: (err) {
      print('Error $err');
    },
    cancelOnError: false,  // 默认是True,设置成false后,即使stream出错了,也会继续执行
    onDone: () {
      print('Done');
    },
  );
}

// flutter: Data: 1
// flutter: Data: 2
// flutter: Data: 3
// flutter: Error Error
// flutter: Data: 4

# 控制Stream

final subscription = myStream.listen(
  (data) => print('Data: $data'),
  onError: (err) {
    print('Error $err');
  },
  cancelOnError: false, // 默认是True,设置成false后,即使stream出错了,也会继续执行
  onDone: () {
    print('Done');
  },
);

subscription上有很多非常有用的方法:

image-20210925171432959

例如:

  • subscription.pause(); // 暂停stream

  • subscription.resume(); // 恢复stream

  • subscription.cancel(); // 取消stream,后续没有stream的数据输出了

链式玩法:

final myStream = NumberCreator().stream.map((i) => 'String $i');

// 打印
final myStream = NumberCreator().stream.map((i) => 'String $i').listen(print);

// 打印奇数
final myStream = NumberCreator().stream
  .where((i) => i % 2 == 0)
  .map((i) => 'String $i').listen(print);

# Flutter中的玩法

使用StreamBuilder来调用stream:

StreamBuilder(
  builder: (context, snapshot) {
    if (snapshot.hasData) {
      return Text(
        'You have pushed the button this many times:${snapshot.data}',
      );
    }
    return const Text('');
  },
  stream: myStream,
),

进阶写法:

final myStream = NumberCreator().stream;

StreamBuilder(
  builder: (context, snapshot) {
    if (snapshot.connectionState == ConnectionState.waiting) {
      return const Text('no Data');
    } else if (snapshot.connectionState == ConnectionState.done) {
      return const Text('Done!');
    } else if (snapshot.hasError) {
      return Text(snapshot.error.toString());
    } else {
      return Text(
        'You have pushed the button this many times:${snapshot.data}',
      );
    }
  },
  stream: myStream,
),