reworks InMemoryResumableFramesStore and improves its tests coverage#1014
Merged
OlegDokuka merged 3 commits intomasterfrom Jun 4, 2021
Merged
reworks InMemoryResumableFramesStore and improves its tests coverage#1014OlegDokuka merged 3 commits intomasterfrom
OlegDokuka merged 3 commits intomasterfrom
Conversation
ef1572b to
64c050f
Compare
rstoyanchev
approved these changes
Jun 1, 2021
rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java
Show resolved
Hide resolved
rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java
Show resolved
Hide resolved
rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java
Outdated
Show resolved
Hide resolved
9ea7cd3 to
81db43f
Compare
rstoyanchev
approved these changes
Jun 2, 2021
f14efe3 to
8895956
Compare
this includes: * rework of InMemoryResumableFramesStore and improvement in its tests coverage * improvements in Client/Server resume Session and ensuring that if connection is rejected for any reasons - it is fully closed on both outbound and inbound ends (This fix is needed for LocalDuplexConnection scenario which may be in unterminated state if it will not be subscribed on the inbound) * enabling resumability tests for LocalTransport * improvements in logging * general cleanups and polishing Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
…ases) At the moment, the onClose hook has no "wait until cleaned" logic, which leads to unpredicted behaviors when used with resumability or others scenarios where we need to wait until all the queues are cleaned and there are no other resources in use (e.g. ByteBufs). For that porpuse, this commit adds onFinalizeHook to the UnboundedProcessor so we can now listen when the UnboundedProcessor is finalized and only after that send the onClose signal Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
05051c5 to
f70cd35
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Main Changes Description
At the moment Resumability implementation is unstable due to its non-well-covered reworked impl.
As it was uncovered, the main root-cause of the problem is the incorrect state synchronization in InMemoryResumableFrameStore when it comes to the connection reestablishment (e.g. the previous connection was lost so we need to go through the resume handshake phase).
As it was observed, the local producers may produce more elements while the new subscriber iterating over the cached value may miss some updates.
To resolve the mentioned problem, InmemoryResumableFrameStore is reworked again with the thought of backpressure (via ASYNC fusion) and fully sequentially signals processing (using WorkInProgress pattern).
The first improvements allow draining elements from the upfront
Publisher(at the moment UnboundedProcessor which does not support backpressure but we should have a proper once #752 is implemented) only when there is an active connection. In general, relying onqueue.pollonly (instead of handling data fromonNext) decreases the amount of potentially concurrent signals we have to deal with and ensures that the new connection does not have to mess with duplicates-checking logic.The second improvement eliminates the need for
synchronizedkeyword and replaces it withvolatile long statemachine over which we can expose various changes without introducing an expensive MpScQueueSide Changes Description
There are some LocalDuplexConnection and UnboundedProcessor modifications as a part of this PR. These modifications are mainly to ensure that e2e resumability tests pass with LocalTransport.
LocalDuplexConnection(used in LocalClient/ServerTransport) embraces UnboundedProcessor to exchange frames between peers. Before these changes, it was not possible to track when all the frames are delivered / discarded. Thus, it was not possible to notify about the real termination of the DuplexConnection. The modifications to UnboundedProcessor provide aondisposehookhandle which allows tracking when the queue is cleaned and closed - hence notify the duplex connection that it can be terminated as well.