-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream Migration PoC #1413
Stream Migration PoC #1413
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool stuff! I've added a bunch of comments.
I think we have to be a bit more careful with how we negotiate the stream migration protocol in the host:
- We want to check if the peer supports the stream migration protocol (see https://pkg.go.dev/github.com/libp2p/go-libp2p-core/peerstore#ProtoBook). We learn this from the Identify protocol. This allows to not waste one roundtrip if the peer doesn't support this protocol.
- If the peer says that it supports the protocol, but multistream negotiation of
/libp2p/stream-migration
fails, we know that something is wrong. Maybe the peer removed support for the stream migration protocol without telling us. In that case, we should probably disable stream migration for this peer altogether. Note: this is very unlikely, we don't expect this to happen in 99.9%. You can focus on the other parts of this PR first, especially since detecting this condition requires us to fix race condition in NegotiateLazy multiformats/go-multistream#83 first.
} | ||
|
||
message MigrateMessage { | ||
required uint32 id=1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a uint64? Most of our stream muxers allow O(uint64) streams.
// - handle stream resets during migration, before migration, after migration. | ||
// - mirror the original stream state on the new state (half-closed) | ||
|
||
var log = logging.Logger("p2p-holepunch") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var log = logging.Logger("p2p-holepunch") | |
var log = logging.Logger("stream-migration") |
|
||
const ID = "/libp2p/stream-migration/" | ||
|
||
const maxMsgSize = 4 * 2 // 2 32-bit ints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to allow for some protobuf framing overhead. Really, we can just use 1 kB or so here, this is just a protection against absurdly large messages.
@@ -190,3 +190,53 @@ func ping(s network.Stream) (time.Duration, error) { | |||
|
|||
return time.Since(before), nil | |||
} | |||
|
|||
// PingWithStream is the same as above, but returns the stream | |||
func PingWithStream(ctx context.Context, h host.Host, p peer.ID) (network.Stream, <-chan Result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only used to test the stream migration, right? Maybe we can implement a simple ping-pong protocol there, to avoid adding to our public API here?
defer sm.mu.Unlock() | ||
log.Debugf("Remote peer is %v", remotePeerID) | ||
log.Debugf("Looking for stream %d. %v. %v", from, len(sm.remoteinitiatedStreams[remotePeerID]), sm.remoteinitiatedStreams[remotePeerID]) | ||
if sm.remoteinitiatedStreams[remotePeerID] == nil || sm.remoteinitiatedStreams[remotePeerID][from] == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: better check if entry exists: _, ok := sm.remoteinitiatedStreams[remotePeerID]; !ok
|
||
var log = logging.Logger("p2p-holepunch") | ||
|
||
const ID = "/libp2p/stream-migration/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const ID = "/libp2p/stream-migration/" | |
const ID = "/libp2p/stream-migration" |
type MigratableStream struct { | ||
network.Stream | ||
mu sync.Mutex | ||
newStream *network.Stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As network.Stream
is an interface, we probably don't need a pointer here, or do we?
network.Stream | ||
mu sync.Mutex | ||
newStream *network.Stream | ||
originalStreamId uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: introduce a type streamID uint32
(or 64, see above) here.
fallthrough | ||
case initiatorStartingMigration: | ||
return ms.Stream.Read(p) | ||
case afterAckForMigration: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: if ms.state == afterAckForMigration
would achieve the same as this switch.
|
||
// Migrate will start migrating the migratable stream s from the original | ||
// network stream to the new network stream. | ||
func (sm *StreamMigrator) Migrate(s *MigratableStream, new network.Stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered MigrateTo(s *MigratableStream, conn network.CapableConn)
?
Closing in favor of eventually supporting QUIC's native connection migration feature. |
[Not meant to be merged]
A PoC implementation of stream migration. The example test here runs ping 20 times and migrates the underlying stream 4 times. No changes to ping except getting a reference to the stream (so we know what stream to migrate from, only for this example).
This test migrates a stream to another stream on the same connection. This is just as an example since I couldn't figure out how to open a new connection. While we are actually concerned about migrating streams to new connections the code here is unaware of were the streams live so should work just as well with a stream on another connection.
This follows libp2p/specs#406.