Skip to content

Commit

Permalink
Merge pull request #503 from TarsCloud/lbbniu/node_name
Browse files Browse the repository at this point in the history
feat: tars basic service call context adds node_name
  • Loading branch information
lbbniu committed Jan 11, 2024
2 parents f5ccfb8 + a10e7ba commit f4adeb5
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 16 deletions.
4 changes: 3 additions & 1 deletion tars/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func (a *application) initConfig() {
a.svrCfg.Node = sMap["node"]
a.svrCfg.App = sMap["app"]
a.svrCfg.Server = sMap["server"]
a.svrCfg.LocalIP = sMap["localip"]
a.svrCfg.LocalIP = c.GetStringWithDef("/tars/application/server<localip>", a.svrCfg.LocalIP)
a.svrCfg.NodeName = c.GetStringWithDef("/tars/application/server<node_name>", a.svrCfg.LocalIP)
a.svrCfg.Local = c.GetString("/tars/application/server<local>")
// svrCfg.Container = c.GetString("/tars/application<container>")

Expand Down Expand Up @@ -252,6 +253,7 @@ func (a *application) initConfig() {
a.cltCfg.ClientDialTimeout = tools.ParseTimeOut(c.GetIntWithDef("/tars/application/client<clientdialtimeout>", ClientDialTimeout))
a.cltCfg.ReqDefaultTimeout = c.GetInt32WithDef("/tars/application/client<reqdefaulttimeout>", ReqDefaultTimeout)
a.cltCfg.ObjQueueMax = c.GetInt32WithDef("/tars/application/client<objqueuemax>", ObjQueueMax)
a.cltCfg.context["node_name"] = a.svrCfg.NodeName
ca := c.GetString("/tars/application/client<ca>")
if ca != "" {
cert := c.GetString("/tars/application/client<cert>")
Expand Down
7 changes: 7 additions & 0 deletions tars/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type serverConfig struct {
LogNum uint64
LogLevel string
Version string
NodeName string
LocalIP string
Local string
BasePath string
Expand Down Expand Up @@ -91,6 +92,7 @@ type clientConfig struct {
ClientDialTimeout time.Duration
ReqDefaultTimeout int32
ObjQueueMax int32
context map[string]string
}

// GetServerConfig Get server config
Expand Down Expand Up @@ -180,6 +182,11 @@ func newClientConfig() *clientConfig {
ClientDialTimeout: tools.ParseTimeOut(ClientDialTimeout),
ReqDefaultTimeout: ReqDefaultTimeout,
ObjQueueMax: ObjQueueMax,
context: make(map[string]string),
}
return conf
}

func (c *clientConfig) Context() map[string]string {
return c.context
}
2 changes: 1 addition & 1 deletion tars/endpointmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func newEndpointManager(objName string, comm *Communicator, opts ...EndpointMana
query := new(queryf.QueryF)
TLOG.Debug("string to proxy locator ", obj)
e.comm.StringToProxy(obj, query)
e.registrar = tarsregistry.New(query)
e.registrar = tarsregistry.New(query, e.comm.Client)
} else {
e.registrar = e.comm.opt.registrar
}
Expand Down
4 changes: 2 additions & 2 deletions tars/nodef.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (n *NodeFHelper) KeepAlive(adapter string) {
}
n.si.Pid = int32(os.Getpid())
n.si.Adapter = adapter
_, err := n.sf.KeepAlive(&n.si)
_, err := n.sf.KeepAlive(&n.si, n.comm.Client.Context())
if err != nil {
TLOG.Error("keepalive fail:", adapter)
}
Expand All @@ -47,7 +47,7 @@ func (n *NodeFHelper) ReportVersion(version string) {
if n.sf == nil {
return
}
_, err := n.sf.ReportVersion(n.si.Application, n.si.ServerName, version)
_, err := n.sf.ReportVersion(n.si.Application, n.si.ServerName, version, n.comm.Client.Context())
if err != nil {
TLOG.Error("report Version fail:")
}
Expand Down
2 changes: 1 addition & 1 deletion tars/notifyf.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (n *NotifyHelper) ReportNotifyInfo(level int32, info string) {
n.tm.ELevel = notifyf.NOTIFYLEVEL(level)
n.tm.SMessage = info
TLOG.Debug(n.tm)
if err := n.tn.ReportNotifyInfo(&n.tm); err != nil {
if err := n.tn.ReportNotifyInfo(&n.tm, n.comm.Client.Context()); err != nil {
TLOG.Errorf("ReportNotifyInfo err: %v", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions tars/propertyf.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (p *PropertyReportHelper) ReportToServer() {
for k, v := range statMsg {
cnt++
if cnt >= 20 {
_, err := p.pf.ReportPropMsg(tmpStatMsg)
_, err := p.pf.ReportPropMsg(tmpStatMsg, p.comm.Client.Context())
if err != nil {
TLOG.Error("Send to property server Error", reflect.TypeOf(err), err)
}
Expand All @@ -434,7 +434,7 @@ func (p *PropertyReportHelper) ReportToServer() {
tmpStatMsg[k] = v
}
if len(tmpStatMsg) > 0 {
_, err := p.pf.ReportPropMsg(tmpStatMsg)
_, err := p.pf.ReportPropMsg(tmpStatMsg, p.comm.Client.Context())
if err != nil {
TLOG.Error("Send to property server Error", reflect.TypeOf(err), err)
}
Expand Down
4 changes: 2 additions & 2 deletions tars/rconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *RConf) GetConfigList() (fList []string, err error) {
Containername:string
*/
}
ret, err := c.tc.ListAllConfigByInfo(&info, &fList)
ret, err := c.tc.ListAllConfigByInfo(&info, &fList, c.comm.Client.Context())
if err != nil {
return fList, err
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func (c *RConf) getConfig(info configf.ConfigInfo) (config string, err error) {
set = v
}
info.Setdivision = set
ret, err := c.tc.LoadConfigByInfo(&info, &config)
ret, err := c.tc.LoadConfigByInfo(&info, &config, c.comm.Client.Context())
if err != nil {
return config, err
}
Expand Down
13 changes: 9 additions & 4 deletions tars/registry/tars/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import (
"github.com/TarsCloud/TarsGo/tars/registry"
)

type Context interface {
Context() map[string]string
}

type tarsRegistry struct {
query *queryf.QueryF
ctx Context
}

func New(query *queryf.QueryF) registry.Registrar {
return &tarsRegistry{query: query}
func New(query *queryf.QueryF, ctx Context) registry.Registrar {
return &tarsRegistry{query: query, ctx: ctx}
}

func (t *tarsRegistry) Registry(_ context.Context, _ *registry.ServantInstance) error {
Expand All @@ -25,7 +30,7 @@ func (t *tarsRegistry) Deregister(_ context.Context, _ *registry.ServantInstance
}

func (t *tarsRegistry) QueryServant(ctx context.Context, id string) (activeEp []registry.Endpoint, inactiveEp []registry.Endpoint, err error) {
ret, err := t.query.FindObjectByIdInSameGroupWithContext(ctx, id, &activeEp, &inactiveEp)
ret, err := t.query.FindObjectByIdInSameGroupWithContext(ctx, id, &activeEp, &inactiveEp, t.ctx.Context())
if err != nil {
return nil, nil, err
}
Expand All @@ -36,7 +41,7 @@ func (t *tarsRegistry) QueryServant(ctx context.Context, id string) (activeEp []
}

func (t *tarsRegistry) QueryServantBySet(ctx context.Context, id, set string) (activeEp []registry.Endpoint, inactiveEp []registry.Endpoint, err error) {
ret, err := t.query.FindObjectByIdInSameSetWithContext(ctx, id, set, &activeEp, &inactiveEp)
ret, err := t.query.FindObjectByIdInSameSetWithContext(ctx, id, set, &activeEp, &inactiveEp, t.ctx.Context())
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion tars/remotelogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type RemoteTimeWriter struct {
reportSuccessPtr *PropertyReport
reportFailPtr *PropertyReport
hasPrefix bool
comm *Communicator
}

// NewRemoteTimeWriter new and init RemoteTimeWriter
Expand All @@ -34,6 +35,7 @@ func NewRemoteTimeWriter() *RemoteTimeWriter {
log := GetServerConfig().Log
comm := GetCommunicator()
comm.StringToProxy(log, rw.logPtr)
rw.comm = comm
go rw.Sync2remote()
return rw
}
Expand Down Expand Up @@ -71,7 +73,7 @@ func (rw *RemoteTimeWriter) Sync2remote() {
}

func (rw *RemoteTimeWriter) sync2remote(s []string) error {
err := rw.logPtr.LoggerbyInfo(rw.logInfo, s)
err := rw.logPtr.LoggerbyInfo(rw.logInfo, s, rw.comm.Client.Context())
return err
}

Expand Down
4 changes: 2 additions & 2 deletions tars/statf.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *StatFHelper) getIntervCount(totalRspTime int32, intervalCount map[int32
func (s *StatFHelper) reportAndClear(mStat string, bFromClient bool) {
// report mStatInfo
if mStat == "mStatInfo" {
_, err := s.sf.ReportMicMsg(s.mStatInfo, bFromClient)
_, err := s.sf.ReportMicMsg(s.mStatInfo, bFromClient, s.comm.Client.Context())
if err != nil {
TLOG.Debug("mStatInfo report err:", err.Error())
}
Expand All @@ -124,7 +124,7 @@ func (s *StatFHelper) reportAndClear(mStat string, bFromClient bool) {
}
// report mStatInfoFromServer
if mStat == "mStatInfoFromServer" {
_, err := s.sf.ReportMicMsg(s.mStatInfoFromServer, bFromClient)
_, err := s.sf.ReportMicMsg(s.mStatInfoFromServer, bFromClient, s.comm.Client.Context())
if err != nil {
TLOG.Debug("mStatInfoFromServer report err:", err.Error())
}
Expand Down

0 comments on commit f4adeb5

Please sign in to comment.