Skip to content

Commit

Permalink
Provider checker mode
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Litvinov <[email protected]>
  • Loading branch information
Zensey committed May 14, 2024
1 parent c97221a commit 782e1ed
Show file tree
Hide file tree
Showing 18 changed files with 541 additions and 5 deletions.
1 change: 1 addition & 0 deletions cmd/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (di *Dependencies) bootstrapTequilapi(nodeOptions node.Options, listener ne
tequilapi_endpoints.AddRoutesForAuthentication(di.Authenticator, di.JWTAuthenticator, di.SSOMystnodes),
tequilapi_endpoints.AddRoutesForIdentities(di.IdentityManager, di.IdentitySelector, di.IdentityRegistry, di.ConsumerBalanceTracker, di.AddressProvider, di.HermesChannelRepository, di.BCHelper, di.Transactor, di.BeneficiaryProvider, di.IdentityMover, di.BeneficiaryAddressStorage, di.HermesMigrator),
tequilapi_endpoints.AddRoutesForConnection(di.MultiConnectionManager, di.StateKeeper, di.ProposalRepository, di.IdentityRegistry, di.EventBus, di.AddressProvider),
tequilapi_endpoints.AddRoutesForConnectionDiag(di.MultiConnectionManager, di.StateKeeper, di.ProposalRepository, di.IdentityRegistry, di.EventBus, di.EventBus, di.AddressProvider, di.IdentitySelector, nodeOptions),
tequilapi_endpoints.AddRoutesForSessions(di.SessionStorage),
tequilapi_endpoints.AddRoutesForConnectionLocation(di.IPResolver, di.LocationResolver, di.LocationResolver),
tequilapi_endpoints.AddRoutesForProposals(di.ProposalRepository, di.PricingHelper, di.LocationResolver, di.FilterPresetStorage, di.NATProber),
Expand Down
15 changes: 12 additions & 3 deletions cmd/di.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ type Dependencies struct {
NodeStatusTracker *monitoring.StatusTracker
NodeStatsTracker *node.StatsTracker
uiVersionConfig versionmanager.NodeUIVersionConfig

provPinger *connection.ProviderChecker
}

// Bootstrap initiates all container dependencies
Expand Down Expand Up @@ -287,7 +289,7 @@ func (di *Dependencies) Bootstrap(nodeOptions node.Options) error {
return err
}

if err := di.bootstrapQualityComponents(nodeOptions.Quality); err != nil {
if err := di.bootstrapQualityComponents(nodeOptions.Quality, nodeOptions); err != nil {
return err
}

Expand All @@ -299,6 +301,7 @@ func (di *Dependencies) Bootstrap(nodeOptions node.Options) error {
if err = di.handleConnStateChange(); err != nil {
return err
}

if err := di.Node.Start(); err != nil {
return err
}
Expand Down Expand Up @@ -581,6 +584,7 @@ func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options, tequil
di.bootstrapBeneficiarySaver(nodeOptions)

di.ConnectionRegistry = connection.NewRegistry()

di.MultiConnectionManager = connection.NewMultiConnectionManager(func() connection.Manager {
return connection.NewManager(
pingpong.ExchangeFactoryFunc(
Expand All @@ -604,6 +608,7 @@ func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options, tequil
di.P2PDialer,
di.allowTrustedDomainBypassTunnel,
di.disallowTrustedDomainBypassTunnel,
di.provPinger,
)
})

Expand Down Expand Up @@ -883,7 +888,7 @@ func (di *Dependencies) bootstrapIdentityComponents(options node.Options) error
return nil
}

func (di *Dependencies) bootstrapQualityComponents(options node.OptionsQuality) (err error) {
func (di *Dependencies) bootstrapQualityComponents(options node.OptionsQuality, nodeOptions node.Options) (err error) {
if err := di.AllowURLAccess(options.Address); err != nil {
return err
}
Expand Down Expand Up @@ -924,6 +929,10 @@ func (di *Dependencies) bootstrapQualityComponents(options node.OptionsQuality)
return err
}

if nodeOptions.ProvChecker {
di.provPinger = connection.NewProviderChecker(di.EventBus)
}

return nil
}

Expand Down Expand Up @@ -1065,7 +1074,7 @@ func (di *Dependencies) handleConnStateChange() error {

latestState := connectionstate.NotConnected
return di.EventBus.SubscribeAsync(connectionstate.AppTopicConnectionState, func(e connectionstate.AppEventConnectionState) {
if config.GetBool(config.FlagProxyMode) || config.GetBool(config.FlagDVPNMode) {
if config.GetBool(config.FlagProxyMode) || config.GetBool(config.FlagDVPNMode) || config.GetBool(config.FlagProvCheckerMode) {
return // Proxy mode doesn't establish system wide tunnels, no reconnect required.
}

Expand Down
10 changes: 10 additions & 0 deletions config/flags_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ var (
Value: false,
}

// FlagProvCheckerMode allows running node under current user as a provider checker agent.
FlagProvCheckerMode = cli.BoolFlag{
Name: "provchecker",
Usage: "",
Value: false,
}

// FlagUserspace allows running a node without privileged permissions.
FlagUserspace = cli.BoolFlag{
Name: "userspace",
Expand Down Expand Up @@ -349,6 +356,7 @@ func RegisterFlagsNode(flags *[]cli.Flag) error {
&FlagUserMode,
&FlagDVPNMode,
&FlagProxyMode,
&FlagProvCheckerMode,
&FlagUserspace,
&FlagVendorID,
&FlagLauncherVersion,
Expand Down Expand Up @@ -411,6 +419,8 @@ func ParseFlagsNode(ctx *cli.Context) {
Current.ParseBoolFlag(ctx, FlagUserMode)
Current.ParseBoolFlag(ctx, FlagDVPNMode)
Current.ParseBoolFlag(ctx, FlagProxyMode)
Current.ParseBoolFlag(ctx, FlagProvCheckerMode)

Current.ParseBoolFlag(ctx, FlagUserspace)
Current.ParseStringFlag(ctx, FlagVendorID)
Current.ParseStringFlag(ctx, FlagLauncherVersion)
Expand Down
5 changes: 5 additions & 0 deletions core/connection/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type Connection interface {
Statistics() (connectionstate.Statistics, error)
}

// ConnectionDiag is a specialised Connection interface for provider check
type ConnectionDiag interface {
Diag() bool
}

// StateChannel is the channel we receive state change events on
type StateChannel chan connectionstate.State

Expand Down
10 changes: 10 additions & 0 deletions core/connection/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ type connectionManager struct {
statsTracker statsTracker

uuid string

provChecker *ProviderChecker
}

// NewManager creates connection manager with given dependencies
Expand All @@ -184,6 +186,7 @@ func NewManager(
validator validator,
p2pDialer p2p.Dialer,
preReconnect, postReconnect func(),
provChecker *ProviderChecker,
) *connectionManager {
uuid, err := uuid.NewV4()
if err != nil {
Expand All @@ -207,6 +210,7 @@ func NewManager(
preReconnect: preReconnect,
postReconnect: postReconnect,
uuid: uuid.String(),
provChecker: provChecker,
}

m.eventBus.SubscribeAsync(connectionstate.AppTopicConnectionState, m.reconnectOnHold)
Expand Down Expand Up @@ -301,6 +305,10 @@ func (m *connectionManager) Connect(consumerID identity.Identity, hermesID commo
return nil
})

if m.provChecker != nil {
go m.provChecker.Diag(m, proposal.ProviderID)
}

go m.consumeConnectionStates(m.activeConnection.State())
go m.checkSessionIP(m.channel, m.connectOptions.ConsumerID, m.connectOptions.SessionID, originalPublicIP)

Expand Down Expand Up @@ -801,6 +809,8 @@ func (m *connectionManager) Cancel() {
}

func (m *connectionManager) Disconnect() error {
log.Trace().Msg("connectionManager) Disconnect >>>>>>>>>>>>>>>>>")

if m.Status().State == connectionstate.NotConnected {
return ErrNoConnection
}
Expand Down
4 changes: 4 additions & 0 deletions core/connection/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type testContext struct {
statsReportInterval time.Duration
mockP2P *mockP2PDialer
mockTime time.Time
provChecker *ProviderChecker

sync.RWMutex
}

Expand Down Expand Up @@ -140,6 +142,7 @@ func (tc *testContext) SetupTest() {

tc.mockP2P = &mockP2PDialer{&mockP2PChannel{}}
tc.mockTime = time.Date(2000, time.January, 0, 10, 12, 3, 0, time.UTC)
tc.provChecker = NewProviderChecker(tc.stubPublisher)

tc.connManager = NewManager(
func(senderUUID string, channel p2p.Channel,
Expand All @@ -159,6 +162,7 @@ func (tc *testContext) SetupTest() {
&mockValidator{},
tc.mockP2P,
func() {}, func() {},
tc.provChecker,
)
tc.connManager.timeGetter = func() time.Time {
return tc.mockTime
Expand Down
50 changes: 50 additions & 0 deletions core/connection/pinger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (C) 2024 The "MysteriumNetwork/node" Authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package connection

import (
"github.com/mysteriumnetwork/node/core/quality"
"github.com/mysteriumnetwork/node/eventbus"
"github.com/rs/zerolog/log"
)

// ProviderChecker is a service for provider testing
type ProviderChecker struct {
bus eventbus.Publisher
}

// NewProviderChecker is a ProviderChecker constructor
func NewProviderChecker(bus eventbus.Publisher) *ProviderChecker {
return &ProviderChecker{
bus: bus,
}
}

// Diag is used to start provider check
func (p *ProviderChecker) Diag(cm *connectionManager, providerID string) {
c, ok := cm.activeConnection.(ConnectionDiag)
res := false
if ok {
log.Debug().Msgf("Check provider> %v", providerID)

res = c.Diag()
cm.Disconnect()
}
ev := quality.DiagEvent{ProviderID: providerID, Result: res}
p.bus.Publish(quality.AppTopicConnectionDiagRes, ev)
}
6 changes: 4 additions & 2 deletions core/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ type Options struct {

Payments OptionsPayments

Consumer bool
Mobile bool
Consumer bool
Mobile bool
ProvChecker bool

SwarmDialerDNSHeadstart time.Duration
PilvytisAddress string
Expand Down Expand Up @@ -205,6 +206,7 @@ func GetOptions() *Options {
SSE: OptionsSSE{
Enabled: config.GetBool(config.FlagSSEEnable),
},
ProvChecker: config.GetBool(config.FlagProvCheckerMode),
}
}

Expand Down
9 changes: 9 additions & 0 deletions core/quality/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ type PingEvent struct {
Duration time.Duration `json:"duration"`
}

// DiagEvent represents provider check result event
type DiagEvent struct {
ProviderID string
Result bool
}

const (
// AppTopicConnectionEvents represents event bus topic for the connection events.
AppTopicConnectionEvents = "connection_events"
Expand All @@ -111,4 +117,7 @@ const (

// AppTopicProviderPingP2P represents event bus topic for provider p2p pings to consumer.
AppTopicProviderPingP2P = "provider_ping_p2p"

// AppTopicConnectionDiagRes represents event bus topic for provider check result.
AppTopicConnectionDiagRes = "connection_diag"
)
7 changes: 7 additions & 0 deletions services/wireguard/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (c *Connection) State() <-chan connectionstate.State {
return c.stateCh
}

// Diag is used to start provider check
func (c *Connection) Diag() bool {
return c.connectionEndpoint.Diag()
}

// Statistics returns connection statistics channel.
func (c *Connection) Statistics() (connectionstate.Statistics, error) {
stats, err := c.connectionEndpoint.PeerStats()
Expand All @@ -110,6 +115,8 @@ func (c *Connection) Reconnect(ctx context.Context, options connection.ConnectOp
}

func (c *Connection) start(ctx context.Context, start startConn, options connection.ConnectOptions) (err error) {
log.Info().Msg("+++++++++++++++++++++++++++++++++++++++++++++++++++++ *Connection) start")

var config wg.ServiceConfig
if err = json.Unmarshal(options.SessionConfig, &config); err != nil {
return errors.Wrap(err, "failed to unmarshal connection config")
Expand Down
3 changes: 3 additions & 0 deletions services/wireguard/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func (mce *mockConnectionEndpoint) ConfigureRoutes(_ net.IP) error { retur
func (mce *mockConnectionEndpoint) PeerStats() (wgcfg.Stats, error) {
return wgcfg.Stats{LastHandshake: time.Now(), BytesSent: 10, BytesReceived: 11}, nil
}
func (mce *mockConnectionEndpoint) Diag() bool {
return true
}

type mockHandshakeWaiter struct {
err error
Expand Down
1 change: 1 addition & 0 deletions services/wireguard/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ type ConnectionEndpoint interface {
Config() (ServiceConfig, error)
InterfaceName() string
Stop() error
Diag() bool
}
Loading

0 comments on commit 782e1ed

Please sign in to comment.