-
Notifications
You must be signed in to change notification settings - Fork 696
QueueInputStream reads all but the first byte without waiting. #748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
QueueInputStream reads all but the first byte without waiting. #748
Conversation
maxxedev
commented
May 17, 2025
Fix so that bulk reads avoid getting stuck if a timeout is set and at least one byte is available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the pull request — it adds valuable functionality and includes all the essential elements.
A couple of suggestions and minor nitpicks below.
Note: Have you considered using BlockingQueue.drainTo
instead of repeated poll()
calls? It locks the queue only once, which could improve performance.
The tradeoff would be representing a byte[]
slice as a List<Integer>
.
src/main/java/org/apache/commons/io/input/QueueInputStream.java
Outdated
Show resolved
Hide resolved
src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
Outdated
Show resolved
Hide resolved
src/main/java/org/apache/commons/io/input/QueueInputStream.java
Outdated
Show resolved
Hide resolved
Adds a small benchmark to measure how much time does it take to transfer 1 MiB from a `QueueOutputStream` to a `QueueInputStream`.
I ran a small benchmark (also included in 35d1ce4) that measures the time it takes to read 1 MiB from the queue, while another thread is writing to it. The results look promising. Before
AfterNote: The high values in some benchmark runs are due to a synchronization problem between the two threads, which led the
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes — using BlockingQueue.drainTo
significantly reduces lock contention and improves performance ~4x.
We might optimize further by reordering operations. For length > 1
, we could:
- Try
drainTo()
first — if it returns elements, skip the next point. - Otherwise, call
read()
for one byte, thendrainTo()
for the rest. - Copy data from
List<Integer>
to the byte array.
src/main/java/org/apache/commons/io/input/QueueInputStream.java
Outdated
Show resolved
Hide resolved
Add missing Javadoc `@param` tag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi All,
I've made a few minor changes; see the commit comments.
The remaining major issue is why the new read(...)
method ignores the configured timeout some of the time and not some other times?
From a user's POV this is quite confusing and random behavior: I construct an instance with a timeout, I pass it on to other APIs, and sometimes the timeout applies and sometimes it doesn't.
Isn't this bound to be a source of bug reports and confusion?
src/main/java/org/apache/commons/io/input/QueueInputStream.java
Outdated
Show resolved
Hide resolved
From my perspective, the intended contract of the In contrast, the previous implementation—inherited from the super class—effectively applied the timeout per byte requested—so if, for example, the caller attempted to read into a buffer of size 1000, the method could block for up to 1000 times the configured timeout. This behavior could easily lead to unexpectedly long delays and violated the "at most the configured timeout" expectation. |
In addition to what |
Co-authored-by: Piotr P. Karwasz <[email protected]>
src/main/java/org/apache/commons/io/input/QueueInputStream.java
Outdated
Show resolved
Hide resolved
queueOutputStream.write(inputData.getBytes(StandardCharsets.UTF_8)); | ||
afterWriteLatch.countDown(); | ||
} catch (final Exception e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @maxxedev
Unless you expect this exception for the test to pass (if yes, then please add a // comment
), the test should be clearer and call JUnit's fail(...)
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is executed asynchronously, so any exception it throws will not be captured by the test.
Since, by the end of the test case, this code must execute, it would be useful to:
- Assign the return value of
CompletableFuture.runAsync()
to a variable (e.g.future
). - Call
assertDoesNotThrow(future::get)
at the very end of the test case.
Note: Since other parts of the test count on the afterWriteLatch.countDown()
call, if something happens here, the test case will simply hang. I think that we should add some reasonable timeouts to the CountDownLatch.wait()
calls.
src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
Outdated
Show resolved
Hide resolved
@garydgregory what needs to be done to get the PR merged? |
I'll take a look again tonight or tomorrow morning. |