This repository has been archived by the owner on Dec 18, 2019. It is now read-only.
forked from elastic/logstash-forwarder
-
Notifications
You must be signed in to change notification settings - Fork 8
/
lumberjack.go
229 lines (199 loc) · 4.99 KB
/
lumberjack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// The basic model of execution:
// - prospector: finds files in paths/globs to harvest, starts harvesters
// - harvester: reads a file, sends events to the spooler
// - spooler: buffers events until ready to flush to the publisher
// - publisher: writes to the network, notifies registrar
// - registrar: records positions of files read
// Finally, prospector uses the registrar information, on restart, to
// determine where in each file to resume a harvester.
package main
import (
_ "expvar"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"syscall"
"time"
)
var (
registry *hregistry
shutdownHandlers []func()
log_file_handle *os.File
)
// creates a file and writes the current process's pid into that file. The
// file name is specified on the command line.
func writePid() {
if options.PidFile == "" {
return
}
f, err := os.OpenFile(options.PidFile, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
if err != nil {
log.Printf("ERROR unable to open pidfile: %v", err)
return
}
fmt.Fprintln(f, os.Getpid())
onShutdown(rmPidfile)
}
// removes pidfile from disk
func rmPidfile() {
if options.PidFile == "" {
return
}
os.Remove(options.PidFile)
}
func awaitSignals() {
die, hup := make(chan os.Signal, 1), make(chan os.Signal, 1)
signal.Notify(die, os.Interrupt, os.Kill)
signal.Notify(hup, syscall.SIGHUP)
for {
select {
case <-die:
log.Println("lumberjack shutting down")
shutdown(nil)
case <-hup:
refreshLogfileHandle()
}
}
}
func refreshLogfileHandle() {
if options.LogFile == "" {
return
}
f, err := os.OpenFile(options.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("unable to open logfile destination: %v", err)
} else {
log.SetOutput(f)
}
if log_file_handle != nil {
log_file_handle.Close()
}
log_file_handle = f
}
func setupLogging() {
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
if options.UseSyslog {
configureSyslog()
} else if options.LogFile != "" {
refreshLogfileHandle()
}
}
// adds a shutdown handler to the list of shutdown handlers. These handlers
// are called when we exit lumberjack with a call to shutdown, but not with
// log.Fatal, so... don't use log.Fatal.
func onShutdown(fn func()) {
if shutdownHandlers == nil {
shutdownHandlers = make([]func(), 0, 8)
}
shutdownHandlers = append(shutdownHandlers, fn)
}
func shutdown(v interface{}) {
for _, fn := range shutdownHandlers {
fn()
}
log.Fatal(v)
}
var publisherId = 0
func startPublishers(conf NetworkConfig, out chan eventPage) error {
for _, group := range conf {
tlsConfig, err := group.TLS()
if err != nil {
return fmt.Errorf("unable to start publishers: %v", err)
}
for _, server := range group.Servers {
p := &Publisher{
id: publisherId,
sequence: 1,
addr: server,
tlsConfig: *tlsConfig,
timeout: group.timeout,
}
log.Printf("TLS config: %v\n", tlsConfig)
go p.publish(group.c_pages_unsent, out)
publisherId++
}
}
return nil
}
func startHttp() {
if options.HttpPort != "" {
log.Printf("starting http debug port on %s", options.HttpPort)
if err := http.ListenAndServe(options.HttpPort, nil); err != nil {
log.Printf("unable to open http port: %v", err)
}
} else {
log.Println("no http port specified")
}
}
// handles command line args. That is, positional arguments, not flag
// arguments. This is for handling subcommands, which at the time of writing,
// is just the ability to test a configuration file.
func handleArgs() {
if flag.NArg() == 0 {
return
}
switch flag.Arg(0) {
case "test-config":
if flag.NArg() < 2 {
shutdown("not enough arguments specified for test-config")
}
testConfig(flag.Arg(1))
default:
shutdown(fmt.Sprintf("unrecognized positional arg: %v", flag.Arg(0)))
}
}
func main() {
flag.Parse()
handleArgs()
runtime.GOMAXPROCS(options.NumThreads)
setupLogging()
writePid()
log.Println("lumberjack starting")
startCPUProfile()
config, err := LoadConfig(options.ConfigFile)
if err != nil {
fmt.Println(err)
shutdown(err.Error())
}
go cmdListener()
registry = newRegistry(config)
registrar_chan := make(chan eventPage, 1)
if len(config.Files) == 0 {
shutdown("No paths given. What files do you want me to watch?\n")
}
go reportFSEvents()
// Prospect the globs/paths given on the command line and launch harvesters
for _, fileconfig := range config.Files {
go Prospect(fileconfig, config.Network)
}
// Harvesters dump events into the spooler.
for _, group := range config.Network {
group.Spool()
}
if err := startPublishers(config.Network, registrar_chan); err != nil {
shutdown(err)
}
// registrar records last acknowledged positions in all files.
go Registrar(registrar_chan)
go startHttp()
awaitSignals()
}
func startCPUProfile() {
if options.CPUProfile != "" {
f, err := os.Create(options.CPUProfile)
if err != nil {
shutdown(err)
}
pprof.StartCPUProfile(f)
go func() {
time.Sleep(60 * time.Second)
pprof.StopCPUProfile()
panic("done")
}()
}
}