
实现后台长时间任务的监控
后台长时间任务的监控,例如,处理进度的监控,可以通过客户端轮询拉或者服务器推技术来实现。这里主要讨论服务器推技术的实现。
CometH2
基于HTTP 长连接、无须在浏览器端安装插件的“服务器推”技术为“Comet”。Comet主要有两种实现方式。
基于 AJAX 的长轮询(long-polling)方式H3

使用 AJAX 实现“服务器推”与传统的 AJAX 应用不同之处在于
- 服务器端会阻塞请求直到有数据传递或超时才返回。
- 客户端 JavaScript 响应处理函数会在处理完服务器返回的信息后,再次发出请求,重新建立连接。
- 当客户端处理接收的数据、重新建立连接时,服务器端可能有新的数据到达;这些信息会被服务器端保存直到客户端重新建立连接,客户端会一次把当前服务器端所有的信息取回。
在这种长轮询方式下,客户端是在 XMLHttpRequest 的 readystate 为 4(即数据传输结束)时调用回调函数,进行信息处理。当 readystate 为 4 时,数据传输结束,连接已经关闭。Mozilla Firefox 提供了对 Streaming AJAX 的支持, 即 readystate 为 3 时(数据仍在传输中),客户端可以读取数据,从而无须关闭连接,就能读取处理服务器端返回的信息。
bash
var xhr = new XMLHttpRequest();xhr.previous_text = '';//xhr.onload = function() { log_message("[XHR] Done. responseText: <i>" + xhr.responseText + "</i>"); };xhr.onerror = function() { log_message("[XHR] Fatal Error."); };xhr.onreadystatechange = function(){try{if (xhr.readyState > 2){var new_response = xhr.responseText.substring(xhr.previous_text.length);var result = JSON.parse( new_response );log_message(result.message);//update the progressbardocument.getElementById('progressor').style.width = result.progress + "%";xhr.previous_text = xhr.responseText;}}catch (e){//log_message("<b>[XHR] Exception: " + e + "</b>");}};xhr.open("GET", "ajax_stream.php", true);xhr.send("Making request...");
基于 Iframe 及 htmlfile 的流(streaming)方式H3

iframe 是很早就存在的一种HTML标记,通过在HTML页面里嵌入一个隐蔵帧,然后将这个隐蔵帧的SRC属性设为对一个长连接的请求,服务器端就能源源不断地往客户端输入数据。iframe 服务器端并不返回直接显示在页面的数据,而是返回对客户端 Javascript 函数的调用,如<script type="text/javascript">js_func(“data from server ”)</script>。服务器端将返回的数据作为客户端JavaScript函数的参数传递;客户端浏览器的Javascript引擎在收到服务器返回的JavaScript调用时就会去执行代码。
Server Send EventH2
HTML5提供了很多新的特性,其中包括的Server Send Event和Websockets能够更方便的实现服务器端推。Websockets的接口实现的是客户端和服务器端之间的双向通信,对应的开销也会更大。 对于我们这里的应用场景来说,只需要单向的服务器端退即可,所以使用SSE来实现更为合适。
客户端的实现H3
bash
if (!!window.EventSource){var source = new EventSource('task.php');source.addEventListener('message', function(e){console.log(e.data);//Do whatever with e.data}, false);}
客户端的实现很简单,只需要新建一个EventSource,参数为请求的地址,然后添加上listener即可。一旦添加完message listener之后,客户端就会发送请求,在这期间,如果服务器端长时间没有响应,那么,客户端会再次发送请求,直到调用source.close()方法。 所以客户端正常情况下,应该有个服务器端处理完,关闭source的逻辑。
服务器端H3
服务器端只需要在任务结束直接,不关闭response,然后然response中写符合特定格式的数据即可
bash
id:xxxdata:xxx
Spring有对SSE的封装,下面以Spring为例,完整地演示后台任务进度的反馈
下面是Controller的代码,对SSE的调用需要在异步任务中进行,但是SSE并不属于应用层的逻辑,所以通过回调的方式传递给应用层。任务完成后需要调用complete方法,对应的客户端对complete之前返回的消息做特殊的响应,即关闭EventSource
bash
SseEmitter sseEmitter = new SseEmitter();fooService.batch((String id, String result, String message, boolean exception) -> {String data = String.format("%s,%s,%s" , result, message, exception);SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().id(id).data(data);sseEmitter.send(eventBuilder);if (BATCH_COMPLETE.equals(result)){sseEmitter.complete();}}, error -> {try {sseEmitter.send(BATCH_COMPLETE);sseEmitter.complete();} catch (IOException e) {sseEmitter.completeWithError(e);}});return sseEmitter;
应用层方法中,应该对每条批处理数据进行异步。异步使用Spring的@Async注解即可,但是这里需要注意的是,如果通过this调用自己类方法AOP代理是会失效的,因此需要用另外一个Bean来封装异步方法
bash
public void batch(FooController.SSEBatchConsumer replyStatus, Consumer<String> replyError) {AtomicInteger count = new AtomicInteger(0);List<Foo> foos = this.fooRepository.findAll();if (foos == null || foos.size() == 0) {replyError.accept(null);return;}for (Foo foo : foos) {this.asyncService.doFoo(foo, foo.size(), count, replyStatus);}}public class AsyncService {@Asyncpublic void asyncService(Foo fpp, int total, AtomicInteger count, RecruitmentsIndexController.SSEBatchConsumer replyStatus)}
评论
新的评论
上一篇
Java脚本调用
JSR223 Java中调用其他脚本语言可以通过JSR223来实现。JSR223规范定义了脚本调用的抽象,只要拥有对应脚本的JSR223的实现,即可实现Java对对应脚本的调用。 例如,下面是对JS脚本的调用 Ruby的调用 JRuby 提供了Ruby脚本的的JSR223的实现…
下一篇
Nutch爬虫开发
Nutch爬虫开发 介绍 Nutch是一个用Java实现的搜索引擎,它包括全文搜索和网络爬虫。它支持针对不同的业务场景,使用本地运行模式或者基于Hadoop的分布式运行模式。 Nutch目前主要由1.x和2.x两个分支,主要的不同在于数据存储的实现。这里的存储指的是,Nutch…
