Skip to content

Commit

Permalink
Merge pull request #138 from tayloraswift/session-synchronization
Browse files Browse the repository at this point in the history
refine the Mongo.Session synchronization API, and enable synchronizing across concurrency domains
  • Loading branch information
tayloraswift committed Jun 28, 2024
2 parents 4be9c8c + f8fbf9f commit 3f46f17
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct CausalConsistency<Configuration>:MongoTestBattery
tests.expect(response ==? .init(inserted: 1))
}

let other:Mongo.Session = try await .init(from: pool, forking: session)
let other:Mongo.Session = try await session.fork()

// We should be able to observe a precondition time after performing the
// initialization.
Expand Down Expand Up @@ -267,7 +267,7 @@ struct CausalConsistency<Configuration>:MongoTestBattery
// current session, however.
await (tests ! "timeout-forked").do(catching: AnyTimeoutError.self)
{
let forked:Mongo.Session = try await .init(from: pool, forking: session)
let forked:Mongo.Session = try await session.fork()

// A forked session should initially share the same precondition
// time as the session it was forked from.
Expand Down
7 changes: 3 additions & 4 deletions Sources/MongoDBTests/Cursors/Cursors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct Cursors<Configuration>:MongoTestBattery where Configuration:CursorTestCon
{
// We should be using a session that is causally-consistent with the
// insertion operation at the beginning of this test.
let session:Mongo.Session = try await .init(from: pool, forking: initializer)
let session:Mongo.Session = try await initializer.fork()
// We should be reusing session identifiers.
tests.expect(await pool.count ==? 2)
// We should be able to query the collection for results in batches of
Expand Down Expand Up @@ -161,8 +161,7 @@ struct Cursors<Configuration>:MongoTestBattery where Configuration:CursorTestCon
{
// We should be using a session that is causally-consistent with the
// insertion operation at the beginning of this test.
let session:Mongo.Session = try await .init(from: pool,
forking: initializer)
let session:Mongo.Session = try await initializer.fork()
let cursor:Mongo.CursorIdentifier? =
try await session.run(
command: Mongo.Find<Mongo.Cursor<Record<Int64>>>.init(collection,
Expand Down Expand Up @@ -224,7 +223,7 @@ struct Cursors<Configuration>:MongoTestBattery where Configuration:CursorTestCon

await tests.do
{
let session:Mongo.Session = try await .init(from: pool, forking: initializer)
let session:Mongo.Session = try await initializer.fork()
try await session.run(
command: Mongo.Find<Mongo.Cursor<Record<Int64>>>.init(collection,
stride: 10),
Expand Down
51 changes: 37 additions & 14 deletions Sources/MongoDriver/Sessions/Mongo.Session.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ extension Mongo
private
init(allocation:SessionPool.Allocation,
pool:SessionPool,
fork:__shared Mongo.Session?)
preconditionTime:BSON.Timestamp? = nil,
notarizedTime:ClusterTime? = nil)
{
self.preconditionTime = fork?.preconditionTime
self.notarizedTime = fork?.notarizedTime
self.preconditionTime = preconditionTime
self.notarizedTime = notarizedTime

self.transaction = allocation.transaction
self.touched = allocation.touched
Expand All @@ -99,24 +100,37 @@ extension Mongo.Session
/// Creates a session from a session pool. Do not escape the session
/// from the scope that yielded the pool, because doing so will prevent
/// the pool from draining on scope exit.
///
/// If `original` is non-nil, operations on the newly-created session will
/// reflect writes performed using the original session at the time of
/// session creation, but the two sessions will be otherwise unrelated.
///
/// If `original` is non-nil, calling this initializer is roughly
/// equivalent to creating an unforked session and immediately calling
/// ``synchronize(with:)``.
public convenience
init(from pool:Mongo.SessionPool, forking original:__shared Mongo.Session? = nil,
init(from pool:Mongo.SessionPool,
by deadline:ContinuousClock.Instant? = nil) async throws
{
self.init(allocation: await pool.create(
capabilities: try await pool.deployment.capabilities(
by: deadline ?? pool.deployment.timeout.deadline())),
pool: pool,
fork: original)
pool: pool)
}

/// Forks this session, returning a new session. Operations on the newly-created session
/// will reflect writes performed using the original session at the time of session
/// creation, but the two sessions will be otherwise unrelated.
///
/// Calling this initializer is roughly equivalent to creating an unforked session and
/// immediately calling ``synchronize(with:)``, although the unforked session will not
/// gossip the cluster time from the original session.
///
/// Do not escape the session from the scope that yielded the pool the original session was
/// created in, because doing so will prevent the pool from draining on scope exit.
public
func fork(by deadline:ContinuousClock.Instant? = nil) async throws -> Self
{
.init(allocation: await self.pool.create(
capabilities: try await self.pool.deployment.capabilities(
by: deadline ?? self.pool.deployment.timeout.deadline())),
pool: self.pool,
preconditionTime: self.preconditionTime,
notarizedTime: self.notarizedTime)
}

/// Fast-forwards this session’s precondition time to the other session’s
/// precondition time, if it is non-nil and greater than this
/// session’s precondition time. The other session’s precondition time
Expand All @@ -126,6 +140,15 @@ extension Mongo.Session
{
other.preconditionTime?.combine(into: &self.preconditionTime)
}

/// Similar to ``synchronize(with:)``, but accepts a ``BSON.Timestamp`` instead of a full
/// session instance. This is useful for synchronizing sessions across concurrency domains,
/// as sessions themselves are non-``Sendable``.
public
func synchronize(to preconditionTime:BSON.Timestamp)
{
preconditionTime.combine(into: &self.preconditionTime)
}
}
@available(*, unavailable, message: "sessions have reference semantics")
extension Mongo.Session:Sendable
Expand Down
2 changes: 1 addition & 1 deletion Sources/MongoDriverTests/TestSessionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestSessionPool(_ tests:TestGroup, bootstrap:Mongo.DriverBootstrap,

try await a.refresh()

let b:Mongo.Session = try await .init(from: $0, forking: a)
let b:Mongo.Session = try await a.fork()

tests.expect(await $0.count ==? 2)

Expand Down

0 comments on commit 3f46f17

Please sign in to comment.