子进程:
由一个进程(父进程)创建的进程,集成父进程大部分属性,同时可以被父进程守护和管理。
(2)你需要知道关于进程产生日志的形式:
进程产生日志有两类输出方式,一类是写入到文件中。另一类是直接写到stdout或者stderr,例如php的echo python的print golang的fmt.Println("")等等。
(3)是否知道docker-daemon与运行中container的关系?
一个container就是一个特殊的进程,它是由docker daemon创建并启动,因此container是docker daemon的子进程。由docker daemon守护和管理。因此container的stdout能够被docker daemon获取到。基于此理论,我们来分析docker daemon相关代码。docker-daemon关于日志源码分析container实例源码# /container/container.go:62 type CommonContainer struct{ StreamConfig *stream.Config ... } # /container/stream/streams.go:26 type Config struct { sync.WaitGroup stdout *broadcaster.Unbuffered stderr *broadcaster.Unbuffered stdin io.ReadCloser stdinPipe io.WriteCloser }找到如上所示对应的代码,显示了每一个container实例都有几个属性stdout,stderr,stdin,以及管道stdinPipe。这里说下stdinPipe,当容器使用-i参数启动时标准输入将被运行,daemon将能够使用此管道向容器内写入标准输入。
我们试想以上图例,如果是你,你怎么实现日志收集转发?# /container/container.go:312func(container *container) startlogger(cfg containertypes.logconfig) (logger.logger, error) { c, err := logger.getlogdriver(cfg.type)iferr != nil{returnnil, fmt.errorf("failed to get logging factory: %v", err) } ctx := logger.context{ config: cfg.config, containerid: container.id, containername: container.name, containerentrypoint: container.path, containerargs: container.args, containerimageid: container.imageid.string(), containerimagename: container.config.image, containercreated: container.created, containerenv: container.config.env, containerlabels: container.config.labels, daemonname: "docker", }// set logging file for "json-logger"ifcfg.type == jsonfilelog.name { ctx.logpath, err = container.getrootresourcepath(fmt.sprintf("%s-json.log", container.id))iferr != nil{returnnil, err } }returnc(ctx) } #/container/container.go:978func(container *container) startlogging() error {ifcontainer.hostconfig.logconfig.type == "none"{returnnil// do not start logging routines} l, err := container.startlogger(container.hostconfig.logconfig)iferr != nil{returnfmt.errorf("failed to initialize logging driver: %v", err) } copier := logger.newcopier(map[string]io.reader{"stdout": container.stdoutpipe(), "stderr": container.stderrpipe()}, l) container.logcopier = copier copier.run() container.logdriver = l// set logpath field only for json-file logdriverifjl, ok := l.(*jsonfilelog.jsonfilelogger); ok { container.logpath = jl.logpath() }returnnil}第一个方法是为container查找log-driver。首先根据容器配置的log-driver类别调用:logger.getlogdriver(cfg.type)返回一个方法类型:/daemon/logger/factory.go:9 type creator func(context) (logger, error)实质就是从工厂类注册的logdriver插件去查找,具体源码下文分析。获取到c方法后构建调用参数具体就是容器的一些信息。然后使用调用c方法返回driver。driver是个接口类型,我们看看有哪些方法:# /daemon/logger/logger.go:61typelogger interface{ log(*message) error name() stringclose() error }很简单的三个方法,也很容易理解,log()发送日志消息到driver,close()进行关闭操作(根据不同实现)。
也就是说我们自己实现一个logdriver,只需要实现如上三个方法,然后注册到logger工厂类中即可。下面我们来看/daemon/logger/factory.go
第二个方法就是处理日志了,获取到日志driver,在创建一个Copier,顾名思义就是复制日志,分别从stdout 和stderr复制到logger driver。下面看看具体关键实现:#/daemon/logger/copir.go:41func(c *Copier) copySrc(name string, src io.Reader) {deferc.copyJobs.Done() reader := bufio.NewReader(src)for{select{case<-c.closed:returndefault: line, err := reader.ReadBytes('
') line = bytes.TrimSuffix(line, []byte{'
'})// ReadBytes can return full or partial output even when it failed.// e.g. it can return a full entry and EOF.iferr == nil|| len(line) >0{iflogErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil{ logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) } }iferr != nil{iferr != io.EOF { logrus.Errorf("Error scanning log stream: %s", err) }return} } } }每读取一行数据,构建一个消息,调用logdriver的log方法发送到driver处理。日志driver注册器位于/daemon/logger/factory.go的源码实现即时日志driver的注册器,其中几个重要的方法(上文已经提到一个):# /daemon/logger/factory.go:21func(lf *logdriverFactory) register(name string, c Creator) error {iflf.driverRegistered(name) {returnfmt.Errorf("logger: log driver named '%s' is already registered", name) } lf.m.Lock() lf.registry[name] = c lf.m.Unlock()returnnil} # /daemon/logger/factory.go:39func(lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error { lf.m.Lock()deferlf.m.Unlock()if_, ok := lf.optValidator[name]; ok {returnfmt.Errorf("logger: log validator named '%s' is already registered", name) } lf.optValidator[name] = lreturnnil}看起来很简单,就是将一个Creator方法类型添加到一个map结构中,将LogOptValidator添加到另一个map这里注意加锁的操作。#/daemon/logger/factory.go:13 type LogOptValidator func(cfg map[string]string) error这个主要是验证driver的参数 ,dockerd和docker启动参数中有:--log-opt好雨云帮自己实现一个基于zmq的log-driver上文已经完整分析了docker daemon管理logdriver和处理日志的整个流程。相信你已经比较明白了。下面我们以zmq-driver为例讲讲我们怎么实现自己的driver。直接接收容器的日志。
上文我们已经谈了一个log-driver需要实现的几个方法。
我们可以看看位于/daemon/logger目录下的已有的driver的实现,例如fluentd,awslogs等。
下面我们来分析zmq-driver具体的代码://定义一个struct,这里包含一个zmq套接字typezmqlogger struct{ writer *zmq.socket containerid stringtenantid stringserviceid stringfelock sync.mutex }//定义init方法调用logger注册器的方法注册当前driver//和参数验证方法。funcinit() {iferr := logger.registerlogdriver(name, new); err != nil{ logrus.fatal(err) }iferr := logger.registerlogoptvalidator(name, validatelogopt); err != nil{ logrus.fatal(err) } }//实现一个上文提到的creator方法注册logdriver.//这里新建一个zmq套接字构建一个实例funcnew(ctx logger.context) (logger.logger, error) { zmqaddress := ctx.config[zmqaddress] puber, err := zmq.newsocket(zmq.pub)iferr != nil{returnnil, err }var( env = make(map[string]string) tenantid stringserviceid string)for_, pair := rangectx.containerenv { p := strings.splitn(pair, "=",2)//logrus.errorf("containerenv pair: %s", pair)iflen(p) ==2{ key := p[0] value := p[1] env[key] = value } } tenantid = env["tenant_id"] serviceid = env["service_id"]iftenantid == ""{ tenantid = "default"}ifserviceid == ""{ serviceid = "default"} puber.connect(zmqaddress)return&zmqlogger{ writer: puber, containerid: ctx.id(), tenantid: tenantid, serviceid: serviceid, felock: sync.mutex{}, }, nil}//实现log方法,这里使用zmq socket发送日志消息//这里必须注意,zmq socket是线程不安全的,我们知道//本方法可能被两个线程(复制stdout和肤质stderr)调用//必须使用锁保证线程安全。否则会发生错误。func(s *zmqlogger) log(msg *logger.message) error { s.felock.lock()defers.felock.unlock() s.writer.send(s.tenantid, zmq.sndmore) s.writer.send(s.serviceid, zmq.sndmore)ifmsg.source == "stderr"{ s.writer.send(s.containerid+": "+string(msg.line), zmq.dontwait) } else{ s.writer.send(s.containerid+": "+string(msg.line), zmq.dontwait) }returnnil}//实现close方法,这里用来关闭zmq socket。//同样注意线程安全,调用此方法的是容器关闭协程。func(s *zmqlogger) close() error { s.felock.lock()defers.felock.unlock()ifs.writer != nil{returns.writer.close() }returnnil}func(s *zmqlogger) name() string{returnname }//验证参数的方法,我们使用参数传入zmq pub的地址。funcvalidatelogopt(cfg map[string]string) error {forkey := rangecfg {switchkey {casezmqaddress:default:returnfmt.errorf("unknown log opt '%s' for %s log driver", key, name) } }ifcfg[zmqaddress] == ""{returnfmt.errorf("must specify a value for log opt '%s'", zmqaddress) }returnnil}总结多研究源码可以方便我们理解docker的工作原理。今天我们分析了日志部分。希望读者对这部分功能能够理解得更清晰。