下载本文的PDF版本 PDF

大规模函数式编程

将函数式编程原则应用于分布式计算项目


Marius Eriksen

现代服务器软件在开发和运维方面都要求很高:它必须随时随地可用;必须在毫秒内回复用户请求;必须快速响应容量需求;必须处理大量数据和更多流量;必须快速适应不断变化的产品需求;并且在许多情况下,它必须容纳一个庞大的工程组织,其众多工程师就像一个大型、混乱厨房里众多的厨师。

更重要的是,这些应用程序的最佳计算机——无论你称之为云、数据中心还是仓库计算机——实际上都很糟糕。它们复杂且不可靠,并且容易出现局部故障。它们具有异步互连和深层内存层次结构,并为操作员错误留下了很大的空间。1

因此,云计算迫使我们正视分布式计算的全部复杂性,在分布式计算中,看似简单的问题需要复杂的解决方案。虽然这种复杂性很大程度上是固有的——问题的本质决定了不可能有更简单的解决方案——但其中很大一部分也是偶然的,仅仅是使用了不适合该用途的工具的简单后果。

在 Twitter,我们使用来自函数式编程的思想来解决现代服务器软件的许多复杂性,主要是通过使用高阶函数和效应。高阶函数,或那些返回其他函数的函数,使我们能够组合更简单的函数来定义更复杂的函数,以零散的方式构建应用程序功能。效应是表示某些副作用操作的值;它们与高阶函数结合使用,以从更简单的效应构建复杂的效应。7

高阶函数和效应通过两种方式帮助构建可扩展的软件:首先,从简单的部分构建复杂的软件使其易于理解、测试、重用和替换单个组件;其次,效应使副作用操作易于处理,从而促进了模块化和良好的关注点分离。

本文探讨了三种遵循这种函数式编程风格的具体抽象:Future 是表示异步操作的效应;服务是表示服务边界的函数;过滤器是封装独立于应用程序的行为的函数。反过来,高阶函数提供了用于将这些组合起来以从简单部分创建复杂系统的粘合剂。

因此,Future、服务和过滤器结合在一起,以零散的方式构建服务器软件。它们使程序员能够构建复杂的软件,同时保留他们推理其组成部分正确性的能力。

通过始终如一地应用这些原则,程序员可以构建出更简单、更灵活且性能更高的系统。

使用 Future 进行并发编程

“并发程序等待得更快。” ——Tony Hoare

并发是服务器软件开发中的核心主题。12 在这类软件中,并发主要来自两个方面。首先,规模意味着并发。例如,搜索引擎可能会将其索引拆分为许多小块(分片),以便整个语料库可以放入主内存中。为了高效地满足查询,必须并发查询所有分片。其次,服务器之间的通信是异步的,为了效率和安全,必须并发处理。

传统的并发编程方法是使用线程和锁。3 线程为程序员提供了并发的执行线程,而锁则协调跨多个线程的(可变)数据的共享。

在实践中,线程和锁出了名的难以正确使用。9 它们难以推理,并且是导致恶性错误的顽固原因。更重要的是,它们难以组合:你无法安全且任意地组合一组线程和锁来构建新功能。它们的计算语义被包裹在管理并发的机制中。

在 Twitter,我们转而围绕 Future 构建并发程序。Future 是一种效应,表示异步操作的结果。它是一种引用单元类型,可以处于三种状态之一:未完成,当 Future 尚未取值时;已完成并赋值,当 Future 包含成功操作的结果时;以及已完成并失败,当操作失败时。Future 最多可以经历一次状态转换:从未完成状态转换为成功或失败状态

在以下使用 Scala 的示例中,Future count 表示整数值操作的结果。我们直接响应 Future 的完成:respond 之后的代码块是一个回调,当 Future 完成时会调用该回调。(正如你很快就会看到的,我们很少以这种方式直接响应 Future 完成。)

val count: Future[Int] = getCount()
count.respond {
  case Return(value) =>
     println(s"The count was $value")
  case Throw(exc) =>
     println(s"getCount failed with $exc")
}

Future 表示 Twitter 系统中的几乎每个异步操作:RPC(远程过程调用)、超时、从磁盘读取文件、从事件流接收下一个事件。

借助一组高阶函数(称为组合器),Future 可以自由组合以表达更复杂的操作。这些组合通常分为顺序或并发两种组合类别之一。

顺序组合允许将一个 Future 定义为另一个 Future 的函数,从而使两者按顺序执行。当两个操作之间存在数据依赖性时,这非常有用:需要第一个 Future 的结果来计算第二个 Future。例如,当用户发送推文时,我们首先需要查看该用户是否在每小时速率限制之内,然后再将推文写入数据库。在以下示例中,Future done 表示此复合操作。(由于历史原因,顺序组合器称为 flatMap。)

def isRateLimited(user: Long): Future[Boolean] = ...
def writeTweet(user: Long, tweet: String): Future[Unit] = ...

val user = 12L
val tweet: String = "just setting up my twitter"

val done: Future[Unit] =
  isRateLimited(user).flatMap {
     case true => Future.exception(new RateLimitingError)
     case false => writeTweet(user, tweet)
  }

这也展示了如何在 Future 中表达失败:Future.exception 返回一个已在失败状态下完成的 Future。在速率限制的情况下,done 变成一个失败的 Future(带有异常 RateLimitingError)。失败会短路任何进一步的组合:如果在前面的示例中,isRateLimited(user) 返回的 Future 失败,则 done 立即失败;传递给 flatMap 的闭包不会运行。

另一组组合器定义了并发组合,允许在多个 Future 之间不存在数据依赖性时将它们组合在一起。并发组合器将 Future 列表转换为值列表的 Future。例如,你可能有一个 Future 列表,表示对搜索索引的所有分片的 RPC。并发组合器 collect 将此 Future 列表转换为结果列表的 Future。

val results: List[Future[String]] = ...
val all: Future[List[String]] =
  Future.collect(results)

默认情况下,独立的 Future 并发执行;仅在存在数据依赖性的地方才按顺序执行。

Future 组合器永远不会修改底层的 Future;相反,它们返回一个新的 Future,表示复合操作。这是推理的重要工具:复合操作不会改变其组成部分的行为。实际上,单个 Future 可以在许多不同的组合中使用和重用。

让我们修改之前的 getCount 示例,看看组合如何允许零散地构建复杂行为。在分布式系统中,通常最好是优雅降级(例如,在可能存在默认值或猜测的情况下),而不是使整个操作失败。8 这可以通过为 getCount 操作使用超时来实现,并且在失败时返回默认值 0。此行为可以表示为不同 Future 之间的复合操作。具体来说,你想要表示超时-默认和 getCount 操作的胜者的 Future。

val count: Future[Int] = getCount()
// Future.sleep(x) 返回一个 Future,它在
// 给定的时间量后完成。
val timeout: Future[Unit] = Future.sleep(5.seconds)

// Future 上的 map 方法构造一个新的 Future
// 它在成功完成之后,将给定的
// 函数应用于结果。它返回一个新的 Future
// 表示此复合操作。在这种情况下,
// 我们只需在超时后返回默认值 0。
val default: Future[Int] = timeout.map(unit => 0)

// Select 组合两个 Future,返回一个新的
// Future,它表示第一个完成的 Future。
val finalCount: Future[Int] = Future.select(count, default)

Future finalCount 现在表示如上所述的复合操作。此示例具有许多值得注意的特征。首先,我们使用简单的底层部分——Future 和函数创建了一个复合操作。其次,组成 Future 在组合下保留了它们的语义——它们的行为不会改变,并且它们可以用于多个不同的组合中。第三,没有说明执行的机制;相反,复合计算表示为多个底层部分的组合。没有显式创建线程,也没有它们之间的任何显式通信——这一切都由数据依赖性暗示。

这简洁地说明了 Future 如何将应用程序的语义(计算什么)从其机制(如何计算)中解放出来。程序员组合并发操作,但无需指定如何调度它们或如何传递值。这是一个良好的关注点分离:应用程序逻辑不会与运行时问题的细节纠缠在一起。

Future 有时可以使程序员无需使用锁。在数据依赖性通过 Future 的组合来证明的情况下,实现负责并发控制。换句话说,Future 可以组合成一个依赖关系图,该图以数据流编程的方式执行。11 虽然我们有时需要求助于显式并发控制,但大量常见用例都直接通过使用 Future 来处理。

在 Twitter,我们实现了使 Future 并发执行工作所需的机制,这些机制位于我们的开源 Finagle4,5 和 Util6 库中。这些库通过可插拔的调度器机制负责将执行映射到操作系统线程。Twitter 的一些团队已经使用此能力来构建更符合其问题领域及其随之而来的权衡的调度策略。我们还使用此功能向我们的系统添加了诸如维护运行时统计信息和 Dapper 风格的 RPC 跟踪等功能,而无需更改任何现有 API 或修改现有用户代码。

使用服务和过滤器进行编程

“我们应该有一些像花园软管一样耦合程序的方法——当需要以另一种方式处理数据时,拧入另一段。这也是 I/O 的方式。” ——Doug McIlroy

现代服务器软件充斥着本质复杂性——即问题固有的且无法避免的复杂性——以及更多自找的复杂性(我们真的需要那个产品功能吗?)。无论如何,既然我们似乎无法保持应用程序的简单性,我们就必须应对它们的复杂性。

因此,现代软件工程实践的中心是如何包含和管理复杂性——以允许我们推理应用程序行为的方式将其打包起来。在这项工作中,目标是在简单性和清晰性与可重用性和模块化之间取得平衡。

更重要的是,服务器软件必须考虑到分布式系统的现实。例如,搜索引擎可能只是省略来自失败索引分片的结果,以便它可以返回部分结果,而不是完全失败查询(如 getCount 示例中所示)。在这种情况下,用户通常无法分辨出差异——即使没有完整的结果集,搜索引擎仍然很有用。以这种方式应用应用程序级知识对于创建真正有弹性的应用程序通常至关重要。

因此,分布式应用程序围绕服务过滤器组织。服务表示 RPC 服务端点。它们是 A => Future[R] 类型的函数(即,一个接受 A 类型的单个参数并返回 R 类型 Future 的函数)。这些函数是异步的:它们立即返回;延迟的操作由返回的 Future 捕获。服务是对称的:RPC 客户端调用服务以分发 RPC 调用;服务器实现服务来处理它们。

以下示例定义了一个简单的字典服务,该服务在内存中存储的映射中查找键。该服务接受一个 String 参数并返回一个 String 值,如果未找到键,则返回 null。

// 这是字典数据,编码在
// 字符串到字符串的映射中。
val map: Map[String, String] = Map(...)

val dictionary: Service[String, String] =
  // 从一个新服务构造
  // 匿名函数,调用
  // 用于发出的每个请求。
  Service(key: String => {
     // 返回查找结果。(Map.get
     // 返回一个可选字符串。) Future.value
     // 创建一个已
     // 用给定值满足的 Future。
     if (map.contains(key))
        Future.value(map(key))
     else
        Future.value(null)
  })

请注意,服务 dictionary 再次像任何其他值一样是一个值。一旦定义,就可以通过首选的 RPC 协议在网络上进行分发。在服务器上,服务导出为

Rpc.serve(dictionary, ":8080")

而客户端可以绑定并调用服务的远程实例

val service: Service[String, String] =
  Rpc.bind("server:8080")

val result: Future[String] =
  service("key")

观察客户端和服务器之间的对称性:两者都在服务方面进行对话,服务是位置透明的;一个是本地实现的,而另一个是远程绑定的,这是一个实现细节。

服务可以使用普通的函数式组合轻松组合。例如,以下服务跨多个字典服务执行分散-收集查找。因此,字典可以分布在多个分片上。

def multipleLookup(services: Seq[Service[String, String]])
     : Service[String, String] =
  Service(key => {
     val results: Seq[Future[String]] =
        services.map(_.apply(key))
     val all: Future[Seq[String]] =
        Future.collect(results)
     all.map { results: Seq[String] =>
        results.indexWhere(_ != null) match {
           case -1 => null
           case i => results(i)
        }
     }
  })

这个例子有很多内容。首先,使用所需的键调用每个底层服务,获得一系列结果——所有 Future。Future.collect 并发地组合 Future,将 Future 序列转换为包含组成 Future 的(成功)完成值的序列的单个 Future。然后,代码查找第一个非空结果并返回它。

过滤器也是异步函数,它们与服务组合以修改其行为。它们的类型是 (A, Service[A, R]) => Future[R] (即,具有两个参数的函数:类型为 A 的值和服务 Service[A, R]);然后,过滤器返回该服务的返回类型 R 的 Future。过滤器的类型表明它负责满足请求,给定一个服务。因此,它可以修改请求如何分派到服务(例如,它可以修改它)以及如何返回请求(例如,向返回的 Future 添加超时)。

过滤器用于实现诸如超时和重试、故障处理策略和身份验证等功能。过滤器可以组合以形成复合过滤器——例如,实现重试逻辑的过滤器可以与实现超时的过滤器组合。最后,过滤器与服务组合以创建具有修改行为的新服务。

在前一个示例的基础上,以下是一个过滤器,它将查找服务中的故障降级为 null。这种行为在构建弹性服务时通常很有用——有时返回部分结果比完全失败更好。

val downgradeToNull: Filter[String, String] =
  Filter((key, service) => {
     service.apply(key).transform {
        case Return(value) => value
        case Throw(exception) => null
     }
  })

另一个过滤器短路了对无用键的请求(这些键在搜索引擎中被称为停用词),这样它们就不会给底层服务带来不必要的负载

val stopwords: Set[String] ...
val stopwordFilter: Filter[String, String] =
  Filter((key, service) => {
     if (stopwords.contains(key))
        Future.value(null)
     else
        service.apply(key)
  })

最后,将两个过滤器组合在一起,然后应用于所有服务。

val resiliencyFilter: Filter[String, String] =
  stopwordFilter.andThen(downgradeToNull)

def resilientMultipleLookup(services: Service[String, String]) = {
  val resilientServices =
     services.map { service => resiliencyFilter.andThen(service) }
  multipleLookup(resilientServices)
}

val resilientService: Service[String, String] =
  resilientMultipleLookup(
     Rpc.bind("server1:8080"),
     Rpc.bind("server2:8080"),
     ...
  )

字典查找服务 resilientService 以弹性方式实现了分散-收集查找。该功能由单独的、定义明确的组件构建而成,这些组件组合在一起以创建一个以期望方式运行的服务。

与 Future 一样,组合过滤器和服务不会更改底层的组成组件。它创建了一个具有新行为的新服务,但不会更改底层过滤器或服务的含义。这再次增强了推理能力,因为组成组件可以独立存在;我们无需推理它们组合后的交互。

Future、服务和过滤器构成了 Twitter 构建服务器软件的基础。它们共同支持模块化和重用:我们独立定义应用程序和行为,并根据需要将它们组合在一起。虽然它们的应用很普遍,但有两个例子很好地说明了它们的力量。首先,我们以一组过滤器的形式实现了类似于 Dapper10 的 RPC 跟踪系统,而无需更改应用程序代码。其次,我们将备份请求2 实现为一个小的、独立的过滤器。

结论

“不要束缚实现者的手脚。” ——Martin Rinard

函数式编程提倡思考如何从简单的部分构建复杂的行为,使用高阶函数和效应将它们粘合在一起。在 Twitter,我们将这种思路应用于分布式计算,围绕一组核心抽象构建系统,这些核心抽象通过效应表达异步性并且是可组合的。这允许从具有简单语义的组件构建复杂系统,这些语义在组合下得以保留,从而更容易推理整个系统。

这种方法带来了更简单、更模块化的系统。这些系统促进了良好的关注点分离并增强了灵活性,同时允许高效的实现。

因此,函数式编程为管理现代软件固有的复杂性提供了必要的工具——解放了实现者的手脚。

致谢

Jake Donham、Erik Meijer、Mark Compton、Terry Coatta、Steve Jenson、Kevin Oliver、Ruben Oanta 和 Oliver Gould 对本文的早期草稿提供了出色的反馈和指导。此处讨论的抽象的早期版本是与 Nick Kallen 共同设计的;此后,Twitter 和开源社区的许多人致力于改进、扩展和巩固它们。

参考文献

1. Barroso, L. A., Clidaras, J., Hölzle, U. 2013. 数据中心即计算机:仓库规模机器设计导论。计算机体系结构综合讲义 8(3): 1-154.

2. Dean, J., Barroso, L. A. 2013. 大规模的尾部。 通讯 56(2): 74-80.

3. Dijkstra, E. W. 1965. 并发程序控制问题的解决方案。 通讯 8(9): 569.

4. Eriksen, M. 2013. 你的服务器作为一个函数。在第七届编程语言和操作系统研讨会论文集,(11 月):5.

5. Eriksen, M., Kallen, N. 2010. Finagle; http://twitter.github.com/finagle.

6. Eriksen, M., Kallen, N. 2010. Util; http://twitter.github.com/util/.

7. Hughes, J. 1989. 为什么函数式编程很重要。计算机杂志 32(2): 98-107.

8. Netflix. Hystrix; https://github.com/Netflix/Hystrix.

9. Ousterhout, J. 1996. 为什么线程是个坏主意(对于大多数目的而言)。在美国用户组年会技术会议演讲,第 5 卷。

10. Sigelman, B.H., Barroso, L.A., Burrows, M., Stephenson, P., Plakal, M., Beaver, D., Jaspan, S., Shanbhag, C., 2010. Dapper,大规模分布式系统跟踪基础设施。技术报告,谷歌:36.

11. Smolka, G. 1995. Oz 编程模型。在今日计算机科学,编辑 Jan van Leeuwen,计算机科学讲义,第 1000 卷:324-343。柏林:施普林格出版社。

12. Sutter, H. 2005. 免费午餐结束了:软件中转向并发的根本转变。Dr. Dobb's 杂志 30(3): 202-210.

相关文章

可扩展的 SQL
- Michael Rys
大型站点和应用程序如何保持基于 SQL?
https://queue.org.cn/detail.cfm?id=1971597

演进与实践:金融领域的低延迟分布式应用
- Andrew Brook
金融行业对低延迟分布式系统有独特的需求。
https://queue.org.cn/detail.cfm?id=2770868

分布式开发经验教训
- Michael Turnlund
如果可以避免,为什么要重蹈覆辙?
https://queue.org.cn/detail.cfm?id=966801

Marius Eriksen 是 Twitter 系统基础设施组的首席工程师。他致力于分布式系统和服务器软件的各个方面,目前正在从事数据管理和集成系统方面的工作;他还担任 Twitter 架构组的主席。你可以通过 [email protected] 或 Twitter 上的 @marius 联系他。

版权所有 © 2016 归所有者/作者所有。出版权已授权给 。

acmqueue

最初发表于 Queue 第 14 卷,第 4 期——
数字图书馆 中评论本文





更多相关文章

Martin Kleppmann, Alastair R. Beresford, Boerge Svingen - 在线事件处理
对跨异构存储技术的分布式事务的支持要么不存在,要么性能和操作特性很差。相比之下,OLEP 越来越多地用于在这些环境中提供良好的性能和强大的 consistency 保证。在数据系统中,日志通常用作内部实现细节。OLEP 方法有所不同:它使用事件日志而不是事务作为数据管理的主要应用程序编程模型。传统数据库仍然使用,但它们的写入来自日志,而不是直接来自应用程序。OLEP 的使用不仅仅是开发人员的实用主义,而且它还提供了许多优势。


Andrew Leung, Andrew Spyker, Tim Bozarth - Titus:将容器引入 Netflix 云
我们相信我们的方法使 Netflix 能够快速采用容器并从中受益。尽管细节可能特定于 Netflix,但通过与现有基础设施集成并与合适的早期采用者合作来提供低摩擦容器采用的方法,对于任何希望采用容器的组织来说,都可能是一种成功的策略。


Caitie McCaffrey - 分布式系统的验证
Leslie Lamport,以其在分布式系统中的开创性工作而闻名,他曾说过一句名言:“分布式系统是指即使你不知道存在的计算机发生故障也会导致你自己的计算机无法使用的系统。” 鉴于这种黯淡的前景和大量可能的故障,你甚至如何开始验证和确认你构建的分布式系统正在做正确的事情?


Philip Maddox - 测试分布式系统
由于多种原因,分布式系统可能尤其难以编程。它们可能难以设计、难以管理,最重要的是,难以测试。即使在最佳情况下,测试普通系统也可能很困难,无论测试人员多么勤奋,错误仍然可能渗透。现在,将所有标准问题乘以在可能都在不同操作系统上的多个框上以多种语言编写的多个进程,就可能发生真正的灾难。





© 保留所有权利。

© . All rights reserved.