本教程基于rxjava1.x版本进行全面讲解,后续课程将陆续更新,敬请关注…
8.可连接的被观察者前几节提到的了响应用过程中的三个细节:被观察者 观察者 和订阅。 接下来这一节继续理解下订阅的其他知识点。
这里介绍一个叫做connectableobservable的可连接被观察者。一个可连接的observable与普通的observable差不多,除了这一点:可连接的observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个observable之后才开始发射数据。
publish该操作符可以将普通的observable转化成可连接的observable。
可连接的observable (connectable observable)与普通的observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了connect操作符时才会开始。用这种方法,你可以在任何时候让一个observable开始发射数据。
rxjava中connect是connectableobservable接口的一个方法,使用publish操作符可以将一个普通的observable转换为一个connectableobservable。
//创建了一个普通的observable对象
observable<integer> observable =
observable.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(1);
subscriber.onnext(2);
subscriber.onnext(3);
}
});
//将一个被观察者转换成一个可连接的被观察者
connectableobservable<integer> connectableobservable =observable.publish();
//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableobservable.subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call: "+integer);
}
});
与普通的observable对象订阅不同,上面的代码中并没直接调用action1对象的call()方法。
connect可连接的observable (connectable observable)与普通的observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了observable之后再开始发射数据。
//创建了一个普通的observable对象
observable<integer> observable =
observable.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(1);
subscriber.onnext(2);
subscriber.onnext(3);
}
});
//将一个被观察者转换成一个可连接的被观察者
connectableobservable<integer> connectableobservable =observable.publish();
//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableobservable.subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call: "+integer);
}
});
//当调用可连接的被观察者connect()方法后 开始发送所有的数据。
connectableobservable.connect(new action1<subscription>() {
@override
public void call(subscription subscription) {
log.i(tag, "call: "+subscription);
}
});
输出:
it520: call: operatorpublish$publishsubscriber@20dce78
it520: call: 1
it520: call: 2
it520: call: 3
refcountrefcount操作符可以让一个可连接的observable转换为普通的observable。
//创建了一个普通的observable对象
observable<integer> observable =
observable.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(1);
subscriber.onnext(2);
subscriber.onnext(3);
}
});
//将一个被观察者转换成一个可连接的被观察者
connectableobservable<integer> connectableobservable =observable.publish();
//将一个可链接的被观察者转换成一个普通观察者
observable<integer> integerobservable = connectableobservable.refcount();
//为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3...
integerobservable.subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call: "+integer);
}
});
replay保证所有的观察者收到相同的数据序列,即使它们在observable开始发射数据之后才订阅.
先来看一个例子:
observable<integer> observable =
observable.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(1);
subscriber.onnext(2);
subscriber.onnext(3);
}
});
connectableobservable<integer> connectableobservable =observable.publish();
connectableobservable.subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call--1--: "+integer);
}
});
connectableobservable.connect();
connectableobservable.subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call--2--: "+integer);
}
});
输出:
com.m520it.rxjava i/it520: call--1--: 1
com.m520it.rxjava i/it520: call--1--: 2
com.m520it.rxjava i/it520: call--1--: 3
首先我们通过publish()将一个普通的observable转换成connectableobservable。当调用connect()的时候,则connect()上面已经订阅的观察者会收到数据。而connect()后面订阅的观察者则无法接收到数据。 如果我们想让所有的观察者在调用connect()的时候同时接收到数据而跟订阅的顺序无关,则需要通过replay()。
observable<integer> observable =
observable.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(1);
subscriber.onnext(2);
subscriber.onnext(3);
}
});
//这里将publish()改为replay()
connectableobservable<integer> connectableobservable =observable.replay();
connectableobservable.subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call--1--: "+integer);
}
});
connectableobservable.connect();
connectableobservable.subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call--2--: "+integer);
}
});
输出:
com.m520it.rxjava i/it520: call--1--: 1
com.m520it.rxjava i/it520: call--1--: 2
com.m520it.rxjava i/it520: call--1--: 3
com.m520it.rxjava i/it520: call--2--: 1
com.m520it.rxjava i/it520: call--2--: 2
com.m520it.rxjava i/it520: call--2--: 3
9.“冷observable”&“热observable”前面我们提到,订阅的时候(如果观察者有发送数据的),观察者有直接接收数据的,有等过了一段时间才接收数据的。
我们将一订阅观察者就马上能接收数据的观察者称之为“热observable”。
如上面的connectableobservable就算被订阅后,也没能发送数据,只有调用connect()才能让观察者接收到数据。我们称该观察者为“冷observable”
10.错误处理很多操作符可用于对observable发射的onerror通知做出响应或者从错误中恢复
catch操作符拦截原始observable的onerror通知,将它替换为其它的数据项或数据序列,让产生的observable能够正常终止或者根本不终止。
rxjava将catch实现为三个不同的操作符:
onerrorreturn
onerrorreturn方法返回一个镜像原有observable行为的新observable,后者会忽略前者的onerror调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的oncompleted方法。
下面的代码发送1,2,3 并在发送的过程中模拟发送一个异常,只要有异常发送,onerrorreturn()就会被调用 并发送44.代码如下:
observable.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(1);
subscriber.onnext(2);
subscriber.onerror(new nullpointerexception("mock exception !"));
subscriber.onnext(3);
}
}).onerrorreturn(new func1<throwable, integer>() {
@override
public integer call(throwable throwable) {
return 44;
}
}).subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call: "+integer);
}
});
输出:
com.m520it.rxjava i/it520: call: 1
com.m520it.rxjava i/it520: call: 2
com.m520it.rxjava i/it520: call: 44
onerrorresumenext
让observable在遇到错误时开始发射第二个observable的数据序列。
下面的代码在发送的时候,模拟发送一个异常。接着onerrorresumenext就会被调用 并开始发射新的observable对象。
observable.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(1);
subscriber.onnext(2);
subscriber.onerror(new nullpointerexception("mock exception !"));
subscriber.onnext(3);
}
}).onerrorresumenext(new func1<throwable, observable<? extends integer>>() {
@override
public observable<? extends integer> call(throwable throwable) {
observable<integer> innerobservable =
observable.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(4);
subscriber.onnext(5);
}
});
return innerobservable;
}
}).subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call: "+integer);
}
});
输出:
com.m520it.rxjava i/it520: call: 1
com.m520it.rxjava i/it520: call: 2
com.m520it.rxjava i/it520: call: 3
com.m520it.rxjava i/it520: call: 4
com.m520it.rxjava i/it520: call: 5
onexceptionresumenext
让observable在遇到错误时继续发射后面的数据项。
//创建一个错误处理的observable对象
observable<integer> exceptionobserver = observable
.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(55);
subscriber.onnext(66);
}
});
observable
.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(1);
subscriber.onnext(2);
subscriber.onerror(new nullpointerexception("mock exception !"));
subscriber.onnext(3);
}
})
//上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionobserver
.onexceptionresumenext(exceptionobserver)
.subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call: "+integer);
}
});
输出:
com.m520it.rxjava i/it520: call: 1
com.m520it.rxjava i/it520: call: 2
com.m520it.rxjava i/it520: call: 55
com.m520it.rxjava i/it520: call: 66
retry重试机制如果原始observable遇到错误,重新订阅它并期望它能正常终止。
rxjava中的实现为retry和retrywhen。
observable.create(new observable.onsubscribe<integer>() {
@override
public void call(subscriber<? super integer> subscriber) {
subscriber.onnext(1);
subscriber.onnext(2);
subscriber.onerror(new nullpointerexception("mock exception !"));
subscriber.onnext(3);
}
})
.retry(3)//重复3次订阅
.subscribe(new action1<integer>() {
@override
public void call(integer integer) {
log.i(tag, "call: "+integer);
}
});
类似的函数还有:
javadoc: retry()) 无论收到多少次onerror通知,都会继续订阅并发射原始observable。
javadoc: retry(long)) retry会最多重新订阅指定的次数,如果次数超了,不会尝试再次订阅
javadoc: retry(func2))
retrywhen
以上就是深入浅出rxjava_02[订阅深入和异常处理]的代码示例的内容。
