Skip to content

QueueInputStream reads all but the first byte without waiting. #748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 22, 2025

Conversation

maxxedev
Copy link
Contributor

QueueInputStream reads all but the first byte without waiting.

Fix so that bulk reads avoid getting stuck if a timeout is set and at least one byte is available.

Fix so that bulk reads avoid getting stuck if a timeout is set
and at least one byte is available.
Copy link
Contributor

@ppkarwasz ppkarwasz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxxedev,

Thanks for the pull request — it adds valuable functionality and includes all the essential elements.

A couple of suggestions and minor nitpicks below.

Note: Have you considered using BlockingQueue.drainTo instead of repeated poll() calls? It locks the queue only once, which could improve performance.
The tradeoff would be representing a byte[] slice as a List<Integer>.

maxxedev and others added 3 commits May 17, 2025 01:48
Adds a small benchmark to measure how much time does it take to transfer
1 MiB from a `QueueOutputStream` to a `QueueInputStream`.
@ppkarwasz
Copy link
Contributor

I ran a small benchmark (also included in 35d1ce4) that measures the time it takes to read 1 MiB from the queue, while another thread is writing to it. The results look promising.

Before

Benchmark Mode Cnt Score Error Units
QueueStreamBenchmark.streams:input sample 639 78.261 ± 1.892 ms/op
QueueStreamBenchmark.streams:input:p0.00 sample 60.293 ms/op
QueueStreamBenchmark.streams:input:p0.50 sample 69.861 ms/op
QueueStreamBenchmark.streams:input:p0.90 sample 96.600 ms/op
QueueStreamBenchmark.streams:input:p0.95 sample 104.858 ms/op
QueueStreamBenchmark.streams:input:p0.99 sample 123.627 ms/op
QueueStreamBenchmark.streams:input:p0.999 sample 156.500 ms/op
QueueStreamBenchmark.streams:input:p0.9999 sample 156.500 ms/op
QueueStreamBenchmark.streams:input:p1.00 sample 156.500 ms/op

After

Note: The high values in some benchmark runs are due to a synchronization problem between the two threads, which led the QueueInputStream thread to time out.

Benchmark Mode Cnt Score Error Units
QueueStreamBenchmark.streams:input sample 9577 21.248 ± 5.848 ms/op
QueueStreamBenchmark.streams:input:p0.00 sample 3.650 ms/op
QueueStreamBenchmark.streams:input:p0.50 sample 12.222 ms/op
QueueStreamBenchmark.streams:input:p0.90 sample 12.386 ms/op
QueueStreamBenchmark.streams:input:p0.95 sample 12.468 ms/op
QueueStreamBenchmark.streams:input:p0.99 sample 33.787 ms/op
QueueStreamBenchmark.streams:input:p0.999 sample 2292.976 ms/op
QueueStreamBenchmark.streams:input:p0.9999 sample 8128.561 ms/op
QueueStreamBenchmark.streams:input:p1.00 sample 8128.561 ms/op

Copy link
Contributor

@ppkarwasz ppkarwasz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes — using BlockingQueue.drainTo significantly reduces lock contention and improves performance ~4x.

We might optimize further by reordering operations. For length > 1, we could:

  1. Try drainTo() first — if it returns elements, skip the next point.
  2. Otherwise, call read() for one byte, then drainTo() for the rest.
  3. Copy data from List<Integer> to the byte array.

Copy link
Member

@garydgregory garydgregory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi All,

I've made a few minor changes; see the commit comments.

The remaining major issue is why the new read(...) method ignores the configured timeout some of the time and not some other times?

From a user's POV this is quite confusing and random behavior: I construct an instance with a timeout, I pass it on to other APIs, and sometimes the timeout applies and sometimes it doesn't.

Isn't this bound to be a source of bug reports and confusion?

@ppkarwasz
Copy link
Contributor

The remaining major issue is why the new read(...) method ignores the configured timeout some of the time and not some other times?

From my perspective, the intended contract of the read(...) method is to attempt to read at least one byte from the queue, waiting up to the configured timeout for data to become available. If no data arrives within that period, it should return -1.
As far as I can tell, the current implementation adheres to this contract. It waits for the timeout at most once, not per byte, which ensures consistent and predictable behavior.

In contrast, the previous implementation—inherited from the super class—effectively applied the timeout per byte requested—so if, for example, the caller attempted to read into a buffer of size 1000, the method could block for up to 1000 times the configured timeout. This behavior could easily lead to unexpectedly long delays and violated the "at most the configured timeout" expectation.

@maxxedev
Copy link
Contributor Author

The remaining major issue is why the new read(...) method ignores the configured timeout some of the time and not some other times?

In addition to what @ppkarwasz said above, see these tests that compare behavior of QueueInputStream and FileInputStream as reference. The previous implementation would block for long durations even though there was some data always available. The new implementation and reference implementation don't block if data is available.

queueOutputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
afterWriteLatch.countDown();
} catch (final Exception e) {
throw new RuntimeException(e);
Copy link
Member

@garydgregory garydgregory May 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @maxxedev
Unless you expect this exception for the test to pass (if yes, then please add a // comment), the test should be clearer and call JUnit's fail(...) method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is executed asynchronously, so any exception it throws will not be captured by the test.

Since, by the end of the test case, this code must execute, it would be useful to:

  1. Assign the return value of CompletableFuture.runAsync() to a variable (e.g. future).
  2. Call assertDoesNotThrow(future::get) at the very end of the test case.

Note: Since other parts of the test count on the afterWriteLatch.countDown() call, if something happens here, the test case will simply hang. I think that we should add some reasonable timeouts to the CountDownLatch.wait() calls.

@maxxedev
Copy link
Contributor Author

@garydgregory what needs to be done to get the PR merged?

@garydgregory
Copy link
Member

I'll take a look again tonight or tomorrow morning.

@garydgregory garydgregory merged commit fd10fed into apache:master May 22, 2025
19 of 21 checks passed
@maxxedev maxxedev deleted the queueinputstream-bulk-read branch July 13, 2025 23:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants