-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean up tests a little #59
Conversation
Problem: the `NetworkError` which is returned by the streams and sinks passed to `Repo.new_remote_repo` is uninformative. All the user knows when such an error appears in the logs is that there was some kind of error on the sink/stream. Solution: Add a `String` field to the `NetworkError` so that it can be printed in logs.
If you're looking to review the test changes specifically I would recommend reading 22acec2. I did the rename in a separate commit so this commit actually has a useful diff - unlike the diff view of the whole PR which presents the test files as just deleted and reinserted. |
Thanks! Will review today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes!
This part of the review covers changed to repo.rs
and network_connect
only. Will review the changes to the tests separately...
src/repo.rs
Outdated
@@ -1117,14 +1154,19 @@ impl Repo { | |||
continue; | |||
}; | |||
if should_be_removed { | |||
self.remote_repos.remove(&repo_id); | |||
if let Some(RemoteRepo { sink, .. }) = self.remote_repos.remove(&repo_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do it make sense to poll close the sinks that have already errored? From the docs I'm not sure:
Other methods state that:
In most cases, if the sink encounters an error, the sink will permanently be unable to receive items
But the poll_close
one states:
If this function encounters an error, the sink should be considered to have failed permanently, and no more Sink methods should be called.
So I guess if any other methods returns an error, one can still try to close the sink, which means this change is right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that was my reasoning
// Reset the sync state. | ||
self.remove_unused_sync_states(); | ||
let remote = RemoteRepo { | ||
stream: Box::new(existing_stream.chain(stream)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the idea here was that by chaining we would not lose any messages pending on the existing stream. Not a good idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should explicitly not chain streams for the following reason. The contract of the sync protocol is that the user must provide a reliable in-order stream and reset state on reconnect. This last part is very important because without it you can end up looping or stalling.
The situation this code is handling, where there are several streams connected to the same peer is incompatible with the reliable in-order stream guarantee, which is why we drop the old stream and use the new one and it's also why I think we should not pull any more messages out of the existing stream or send any more messages to the existing sink. We need the reset of the sync state and the switch to new stream/sink to all happen at once to maintain the reliable in-order guarantee.
I actually had to do this because some of the tests were failing after switching to the tincans
because now that the sink and stream were separate things (with separate lifetimes) the repo was pulling old messages out of the streams even after the sink was closed and confusing the sync protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the explanation.
src/repo.rs
Outdated
let pending_sinks = self.pending_close_sinks.entry(repo_id.clone()).or_default(); | ||
pending_sinks.push(remote_repo.sink); | ||
} | ||
let sinks_to_close = self.pending_close_sinks.keys().cloned().collect::<Vec<_>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is a good idea, because we don't know if all sinks pending close will be ready to be polled.
In the current code, only the currently "open" sinks are marked as pending close, and then poll_close
is called on them. The ones already inside pending_close_sinks
are only polled again in response to WakeSignal::PendingCloseSink(sink_id)
in the loop below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good spot. I actually had this in here because it made the tests pass and in the process of trying to explain why it made the tests pass I realised I had some logic elsewhere slightly wrong. The reason I had to add this was that the tests sometimes hung waiting for sinks to close. What was happening was this scenario:
- The repo has a connection to a remote
- The test calls
new_remote_repo
with the sameRepoId
as the existing remote - The test then calls
RepoHandle::shutdown
- This results in the events
ConnectRemoteRepo
andStop
being received consecutively - The repo handles
ConnectRemoteRepo
, notices that it is a connect event for an existing stream and adds theSink
corresponding to the existing connection topending_close_sinks
but fails to callSelf::poll_close_sinks
- Then the
Stop
event is received and we transition into the shutdown routine. - The shutdown routine adds the new sink to
pending_close_sinks
, then callspoll_close_sinks
on the new sink - Now we wait for all the sinks to close, _but nothing every called poll_close on the old sink we replaced when handing
ConnectRemoteRepo
I've updated the code to do closing sinks in a helper method which always calls Self::poll_close_sinks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(In case it wasn't clear, I've reverted the logic here to the original "only poll the sinks which are ready" logic because I fixed the bug described above)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I see, thanks for the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean-up looks great! A few points, and a couple of nits while we're at it
assert!(old_left_closed.load(std::sync::atomic::Ordering::Acquire)); | ||
assert!(old_right_closed.load(std::sync::atomic::Ordering::Acquire)); | ||
assert!(new_left_closed.load(std::sync::atomic::Ordering::Acquire)); | ||
assert!(new_right_closed.load(std::sync::atomic::Ordering::Acquire)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move these assertions to the drop impl? We should always stop repos at the end of tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that would be much nicer. Unfortunately it produces horrible error messages which halt the entire test suite because the panic in the drop is not unwindable. If you know a way around that then I think we should do the drop thing, but otherwise this seems like the least bad thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, agreed to keep it like this.
Problem: often the `Stream` and `Sink` you want to use for communication are separate objects (for example, in the case of using `tokio::sync::mpsc::Channel` pairs). Forcing the user to combine these objects into one is awkward, requiring lots of boilerplate. Solution: accept the `Stream` and `Sink` as separate arguments. This has a much smaller cost tham forcing users to combine the stream and sink because most real-life combined streams can be split using `StreamExt::split`.
Problem: each test in the tests/ subdirectory is compiled as a separate crate, increasing compile times and making it harder to share common code between tests. Solution: move the tests into a single module in `tests/network` where they are all compiled together.
c6b1839
to
0f48c2d
Compare
Problem: the integration tests contained a lot of repeated logic for forwarding messages from one peer to another and for managing tokio runtimes. This made the tests a little hard to read and a little fragile. Solution: replace the repeated routing logic with a simple wrapper around a pair of `tokio::sync::mpsc::Channel`s and use `tokio::test` so that we don't need to manage runtimes or synchronise with channels in the tests. In the process of implementing this solution I also did a few other things: * Fixed bugs in several places where we were failing to `poll_close` sinks which we are no longer using * Used `test_log::test` to run all the tests so we get nice logs from `tracing` * Propagated a tracing span into the thread which repo spawns so that spans from within the repo run loop are annotated with the repo ID of the running repo.
Thanks for the detailed review. I think I've addressed or replied to everything. |
I'm working on updating this codebase to be interoperable with the 1.0 of the JS implementation. As I do this I need to write a bunch more tests. I felt that the existing tests were a little verbose (as is often the case with the first version of a test suite where you're focusing on getting the tests written at all) and so before I wrote more tests I thought I would take the time to refactor things a little bit.
There are a few main changes:
new_remote_repo
to accept theSink
andStream
as separate arguments. This makes calling this function easier for cases where the sink and stream are not the same object (such as in channels), which leads us into the next point ...Network
network adapter with a simpler thing which is really just a wrapper around atokio::sync::mpsc::channel(1)
. You can find this intests::network::tincans
tests/network
module so that they all compile in one go and can reuse the utilities intincans
async fn
usingtokio::test
. This means less incidental complexity juggling runtimes and channels to communicate the results of spawned tasks back to the main testString
toNetworkError
so it's easier to debug when it occurs and also add a bunch moretracing
poll_close
various sinks