|
1.3 注意事项三
如果一个写操作在PipedOutputStream上执行,同时最近从对应PipedInputStream读取的线程已经不再活动(通过 Thread.isAlive()检测),则写操作将抛出一个IOException异常。假定有两个线程w和r,w向 PipedOutputStream写入数据,而r则从对应的PipedInputStream读取。下面一系列的事件将导致w线程在试图写入 PipedOutputStream时遇到IOException异常:
1. 写操作线程w已经创建,但r线程还不存在。
2. w向PipedOutputStream写入数据。
3. 读线程r被创建,并从PipedInputStream读取数据。
4. r线程结束。
5. w企图向PipedOutputStream写入数据,发现r已经结束,抛出IOException异常。
实际上,这个问题不象第二个问题那样棘手。和多个读线程/单个写线程的情况相比,也许在应用中有一个读线程(作为响应请求的服务器)和多个写线程(发出请求)的情况更为常见。
1.4 解决问题
要防止管道流前两个局限所带来的问题,方法之一是用一个ByteArrayOutputStream作为代理或替代PipedOutputStream。 Listing 4显示了一个LoopedStreams类,它用一个ByteArrayOutputStream提供和Java管道流类似的功能,但不会出现死锁和 IOException异常。这个类的内部仍旧使用管道流,但隔离了本文介绍的前两个问题。我们先来看看这个类的公用方法(参见图3)。构造函数很简单,它连接管道流,然后调用startByteArrayReaderThread()方法(稍后再讨论该方法)。getOutputStream()方法返回一个OutputStream(具体地说,是一个ByteArrayOutputStream)用以替代PipedOutputStream。写入该 OutputStream的数据最终将在getInputStream()方法返回的流中作为输入出现。和使用PipedOutputStream的情形不同,向ByteArrayOutputStream写入数据的线程的激活、写数据、结束不会带来负面效果。
图三:ByteArrayOutputStream原理
【Listing 4:防止管道流应用中出现的常见问题】
import java.io.*;
public class LoopedStreams {
private PipedOutputStream pipedOS =
new PipedOutputStream();
private boolean keepRunning = true;
private ByteArrayOutputStream byteArrayOS =
new ByteArrayOutputStream() {
public void close() {
keepRunning = false;
try {
super.close();
pipedOS.close();
}
catch(IOException e) {
// 记录错误或其他处理
// 为简单计,此处我们直接结束
System.exit(1);
}
}
};
private PipedInputStream pipedIS = new PipedInputStream() {
public void close() {
keepRunning = false;
try {
super.close();
}
catch(IOException e) {
// 记录错误或其他处理
// 为简单计,此处我们直接结束
System.exit(1);
}
}
};
public LoopedStreams() throws IOException {
pipedOS.connect(pipedIS);
startByteArrayReaderThread();
} // LoopedStreams()
public InputStream getInputStream() {
return pipedIS;
} // getInputStream()
public OutputStream getOutputStream() {
return byteArrayOS;
} // getOutputStream()
private void startByteArrayReaderThread() {
new Thread(new Runnable() {
public void run() {
while(keepRunning) {
// 检查流里面的字节数
if(byteArrayOS.size() > 0) {
byte[] buffer = null;
synchronized(byteArrayOS) {
buffer = byteArrayOS.toByteArray();
byteArrayOS.reset(); // 清除缓冲区
}
try {
// 把提取到的数据发送给PipedOutputStream
pipedOS.write(buffer, 0, buffer.length);
}
catch(IOException e) {
// 记录错误或其他处理
// 为简单计,此处我们直接结束
System.exit(1);
}
}
else // 没有数据可用,线程进入睡眠状态
try {
// 每隔1秒查看ByteArrayOutputStream检查新数据
Thread.sleep(1000);
}
catch(InterruptedException e) {}
}
}
}).start();
} // startByteArrayReaderThread()
} // LoopedStreams
startByteArrayReaderThread()方法是整个类真正的关键所在。这个方法的目标很简单,就是创建一个定期地检查 ByteArrayOutputStream缓冲区的线程。缓冲区中找到的所有数据都被提取到一个byte数组,然后写入到 PipedOutputStream。由于PipedOutputStream对应的PipedInputStream由getInputStream ()返回,从该输入流读取数据的线程都将读取到原先发送给ByteArrayOutputStream的数据。前面提到,LoopedStreams类解决了管道流存在的前二个问题,我们来看看这是如何实现的。
ByteArrayOutputStream具有根据需要扩展其内部缓冲区的能力。由于存在“完全缓冲”,线程向getOutputStream()返回的流写入数据时不会被阻塞。因而,第一个问题不会再给我们带来麻烦。另外还要顺便说一句,ByteArrayOutputStream的缓冲区永远不会缩减。例如,假设在能够提取数据之前,有一块500 K的数据被写入到流,缓冲区将永远保持至少500 K的容量。如果这个类有一个方法能够在数据被提取之后修正缓冲区的大小,它就会更完善。
第二个问题得以解决的原因在于,实际上任何时候只有一个线程向PipedOutputStream写入数据,这个线程就是由 startByteArrayReaderThread()创建的线程。由于这个线程完全由LoopedStreams类控制,我们不必担心它会产生 IOException异常。
LoopedStreams类还有一些细节值得提及。首先,我们可以看到byteArrayOS和pipedIS实际上分别是 ByteArrayOutputStream和PipedInputStream的派生类的实例,也即在它们的close()方法中加入了特殊的行为。如果一个LoopedStreams对象的用户关闭了输入或输出流,在startByteArrayReaderThread()中创建的线程必须关闭。覆盖后的close()方法把keepRunning标记设置成false以关闭线程。另外,请注意startByteArrayReaderThread ()中的同步块。要确保在toByteArray()调用和reset()调用之间ByteArrayOutputStream缓冲区不被写入流的线程修改,这是必不可少的。由于ByteArrayOutputStream的write()方法的所有版本都在该流上同步,我们保证了 ByteArrayOutputStream的内部缓冲区不被意外地修改。
注意LoopedStreams类并不涉及管道流的第三个问题。该类的getInputStream()方法返回PipedInputStream。如果一个线程从该流读取,一段时间后终止,下次数据从ByteArrayOutputStream缓冲区传输到PipedOutputStream时就会出现 IOException异常。
|