下载本文的 PDF 版本 PDF

使用副作用增强 Dart

一组 Dart 编程语言的扩展,旨在支持异步和生成器函数


Erik Meijer,Applied Duality;Kevin Millikin,Google;Gilad Bracha,Google

Dart 编程语言最近合并了一组扩展,旨在支持异步和生成器函数。由于 Dart 是一种用于 Web 编程的语言,延迟是一个重要的考虑因素。为了避免阻塞,当计算结果需要较长时间时,开发人员必须将方法设为异步。生成器函数简化了计算可迭代序列的任务。

本文描述的扩展提供了两种类型的同步函数:生成单个结果的普通函数;以及生成一系列结果的生成器函数。因此,出于对称性的考虑,异步函数也应该有两种类型:异步生成单个结果的函数;以及异步生成一系列结果的生成器。在此处描述的设计中,普通函数/生成器和异步/同步是正交维度。表 1 说明了这些维度,并显示了涉及的类型。

 

一个

多个

同步

T

Iterable<T>

异步

Future<T>

Stream<T>

表 1 生成器和同步性:正交维度

不幸的是,当代语言中的控制结构针对简单的同步调用进行了优化,一旦需要组合异步方法,开发人员就只能靠自己,被迫手动编写显式的 CPS(延续传递风格)代码,就像他们是人类编译器一样。对于操作异步流的代码来说,情况甚至更糟,但即使创建同步迭代器也涉及到乏味的手动构建 CPS 状态机。

幸运的是,一些现代语言实现已经使用延续进行编译,这为自动代表程序员执行必要的 CPS 转换提供了机会。通过简单地将方法体标记为 async, async*,sync*,开发人员可以使用普通的命令式控制结构,例如循环、条件语句、try/catch/finallybreak/continue 语句来摆脱“回调地狱”,因为编译器会处理所有繁重的工作。

 

使用 Future 的异步编程

在 Dart 和许多其他语言中,异步返回单个结果的方法通过立即返回 Future<T> 类型的值来实现。 Future 上的 .then(f, onError: g) 方法注册延续回调,用于在 future 成功完成并获得 T 类型的 f 值或分别获得异常 g 时执行。通过使用 future 来承认异步性的回报是“回调地狱”。开发人员无法再使用常规的控制流结构,而是需要手动将代码转换为 CPS。

想象一下,如果 Dart 有一个用于发出 HTTP 请求的同步 API。那么我们可以编写以下简单的代码来创建一个新客户端,联系 example.com 执行搜索,打印结果并关闭连接

 

getPage(t) {

  var c = new http.Client();

  try {

     var r = c.get('http://example.com/search?q=$t');

     print(r);

  } finally {

     return c.close();

  }

}

 

当然,执行阻塞的同步 HTTP 请求不是一个好主意,这就是为什么在 Dart 中所有网络 API 都是异步的并返回 future。但是,我们必须使用 .then.whenComplete 而不是使用 ;try-finally,结果导致代码的高级结构被破坏

 

Future getPage(t) {

  var c = new http.Client();

  return c.get('http://example.com/search?q=$t')

     .then((r) {

        print(r);

     }).whenComplete(() { return c.close(); });

}

 

这就是 async 函数的用武之地。这些函数在被调用时会立即且自动地返回一个 future。 future 稍后会以函数体确定的方式完成。如果函数成功返回,则 future 将以函数体计算的值完成。如果函数抛出异常,则 future 将以抛出的对象完成。

使用 async 函数,我们可以编写完全异步的代码,使用常规的控制结构,非常像我们最初设想的阻塞同步代码

 

Future getPage(t) async {

  var c = new http.Client();

  try {

     var r = await c.get('http://example.com/search?q=$t');

     print(r);

  } finally {

     await c.close();

  }

}

 

此处显示的 async 函数体使用 await 表达式来暂停函数的执行,直到正在等待的 future 完成。然后,如果 future 以值 r 完成,则 await 的求值结果为 r。如果被等待的表达式求值为非 future 值或抛出异常,则结果将被包装在一个 future 中,然后再由 await 进一步处理。

 

Future 表示最终将以值或异常完成的计算。在 Dart 中,我们可以使用 Future<T> 上的 .then 方法为这两个事件注册回调 onValue(T value)onError(Exception e)。要从头开始创建 future,Dart 提供了工厂类型 Completer<T>(在 Scala 等其他语言中,completer 通常称为promise)。给定一个 completer,我们可以通过 .future 属性提取一个 future,并且可以使用 .complete(T value).completeError(Object e) 方法(最多一次)以值或错误完成该 future。

 

请注意,await e 只是 async 函数内部的普通表达式,因此可以出现在函数体中可以出现表达式的任何位置,包括 catchfinally 代码块内。 async 修饰符也可以应用于闭包和 getter 的函数体,但不能应用于构造函数或 setter。因此,async 函数的编写方式可以非常接近同步函数的编写方式,并具有熟悉的顺序控制结构:循环、条件语句和 try-catch-finally

 

虽然 async 函数可能让我们想起 C# 中的 async/await 功能,但请注意,它们之间存在差异。在 C# 中,async 函数从调用时到遇到第一个 await 表达式之间是同步执行的。仅当值尚未同步完成或已抛出异常时,才会等待计算。否则,执行将继续使用已完成 future 的值。因此,即使标记为 async,异步函数也可能完全同步执行。与 C# 的另一个区别是,在 Dart 中,标记 async 的是函数。这强调了异步性是函数实现的一个属性,并且函数的签名不受影响。此外,在 Dart 中,await 始终会暂停执行,即使被等待的表达式的结果不是 future 也是如此。最后,在 C# 中,控制流无法逃脱 finally 代码块,而在 Dart 中,与 Java 中一样,finally 代码块可以将控制权转移回我们想要的任何位置。这使得在 Dart 中无法实现保留原始程序结构的 C# async/await 转换,这也触发了基于延续的实现。

Facebook 新的 Hack 语言中的 async 函数类似于 C# 的 async 方法,并急切地执行 async 函数的函数体,直到第一个 await;已经完成的 future 不会被等待。

 

从概念上讲,await 表达式 x = await e; rest[x] 的语义是 e.then((r){ x = r; rest[x] });当然,微妙之处之一在于“计算的其余部分”究竟是什么意思,尤其是当 await 表达式出现在 try-catch-finally 语句或循环中时,如下面的(人为的)示例所示

 

foo() async {

  var x;

  while(true) {

     try { x = await foo(); return x+1; } finally { continue; }

  }

}

 

为了展示 async 函数的含义,我们为 Dart 方法的“轻量级”子集定义了 Scott-Strachey 风格5 的延续语义,该子集由以下语法定义

 

expression ::= identifier = expression

           | expression.identifier(expression)

           | await expression

 

statement ::= { ... statement ... }

          | expression;

          |  return expression;

          |  while (expression) statement

          |  try statement finally statement

 

method ::= identifier(identifier) async {
             var identifier; ... statement ...
          }

 

不失一般性,我们假设表达式和语句将始终成功终止(即,不会抛出异常)。此外,为了避免混乱,我们假设方法只有一个参数,方法体只有一个局部变量,并且所有调用都接受一个接收者和一个参数。

不要被希腊符号和高阶函数吓倒。语义规则基于 Dart async 方法如何执行的操作直觉,并与官方 Dart 语言规范的声明性描述相匹配。2

语句 𝒞〚s〛(ρ,σ) 的语义接受两个回调函数(或延续),分别命名为 ρσ。返回延续 ρ 表示如果语句 s 执行 return 时应该发生什么;成功延续 σ 表示当 s 的执行在没有遇到 return 的情况下结束时应该发生什么。因此,两个延续都具有相同的类型,即,它们接受任何值并返回 void

表达式 〚e〛(σ) 的语义只需要一个成功延续 σ,因为 return 是一个语句,因此无法从表达式内部的方法 return

方法声明 〚d〛 的语义通过分配一个新的 Completer 来连接所有内容,该 completer 由方法体的初始返回和成功延续完成,并且其 future 由生成的包装器函数立即返回。方法体本身在新 future 中运行。

return 语句的语义开始,规则 𝒞〚return e;〛(ρ,σ) 指定首先对表达式 e 进行求值以获得值 r,如调用  〚e〛(λr→...) 所示,然后将表达式 r 的值传递给 return 语句的返回延续,如 ρ(r) 所示。完整的规则如下

 

𝒞〚return e;〛(ρ,σ) = 〚e〛(λr→ρ(r))

 

当然,对于 async 方法,最有趣的情况是 await 表达式 〚await e〛(σ) 的语义。该语义形式化了首先对表达式 〚e〛(λr→...) 进行求值以获得 future r,然后,当 r 完成时,r.then(σ) 将继续评估程序的其余部分 σ

 

〚await e〛(σ) = 〚e〛(λr→r.then(σ))

 

这正是“等待” future 时所期望的。请记住,为了简单起见,我们不处理失败;如果我们这样做,语义将简单地采用额外的异常延续 ε,它将作为 future 的错误延续传递。 async 方法的完整语义如下

 

〚x=e〛(σ) = 〚e〛(λr→{ x=r; σ(r); })

〚a.f(b)〛(σ) = 〚a〛(λa→〚b〛(λb→σ(a.f(b)))

〚await e〛(σ) = 〚e〛(λr→r.then(σ))

 

𝒞〚{ s1 s2 }〛(ρ,σ) = 𝒞〚s1〛(ρ, λr→𝒞〚s2〛(ρ,σ))

𝒞〚e;〛(ρ,σ) = 〚e〛(σ)

𝒞〚return e;〛(ρ,σ) = 〚e〛(λr→ρ(r))

𝒞〚while(e) s〛(ρ,σ) = loop()

  where loop() = 〚e〛(λr→{

           if(r){ 𝒞〚s〛(ρ, λr→loop()); } else { σ(null); }              
        })

𝒞〚try s1 finally s2〛(ρ,σ) =
  𝒞〚s1〛(λr→𝒞〚s2〛(ρ,λs→ρ(r)), λr→𝒞〚s2〛(ρ,λs→σ(null)))

 

〚f(a) async { var x; s }〛= f(a) {

  var result = new Completer();

  new Future(λ()→{

     var x;

     𝒞〚s〛(λr→result.complete(r), λr→result.complete(null))

  });

  return result.future

}

 

在以下示例中,将语义应用于左侧的 async 方法将导致右侧的常规方法,该方法使用 .then 和递归来实现原始 async 方法的排序和循环

f() async {

 

 

  var x;
 

  while(await g()) {

   

    x = await h();

 }

 return x;

}

 

f() {

 var result = new Completer();

 new Future(λ()→{

    var x;

    loop() {

g().then(λs→
       if(s) {
          h().then(λs→{ x = s; loop(); });

       } else {
          result.complete(x);

       });

    }

    loop();

 });

 return result.future;

}

 

抽象延续语义的一个令人欣慰的特性是,生成的代码与开发人员手动编写以实现相同效果的代码非常接近。

 

如前所述,此处描述的语义必然描绘了一个简化的图景。为了处理完整的 Dart 语言,而不仅仅是轻量级子集,Dart2Dart 编译器携带了几个额外的延续来处理 breakcontinue 语句,以及 switchcatch 代码块。编译器还尽可能地将延续反功能化为直接风格(即,不包含 await 表达式的语句被编译回自身),并删除不必要的延续“管理”应用程序。然而,从根本上说,编译器的工作方式与语义相同,只是它添加了超越轻量级语言类别所需的所有可怕细节。要了解有关使用延续进行编译的更多信息,请参阅 Matthew Might 的博客文章 (http://matt.might.net/articles/cps-conversion/) 以及其中的参考文献。

Dart VM(虚拟机)可以直接访问运行时的堆栈、返回地址和异常表,因此能够以更低的抽象级别实现此处描述的语言增强功能。这些运行时结构是表示指称语义操作的各种延续的具体表示。要了解有关在堆栈和返回地址级别描述 asyncsync*async* 的语义的更多信息,请参阅以下论文:http://dl.acm.org/citation.cfm?id=2367181http://dl.acm.org/citation.cfm?id=1297063

 

迭代器和可迭代对象

现在我们已经解释了 async 方法如何简化返回单个值的异步代码,让我们将重点转移到集合的批量处理。 IterableIterator 接口(或其细微变体)几乎是每种现代面向对象语言中集合库的主力。由于 foreach 循环,使用可迭代对象非常容易

 

Iterable xs = [1, 2, 3, 4, 5];

for(x in xs) { print(x); }

 

真正发生的是 for(x in xs) 隐藏了从可迭代对象 xs 获取新迭代器并遍历它的样板代码。换句话说,Dart 中的 for 循环是以下 while 循环的语法糖

 

Iterable xs = [1, 2, 3, 4, 5];

var _xs = xs.iterator;

while(_xs.moveNext()){ var x = _xs.current; print(x); }

 

然而,Dart 没有用于生成可迭代对象的语法支持。这要求开发人员执行与他们在没有 async 函数的帮助下编写异步方法时必须执行的相同残酷和不自然的动作。例如,让我们尝试从头开始编写标准库函数 filter。给定一个谓词,filter 必须返回一个新的可迭代对象,其中删除了谓词为 false 的所有值

 

Iterable filter(Iterable src, predicate) {

  return new FilterIterable(src, predicate);

}

 

类型 FilterIterable 构建于 IterableBase 之上,后者实现了 Iterable 的所有方法,除了 iterator.iterator 方法返回 FilterIterator 的实例,该实例将从迭代器中过滤掉值。

 

class FilterIterable extends IterableBase {

  var src, predicate;

FilterIterable(this.src, this.predicate);

  FilterIterator get iterator {
     return new FilterIterator(src.iterator, predicate);
  }

}

 

现在我们可以实际实现删除值的逻辑

 

class FilterIterator extends Iterator {

  var src, predicate;

FilterIterator(this.src, this.predicate);

  bool moveNext() {

     while (src.moveNext()) {

       if (predicate(src.current)) { return true; }

     }

     return false;
  }

  get current { return src.current; }

}

 

当唯一有趣的行是 if(predicate(src.current)){ ... produce the next value ... } 时,这是很多无聊的仪式。目前还不太容易看出这段代码实际上也是手动 CPS 转换,但这将在稍后定义同步生成器的延续语义时变得清晰。

同步生成器函数是用于定义迭代器的语法糖,通过使用 sync* 修饰符标记其函数体来定义。生成器在被调用时会立即返回一个 Iterable。当我们随后从可迭代对象中获取迭代器并调用其 moveNext() 方法时,函数体将被执行,直到内部命中 yieldreturn。如果它到达 return 语句(再次强调,前提是它没有被 finally 子句劫持),则迭代器完成,并且进一步调用 moveNext() 将返回 false。否则,后续调用 moveNext() 将从函数上次停止的地方恢复执行,直到下一个 yieldreturn。使用生成器,filter 的代码可以简化为一行。

 

Iterable filter(Iterable src, predicate) sync* {

  for(var s in src){ if(predicate(s)) { yield s; }}

}

 

仔细阅读 Future 的 API 文档会发现,嵌套的 Future 会被自动扁平化。对于生成器,我们通常希望维护嵌套的可迭代对象,但在某些情况下,我们希望将嵌套的可迭代对象拼接到其父可迭代对象中。以下面的示例为例,其中序列 range(s,n) = s, s+1, ..., s+n-1 是递归生成的。

 

Iterable range(s, n) sync* {

  if(n>0){ yield s; for(var i in range(s+1, n-1)) yield i; }

}

 

这个 range 实现的问题在于,值 s+iyieldi 次,因此 range 的运行时复杂度是二次方的。4 这不是一个人为的问题;如果我们通过复制来连接两个可迭代对象:

 

Iterable append(Iterable left, Iterable right) sync* {

  for(var l in left){ yield l; } for(var r in right){ yield r; }

}

 

那么当我们以左结合的方式连接列表时,例如 append(append(xs,xs),xs),我们也会遇到同样的二次效应。实现高效的列表连接并非易事,3 并且“二次连接”问题也出现在许多其他上下文中。

Dart 的 sync* 方法允许我们使用 yield* 语句将嵌套的可迭代对象拼接进结果可迭代对象,而不是复制嵌套的可迭代对象,这样可以实现如下的 range

 

range(s, n) sync* {

  if(n>0){ yield s; yield* range(s+1, n-1); }

}

 

为了能够高效地实现嵌套可迭代对象的拼接,sync* 函数的结果必须是一个玫瑰树,当我们迭代它时,它是以深度优先顺序(使用堆栈)遍历的。形式上,我们使用迭代器与恢复点的递归类型同构的事实:

 

Iterator<T> μ Resumption.
            ()→(()(T,Resumption)(()→Resumption,Resumption))

 

Resumption 是一个递归函数,它通过立即终止、返回一个包含单个值和一个将产生更多值的后续恢复点的对,或者返回一个包含嵌套的可迭代对象和一个后续恢复点的对来展开迭代器。假设存在一个函数 β (及其逆函数 β-1),它将 Iterable<T> (即迭代器工厂)嵌入到类型为 ()→Resumption 的恢复点工厂中。它确保正常的可迭代对象和对 sync* 方法的递归调用都被统一地作为恢复点处理。

用于 sync* 方法的 Dart 轻量级子集添加了 yieldyield* 语句,并将 sync* 方法内部的 return 语句限制为不能带有结果表达式。同样,为了不失一般性,我们忽略异常。

 

expression ::= identifier = expression

           | expression.identifier(expression)

 

statement ::= { ... statement ... }

          | expression;

          |  return;

          |  yield expression; | yield* expression;

          |  while(expression) statement

          |  try{ statement } finally statement

         

method ::= identifier(identifier) sync*{
var identifier; ... statement ...
  }

 

语句 𝒞〚s〛(ρ,σ) 的语义使用两个延续 ρσ,因为在执行 sync* 函数体时,我们需要在完成底层可迭代对象之前运行 finally 代码块。表达式 〚e㛬(σ) 的语义与 async 方法基本相同,除了在 sync* 方法内部我们不能使用 await (为此我们需要 async* 方法,正如本文后面解释的那样)。方法声明 〚d㛬 的语义将所有内容连接起来。

sync* 方法内部的 Return 语句没有结果表达式,但必须像普通方法中的 return 语句一样,经过 finally 代码块;因此,return 语句 𝒞〚return;㛬(ρ,σ) 的语义使用“恢复点结束”值调用返回延续。

 

𝒞〚return;㛬(ρ,σ) = ρ()

 

对于 sync* 方法,最有趣的例子是 yield 单个值 𝒞〚yield e;㛬 或嵌套的可迭代对象 𝒞〚yield* e;㛬。该语义形式化了首先对表达式 〚e㛬(λr→...) 进行求值以获得结果 r,然后 sync* 方法的执行返回一个包含值 r 和剩余计算 σ 的恢复点对。

 

𝒞〚yield e;㛬(ρ,σ) = 〚e㛬(λr→(r,σ)))

 

当我们 yield* 一个嵌套的可迭代对象时,r 会使用 β 强制转换为恢复点,因此我们不必担心 sync* 方法的递归调用或返回可迭代对象的常规方法之间的区别。

 

𝒞〚yield* e;㛬(ρ,σ) = 〚e㛬(λr→(β(r),σ))

 

然后,sync* 方法的完整延续语义如下所示:

 

〚x=e〛(σ) = 〚e〛(λr→{ x=r; σ(r); })

〚a.f(b)〛(σ) = 〚a〛(λa→〚b〛(λb→σ(a.f(b)))

 

𝒞〚s1 s2㛬(ρ,σ) = 𝒞〚s1㛬(ρ, λr→𝒞〚s2㛬(ρ,σ))

𝒞〚e;㛬(ρ,σ) = 〚e㛬(λr→ρ(r))

𝒞〚return;㛬(ρ,σ) = ρ()

𝒞〚yield e;㛬(ρ,σ) = 〚e㛬(λr→(r,σ))

𝒞〚yield* e;㛬(ρ,σ) = 〚e㛬(λr→(β(r),σ))

𝒞〚while(e) s〛(ρ,σ) = loop()

  where loop() = 〚e㛬(λr→{

           if(r){ 𝒞〚s〛(ρ, λr→loop()); } else { σ(null); }              
        })

𝒞〚try s1 finally s2〛(ρ,σ) =
  𝒞〚s1㛬(λ()→𝒞〚s2㛬(ρ,λs→ρ()), λr→𝒞〚s2㛬(ρ,λs→σ(r)))

 

〚f(a) sync* { var x; s }㛬= f(_a){

  return new _IterableBase(λ()→{

     var x; var a = _a; return 𝒞〚s㛬(λ()→(), λr→())

  });

}
where

  class _IterableBase extends IterableBase {

     var _resumption;

     _IterableBase(this._resumption);

     Iterator get iterator { return β-1(_resumption()); }

  }

 

将语义应用于左侧的程序会得到:

 

range(s,n) sync* {

  if(n>0) {

     yield s;

     yield* range(s+1,n-1);
  }

}

range(s,n)→new _IterableBase(λ()→{

  if(n>0) { return

     (s,

      λ()→(β(range(s+1, n-1)),λ()→())

     );  

  }

  return ();
});

 

虽然 sync* 函数可能会让我们想起 C# 中的迭代器,但我们必须再次强调,两者之间存在差异。在 C# 中,根据包含 yield returnyield break 语句的迭代器方法指定的返回类型,迭代器可以返回可枚举对象(可迭代对象)或枚举器(迭代器)。由于 Dart 中的类型是可选的,因此 sync* 函数始终返回可迭代对象。在 Dart 中,yieldyield*return 语句可以出现在常规语句可以出现的任何地方,包括在 finally 代码块中。与 Dart 的另一个不同之处在于,在 C# 中,lambda 表达式不能是迭代器(因为我们无法指定它们的返回类型),而在 Dart 中,方法、函数和 getter 可以标记为 sync*。Dart sync* 方法相对于 C# 迭代器最重要的改进或许是包含了 yield* 用于拼接嵌套的可迭代对象。

 

是否取消:这是一个问题

 

一个开放的设计决策是 Dart 迭代器是否应该支持取消或释放方法。对于简单的迭代器来说,这不是问题,但是一旦引入 sync* 方法的强大功能,就很容易使方法在飞行中挂起,占用昂贵的资源。例如,在以下 sync* 方法上仅调用一次 moveNext() 将返回文件的第一行,但永远不会关闭文件:

 

Iterable readLines(name) sync* {

  var file = new File(name);
  try {
     yield file.readNext();

  } finally {

     file.close();

  }

}

 

在 C# 中,当我们 dispose(取消)一个可迭代对象时,所有 finally 代码块都会运行,并且 foreach 循环被 try/finally 代码块包围,以确保可枚举对象在使用后始终被释放。通过维护一个“finally 延续”,可以很容易地将此功能添加到 sync* 方法中,该延续在被调用时将运行所有 finally 代码块,并忽略在清理期间遇到的任何进一步的 yieldyield* 语句。

 

async* = async+sync*

到目前为止,我们已经了解了 asyncsync* 方法如何简化分别异步返回单个值和同步返回多个值的方法的编码。将这两个维度结合起来得到 async* 函数,这些函数简化了异步生成值流的方法的编码。正如 async 函数立即返回 future 一样,async* 函数立即返回一个 Stream。当(且仅当)我们监听 stream 时,该函数才开始执行,就像在 sync* 方法返回的 Iterable 上调用 moveNext 时,sync* 方法才开始执行一样。

假设我们要扫描一个 stream,并增量地发出将异步函数应用于每个传入项的中间结果。使用 async* 方法,这变成只是遍历 stream、等待下一个状态的计算结果,然后 yield 当前状态的问题:

 

Stream scan(Stream src, state, acc) async* {

  yield state;
  await for(var next in src) {

    try {

       yield (await accumulateAsync(state, next));
    } catch(e) {

       return; // swallow exception, don't do this at home!

    }

  }

}

 

正如您可以想象的那样,使用 StreamController 手动实现此行为非常繁琐。我们需要在等待 accumulateAsync 的结果时暂停源 stream,并在 future 完成时恢复它,并且我们需要处理 await 周围的 try-catch 代码块、结果 stream 的取消等等。感谢 async* 方法,我们只需编写常规的控制流,Dart 编译器将负责处理所有困难的 stream 管理问题。

 

Dart stream 是可迭代对象的异步对应物。给定一个类型为 Stream<T> 的 stream ts,我们可以通过传递回调函数 void onData(T event),以及可选的 void onError(Exception e)void onDone() 来监听 stream 以获取新值的通知。listen 方法然后返回一个 StreamSubscription,它具有一个方法 cancel 以取消订阅 stream,以及 pauseresume 以控制生产者的速度。Stream 可以通过转换现有 stream(使用诸如 mapreduce 之类的运算符)或使用 StreamController 来创建,我们可以使用 addaddError 方法向其推送值。

 

定义 async* 方法语义的 Dart 轻量级子集定义如下:

 

expression ::= identifier = expression

           | expression.identifier(expression)

           |  await expression

 

statement ::= statement statement

          | expression;

          | return;

                  | yield expression; | yield* expression;

          |  while(expression) statement

          |  try statement finally statement

          |  await for(var identifier in expression) statement

method ::= identifier(identifier) async* {
             var identifier; ... statement ...
          }

 

为了保持简单,我们将再次忽略失败并且不实现取消。由于 await for 循环可以嵌套,因此轻量级 async* 函数体中语句的语义 𝒞〚s㛬(ξ,ρ,σ) 不仅需要两个延续 ρσ,还需要一个订阅 ξ 到封闭方法或 await for 循环的 stream(为了方便起见,我们通常将 ξ 简单地称为“立即封闭的 stream”)。需要立即封闭的 stream,以便在 async* 方法的结果 stream 被暂停时,我们可以在 yieldyield* 语句处或在使用 await for 遍历嵌套的 stream 时 pause/resume 函数体。封闭的 stream 初始化为 never,它永远不会发出值,因此 never.pause()never.resume() 是空操作,可以被丢弃。

表达式的语义 〚e㛬(ξ,σ) 也获取了封闭 stream 的订阅,以便在执行 await 表达式时可以 pause/resume 它。方法声明 〚d㛬 的语义将所有内容连接起来,通过创建 stream 控制器的子类,其 .stream 立即返回,其 .sinkasync* 方法的函数体用于 yield 值。

对于表达式 〚await e㛬(ξ,σ),首先使用封闭的 stream ξ 求值表达式 〚e㛬(ξ,λr→{...}} 以 yield 值 r。在等待 r 之前,暂停 ξ,以便在等待 future 完成期间不会推送值。在使用 future 的结果 v 调用成功延续之前,恢复封闭的 stream ξ

 

〚await e㛬(ξ,σ) = 〚e㛬(ξ,λr→{
  ξ.pause(); r.then(λv→{ ξ.resume(); σ(v); });
})

 

async* 方法内部的 Return 语句与 sync* 的 return 语句类似,它们都调用返回延续以运行所有未完成的 finally 代码块,然后关闭结果 stream。在 yield 值之前,async* 方法检查其结果 stream(在全局变量 result 中捕获)是否具有待处理的暂停请求。如果是,则立即封闭的 stream ξ 被暂停,然后通过设置 onResume 回调来暂停结果 stream 本身,以便在消费者恢复它时继续执行。

 

𝒞〚yield e;㛬(ξ,ρ,σ) = 〚e㛬(ξ,λr→{

  if(result.isPaused){

     ξ.pause(); result._onResume = λ()→{
        ξ.resume(); result.add(r); σ(null);
     };

  } else {

    result.add(r); σ(null);

  }

})

 

〚㛬async* 函数体上被调用时,在初始返回和成功延续中,也会执行类似的暂停检查以关闭结果 stream。async* 方法最有趣的例子是遍历 stream 𝒞〚await for(var x in e){s}㛬(ξ,ρ,σ)。首先求值循环表达式 〚e㛬(ξ,λr→{...}) 以获取 stream r。然后暂停立即封闭的 stream ξ,并改为监听 r 以获取订阅 onData 回调在 的上下文中执行循环体,onDone 回调在调用成功延续之前恢复 ξ

 

𝒞〚await for(var x in e) s㛬(ξ,ρ,σ) = 〚e㛬(ξ,λr→{

  ξ.pause();

  var _ξ; _ξ = r.listen(λv→{

     var x = v;

     𝒞〚s㛬(_ξ, λ()→{ _ξ.close(); ξ.resume(); ρ(); }, λ()→{})

  }, onDone: λ()→{ _ξ.close(); ξ.resume(); σ(null); });

})

 

与普通 for 循环遍历可迭代对象一样,每次迭代都会分配一个新的循环变量。

以下是 async* 方法的完整延续语义:

 

 

〚x=e㛬(ξ,σ) = 〚e㛬(ξ,λr→{ x=r; σ(r); })

〚a.f(b)㛬(ξ,σ) = 〚a㛬(ξ,λa→〚b㛬(ξ,λb→σ(a.f(b)))

〚await e㛬(ξ,σ) = 〚e㛬(ξ,λr→{
  ξ.pause(); r.then(λv→{ ξ.resume(); σ(v); });
})

 

𝒞〚{ s1 s2 }㛬(ξ,ρ,σ) = 𝒞〚s1㛬(ξ,ρ,λr→𝒞〚s2㛬(ξ,ρ,σ))

𝒞〚e;㛬(ξ,ρ,σ) = 〚e㛬(ξ,λr→ρ(r))

𝒞〚return;㛬(ξ,ρ,σ) = ρ()

𝒞〚yield e;㛬(ξ,ρ,σ) = 〚e㛬(ξ,λr→{

  if(result.isPaused){

     ξ.pause(); result._onResume = λ()→{
        ξ.resume(); result.add(r); σ(null);
     };

  } else {

    result.add(r); σ(null);

  }

})

𝒞〚yield* e;㛬(ξ,ρ,σ) = 〚e㛬(ξ,λr→{

  if(result.isPaused){

     ξ.pause();

     result._onResume = λ()→{

ξ.resume(); result.addStream(r); σ(null); };
  } else {

result.addStream(r); σ(null);

  }
})

𝒞〚while(e) s㛬(ξ,ρ,σ) = loop()

  where loop() = 〚e㛬(ξ,λr→ {

           if(r){ 𝒞〚s〛(ξ,ρ,λr→loop()); } else { σ(null); }              
        })

𝒞〚try s1 finally s2〛(ξ,ρ,σ) =
  𝒞〚s1〛(ξ,λ()→𝒞〚s2㛘(ξ,ρ,λs→ρ()), λr→𝒞〚s2㛘(ξ,ρ,λs→σ(r)))

𝒞〚await for(var x in e) s㛬(ξ,ρ,σ) = 〚e㛬(ξ,λr→{

  ξ.pause();

  var _ξ; _ξ = r.listen(λv→{
     var x = v;
     𝒞〚s㛘(_ξ,λ()→{ _ξ.close(); ξ.resume(); ρ(); }, λ()→{})
  }, onDone: λ()→{ _ξ.close(); ξ.resume(); σ(null); });
})

 

〚f(a) async* { var x; s }㛘= f(_a){

  var result = new _StreamController(
  onListen: λ()→{

     var x; var a = _a;

     𝒞〚s㛘(never,

      λ()→{
         if(result.isPaused) {

result._onResume = λ()→result.close();

         } else {

result.close();

         }

      }, λr→ {
         if(result.isPaused) {

result._onResume = λ()→result.close();

         } else {

result.close();

         }      

      })

  },

  onResume: λ()→{ result._onResume(); }
  );

  return result.stream;

}

 

下面,将语义应用于左侧的程序会产生右侧的代码

 

Stream repeat(s) async* {

  while(true) {
     yield* s();
  }

}

Stream repeat(_s) {

  var result = new _StreamController(

  onListen: λ()→{

     var s = _s;

     loop() = {

        if(result.isPaused){

result._onResume = λ()→{

result.addStream(s());

loop();

           };

        } else {

result.addStream(s());

           loop();

        }

     }

     loop();

  },
  onResume: λ()→{
     result._onResume();
  });

  return result.stream;

}

 

async* 方法是一项新颖的特性,据我们所知,没有其他语言支持它们,尽管有人建议将其作为扩展(参见 http://hendryluk.wordpress.com/2012/02/28/reactive-extensions-and-asyncawait/)用于支持 C# 中可观察流的命令式创建。来自 Netflix 的 Jafar Hussain 为 ECMAScript 提出了类似的功能,但到目前为止,ECMAScript 仍然缺乏异步流的标准类型。

 

结论

大多数主流(命令式)语言都针对表达同步计算进行了优化,其中调用者会阻塞,直到被调用者返回一个值。 随着调用者和被调用者必须交换数据的距离增加(从缓存、磁盘和网络),调用的延迟会增加多个数量级,1 因此编程语言直接支持异步调用变得至关重要。在异步调用中,调用者不会阻塞等待被调用者返回,而是传递一个延续,被调用者会在最终生成值时调用该延续。

通过复兴被遗忘的指称语义艺术,我们展示了如何采用同步控制流结构(例如循环、条件语句和 try-catch-finally 块),并在异步计算的上下文中以自然的方式重新解释它们,以及用于生成同步和异步值流。 在不久的将来,每种现代编程语言都可能以某种方式支持异步方法,并且本文中使用的语义可能有助于其他语言设计者采用该方法,以适应其特定语言的特性。

 

参考文献

1. Bonér, J. 2012. 每个程序员都应该知道的延迟数字; https://gist.github.com/jboner/2841832

2. Dart 语言规范; https://www.dartlang.org/docs/spec/

3. Hughes, R. J. M. 1986. 列表的新颖表示及其在函数“reverse”中的应用;信息处理快报 22 (3): 141-144; http://www.cs.tufts.edu/~nr/cs257/archive/john-hughes/lists.pdf

4. Jacobs, B., Meijer, E., Piessens, F., Schulte, W. 重新审视迭代器:证明规则和实现; http://research.microsoft.com/en-us/projects/specsharp/iterators.pdf

5. Stoy, J. E. 1977. 指称语义学:编程语言理论的 Scott-Strachey 方法。马萨诸塞州剑桥市:麻省理工学院出版社; http://dl.acm.org/citation.cfm?id=540155

 

喜欢或讨厌?请告诉我们

[email protected]

 

Erik Meijer ([email protected]) 是 Applied Duality 的创始人,也是 TUDelft 的大数据工程教授。 他最出名的是他对编程语言(如 Haskell、C#、Visual Basic、Hack 和 Dart)以及大数据技术(如 LINQ 和 Rx Framework)的贡献。

Kevin Millikin ([email protected]) 是 Google 的一名软件工程师,曾从事 V8 JavaScript 引擎和 Dart 虚拟机的工作。 他是 V8 Crankshaft 编译器的最初开发者之一,Crankshaft 编译器是第一个具有自适应优化的 JavaScript 编译器。 他目前正在从事 dart2js 的工作。

Gilad Bracha ([email protected]) 是 Newspeak (http://bracha.org/Site/Newspeak.html) 编程语言的创建者,也是 Java 语言规范 (http://docs.oracle.com/javase/specs/) 的合著者。 他目前是 Google 的一名软件工程师,在 Google 从事 Dart (https://www.dartlang.org/) 的工作。

© 2014 1542-7730/14/1200 $10.00

acmqueue

最初发表于 Queue vol. 13, no. 3
数字图书馆 中评论这篇文章





更多相关文章

Matt Godbolt - C++ 编译器中的优化
在向编译器提供更多信息方面需要权衡:这会使编译速度变慢。 链接时优化等技术可以为您提供两全其美的优势。 编译器中的优化仍在不断改进,间接调用和虚函数分派方面的即将到来的改进可能很快就会带来更快的多态性。


Ulan Degenbaev, Michael Lippautz, Hannes Payer - 作为合资企业的垃圾回收
跨组件追踪是解决跨组件边界的引用循环问题的一种方法。 只要组件可以形成具有跨 API 边界的非平凡所有权的任意对象图,就会出现此问题。 CCT 的增量版本已在 V8 和 Blink 中实现,从而能够以安全的方式有效且高效地回收内存。


David Chisnall - C 并非低级语言
在最近的 Meltdown 和 Spectre 漏洞之后,值得花一些时间来研究根本原因。 这两个漏洞都涉及处理器推测性地执行超出某种访问检查的指令,并允许攻击者通过侧信道观察结果。 导致这些漏洞以及其他几个漏洞的功能被添加进来,是为了让 C 程序员继续相信他们正在用低级语言编程,而这种情况已经几十年没有发生过了。


Tobias Lauinger, Abdelberi Chaabane, Christo Wilson - 你不应该依赖我
大多数网站都使用 JavaScript 库,其中许多库已知存在漏洞。 了解问题的范围以及包含库的许多意外方式,只是改进情况的第一步。 此处的目的是,本文中包含的信息将有助于为社区提供更好的工具、开发实践和教育工作。





© 保留所有权利。

© . All rights reserved.