在当今IT行业最热门的流行语中,“大数据”名列前茅,但“大”在某种程度上是一个用词不当:大数据不仅仅是关于容量,还关于速度和多样性:4
• 数据的容量范围从存储在传统RDMS(关系数据库管理系统)的封闭世界中的少量项目,到分布在大型机器集群或整个万维网上的大量项目。
• 数据的速度范围从消费者同步地从源拉取数据(负速度)到源异步地以不同的速度将其数据推送到其客户端(正速度),速度范围从毫秒级延迟的基于推送的股票报价流到应用程序每月从中央存储库拉取的参考数据。
• 数据的多样性范围从具有外键/主键关系的SQL风格的关系元组到具有键值指针的coSQL风格的对象或图,甚至包括视频和音乐等二进制数据。
如果我们在容量、速度和多样性这三个维度上绘制大数据的设计空间图,那么我们就会得到如图1所示的大数据立方体。立方体的八个角中的每一个都对应于一种(众所周知的)数据库技术。例如,传统RDMS位于后上角,坐标为(小,拉取,外键/主键),这意味着数据集很小;它假设一个完全由数据库控制的封闭世界,客户端在发出查询后同步地从数据库中拉取行,并且数据模型基于Codd的关系模型。基于Hadoop的系统(如HBase)位于前左角,坐标为(大,拉取,外键/主键)。数据模型仍然是矩形的,包含行、列和主键,结果仍然由客户端从存储中拉取,但数据存储在使用某种分区方案的机器集群上。
当从顶面移动到底面时,数据模型从带有主键和外键的行变为对象和指针。在左下角,坐标为(小,拉取,键/值)的是传统的O/R(对象/关系)映射解决方案,如LINQ to SQL和Hibernate,它们在关系数据库之上放置了一个OO(面向对象)外观。在立方体的前面是LINQ to Objects,坐标为(大,拉取,键/值)。它使用以下接口虚拟化实际数据源:IEnumerable<T>接口,该接口允许在运行时生成无限的项目集合。在右侧,立方体从批处理变为流式或实时数据,其中数据源异步地将数据推送到其客户端。具有行和列数据模型的流式数据库系统,如Percolator、StreamBase和StreamInsight,占据了右上轴。
最后,在右下角,坐标为(大,推送,键/值)的是Rx(响应式扩展),或者有时被称为LINQ to Events,这是本文的主题。
Rx的目标是协调和编排基于事件和异步的计算,例如低延迟传感器流、Twitter和社交媒体状态更新、短信消息、GPS坐标、鼠标移动和其他UI事件、Web sockets以及对Web服务的高延迟调用,使用标准的面向对象编程语言,如Java、C#或Visual Basic。
有很多方法可以推导出Rx,其中一些方法涉及范畴论并诉诸数学对偶性,但本文展示了每个开发人员如何通过将标准JDK(Java开发工具包)Future<T>接口与GWT(Google Web 工具包)AsyncCallBack<T>接口交叉,来创建一对接口IObservable<T>和IObserver<T>,它们使用类型为T的值来建模异步数据流。这对应于众所周知的主题/观察者设计模式。然后,本文展示了如何通过将UI事件和Web服务公开为异步数据流,并使用流畅的API组合它们来编写一个简单的Ajax风格的应用程序。
GWT开发者文档包含一个稍微带有歉意的章节,名为“习惯异步调用”,3 其中解释说,虽然异步调用乍一看对开发者来说显得残酷和不自然,但它们是防止UI锁死并允许客户端拥有多个并发的未完成服务器请求的必要之恶。
在GWT中,同步方法的异步对应物,例如Person[] getPeople(...),它进行同步跨网络调用并阻塞,直到它返回一个Person数组,将返回void并接受一个额外的回调参数void getPeople(..., AsyncCallback<Person[]> callback)。回调接口有两个方法void onFailure(Throwable error),当异步调用抛出异常时调用;以及void onSuccess(T result),当异步调用成功返回结果值时调用。给定一个异步函数,例如getPeople,一个调用通常会传递一个匿名接口实现,该实现分别处理成功和失败回调,如下所示
service.getPeople(startRow, maxRows, new AsyncCallback<Person[]>() {
void onFailure(Throwable error) { ...处理失败的代码... }
void onSuccess(Person[] result) { ...处理成功的代码... }
});
虽然GWT对异步的承诺值得称赞,但GWT错过了一个巨大的机会,即没有进一步改进和统一整个框架的异步编程模型。例如,用于进行直接HTTP调用的RequestBuilder类使用RequestCallback接口,该接口有两个方法onError和onResponseReceived,它们与先前讨论的AsyncCallback接口的方法几乎是同构的。
这两个AsyncCallback和RequestCallback接口都假设异步调用一次性交付其结果。然而,在示例中,以流式方式增量地返回Person数组的元素是完全合理的,特别是当结果集很大甚至无限时。您可以通过允许onSuccess方法被多次调用(每次针对结果数组的额外块)以及添加一个方法void onCompleted()来异步流式传输回结果,该方法在所有块都已成功交付时调用。让我们将这个派生接口称为Observer<T>,以表明它可以观察多个T值,然后再完成,并反映标准的Java非泛型observer接口。
使用Observer<T>Observer<T>而不是AsyncCallback<T>之后,异步计算及其客户端之间可能的交互序列是:(a)在n ≥ 0个值后成功终止;(b)在i
个值后不成功终止;或(c)永不完成的无限值流,如图2所示。void将回调直接作为参数传递给异步方法的另一个缺点是,一旦调用发出,就很难撤销回调的调用,只留下一个getPeopleClosablevoid在你手中。例如,假设函数getPeoplegetPeople
每分钟流式传输回已注册营销推广的人员姓名,但您不希望接收超过前一千个姓名。如果您在发出调用并接收回voidClosable
时没有预料到这种模式,您如何稍后实现此目的?即使异步调用最多交付一个值,您也可以选择稍后忽略或取消调用,方法是在一定时间间隔内未收到结果后超时。同样,如果您在将回调传递到Future<T>getPeople时预料到了这一点,这是可能的,但您以后不能改变主意。这些小问题是异步计算和流未被视为可以从方法返回、存储在变量等中的一等公民值的症状。下一节将展示如何通过引入一个额外的容器接口来创建异步数据流,该接口表示异步计算本身,您可以在该接口上注册回调,以便在计算结果时收到通知。现在,异步方法可以返回一个值,该值表示待处理的异步计算或流,而不仅仅是
void
{
,您可以像对待任何其他常规值一样对待它。特别是,它允许您在发出调用后改变主意,并随意过滤、操作或转换计算。
Java SDK已经以
Future<T>
接口的形式提供了(单次)异步计算作为一等公民值,该接口的主要方法
T get()
}
检索计算结果,并在底层计算尚未终止时阻塞Future<T>interface Future<T>boolean cancel(boolean mayInterruptIfRunning); T get(); T get(long timeout, TimeUnit unit); boolean isCancelled(); boolean isDone();请注意,原则上,
Future<T>可以用于生成多个值。在这种情况下,每次调用get()Future<T>都会阻塞并在下一个值可用时返回该值,只要时预料到了这一点,这是可能的,但您以后不能改变主意。isDone()Observer<T>不为真。这类似于而不是iterable接口。在本文的其余部分,异步计算被假定为返回多个结果的流。和虽然future确实提供了异步计算的一等公民表示,但get可以用于生成多个值。在这种情况下,每次调用方法是阻塞的。幸运的是,您可以通过为Future<T>get
void
{
,您可以像对待任何其他常规值一样对待它。特别是,它允许您在发出调用后改变主意,并随意过滤、操作或转换计算。
方法提供类型为
}
Observer<T>的回调(引入的接口用于扩展GWT的AsyncCallback<T>可以用于生成多个值。在这种情况下,每次调用接口)来使JDK接口非阻塞。请注意,阻塞的可以用于生成多个值。在这种情况下,每次调用isCancelled可以用于生成多个值。在这种情况下,每次调用和isDone方法不再需要,因为该信息也通过回调传输。为简单起见,忽略的回调(引入的接口用于扩展GWT的get的回调(引入的接口用于扩展GWT的的第二个重载,因为您可以稍后轻松地重建它。应用这些更改后,voidObserver<T>Future<T>接口的非阻塞版本如下所示的回调(引入的接口用于扩展GWT的void get(Observer<T> callback);您尚未完成重构。与其通过cancel方法取消整个future,不如只取消每个观察者的特定未完成的getFuture<T>:
调用更有意义。这可以通过让
get返回一个表示可取消资源的接口来实现。此外,由于您已经调用了get,因此无需指定mayInterruptIfRunningIObservable<T>参数,因为计算已经在此时运行,您可以决定是否调用
close()Future<T>和来编码布尔值。最后,您可以通过返回ClosableIObservable<T>和IObserver<T>来使,因此无需指定cancel方法非阻塞。您可以尝试使cancelIObservable<T>返回一个Observer<T>Future<boolean>T.
,但那样您将陷入异步的无休止的递归兔子洞。事实证明,
{
java.io.Closable
}
接口完全符合要求,从而导致了
{
Future<T>
的以下变体
interface Future<T> { Closable get(Observer<T> callback); }
}
请注意,调用订阅返回的
{
Closable
}
接口的close()方法可能实际取消,也可能不实际取消底层计算,因为单个observable可能具有多个观察者(例如,处置对鼠标移动的订阅,这不应阻止您的鼠标工作)。但是,由于该特定观察者未收到任何进一步的值通知,因此从其角度来看,计算已终止。如果需要,实现IObservable<T>的类可以以其他方式取消计算。
在.NET中,而不是IObserver<T>和IObservable<T>Observer<T>,
,有标准的IObserver<T>IObservable<T>和IObserver<T>接口;而不是Closable,它具有IDisposable。类型为IObservable<T>
(或IObserver<T>Observable<T>
,取决于您首选的编程语言)的值表示异步数据流或事件流,其类型为。类型为IDisposable,它具有T的值interface IObservable<T>方法非阻塞。您可以尝试使 IDisposable Subscribe(IObserver<T> observer);的值interface IObserver<T>。类型为和,它具有void OnNext(T value);
void OnError(Exception error);IObservable<T>void OnCompleted();
interface IDisposable
仔细检查生成的接口三位一体,揭示了经典的主题/观察者接口2的泛型变体,用于发布/订阅模式,这是面向对象程序员数十年来用于处理基于事件的系统的工具箱中的必备工具。. JDK 1.0 已经通过(非泛型)Observable类和Observer接口支持此模式。在.NET中,Rx库支持该模式。Rx库对IObservable<T>
和
{
IObserver<T>
};
接口做出了一些额外的行为假设,这些假设未在其(语法)类型签名中表达:* 对IObserver<T>接口的实例的调用序列应遵循正则表达式OnNext(t)* (OnCompleted() | OnError(e))?的值。换句话说,在零个或多个
OnNext
调用之后,
);
OnCompleted或OnError
之一将被可选地调用。* 可以假定IObserver<T>
的实现是同步的;从概念上讲,它们在锁下运行,类似于常规的.NET事件处理程序或reactor模式。9
* 与观察者关联的所有资源都应在调用
Dispose()
时清理干净。特别是,观察者的
Subscribe
调用返回的订阅将由observable在流完成时立即处置。在实践中,这是通过在
OnCompleted
和
OnError
方法的实现中闭包Dispose()返回的IDisposable来实现的。
* 当订阅在外部被处置时,流应尽最大努力尝试停止该订阅的所有未完成工作。任何已经在进行中的工作可能仍然会完成,因为中止正在进行中的工作并非总是安全的,但不应向未订阅的观察者发出信号。此合同确保易于推理和证明运算符和用户代码的正确性。流畅地创建、组合和消费异步数据流要在Java中创建Observable<T>的实例,您将使用匿名内部类并定义一个抽象基类ObservableBase<T>,它负责强制执行Rx合同。通过提供subscribe
方法的实现来专门化它:
Observable<T> observable = new ObservableBase<T>()
Closable subscribe(Observer<T> observer) { ... }
由于.NET缺少匿名接口,因此它改为使用工厂方法
Observable.CreateIObservable<T>,该方法从ObservableFunc<IObservable<T>, IDisposable>T类型的匿名委托创建一个新的observable实例,该委托实现IEnumerable<T>subscribe函数:IObservable<T> observable = Observable.Create<T>(
IObserver<T> observer => { ... }与Java解决方案一样,Create方法返回的具体类型强制执行所需的Rx行为。一旦您拥有一个表示异步数据流的单一接口,您就可以公开现有的基于事件和回调的抽象(例如GUI控件)作为异步数据流的源。例如,您可以使用以下复杂的代码组合将Java中TextField控件的文本更改事件包装为异步数据流:IEnumerable<T>Observable<string> TextChanges(JTextField tf){
return new ObservableBase<string>(){ Closable subscribe(Observer<string> o){ DocumentListener l = new DocumentListener(){void changedUpdate(DocumentEvent e {.
o.OnNext(tf.getText());};->Observer<T>=> tf.addDocumentListener (l);Observerreturn new Closable() {
void close(){tf.removeDocumentListener(l);}}}}}
每次
changedUpdate
事件触发时,类型为
Observable<string>的相应异步数据流会向其订阅者推送一个新的字符串,表示文本字段的当前内容。同样,您可以通过将带有setter的对象包装为观察者来公开它们作为异步数据流的接收器。例如,通过在每次调用onNext时将listData属性设置为给定数组,将IObservable<T>javax.swing.JList<T>IEnumerable<T>列表公开到
Observable<string>Observer<T[]>中:Observer<T[]>Observer<T[]> ObserveChanges(javax.swing.JList<T> list){TextFieldreturn new ObserverBase<T[]>() {void onNext(T[] values){ list.setListData(values); }}}因此,您可以将UI控件、鼠标、文本字段或按钮视为流式数据库,每次底层控件触发事件时,它都会生成无限的值集合。反之亦然,具有可设置属性的对象(如列表和标签)可以用作此类异步数据流的观察者。由IObservable<T>
Observable<string>Observer<T[]>接口(或Java中的IObservable<T>Observable<T>IEnumerable<T>)表示的异步数据流的行为类似于常规的IEnumerable<T>T
类型的值集合,只是它们是基于推送或流式的,而不是通常的基于拉取的集合(如实现IEnumerable<T>接口的数组和列表,或Java中的iterable<T>)。这意味着您可以使用标准查询运算符的流畅API将异步数据流连接在一起,以高度可组合和声明式的方式创建复杂的事件处理系统。IObservable<T>例如,
WhereIEnumerable<T>运算符接受
Func<S,bool>
类型的谓词,并过滤掉谓词不成立的所有值,用于类型为
IObservable<S>的输入observable集合,这与在基于拉取的IEnumerable<T>集合上工作的表亲完全相同。图3说明了这一点。使用此运算符,您可以清理公开为IObservable<string>IEnumerable<T>流的文本字段输入,并使用以下查询表达式删除所有空字符串和null字符串:input.Where(s=>!string.IsNullOrEmpty(s))在具有lambda表达式和防御方法的Java 8中,代码看起来与此处显示的C#代码非常相似,只是lambda使用->
,并且变量名称的大小写不同。然而,即使没有这些即将推出的Java语言功能,您也可以在Java中近似一个流畅的接口(如FlumeJava1 或Reactive4Java8),用于使用标准查询运算符操作事件流。例如,通过将运算符作为
IObservable<T>
的方法,您可以将过滤器示例编写为:
input.Where<T>(new Func<string,T>{
Invoke(string s){IEnumerable<T> return !(s == null || s.length() == 0); }}在具有lambda表达式和防御方法的Java 8中,代码看起来与此处显示的C#代码非常相似,只是lambda使用为了节省大家过多的输入,接下来的几个示例仅以C#提供,即使没有任何内容是C#或.NET特定的。
SelectIEnumerable<T>运算符接受一个转换函数Func<S,T>,以转换类型为
IObservable<S>的输入数据流中的每个值。这将生成一个新的类型为IObservable<T>在具有lambda表达式和防御方法的Java 8中,代码看起来与此处显示的C#代码非常相似,只是lambda使用的异步结果流,再次与基于
IEnumerable<T>的版本完全相同,如图4所示。SelectMany运算符通常用于将两个数据流(基于拉取或基于推送)连接在一起。SelectMany
接受类型为
IObservable<S>的输入observable集合,这与在基于拉取的的源流和类型为
,并且变量名称的大小写不同。然而,即使没有这些即将推出的Java语言功能,您也可以在Java中近似一个流畅的接口(如FlumeJava1 或Reactive4Java8),用于使用标准查询运算符操作事件流。例如,通过将运算符作为
Func<S, IObservable<T>>
的膨胀函数,并从原始源流中的每个元素生成一个新的嵌套流,其中包含零个、一个或多个元素。然后,它将所有中间异步数据流合并为一个类型为
IObservable<T>
的方法,您可以将过滤器示例编写为:
input.Where<T>(new Func<string,T>{
IObservable<T>IObservable<T>的单一输出流,如图5所示。
SelectMany
运算符清楚地显示了
的异步性质和IObservable<T>和IObserver<T>IEnumerable<T>的同步性质之间的差异。如图5所示,源流上的值异步出现,即使您仍在从先前的膨胀函数生成值。在IEnumerable<T>
的情况下,下一个值仅在生成膨胀函数的所有值之后才从源流中拉取(即,输出流是所有后续膨胀函数生成的流的串联,而不是非确定性交错),如图6所示。IEnumerable<T>和有时,使用更顺序的模式生成异步流的输出流会很方便。如图7所示,SwitchIObservable<T>和IObserver<T>运算符接受嵌套的异步数据流
请注意,我们的异步数据流模型对时间没有任何特殊假设。 这使得该方法不同于函数式编程中典型的反应式编程方法,例如 Fran 或 FlapJax,它们强调(连续的)随时间变化的值(称为行为),以及基于 SQL 的复杂事件处理系统,例如 StreamBase 和 StreamInsight,它们也在其语义模型中强调时间。 相反,时钟和定时器被视为类型为IObservable<DateTimeOffset> 的常规异步数据流。我们通过另一个接口来参数化并发和逻辑时钟IScheduler(此处略有简化),它表示一个执行上下文,该上下文具有本地时间概念,可以在将来安排工作
interface IScheduler
{
DateTimeOffset Now { get; }
IDisposable Schedule(Action work, TimeSpan dueTime)
}
Java 程序员会立即看到与executor接口的对应关系,该接口在 Java SDK 中扮演着相同的作用,即抽象化并发的精确引入。
结论Web 和移动应用程序越来越多地由异步和实时流服务以及推送通知组成,推送通知是一种特殊形式的大数据,其中数据具有正速度。 本文展示了如何将异步数据流公开为类型为IObservable<T>(与类型为 pull-based 的集合形成对比IEnumerable<T>)的集合,以及如何使用 Rx 库提供的流畅 API 操作符来查询异步数据流。 这个流行的库可用于 .NET 和 JavaScript(包括用于流行的框架(如 JQuery 和 Node)的绑定),并且也随 Windows Phone 的 ROM 一起发布。 F# 的一流事件基于 Rx,社区还为其他语言(如 Dart7 或 Haskell6)创建了替代实现。
要了解有关 LINQ 的一般信息和 Rx 的特定信息,请阅读简短的教科书 <i>Programming Reactive Extensions and LINQ</i>5
1. Chambers, C., Raniwala, A., Perry, F., Adams, S., Henry, R. R., Bradshaw, R., Weizenbaum, N. 2010. FlumeJava:简单、高效、数据并行的管道。《 SIGPLAN 程序设计与实现会议论文集》; https://dl.acm.org/citation.cfm?id=1806638。
2. Eugster, P. Th., Felber, P. A., Guerraiou, R., Kermarrec, A-M. 2003. 发布/订阅的多种面孔。《 计算调查》35(2):114-131; https://dl.acm.org/citation.cfm?id=857076.857078。
3. Google Web Toolkit. 2007. 习惯异步调用; http://www.gwtapps.com/doc/html/com.google.gwt.doc.DeveloperGuide.RemoteProcedureCalls.GettingUsedToAsyncCalls.html。
4. Laney, D. 2001. 3D 数据管理:控制数据量、速度和多样性。应用程序交付策略; http://blogs.gartner.com/doug-laney/files/2012/01/ad949-3D-Data-Management-Controlling-Data-Volume-Velocity-and-Variety.pdf。
5. Liberty, J., Betts, P. 2011. <i>Programming Reactive Extensions and LINQ</i>。纽约:Apress; http://www.apress.com/9781430237471。
6. Reactive-bacon; https://github.com/raimohanska/reactive-bacon。
7. Reactive-Dart; https://github.com/prujohn/Reactive-Dart。
8. Reactive4java; https://code.google.com/p/reactive4java/。
9. Wikipedia. Reactor pattern; https://en.wikipedia.org/wiki/Reactor_pattern。
喜欢它,讨厌它? 让我们知道
<b>ERIK MEIJER</b> (<a href="/cdn-cgi/l/email-protection#5f3a323a36353a2d1f32363c2d302c30392b713c3032" target="_blank"><span class="__cf_email__" data-cfemail="[email protected]">[email protected]</span></a>) 在过去 15 年中一直致力于“云 democratizing”。 他可能因其在 Haskell 语言方面的工作以及对 LINQ 和 Rx 框架的贡献而闻名。
© 2012 1542-7730/12/0300 $10.00
<em>最初发表于 Queue vol. 10, no. 3</em>—
在<a href="http://portal.acm.org/citation.cfm?id=2169076"> 数字图书馆</a>中评论本文
<span>Shylaja Nukala, Vivek Rau</span> - <a href="detail.cfm?id=3283589"><b>为什么 SRE 文档很重要</b></a>
SRE(站点可靠性工程)是一种工作职能、一种思维模式以及一组工程方法,用于使 Web 产品和服务可靠地运行。 SRE 在软件开发和系统工程的交叉点运作,以解决运营问题并设计解决方案,从而可扩展、可靠且高效地设计、构建和运行大规模分布式系统。 成熟的 SRE 团队可能拥有与许多 SRE 职能相关的明确定义的文档体系。
<span>Taylor Savage</span> - <a href="detail.cfm?id=2844732"><b>Web 组件化</b></a>
在当今的软件工程中,没有哪项任务比 Web 开发更艰巨。 Web 应用程序的典型规范可能如下:该应用程序必须跨各种浏览器工作。 它必须以 60 fps 的速度运行动画。 它必须立即响应触摸。 它必须符合一组特定的设计原则和规范。 它必须在几乎所有可以想象的屏幕尺寸上工作,从电视和 30 英寸显示器到手机和手表表面。 它必须经过精心设计,并且在长期内可维护。
<span>Arie van Deursen</span> - <a href="detail.cfm?id=2793039"><b>超越页面对象:使用状态对象测试 Web 应用程序</b></a>
Web 应用程序的端到端测试通常涉及通过 Selenium WebDriver 等框架与 Web 页面进行棘手的交互。 隐藏此类 Web 页面复杂性的推荐方法是使用页面对象,但首先需要回答一些问题:在测试 Web 应用程序时,应该创建哪些页面对象? 应在页面对象中包含哪些操作? 给定页面对象,应指定哪些测试场景?
<span>Rich Harris</span> - <a href="detail.cfm?id=2790378"><b>消除准入壁垒</b></a>
一场战争正在 Web 开发领域展开。 一方是工具制造者和工具用户的先锋,他们以摧毁糟糕的旧观念(在这种环境中,“旧”意味着任何在一个多月前在 Hacker News 上首次亮相的东西)以及关于转译器等的激烈辩论为乐。