Apache Kafka源码分析 – Log Management
本文共 23582 字,大约阅读时间需要 78 分钟。




首先loadLogs会加载每个partition所对应的log对象, 然后提供createLog,getLog,deleteLog之类的管理接口

/** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. * All read and write operations are delegated to the individual log instances. *  * The log manager maintains logs in one or more directories. New logs are created in the data directory * with the fewest logs. No attempt is made to move partitions after the fact or balance based on * size or I/O rate. *  * A background thread handles log retention by periodically truncating excess log segments. */@threadsafeclass LogManager(val logDirs: Array[File],                 val topicConfigs: Map[String, LogConfig],                 val defaultConfig: LogConfig,                 val cleanerConfig: CleanerConfig,                 val flushCheckMs: Long,                 val flushCheckpointMs: Long,                 val retentionCheckMs: Long,                 scheduler: Scheduler,                 private val time: Time) extends Logging {  //kafka.utils.Pool,对ConcurrentHashMap的封装  private val logs = new Pool[TopicAndPartition, Log]() //一个topic的partition对应于一个log  /**   * Recover and load all logs in the given data directories   */  private def loadLogs(dirs: Seq[File]) {    for(dir <- dirs) {      val recoveryPoints = this.recoveryPointCheckpoints(dir).read //载入recoveryPoints       /* load the logs */      val subDirs = dir.listFiles()      if(subDirs != null) {        for(dir <- subDirs) {  //将每个子目录load成log,子目录中的文件就是segment文件          if(dir.isDirectory) {            val topicPartition = Log.parseTopicPartitionName(dir.getName) //从目录名可以解析出topic和partition名            val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)            val log = new Log(dir,                               config,                              recoveryPoints.getOrElse(topicPartition, 0L),                              scheduler,                              time)            val previous = this.logs.put(topicPartition, log)          }        }        cleanShutDownFile.delete()      }    }  }  /**   * Create a log for the given topic and the given partition   * If the log already exists, just return a copy of the existing log   */  def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {    logCreationOrDeletionLock synchronized {      var log = logs.get(topicAndPartition)            // check if the log has already been created in another thread      if(log != null)        return log            // if not, create it      val dataDir = nextLogDir()      val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition) //创建log目录      dir.mkdirs()      log = new Log(dir,  //创建log对象                    config,                    recoveryPoint = 0L,                    scheduler,                    time)      logs.put(topicAndPartition, log)       log    }  }  //从checkpoint文件load各个log的RecoveryPoint  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap  //生成所有logs的RecoveryPoint的checkpoint文件  /**   * Write out the current recovery point for all logs to a text file in the log directory    * to avoid recovering the whole log on startup.   */  def checkpointRecoveryPointOffsets() {    val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString)    for(dir <- logDirs) {        val recoveryPoints = recoveryPointsByDir.get(dir.toString)        if(recoveryPoints.isDefined)          this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))    }  }}


Log只是对于LogSegments的封装,包含loadSegments,append(到active segment),read(需要定位到相应的segment)

/** * An append-only log for storing messages. *  * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment. *  * New log segments are created according to a configurable policy that controls the size in bytes or time interval * for a given segment. *  * @param dir The directory in which log segments are created. * @param config The log configuration settings * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk * @param scheduler The thread pool scheduler used for background actions * @param time The time instance used for checking the clock  *  */@threadsafeclass Log(val dir: File,          @volatile var config: LogConfig,          @volatile var recoveryPoint: Long = 0L,          val scheduler: Scheduler,          time: Time = SystemTime) extends Logging with KafkaMetricsGroup {    //使用ConcurrentSkipListMap来保存segments信息(startoffset,logSegment),按startoffset排序  /* the actual segments of the log */  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]  loadSegments()  /* Calculate the offset of the next message */  private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())  /**   * The active segment that is currently taking appends   */  def activeSegment = segments.lastEntry.getValue //最新的segment就是active  /* Load the log segments from the log files on disk */  private def loadSegments() {    // create the log directory if it doesn't exist    dir.mkdirs()        // first do a pass through the files in the log directory and remove any temporary files     // and complete any interrupted swap operations    // ......为load做准备,清除临时文件和一些swap操作    // now do a second pass and load all the .log and .index files,真正开始load segment    for(file <- dir.listFiles if file.isFile) {      val filename = file.getName      if(filename.endsWith(IndexFileSuffix)) { //清理无效的index文件,无配对的log文件        // if it is an index file, make sure it has a corresponding .log file        val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))        if(!logFile.exists) {          warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))          file.delete()        }      } else if(filename.endsWith(LogFileSuffix)) { //对于log文件,生成LogSegment对象完成load        // if its a log file, load the corresponding log segment        val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong        val hasIndex = Log.indexFilename(dir, start).exists        val segment = new LogSegment(dir = dir,                                      startOffset = start,                                     indexIntervalBytes = config.indexInterval,                                      maxIndexSize = config.maxIndexSize,                                     time = time)        if(!hasIndex) { //对于没有index文件的,需要rebuild index文件          error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))          segment.recover(config.maxMessageSize)        }        segments.put(start, segment)      }    }  }  /** Struct to hold various quantities we compute about each message set before appending to the log   * @param firstOffset The first offset in the message set   * @param lastOffset The last offset in the message set   * @param codec The codec used in the message set   * @param offsetsMonotonic Are the offsets in this message set monotonically increasing   */  case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, offsetsMonotonic: Boolean)  /**   * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.   *    * This method will generally be responsible for assigning offsets to the messages,    * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.   *    * @param messages The message set to append   * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given   *    * @throws KafkaStorageException If the append fails due to an I/O error.   *    * @return Information about the appended messages including the first and last offset.   */  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {    val appendInfo = analyzeAndValidateMessageSet(messages) //分析ByteBufferMessageSet,生成LogAppendInfo        try {      // they are valid, insert them in the log      lock synchronized {        appendInfo.firstOffset = nextOffset.get //nextOffset作为append的起始点        // maybe roll the log if this segment is full        val segment = maybeRoll() //是否需要产生新的segment        // now append to the log        segment.append(appendInfo.firstOffset, validMessages) //append        // increment the log end offset        nextOffset.set(appendInfo.lastOffset + 1) //递增nextOffset        if(unflushedMessages >= config.flushInterval)          flush() //定期flush        appendInfo      }    } catch {      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)    }  }  /**   * Read messages from the log   * @param startOffset The offset to begin reading at   * @param maxLength The maximum number of bytes to read   * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).   *    * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.   * @return The messages read   */  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = {    // check if the offset is valid and in range    val next = nextOffset.get    if(startOffset == next)      return MessageSet.Empty        var entry = segments.floorEntry(startOffset) //floorEntry会找出小于startOffset,并最接近的那个segment          // do the read on the segment with a base offset less than the target offset    // but if that segment doesn't contain any messages with an offset greater than that    // continue to read from successive segments until we get some messages or we reach the end of the log    while(entry != null) {      val messages = entry.getValue.read(startOffset, maxOffset, maxLength) //从该segment中读出数据      if(messages == null)        entry = segments.higherEntry(entry.getKey) //如果没有读到,去下个segment中读      else        return messages    }        // okay we are beyond the end of the last segment but less than the log end offset    MessageSet.Empty  }}


Segment是个逻辑概念,为了防止log文件过大, 将log分成许多的LogSegments





/** * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each  * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in * any previous segment. *  * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.  *  * @param log The message set containing log entries * @param index The offset index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index * @param time The time instance */@nonthreadsafeclass LogSegment(val log: FileMessageSet,                  val index: OffsetIndex,                  val baseOffset: Long,                  val indexIntervalBytes: Int,                 time: Time) extends Logging {  /**   * Append the given messages starting with the given offset. Add   * an entry to the index if needed.   *    * It is assumed this method is being called from within a lock.   *    * @param offset The first offset in the message set.   * @param messages The messages to append.   */  @nonthreadsafe  def append(offset: Long, messages: ByteBufferMessageSet) {    if (messages.sizeInBytes > 0) {      // append an entry to the index (if needed)      if(bytesSinceLastIndexEntry > indexIntervalBytes) { //仅index部分message        index.append(offset, log.sizeInBytes())  //写index文件        this.bytesSinceLastIndexEntry = 0      }      // append the messages      log.append(messages) //写messageset文件      this.bytesSinceLastIndexEntry += messages.sizeInBytes    }  }    /**   * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include   * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.   *    * @param startOffset A lower bound on the first offset to include in the message set we read   * @param maxSize The maximum number of bytes to include in the message set we read   * @param maxOffset An optional maximum offset for the message set we read   *    * @return The message set read or null if the startOffset is larger than the largest offset in this log.   */  @threadsafe  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = {        val logSize = log.sizeInBytes // this may change, need to save a consistent copy    val startPosition = translateOffset(startOffset)        // calculate the length of the message set to read based on whether or not they gave us a maxOffset    val length =       maxOffset match {        case None =>          // no max offset, just use the max size they gave unmolested          maxSize        case Some(offset) => {          // there is a max offset, translate it to a file position and use that to calculate the max read size          if(offset < startOffset)            throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))          val mapping = translateOffset(offset, startPosition.position)          val endPosition =             if(mapping == null)              logSize // the max offset is off the end of the log, use the end of the file            else              mapping.position          min(endPosition - startPosition.position, maxSize)         }      }    log.read(startPosition.position, length) //读出messageset  }    /**   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.   *    * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this   * is corrupt.   *    * @return The number of bytes truncated from the log   */  @nonthreadsafe  def recover(maxMessageSize: Int): Int = {...}}


Segment中实际存放log message的文件,通过FileChannel可以读写文件

1: /**
2:  * An on-disk message set. An optional start and end position can be applied to the message set
3:  * which will allow slicing a subset of the file.
4:  * @param file The file name for the underlying log data
5:  * @param channel the underlying file channel used
6:  * @param start A lower bound on the absolute position in the file from which the message set begins
7:  * @param end The upper bound on the absolute position in the file at which the message set ends
8:  * @param isSlice Should the start and end parameters be used for slicing?
9:  */
10: @nonthreadsafe
11: class FileMessageSet private[kafka](@volatile var file: File,
12:                                     private[log] val channel: FileChannel,
13:                                     private[log] val start: Int,
14:                                     private[log] val end: Int,
15:                                     isSlice: Boolean) extends MessageSet with Logging {...}


Segment的index文件, 这是0.8后加上的,之前message直接使用物理offset标识

新版本中还是改成了使用逻辑offset,让物理地址对用户透明, 这样就需要一个index来匹配逻辑offset和物理地址
index考虑到效率,最好放在内存中,但是考虑到size问题, 所以使用MappedByteBuffer(参考,)
并且逻辑offset是基于base offset的相对offset,否则无法保证只使用4-byte

1: /**
2:  * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
3:  * that is it may not hold an entry for all messages in the log.
4:  *
5:  * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries.
6:  *
7:  * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant
8:  * to locate the offset/location pair for the greatest offset less than or equal to the target offset.
9:  *
10:  * Index files can be opened in two ways: either as an empty, mutable index that allows appends or
11:  * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an
12:  * immutable one and truncate off any extra bytes. This is done when the index file is rolled over.
13:  *
14:  * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
15:  *
16:  * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the
17:  * message with that offset. The offset stored is relative to the base offset of the index file. So, for example,
18:  * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use
19:  * only 4 bytes for the offset.
20:  *
21:  * The frequency of entries is up to the user of this class.
22:  *
23:  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
24:  * storage format.
25:  */
26: class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
27:   private val lock = new ReentrantLock  //操作index文件需要加锁
29:   /* initialize the memory mapping for this index */
30:   private var mmap: MappedByteBuffer =  //使用MappedByteBuffer来操作index文件以应对大文件
31:     {
32:       val newlyCreated = file.createNewFile()
33:       val raf = new RandomAccessFile(file, "rw")
34:       val len = raf.length()
35:       val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
36:     }
38:   //通过byte偏移从buffer中读出某个entry的内容,offset和physical地址
39:   /* return the nth offset relative to the base offset */
40:   private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
41:   /* return the nth physical position */
42:   private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
44:   //通过二分查找找到targetOffset或最接近的offset(less than)
45:   /**
46:    * Find the largest offset less than or equal to the given targetOffset
47:    * and return a pair holding this offset and it's corresponding physical file position.
48:    *
49:    * @param targetOffset The offset to look up.
50:    *
51:    * @return The offset found and the corresponding file position for this offset.
52:    * If the target offset is smaller than the least entry in the index (or the index is empty),
53:    * the pair (baseOffset, 0) is returned.
54:    */
55:   def lookup(targetOffset: Long): OffsetPosition = {...}
57: /**
58:  * Get the nth offset mapping from the index
59:  * @param n The entry number in the index
60:  * @return The offset/position pair at that entry
61:  */
62: def entry(n: Int): OffsetPosition = {
63:   maybeLock(lock) {
64:     if(n >= entries)
65:       throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries))
66:     val idx = mmap.duplicate
67:     OffsetPosition(relativeOffset(idx, n), physical(idx, n))
68:   }
69: }
71: /**
72:  * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
73:  */
74: def append(offset: Long, position: Int) {
75:   inLock(lock) {
76:     require(!isFull, "Attempt to append to a full index (size = " + size + ").")
77:     if (size.get == 0 || offset > lastOffset) {
78:       debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
79:       this.mmap.putInt((offset - baseOffset).toInt)
80:       this.mmap.putInt(position)
81:       this.size.incrementAndGet()
82:       this.lastOffset = offset
83:       require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
84:     } else {
85:       throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
86:         .format(offset, entries, lastOffset, file.getAbsolutePath))
87:     }
88:   }
89: }



所以kafka的方式,是分段索引,用offset通过二分查找中index中找出段的起始地址,然后再去file里面遍历找出精确的地址, 时间换空间的设计

1. LogSegment.translateOffset

前面说了,index中从效率考虑并不会为每个offset建立索引entry,只会分段建立offset索引, 所以从index中直接可以找到精确物理地址的概率不大,但是可以找到最接近的那个物理地址

/**   * Find the physical file position for the first message with offset >= the requested offset.   *    * The lowerBound argument is an optimization that can be used if we already know a valid starting position   * in the file higher than the greast-lower-bound from the index.   *    * @param offset The offset we want to translate   * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and   * when omitted, the search will begin at the position in the offset index.   *    * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.   */  @threadsafe  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {    val mapping = index.lookup(offset) //从index查出近似物理地址    log.searchFor(offset, max(mapping.position, startingFilePosition))  }

2. FileMessageSet.searchFor

而searchFor的逻辑是从startingPosition开始, 逐条遍历各个message,并从overhead中取出offset进行比较,直到找到target offset为止

/**   * Search forward for the file position of the last offset that is greater than or equal to the target offset   * and return its physical position. If no such offsets are found, return null.   * @param targetOffset The offset to search for.   * @param startingPosition The starting position in the file to begin searching from.   */  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {    var position = startingPosition    val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) // LogOverhead = MessageSizeLength + OffsetLength    val size = sizeInBytes()    while(position + MessageSet.LogOverhead < size) { //从postion开始逐条遍历      buffer.rewind()      channel.read(buffer, position)      buffer.rewind()      val offset = buffer.getLong()      if(offset >= targetOffset)  //判断是否找到offset        return OffsetPosition(offset, position)      val messageSize = buffer.getInt()      position += MessageSet.LogOverhead + messageSize //递进到下个message    }    null  }


