From 782e1ed50303425e208cfbce9be2c2de36a79e59 Mon Sep 17 00:00:00 2001 From: Anton Litvinov Date: Mon, 13 May 2024 02:24:46 +0400 Subject: [PATCH] Provider checker mode Signed-off-by: Anton Litvinov --- cmd/bootstrap.go | 1 + cmd/di.go | 15 +- config/flags_node.go | 10 + core/connection/interface.go | 5 + core/connection/manager.go | 10 + core/connection/manager_test.go | 4 + core/connection/pinger.go | 50 ++++ core/node/options.go | 6 +- core/quality/metrics.go | 9 + services/wireguard/connection/connection.go | 7 + .../wireguard/connection/connection_test.go | 3 + services/wireguard/endpoint.go | 1 + .../wireguard/endpoint/diagclient/client.go | 150 +++++++++++ services/wireguard/endpoint/endpoint.go | 10 + services/wireguard/endpoint/wg_client.go | 10 + services/wireguard/service/service_test.go | 3 + tequilapi/contract/connection.go | 8 + tequilapi/endpoints/connection-diag.go | 244 ++++++++++++++++++ 18 files changed, 541 insertions(+), 5 deletions(-) create mode 100644 core/connection/pinger.go create mode 100644 services/wireguard/endpoint/diagclient/client.go create mode 100644 tequilapi/endpoints/connection-diag.go diff --git a/cmd/bootstrap.go b/cmd/bootstrap.go index a45f464cee..e38b9cb968 100644 --- a/cmd/bootstrap.go +++ b/cmd/bootstrap.go @@ -70,6 +70,7 @@ func (di *Dependencies) bootstrapTequilapi(nodeOptions node.Options, listener ne tequilapi_endpoints.AddRoutesForAuthentication(di.Authenticator, di.JWTAuthenticator, di.SSOMystnodes), tequilapi_endpoints.AddRoutesForIdentities(di.IdentityManager, di.IdentitySelector, di.IdentityRegistry, di.ConsumerBalanceTracker, di.AddressProvider, di.HermesChannelRepository, di.BCHelper, di.Transactor, di.BeneficiaryProvider, di.IdentityMover, di.BeneficiaryAddressStorage, di.HermesMigrator), tequilapi_endpoints.AddRoutesForConnection(di.MultiConnectionManager, di.StateKeeper, di.ProposalRepository, di.IdentityRegistry, di.EventBus, di.AddressProvider), + tequilapi_endpoints.AddRoutesForConnectionDiag(di.MultiConnectionManager, di.StateKeeper, di.ProposalRepository, di.IdentityRegistry, di.EventBus, di.EventBus, di.AddressProvider, di.IdentitySelector, nodeOptions), tequilapi_endpoints.AddRoutesForSessions(di.SessionStorage), tequilapi_endpoints.AddRoutesForConnectionLocation(di.IPResolver, di.LocationResolver, di.LocationResolver), tequilapi_endpoints.AddRoutesForProposals(di.ProposalRepository, di.PricingHelper, di.LocationResolver, di.FilterPresetStorage, di.NATProber), diff --git a/cmd/di.go b/cmd/di.go index 1088d8ce3c..d1a161e728 100644 --- a/cmd/di.go +++ b/cmd/di.go @@ -210,6 +210,8 @@ type Dependencies struct { NodeStatusTracker *monitoring.StatusTracker NodeStatsTracker *node.StatsTracker uiVersionConfig versionmanager.NodeUIVersionConfig + + provPinger *connection.ProviderChecker } // Bootstrap initiates all container dependencies @@ -287,7 +289,7 @@ func (di *Dependencies) Bootstrap(nodeOptions node.Options) error { return err } - if err := di.bootstrapQualityComponents(nodeOptions.Quality); err != nil { + if err := di.bootstrapQualityComponents(nodeOptions.Quality, nodeOptions); err != nil { return err } @@ -299,6 +301,7 @@ func (di *Dependencies) Bootstrap(nodeOptions node.Options) error { if err = di.handleConnStateChange(); err != nil { return err } + if err := di.Node.Start(); err != nil { return err } @@ -581,6 +584,7 @@ func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options, tequil di.bootstrapBeneficiarySaver(nodeOptions) di.ConnectionRegistry = connection.NewRegistry() + di.MultiConnectionManager = connection.NewMultiConnectionManager(func() connection.Manager { return connection.NewManager( pingpong.ExchangeFactoryFunc( @@ -604,6 +608,7 @@ func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options, tequil di.P2PDialer, di.allowTrustedDomainBypassTunnel, di.disallowTrustedDomainBypassTunnel, + di.provPinger, ) }) @@ -883,7 +888,7 @@ func (di *Dependencies) bootstrapIdentityComponents(options node.Options) error return nil } -func (di *Dependencies) bootstrapQualityComponents(options node.OptionsQuality) (err error) { +func (di *Dependencies) bootstrapQualityComponents(options node.OptionsQuality, nodeOptions node.Options) (err error) { if err := di.AllowURLAccess(options.Address); err != nil { return err } @@ -924,6 +929,10 @@ func (di *Dependencies) bootstrapQualityComponents(options node.OptionsQuality) return err } + if nodeOptions.ProvChecker { + di.provPinger = connection.NewProviderChecker(di.EventBus) + } + return nil } @@ -1065,7 +1074,7 @@ func (di *Dependencies) handleConnStateChange() error { latestState := connectionstate.NotConnected return di.EventBus.SubscribeAsync(connectionstate.AppTopicConnectionState, func(e connectionstate.AppEventConnectionState) { - if config.GetBool(config.FlagProxyMode) || config.GetBool(config.FlagDVPNMode) { + if config.GetBool(config.FlagProxyMode) || config.GetBool(config.FlagDVPNMode) || config.GetBool(config.FlagProvCheckerMode) { return // Proxy mode doesn't establish system wide tunnels, no reconnect required. } diff --git a/config/flags_node.go b/config/flags_node.go index acaf62e56c..8c8fd3fada 100644 --- a/config/flags_node.go +++ b/config/flags_node.go @@ -229,6 +229,13 @@ var ( Value: false, } + // FlagProvCheckerMode allows running node under current user as a provider checker agent. + FlagProvCheckerMode = cli.BoolFlag{ + Name: "provchecker", + Usage: "", + Value: false, + } + // FlagUserspace allows running a node without privileged permissions. FlagUserspace = cli.BoolFlag{ Name: "userspace", @@ -349,6 +356,7 @@ func RegisterFlagsNode(flags *[]cli.Flag) error { &FlagUserMode, &FlagDVPNMode, &FlagProxyMode, + &FlagProvCheckerMode, &FlagUserspace, &FlagVendorID, &FlagLauncherVersion, @@ -411,6 +419,8 @@ func ParseFlagsNode(ctx *cli.Context) { Current.ParseBoolFlag(ctx, FlagUserMode) Current.ParseBoolFlag(ctx, FlagDVPNMode) Current.ParseBoolFlag(ctx, FlagProxyMode) + Current.ParseBoolFlag(ctx, FlagProvCheckerMode) + Current.ParseBoolFlag(ctx, FlagUserspace) Current.ParseStringFlag(ctx, FlagVendorID) Current.ParseStringFlag(ctx, FlagLauncherVersion) diff --git a/core/connection/interface.go b/core/connection/interface.go index cf7f678140..531cca8f0f 100644 --- a/core/connection/interface.go +++ b/core/connection/interface.go @@ -39,6 +39,11 @@ type Connection interface { Statistics() (connectionstate.Statistics, error) } +// ConnectionDiag is a specialised Connection interface for provider check +type ConnectionDiag interface { + Diag() bool +} + // StateChannel is the channel we receive state change events on type StateChannel chan connectionstate.State diff --git a/core/connection/manager.go b/core/connection/manager.go index e87d5a32ad..e1e53a14d9 100644 --- a/core/connection/manager.go +++ b/core/connection/manager.go @@ -170,6 +170,8 @@ type connectionManager struct { statsTracker statsTracker uuid string + + provChecker *ProviderChecker } // NewManager creates connection manager with given dependencies @@ -184,6 +186,7 @@ func NewManager( validator validator, p2pDialer p2p.Dialer, preReconnect, postReconnect func(), + provChecker *ProviderChecker, ) *connectionManager { uuid, err := uuid.NewV4() if err != nil { @@ -207,6 +210,7 @@ func NewManager( preReconnect: preReconnect, postReconnect: postReconnect, uuid: uuid.String(), + provChecker: provChecker, } m.eventBus.SubscribeAsync(connectionstate.AppTopicConnectionState, m.reconnectOnHold) @@ -301,6 +305,10 @@ func (m *connectionManager) Connect(consumerID identity.Identity, hermesID commo return nil }) + if m.provChecker != nil { + go m.provChecker.Diag(m, proposal.ProviderID) + } + go m.consumeConnectionStates(m.activeConnection.State()) go m.checkSessionIP(m.channel, m.connectOptions.ConsumerID, m.connectOptions.SessionID, originalPublicIP) @@ -801,6 +809,8 @@ func (m *connectionManager) Cancel() { } func (m *connectionManager) Disconnect() error { + log.Trace().Msg("connectionManager) Disconnect >>>>>>>>>>>>>>>>>") + if m.Status().State == connectionstate.NotConnected { return ErrNoConnection } diff --git a/core/connection/manager_test.go b/core/connection/manager_test.go index 4e7ec1da1d..92ba87f454 100644 --- a/core/connection/manager_test.go +++ b/core/connection/manager_test.go @@ -61,6 +61,8 @@ type testContext struct { statsReportInterval time.Duration mockP2P *mockP2PDialer mockTime time.Time + provChecker *ProviderChecker + sync.RWMutex } @@ -140,6 +142,7 @@ func (tc *testContext) SetupTest() { tc.mockP2P = &mockP2PDialer{&mockP2PChannel{}} tc.mockTime = time.Date(2000, time.January, 0, 10, 12, 3, 0, time.UTC) + tc.provChecker = NewProviderChecker(tc.stubPublisher) tc.connManager = NewManager( func(senderUUID string, channel p2p.Channel, @@ -159,6 +162,7 @@ func (tc *testContext) SetupTest() { &mockValidator{}, tc.mockP2P, func() {}, func() {}, + tc.provChecker, ) tc.connManager.timeGetter = func() time.Time { return tc.mockTime diff --git a/core/connection/pinger.go b/core/connection/pinger.go new file mode 100644 index 0000000000..9ed462507d --- /dev/null +++ b/core/connection/pinger.go @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024 The "MysteriumNetwork/node" Authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package connection + +import ( + "github.com/mysteriumnetwork/node/core/quality" + "github.com/mysteriumnetwork/node/eventbus" + "github.com/rs/zerolog/log" +) + +// ProviderChecker is a service for provider testing +type ProviderChecker struct { + bus eventbus.Publisher +} + +// NewProviderChecker is a ProviderChecker constructor +func NewProviderChecker(bus eventbus.Publisher) *ProviderChecker { + return &ProviderChecker{ + bus: bus, + } +} + +// Diag is used to start provider check +func (p *ProviderChecker) Diag(cm *connectionManager, providerID string) { + c, ok := cm.activeConnection.(ConnectionDiag) + res := false + if ok { + log.Debug().Msgf("Check provider> %v", providerID) + + res = c.Diag() + cm.Disconnect() + } + ev := quality.DiagEvent{ProviderID: providerID, Result: res} + p.bus.Publish(quality.AppTopicConnectionDiagRes, ev) +} diff --git a/core/node/options.go b/core/node/options.go index 04085ff27e..7ecb9cd22a 100644 --- a/core/node/options.go +++ b/core/node/options.go @@ -81,8 +81,9 @@ type Options struct { Payments OptionsPayments - Consumer bool - Mobile bool + Consumer bool + Mobile bool + ProvChecker bool SwarmDialerDNSHeadstart time.Duration PilvytisAddress string @@ -205,6 +206,7 @@ func GetOptions() *Options { SSE: OptionsSSE{ Enabled: config.GetBool(config.FlagSSEEnable), }, + ProvChecker: config.GetBool(config.FlagProvCheckerMode), } } diff --git a/core/quality/metrics.go b/core/quality/metrics.go index 51d3489607..9beedfd6bb 100644 --- a/core/quality/metrics.go +++ b/core/quality/metrics.go @@ -102,6 +102,12 @@ type PingEvent struct { Duration time.Duration `json:"duration"` } +// DiagEvent represents provider check result event +type DiagEvent struct { + ProviderID string + Result bool +} + const ( // AppTopicConnectionEvents represents event bus topic for the connection events. AppTopicConnectionEvents = "connection_events" @@ -111,4 +117,7 @@ const ( // AppTopicProviderPingP2P represents event bus topic for provider p2p pings to consumer. AppTopicProviderPingP2P = "provider_ping_p2p" + + // AppTopicConnectionDiagRes represents event bus topic for provider check result. + AppTopicConnectionDiagRes = "connection_diag" ) diff --git a/services/wireguard/connection/connection.go b/services/wireguard/connection/connection.go index 3657b8b00c..7909dfd815 100644 --- a/services/wireguard/connection/connection.go +++ b/services/wireguard/connection/connection.go @@ -86,6 +86,11 @@ func (c *Connection) State() <-chan connectionstate.State { return c.stateCh } +// Diag is used to start provider check +func (c *Connection) Diag() bool { + return c.connectionEndpoint.Diag() +} + // Statistics returns connection statistics channel. func (c *Connection) Statistics() (connectionstate.Statistics, error) { stats, err := c.connectionEndpoint.PeerStats() @@ -110,6 +115,8 @@ func (c *Connection) Reconnect(ctx context.Context, options connection.ConnectOp } func (c *Connection) start(ctx context.Context, start startConn, options connection.ConnectOptions) (err error) { + log.Info().Msg("+++++++++++++++++++++++++++++++++++++++++++++++++++++ *Connection) start") + var config wg.ServiceConfig if err = json.Unmarshal(options.SessionConfig, &config); err != nil { return errors.Wrap(err, "failed to unmarshal connection config") diff --git a/services/wireguard/connection/connection_test.go b/services/wireguard/connection/connection_test.go index cbc2930b6a..ad82dbc3a8 100644 --- a/services/wireguard/connection/connection_test.go +++ b/services/wireguard/connection/connection_test.go @@ -158,6 +158,9 @@ func (mce *mockConnectionEndpoint) ConfigureRoutes(_ net.IP) error { retur func (mce *mockConnectionEndpoint) PeerStats() (wgcfg.Stats, error) { return wgcfg.Stats{LastHandshake: time.Now(), BytesSent: 10, BytesReceived: 11}, nil } +func (mce *mockConnectionEndpoint) Diag() bool { + return true +} type mockHandshakeWaiter struct { err error diff --git a/services/wireguard/endpoint.go b/services/wireguard/endpoint.go index 8d2201b795..e6df362a67 100644 --- a/services/wireguard/endpoint.go +++ b/services/wireguard/endpoint.go @@ -34,4 +34,5 @@ type ConnectionEndpoint interface { Config() (ServiceConfig, error) InterfaceName() string Stop() error + Diag() bool } diff --git a/services/wireguard/endpoint/diagclient/client.go b/services/wireguard/endpoint/diagclient/client.go new file mode 100644 index 0000000000..646ff7f14e --- /dev/null +++ b/services/wireguard/endpoint/diagclient/client.go @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2024 The "MysteriumNetwork/node" Authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package diagclient + +import ( + "bufio" + "fmt" + "io" + "net/http" + "net/netip" + "strings" + "sync" + "time" + + "github.com/rs/zerolog/log" + "golang.zx2c4.com/wireguard/conn" + "golang.zx2c4.com/wireguard/device" + + "github.com/mysteriumnetwork/node/services/wireguard/endpoint/netstack" + "github.com/mysteriumnetwork/node/services/wireguard/endpoint/userspace" + "github.com/mysteriumnetwork/node/services/wireguard/wgcfg" +) + +type client struct { + mu sync.Mutex + Device *device.Device + tnet *netstack.Net +} + +// New create new WireGuard client testing the provider. +func New() (*client, error) { + log.Error().Msg("Creating pinger wg client") + return &client{}, nil +} + +func (c *client) ReConfigureDevice(config wgcfg.DeviceConfig) error { + return c.ConfigureDevice(config) +} + +func (c *client) ConfigureDevice(cfg wgcfg.DeviceConfig) error { + localAddr, err := netip.ParseAddr(cfg.Subnet.IP.String()) + if err != nil { + return fmt.Errorf("could not parse local addr: %w", err) + } + if len(cfg.DNS) == 0 { + return fmt.Errorf("DNS addr list is empty") + } + dnsAddr, err := netip.ParseAddr(cfg.DNS[0]) + if err != nil { + return fmt.Errorf("could not parse DNS addr: %w", err) + } + tunnel, tnet, err := netstack.CreateNetTUN([]netip.Addr{localAddr}, []netip.Addr{dnsAddr}, device.DefaultMTU) + if err != nil { + return fmt.Errorf("failed to create netstack device %s: %w", cfg.IfaceName, err) + } + + logger := device.NewLogger(device.LogLevelVerbose, fmt.Sprintf("(%s) ", cfg.IfaceName)) + wgDevice := device.NewDevice(tunnel, conn.NewDefaultBind(), logger) + + log.Info().Msg("Applying interface configuration") + if err := wgDevice.IpcSetOperation(bufio.NewReader(strings.NewReader(cfg.Encode()))); err != nil { + wgDevice.Close() + return fmt.Errorf("could not set device uapi config: %w", err) + } + + log.Info().Msg("Bringing device up") + + wgDevice.Up() + + c.mu.Lock() + c.Device = wgDevice + c.mu.Unlock() + c.tnet = tnet + + return nil +} + +func (c *client) DestroyDevice(iface string) error { + return c.Close() +} + +func (c *client) PeerStats(iface string) (wgcfg.Stats, error) { + deviceState, err := userspace.ParseUserspaceDevice(c.Device.IpcGetOperation) + if err != nil { + return wgcfg.Stats{}, fmt.Errorf("could not parse device state: %w", err) + } + + stats, statErr := userspace.ParseDevicePeerStats(deviceState) + if statErr != nil { + err = statErr + log.Warn().Err(err).Msg("Failed to parse device stats, will try again") + } else { + return stats, nil + } + + return wgcfg.Stats{}, fmt.Errorf("could not parse device state: %w", err) +} + +func (c *client) Close() (err error) { + c.mu.Lock() + defer c.mu.Unlock() + + log.Error().Err(err).Msg("Shutting down pinger ...") + + if c.Device != nil { + go func() { + time.Sleep(5 * time.Second) + c.Device.Close() + }() + } + return nil +} + +func (c *client) Diag() bool { + client := http.Client{ + Transport: &http.Transport{ + DialContext: c.tnet.DialContext, + }, + } + resp, err := client.Get("http://1.1.1.1/") + if err != nil { + log.Error().Err(err).Msg("Get failed") + return false + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Error().Err(err).Msg("Readall failed") + return false + } + _ = body + + return true +} diff --git a/services/wireguard/endpoint/endpoint.go b/services/wireguard/endpoint/endpoint.go index 7b77489300..c9b8c7bc73 100644 --- a/services/wireguard/endpoint/endpoint.go +++ b/services/wireguard/endpoint/endpoint.go @@ -52,6 +52,14 @@ type connectionEndpoint struct { wgClient WgClient } +func (ce *connectionEndpoint) Diag() bool { + c, ok := ce.wgClient.(WgClientDiag) + if ok { + return c.Diag() + } + return false +} + // StartConsumerMode starts and configure wireguard network interface running in consumer mode. func (ce *connectionEndpoint) StartConsumerMode(cfg wgcfg.DeviceConfig) error { if err := ce.cleanAbandonedInterfaces(); err != nil { @@ -80,6 +88,8 @@ func (ce *connectionEndpoint) StartConsumerMode(cfg wgcfg.DeviceConfig) error { } return errors.Wrap(err, "could not configure device") } + + // ce.wgClient.Diag() return nil } diff --git a/services/wireguard/endpoint/wg_client.go b/services/wireguard/endpoint/wg_client.go index b991bb2ab2..74cd0f6111 100644 --- a/services/wireguard/endpoint/wg_client.go +++ b/services/wireguard/endpoint/wg_client.go @@ -24,6 +24,7 @@ import ( "github.com/rs/zerolog/log" "github.com/mysteriumnetwork/node/config" + "github.com/mysteriumnetwork/node/services/wireguard/endpoint/diagclient" "github.com/mysteriumnetwork/node/services/wireguard/endpoint/dvpnclient" "github.com/mysteriumnetwork/node/services/wireguard/endpoint/kernelspace" netstack_provider "github.com/mysteriumnetwork/node/services/wireguard/endpoint/netstack-provider" @@ -43,6 +44,11 @@ type WgClient interface { Close() error } +// WgClientDiag is a specialised WgClient interface for provider check +type WgClientDiag interface { + Diag() bool +} + // WgClientFactory represents WireGuard client factory. type WgClientFactory struct { once sync.Once @@ -56,6 +62,10 @@ func NewWGClientFactory() *WgClientFactory { // NewWGClient returns a new wireguard client. func (wcf *WgClientFactory) NewWGClient() (WgClient, error) { + + if config.GetBool(config.FlagProvCheckerMode) { + return diagclient.New() + } if config.GetBool(config.FlagDVPNMode) { return dvpnclient.New() } diff --git a/services/wireguard/service/service_test.go b/services/wireguard/service/service_test.go index 96f210be70..aa4742a17e 100644 --- a/services/wireguard/service/service_test.go +++ b/services/wireguard/service/service_test.go @@ -153,6 +153,9 @@ func (mce *mockConnectionEndpoint) ConfigureRoutes(_ net.IP) error { retur func (mce *mockConnectionEndpoint) PeerStats() (wgcfg.Stats, error) { return wgcfg.Stats{LastHandshake: time.Now()}, nil } +func (mce *mockConnectionEndpoint) Diag() bool { + return true +} func newManagerStub(pub, out, country string) *Manager { dnsHandler, _ := dns.ResolveViaSystem() diff --git a/tequilapi/contract/connection.go b/tequilapi/contract/connection.go index c20432e63e..226e9fdb37 100644 --- a/tequilapi/contract/connection.go +++ b/tequilapi/contract/connection.go @@ -51,6 +51,14 @@ func NewConnectionInfoDTO(session connectionstate.Status) ConnectionInfoDTO { return response } +// ConnectionDiagInfoDTO holds provider check result +// swagger:model ConnectionDiagInfoDTO +type ConnectionDiagInfoDTO struct { + Status bool `json:"status"` + Error interface{} `json:"error"` + ProviderID string `json:"provider_id"` +} + // ConnectionInfoDTO holds partial consumer connection details. // swagger:model ConnectionInfoDTO type ConnectionInfoDTO struct { diff --git a/tequilapi/endpoints/connection-diag.go b/tequilapi/endpoints/connection-diag.go new file mode 100644 index 0000000000..6783232d12 --- /dev/null +++ b/tequilapi/endpoints/connection-diag.go @@ -0,0 +1,244 @@ +/* + * Copyright (C) 2024 The "MysteriumNetwork/node" Authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package endpoints + +import ( + "fmt" + "strconv" + + "github.com/ethereum/go-ethereum/common" + "github.com/gin-gonic/gin" + "github.com/gofrs/uuid" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + + "github.com/mysteriumnetwork/go-rest/apierror" + "github.com/mysteriumnetwork/node/config" + "github.com/mysteriumnetwork/node/core/connection" + "github.com/mysteriumnetwork/node/core/discovery/proposal" + "github.com/mysteriumnetwork/node/core/node" + "github.com/mysteriumnetwork/node/core/quality" + "github.com/mysteriumnetwork/node/eventbus" + "github.com/mysteriumnetwork/node/identity" + "github.com/mysteriumnetwork/node/identity/registry" + "github.com/mysteriumnetwork/node/identity/selector" + "github.com/mysteriumnetwork/node/tequilapi/contract" + "github.com/mysteriumnetwork/node/tequilapi/utils" +) + +// ConnectionDiagEndpoint struct represents /connection resource and it's subresources +type ConnectionDiagEndpoint struct { + manager connection.MultiManager + publisher eventbus.Publisher + publisher2 eventbus.Subscriber + + stateProvider stateProvider + // TODO connection should use concrete proposal from connection params and avoid going to marketplace + proposalRepository proposalRepository + identityRegistry identityRegistry + addressProvider addressProvider + identitySelector selector.Handler +} + +// NewConnectionDiagEndpoint creates and returns connection endpoint +func NewConnectionDiagEndpoint(manager connection.MultiManager, stateProvider stateProvider, proposalRepository proposalRepository, identityRegistry identityRegistry, publisher eventbus.Publisher, publisher2 eventbus.Subscriber, addressProvider addressProvider, identitySelector selector.Handler) *ConnectionDiagEndpoint { + return &ConnectionDiagEndpoint{ + manager: manager, + publisher: publisher, + publisher2: publisher2, + stateProvider: stateProvider, + proposalRepository: proposalRepository, + identityRegistry: identityRegistry, + addressProvider: addressProvider, + identitySelector: identitySelector, + } +} + +// Status returns result of provider check +// swagger:operation GET /prov-checker ConnectionDiagInfoDTO +// +// --- +// summary: Returns connection status +// description: Returns status of current connection +// responses: +// 200: +// description: Status +// schema: +// "$ref": "#/definitions/ConnectionInfoDTO" +// 400: +// description: Failed to parse or request validation failed +// schema: +// "$ref": "#/definitions/APIError" +// 500: +// description: Internal server error +// schema: +// "$ref": "#/definitions/APIError" +func (ce *ConnectionDiagEndpoint) Status(c *gin.Context) { + n := 0 + id := c.Query("id") + if len(id) > 0 { + var err error + n, err = strconv.Atoi(id) + if err != nil { + c.Error(apierror.ParseFailed()) + return + } + } + status := ce.manager.Status(n) + statusResponse := contract.NewConnectionInfoDTO(status) + utils.WriteAsJSON(statusResponse, c.Writer) +} + +// Diag is used to start provider check +func (ce *ConnectionDiagEndpoint) Diag(c *gin.Context) { + log.Error().Msgf("Diag >>>") + + chainID := config.GetInt64(config.FlagChainID) + consumerID_, err := ce.identitySelector.UseOrCreate(config.FlagIdentity.Value, config.FlagIdentityPassphrase.Value, chainID) + if err != nil { + c.Error(apierror.Internal("Failed to unlock identity", err.Error())) + return + } + log.Error().Msgf("Unlocked identity: %v", consumerID_) + + hermes, err := ce.addressProvider.GetActiveHermes(config.GetInt64(config.FlagChainID)) + if err != nil { + c.Error(apierror.Internal("Failed to get active hermes", contract.ErrCodeActiveHermes)) + return + } + + prov := c.Query("id") + if len(prov) == 0 { + c.Error(errors.New("Empty prameter: prov")) + return + } + cr := &contract.ConnectionCreateRequest{ + ConsumerID: consumerID_.Address, + ProviderID: prov, + Filter: contract.ConnectionCreateFilter{IncludeMonitoringFailed: true}, + HermesID: hermes.Hex(), + ServiceType: "wireguard", + ConnectOptions: contract.ConnectOptions{}, + } + + if err := cr.Validate(); err != nil { + c.Error(err) + return + } + + consumerID := identity.FromAddress(cr.ConsumerID) + status, err := ce.identityRegistry.GetRegistrationStatus(config.GetInt64(config.FlagChainID), consumerID) + if err != nil { + log.Error().Err(err).Stack().Msg("Could not check registration status") + c.Error(apierror.Internal("Failed to check ID registration status: "+err.Error(), contract.ErrCodeIDRegistrationCheck)) + return + } + + switch status { + case registry.Unregistered, registry.RegistrationError, registry.Unknown: + log.Error().Msgf("Identity %q is not registered, aborting...", cr.ConsumerID) + c.Error(apierror.Unprocessable(fmt.Sprintf("Identity %q is not registered. Please register the identity first", cr.ConsumerID), contract.ErrCodeIDNotRegistered)) + return + case registry.InProgress: + log.Info().Msgf("identity %q registration is in progress, continuing...", cr.ConsumerID) + case registry.Registered: + log.Info().Msgf("identity %q is registered, continuing...", cr.ConsumerID) + default: + log.Error().Msgf("identity %q has unknown status, aborting...", cr.ConsumerID) + c.Error(apierror.Unprocessable(fmt.Sprintf("Identity %q has unknown status. Aborting", cr.ConsumerID), contract.ErrCodeIDStatusUnknown)) + return + } + + if len(cr.ProviderID) > 0 { + cr.Filter.Providers = append(cr.Filter.Providers, cr.ProviderID) + } + + f := &proposal.Filter{ + ServiceType: cr.ServiceType, + LocationCountry: cr.Filter.CountryCode, + ProviderIDs: cr.Filter.Providers, + IPType: cr.Filter.IPType, + IncludeMonitoringFailed: cr.Filter.IncludeMonitoringFailed, + AccessPolicy: "all", + } + proposalLookup := connection.FilteredProposals(f, cr.Filter.SortBy, ce.proposalRepository) + + res := make(chan bool) + cb := func(r quality.DiagEvent) { + if r.ProviderID == prov { + res <- r.Result + } + } + + uid, err := uuid.NewV4() + if err != nil { + log.Error().Msgf("Error > %v", err) + c.Error(err) + return + } + + ce.publisher2.SubscribeWithUID(quality.AppTopicConnectionDiagRes, uid.String(), cb) + defer ce.publisher2.UnsubscribeWithUID(quality.AppTopicConnectionDiagRes, uid.String(), cb) + + err = ce.manager.Connect(consumerID, common.HexToAddress(cr.HermesID), proposalLookup, getConnectOptions(cr)) + if err != nil { + switch err { + case connection.ErrAlreadyExists: + c.Error(apierror.Unprocessable("Connection already exists", contract.ErrCodeConnectionAlreadyExists)) + case connection.ErrConnectionCancelled: + c.Error(apierror.Unprocessable("Connection cancelled", contract.ErrCodeConnectionCancelled)) + default: + log.Error().Err(err).Msg("Failed to connect") + c.Error(apierror.Internal("Failed to connect: "+err.Error(), contract.ErrCodeConnect)) + } + + return + } + + r := <-res + log.Debug().Msgf("Result > %v", r) + resp := contract.ConnectionDiagInfoDTO{ + ProviderID: prov, + Status: r, + } + utils.WriteAsJSON(resp, c.Writer) +} + +// AddRoutesForConnectionDiag adds proder check route to given router +func AddRoutesForConnectionDiag( + manager connection.MultiManager, + stateProvider stateProvider, + proposalRepository proposalRepository, + identityRegistry identityRegistry, + publisher eventbus.Publisher, + publisher2 eventbus.Subscriber, + addressProvider addressProvider, + identitySelector selector.Handler, + options node.Options, +) func(*gin.Engine) error { + ConnectionDiagEndpoint := NewConnectionDiagEndpoint(manager, stateProvider, proposalRepository, identityRegistry, publisher, publisher2, addressProvider, identitySelector) + return func(e *gin.Engine) error { + connGroup := e.Group("") + { + if options.ProvChecker { + connGroup.GET("/prov-checker", ConnectionDiagEndpoint.Diag) + } + } + return nil + } +}