Speeding up FileIO - Double Buffered File Copy?

We are trying to speed up file copy from disk to tape, and I need a little more speed. I have tried playing with the size of the buffer, but that isn't changing much (makeing it slower if anything).

I'm trying to make a double buffered file copy and I can't figure out how to do it. I figured this would be a good place to get speed. Right now, my write is very simple:

byte buffer =new buffer[8 * 1024 * 1024];

FileInputStream in =new FileInputStream(srcFile);

while(true){

int amountRead = in.read(buffer);

if (amountRead == -1){break;}

write(buffer, 0, length);

}

So what i need to do it be able to read and write at the same time. So I was thinking that I could either make the write method a sperate thread, or some how make threaded buffers that read while the other is being writen. Has anyone tackled this problem before?

If this isn't the right way to speed up File IO, can you let me know other ideas? Thanks in advance!

Andrew

[1393 byte] By [serffa] at [2007-9-19]
# 1
1. Front your FileInputStream with a BufferedInputStream:BufferedInputStream bis = new BufferedInputStream(in);Do your input using bis. The results should be profoundly faster than those you have seen.2. Do the same thing for your output stream.
bschauwea at 2007-7-8 > top of java,Core,Core APIs...
# 2

Ok, I tried your suggestion and I didn't see much change. If i used the bufferedInputStream, I see no difference. If I use the Buffered output stream, It is slower. If i use them both, i see no difference. Maybe I am already at my max. I'm writing to a tape drive that "should" be getting 12 MB/s, but I'm getting about 11MB/s. I have another tape drive that i haven't tested yet that will get 24MB/s, so I guess I'll see if I can get close to that. I'm using code from the JTape project on sourceforge if that would help at all. Basically this is the output stream I'm using:

class TapeOutputStream extends OutputStream {

private byte[] temp = new byte[1];

public void write(int b) throws IOException {

temp[0] = (byte) b;

write(temp, 0, 1);

}

public void write(byte[] b) throws IOException {

write(b, 0, b.length);

}

public void write(byte[] b, int off, int len) throws IOException {

if (b == null) {

throw new NullPointerException();

}

if (off < 0 || len < 0 || off+len > b.length) {

throw new IndexOutOfBoundsException();

}

if (eom && !ignoreEOM) {

throw new LogicalEOMException("logical end-of-media");

}

int n = tapeWrite(b, off, len);

}

public void close() throws IOException {

TapeDevice.this.close();

}

}

The tapeWrite method calls a native C method. And my inputStream is just a FileInputStream (I didn't make my own). Let me know what you think. Thanks!

Andrew

serffa at 2007-7-8 > top of java,Core,Core APIs...
# 3

In reference to your first post. Seems to me that you are doing the right thing with your reading. That is the fastest way to do it. The only thing I would say you should do is make your buffer smaller, an 8MB buffer is a waste of space and can take more time to allocate. a couple of K should do it just fine.

Useing more threads might make it faster, depending on the situation, but overall I think that it will only serve to slow down your transfer because of the constant monitor locking/unlocking. Also, genrally fast operations should not be multithreaded. Only things that need a good responce to lantency should be multi-threaded.

If anything, I would see if you are getting small reads from your file input stream. If you are not filling (or getting close to) filling the buffer everytime, then you should buffer a larger amount before writing to the tape. Assuming that the tape is much slower then the disk. Again, smaller buffers (smaller then 8 MB ) will help too.

steveftotha at 2007-7-8 > top of java,Core,Core APIs...
# 4

> Useing more threads might make it faster, depending on

> the situation, but overall I think that it will only

> serve to slow down your transfer because of the

> constant monitor locking/unlocking.

I have to disagree with this. If you are block-reading then block-writing in a loop in a single thread, then you are artificially slowing down both the read operation by the time taken for the write, and the write operation by the time taken for the read.

That is, if your reading device takes 1s to read a block, and your writing device takes 1s to write the block, then to read and write 10 blocks with a single thread will take 20s. If your reading and writing are occurring in different threads, then the writing thread can write the first block while the reading thread can read the second block (Assuming the devices can operate in parallel)... the total time to read and write the same 10 blocks is reduced to 11s by incorporating threads.

The threading model used is exactly the producer/consumer thread model used in concurrent programming examples worldwide, though it is simplified to have only one producer and one consumer - which in fact means that monitor contention (unnecessary overhead) should be almost completely avoided... the only overhead will be necessary for correctness (ie- the writer waiting for the reader to provide a block to write, or the reader waiting for the writer to remove a block from the buffer - giving room for the next block to be read)

Note that in your circumstance, reading from a high-speed device (Hard drive) and writing to a low-speed device (Tape), the reading will have to be artificially slowed to the speed of the slower writing, to prevent the buffer from growing out-of-control... the simplest way to do this is to just place a size restriction on the buffer - if the reader gets too far in front, then it will have to wait for room to be freed in the buffer.

There is actually a real-life situation in which the above becomes irrellevant - if the writing device has a local write buffer, then you are unlikely to get much benefit from reading/writing in separate code threads... since the writing device is already providing its own (hardware) thread. Using your own threading will effectively "smooth" any latency fluctuations in the reading device, as seen by the writing device.

> Also, genrally

> fast operations should not be multithreaded. Only

> things that need a good responce to lantency should be

> multi-threaded.

This is a very naive view of threading. Any operation that requires handling in parallel with any other operation requires distinct threads of execution. There is a special case when programming GUIs in which a thread can be used to delegate the work of handling an event off the event-handling thread, which reduces perceived latency by increasing the throughput of interface events.

This last is the most symptomatic use of threads I've seen. When I talk about multithreaded programming, I will almost always refer to the former, where various operations are being performed at the same time.

fiontana at 2007-7-8 > top of java,Core,Core APIs...
# 5

> The threading model used is exactly the

> producer/consumer thread model used in concurrent

> programming examples worldwide, though it is

> simplified to have only one producer and one consumer

> - which in fact means that monitor contention

> (unnecessary overhead) should be almost completely

> avoided... the only overhead will be necessary for

> correctness (ie- the writer waiting for the reader to

> provide a block to write, or the reader waiting for

> the writer to remove a block from the buffer - giving

> room for the next block to be read)

So you recommend using just one buffer here? Reading into the back of the queue and writing out of the front? I was thinking of using 2 buffers. Reading into one while writing out of the other than swapping them when needed. This may get rid of the read thread having to wait for the write thread while reading...but it would still have to wait to swap. Does this create too much overhead though (I guess it would only be initial overhead to create the buffers...)?

I'm getting a new design patterns book today, so maybe it will have some examples of producer/consumer in there as well.

In response to Steve, I did try a 64K buffer and it took me down to about 8MB/s from 11MB/s...so i don't know if I aggree with you there. I will play with it more though. I am going to keep playing with the size until I get it as small as possible before losing performance.

Thanks for both your help!

Andrew

serffa at 2007-7-8 > top of java,Core,Core APIs...
# 6

I believe in this case, thread can improve speed. But it could be somewhat complicated coding compare with your current one.

Here is my design:

A class acts as a queue, which contains all bytes arrays waiting to be written to the destination file. Once a byte array has been written, it should be available for reading again, so that you can reuse those byte arrays instead of creating a new one. Once read(buf) returned, push the buf into the queue. And the writer thread should pop those buffers from the queue one by one. If you don't readFully(buf), you might need to wrap the buf in a class with an attributes 'count' to indicate how many bytes in this array. Of course, wait()/notify() should be used in this solution. In a simple word, instead of reading lots of data in one go, chunk them into pieces, so the reading and writing can be done at the same time.

This design is based on an assumption:

read(buf) and write(buf) at the same time should be faster than read(buf) then write(buf).

I hope it works.

andy.ga at 2007-7-8 > top of java,Core,Core APIs...
# 7

> So you recommend using just one buffer here? Reading

> into the back of the queue and writing out of the

> front? I was thinking of using 2 buffers.

Hence the title "Double Buffered File Copy"... I did think that was a little strange when I noticed it :-)

I was specifically recommending using a single buffer, although I hadn't thought about using two buffers... considerations and (my) conclusion follow.

> Reading into one while writing out of the other than

> swapping them when needed. This may get rid of the

> read thread having to wait for the write thread while

> reading...but it would still have to wait to swap.

Reading would end up being more bursty - faster while reading, but waiting longer to be able to read again.

> Does this create too much overhead though (I guess it

> would only be initial overhead to create the

> buffers...)?

I don't believe overhead would be noticably different however many buffers you chose to use.

I think I would have to recommend using just a single buffer, because using multiple buffers seems to be introducing an artificial layer which already exists in the single buffered approach. Consider when you read a block from a device - the device fills the block with bytes until it is full, then passes the block on. Adding a buffer essentially "averages out" the speed at which the blocks are provided. Once you add a second buffer, you are creating another "block" layer - fill the buffer with blocks until it is full, then pass it on. You then create a new buffer (or reuse an old one) to fill with the next blocks, and so on.

> I'm getting a new design patterns book today, so maybe

> it will have some examples of producer/consumer in

> there as well.

java.io.PipedInputStream/PipedOutputStream is a naive implementation - you could look at their source for one example of the producer/consumer implemented with correct synchronization.

fiontana at 2007-7-8 > top of java,Core,Core APIs...
# 8

> java.io.PipedInputStream/PipedOutputStream is a naive

> implementation - you could look at their source for

> one example of the producer/consumer implemented with

> correct synchronization.

Is using the Piped Streams a viable solution then? Or do i still need to implement my own solution? Thanks for all your help. I gave you some Duke Dollars...:)

Andrew

serffa at 2007-7-8 > top of java,Core,Core APIs...
# 9

Watch out with PipedInputStream/PipedOutputStream. The implementation sucks - lousy performance, and deadly bugs if you should be so foolish as to use them in conjunction with RMI or anything else that does thread pooling.

I have an alternative implementation - posted a year or so ago by a saint. Ask, and I'll re-post them.

bschauwea at 2007-7-8 > top of java,Core,Core APIs...
# 10

> Is using the Piped Streams a viable solution then? Or

> do i still need to implement my own solution? Thanks

> for all your help. I gave you some Duke Dollars...:)

Ta

java.io's piped streams aren't the most efficient implementation, since they don't handle block reading and writing gracefully...

You call the synchronized block write method

It loops through the bytes in the block, calling the synchronized byte write method for each

Although the extra synchronization calls are essentially free (Since the executing thread already holds the correct monitor), the call to write each byte will send a notification to the other end of the pipe - prematurely waking the reading thread before there is an entire block to read.

To actually answer the question, you will probably get better mileage from writing or finding a better implementation. A simple choice would be to use Doug Lea's BoundedBuffer and get and put byte arrays from the relevant sides. Failing that, the source will be an excellent guidline for a custom implementation.

http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html

To the previous poster, I'd appreciate if you could point out a reference to the dangers of using the java.io piped streams in conjunction with a thread pool. As to the other, trying to use any kind of pipe in conjunction with RMI is just an exercise in folly, really.

fiontana at 2007-7-8 > top of java,Core,Core APIs...
# 11
> I have an alternative implementation - posted a year> or so ago by a saint. Ask, and I'll re-post them.That would be great if you can repost them. Or even just a link to them. The more sample code the better!! :)Thanks again everyone!
serffa at 2007-7-8 > top of java,Core,Core APIs...
# 12

1. Regarding the dangers of pools vs rmi: there is no reference; I found this problem through sorry experience.

The reasons behind the problem a colleague and I ferreted out and - to some extent - intuited. The alternative pipe implementation solved the problem.

Problem: A thread reads file data and feeds it into a pipe with a compression filter front end; A client makes RMI calls to the server, reading blocks of compressed data out of the other end of the pipe, returning byte arrays. When there are more than a few active clients, you get broken pipes and partner-exited errors.)

Analysis:

o RMI seems to use a pool of threads. You can't really see the code - it's implemented inside the JVM.

o RMI seems to expand the pool if it finds that all of the threads it started with are in use.

o RMI seems to discard threads if, when it tries to return them to the pool, it finds that the pool is at its original level already.

The bad interaction is that the standard pipe implementation uses thread IDs to check whether a "partner" is alive and well, or dead. So if a server happens to open a pipe using a pooled thread, and later that thread dies because the pool is full, well very shortly - probably on the next attempt to write into the pipe, an exception will be thrown.

I will post the alternative pipe implementation in the next message. If you examine it, you will find that there is no use made of thread IDs.

bschauwea at 2007-7-8 > top of java,Core,Core APIs...
# 13

Once again: I wish I could claim credit for these classes, but they were in fact posted a year or so ago by someone lese. If I had the name I would give credit.

I've used these for two heavy-duty applications with never a problem.

<code>

package pipes;

import java.io.IOException;

import java.io.InputStream;

/**

* This class is equivalent to <code>java.io.PipedInputStream</code>. In the

* interface it only adds a constructor which allows for specifying the buffer

* size. Its implementation, however, is much simpler and a lot more efficient

* than its equivalent. It doesn't rely on polling. Instead it uses proper

* synchronization with its counterpart PipedOutputStream.

*

* Multiple readers can read from this stream concurrently. The block asked for

* by a reader is delivered completely, or until the end of the stream if less

* is available. Other readers can't come in between.

*/

public class PipedInputStream extends InputStream {

byte[]buffer;

booleanclosed = false;

intreadLaps = 0;

intreadPosition = 0;

PipedOutputStreamsource;

intwriteLaps = 0;

intwritePosition = 0;

/**

* Creates an unconnected PipedInputStream with a default buffer size.

* @exceptionIOException

*/

public PipedInputStream() throws IOException {

this(null);

}

/**

* Creates a PipedInputStream with a default buffer size and connects it to

* source.

* @exception IOException It was already connected.

*/

public PipedInputStream(PipedOutputStream source) throws IOException {

this(source, 0x10000);

}

/**

* Creates a PipedInputStream with buffer size <code>bufferSize</code> and

* connects it to <code>source</code>.

* @exception IOException It was already connected.

*/

public PipedInputStream(PipedOutputStream source, int bufferSize) throws IOException {

if (source != null) {

connect(source);

}

buffer = new byte[bufferSize];

}

/**

* Return the number of bytes of data available from this stream without blocking.

*/

public int available() throws IOException {

// The circular buffer is inspected to see where the reader and the writer

// are located.

return writePosition > readPosition ? // The writer is in the same lap.

writePosition - readPosition : (writePosition < readPosition ? // The writer is in the next lap.

buffer.length - readPosition + 1 + writePosition :

// The writer is at the same position or a complete lap ahead.

(writeLaps > readLaps ? buffer.length : 0)

);

}

/**

* Closes the pipe.

* @exception IOException The pipe is not connected.

*/

public void close() throws IOException {

if (source == null) {

throw new IOException("Unconnected pipe");

}

synchronized (buffer) {

closed = true;

// Release any pending writers.

buffer.notifyAll();

}

}

/**

* Connects this input stream to an output stream.

* @exception IOException The pipe is already connected.

*/

public void connect(PipedOutputStream source) throws IOException {

if (this.source != null) {

throw new IOException("Pipe already connected");

}

this.source = source;

source.sink = this;

}

/**

* Closes the input stream if it is open.

*/

protected void finalize() throws Throwable {

close();

}

/**

* Unsupported - does nothing.

*/

public void mark(int readLimit) {

return;

}

/**

* returns whether or not mark is supported.

*/

public boolean markSupported() {

return false;

}

/**

* reads a byte of data from the input stream.

* @return the byte read, or -1 if end-of-stream was reached.

*/

public int read() throws IOException {

byte[] b = new byte[0];

int result = read(b);

return result == -1 ? -1 : b[0];

}

/**

* Reads data from the input stream into a buffer.

* @exceptionIOException

*/

public int read(byte[] b) throws IOException {

return read(b, 0, b.length);

}

/**

* Reads data from the input stream into a buffer, starting at the specified offset,

* and for the length requested.

* @exception IOException The pipe is not connected.

*/

public int read(byte[] b, int off, int len) throws IOException {

if (source == null) {

throw new IOException("Unconnected pipe");

}

synchronized (buffer) {

if (writePosition == readPosition && writeLaps == readLaps) {

if (closed) {

return -1;

}

// Wait for any writer to put something in the circular buffer.

try {

buffer.wait();

}

catch (InterruptedException e) {

throw new IOException(e.getMessage());

}

// Try again.

return read(b, off, len);

}

// Don't read more than the capacity indicated by len or what's available

// in the circular buffer.

int amount = Math.min(len,

(writePosition > readPosition ? writePosition : buffer.length) - readPosition);

System.arraycopy(buffer, readPosition, b, off, amount);

readPosition += amount;

if (readPosition == buffer.length) {

// A lap was completed, so go back.

readPosition = 0;

++readLaps;

}

// The buffer is only released when the complete desired block was

// obtained.

if (amount < len) {

int second = read(b, off + amount, len - amount);

return second == -1 ? amount : amount + second;

} else {

buffer.notifyAll();

}

return amount;

}

}

}

package pipes;

import java.io.IOException;

import java.io.OutputStream;

/**

* This class is equivalent to java.io.PipedOutputStream. In the

* interface it only adds a constructor which allows for specifying the buffer

* size. Its implementation, however, is much simpler and a lot more efficient

* than its equivalent. It doesn't rely on polling. Instead it uses proper

* synchronization with its counterpart PipedInputStream.

*

* Multiple writers can write in this stream concurrently. The block written

* by a writer is put in completely. Other writers can't come in between.

*/

public class PipedOutputStream extends OutputStream {

PipedInputStreamsink;

/**

* Creates an unconnected PipedOutputStream.

* @exceptionIOException

*/

public PipedOutputStream() throws IOException {

this(null);

}

/**

* Creates a PipedOutputStream with a default buffer size and connects it to

* <code>sink</code>.

* @exception IOException It was already connected.

*/

public PipedOutputStream(PipedInputStream sink) throws IOException {

this(sink, 0x10000);

}

/**

* Creates a PipedOutputStream with buffer size <code>bufferSize</code> and

* connects it to <code>sink</code>.

* @exception IOException It was already connected.

*/

public PipedOutputStream(PipedInputStream sink, int bufferSize) throws IOException {

if (sink != null) {

connect(sink);

sink.buffer = new byte[bufferSize];

}

}

/**

* Closes the input stream.

* @exception IOException The pipe is not connected.

*/

public void close() throws IOException {

if (sink == null) {

throw new IOException("Unconnected pipe");

}

synchronized (sink.buffer) {

sink.closed = true;

flush();

}

}

/**

* Connects the output stream to an input stream.

* @exception IOException The pipe is already connected.

*/

public void connect(PipedInputStream sink) throws IOException {

if (this.sink != null) {

throw new IOException("Pipe already connected");

}

this.sink = sink;

sink.source = this;

}

/**

* Closes the output stream if it is open.

*/

protected void finalize() throws Throwable {

close();

}

/**

* forces any buffered data to be written.

* @exceptionIOException

*/

public void flush() throws IOException {

synchronized (sink.buffer) {

// Release all readers.

sink.buffer.notifyAll();

}

}

/**

* writes a byte of data to the output stream.

* @exceptionIOException

*/

public void write(int b) throws IOException {

write(new byte[] {(byte) b});

}

/**

* Writes a buffer of data to the output stream.

* @exceptionIOException

*/

public void write(byte[] b) throws IOException {

write(b, 0, b.length);

}

/**

* writes data to the output stream from a buffer, starting at the named offset,

* and for the named length.

* @exception IOException The pipe is not connected or a reader has closed

* it.

*/

public void write(byte[] b, int off, int len) throws IOException {

if (sink == null) {

throw new IOException("Unconnected pipe");

}

if (sink.closed) {

throw new IOException("Broken pipe");

}

synchronized (sink.buffer) {

if (sink.writePosition == sink.readPosition &&

sink.writeLaps > sink.readLaps) {

// The circular buffer is full, so wait for some reader to consume

// something.

try {

sink.buffer.wait();

}

catch (InterruptedException e) {

throw new IOException(e.getMessage());

}

// Try again.

write(b, off, len);

return;

}

// Don't write more than the capacity indicated by len or the space

// available in the circular buffer.

int amount = Math.min(len,

(sink.writePosition < sink.readPosition ?

sink.readPosition : sink.buffer.length)

- sink.writePosition);

System.arraycopy(b, off, sink.buffer, sink.writePosition, amount);

sink.writePosition += amount;

if (sink.writePosition == sink.buffer.length) {

sink.writePosition = 0;

++sink.writeLaps;

}

// The buffer is only released when the complete desired block was

// written.

if (amount < len) {

write(b, off + amount, len - amount);

} else {

sink.buffer.notifyAll();

}

}

}

}

</code>

bschauwea at 2007-7-8 > top of java,Core,Core APIs...
# 14

Greetings to all! I'm sorry for coming late to this party :-)

This topic has recently popped up on my radar and I've put some thoughts down to code for experimentation. If anyone continues to have interest, I'm more than happy to share.

One thought came to mind: How much does a multi-threaded approach to improving I/O suffer if the target system is limited to 1 processor? Since I/O as a whole triggers context-switches, do we stand to lose much of the benefit of threads in this case?

-Edwin

edwinba at 2007-7-8 > top of java,Core,Core APIs...
# 15

Thanks for posting that code sample. I just have one observation though. It appears that the code uses recursion to wait for something to appear within the buffer. This occurs in both of the following methods:

write(byte[] b, int off, int len)

read(byte[] b, int off, int len)

Wouldn't it be safer and faster to use iteration rather than consume memory on the stack, and then have the stack un-roll when the operation completes?

Regards,

Craig.

craigm28a at 2007-7-18 > top of java,Core,Core APIs...
# 16

Lot of nonsense, here.

If you want to read a file quickly, open it as a RandomAccessFile. Use the length() to create a byte array as long as the file, then use a single read() to fill that array.

Use buffers IFF your file is huge (many megabytes). Use InputStreams for streams, which a disk file isn't. Use a separate Thread IFF your application requires that for some other reason.

MartinRineharta at 2007-7-18 > top of java,Core,Core APIs...
# 17

Lots of nonsense about RMI too. Off topic really, but for the record:

> o RMI seems to use a pool of threads. You can't really

> see the code - it's implemented inside the JVM.

You can see the code, it's in the java.rmi and sun.rmi packages in the source code. None of RMI is implemented in JNI code or in the JVM.

> o RMI seems to expand the pool if it finds that all of

> the threads it started with are in use.

> o RMI seems to discard threads if, when it tries to

> return them to the pool, it finds that the pool is at

> its original level already.

RMI uses (1) a client-side pool of connections and (ii) a server-side model of a new thread per connection. If the client-side connection pool manages to reuse a connection the server thread is also reused. There is no thread pool anywhere in RMI.

> The bad interaction is that the standard pipe

> implementation uses thread IDs to check whether a

> "partner" is alive and well, or dead. So if a server

> happens to open a pipe using a pooled thread, and

> later that thread dies because the pool is full, well

> very shortly - probably on the next attempt to write

> into the pipe, an exception will be thrown.

As there are no pooled threads and no such concept as 'pool is full' in RMI, this is all fantasy.

Use the source.

EJP

ejpa at 2007-7-18 > top of java,Core,Core APIs...