kafka 笔记

2016/12/25 posted in  Kafka

有时候,说程序员是复制粘贴代码.我觉得你说得对.但是,得看你复制几手代码了.读源码,复制源码,学习源码.学习优秀程序员写代码的风格

我有时候看源码,喜欢找找工具类,看看能不能复用.
比如kafka的源码

看看源码工具类

序列化字符串

import kafka.utils.VerifiableProperties

/**
 * A decoder is a method of turning byte arrays into objects.
 * An implementation is required to provide a constructor that
 * takes a VerifiableProperties instance.
 */
trait Decoder[T] {
  def fromBytes(bytes: Array[Byte]): T
}

/**
 * The default implementation does nothing, just returns the same byte array it takes in.
 */
class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] {
  def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}

/**
 * The string encoder translates strings into bytes. It uses UTF8 by default but takes
 * an optional property serializer.encoding to control this.
 */
class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] {
  val encoding = 
    if(props == null)
      "UTF8" 
    else
      props.getString("serializer.encoding", "UTF8")
      
  def fromBytes(bytes: Array[Byte]): String = {
    new String(bytes, encoding)
  }
}

反序列化

import kafka.utils.VerifiableProperties

/**
 * An encoder is a method of turning objects into byte arrays.
 * An implementation is required to provide a constructor that
 * takes a VerifiableProperties instance.
 */
trait Encoder[T] {
  def toBytes(t: T): Array[Byte]
}

/**
 * The default implementation is a no-op, it just returns the same array it takes in
 */
class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
  override def toBytes(value: Array[Byte]): Array[Byte] = value
}

class NullEncoder[T](props: VerifiableProperties = null) extends Encoder[T] {
  override def toBytes(value: T): Array[Byte] = null
}

/**
 * The string encoder takes an optional parameter serializer.encoding which controls
 * the character set used in encoding the string into bytes.
 */
class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] {
  val encoding = 
    if(props == null) 
      "UTF8" 
    else 
      props.getString("serializer.encoding", "UTF8")
  
  override def toBytes(s: String): Array[Byte] = 
    if(s == null)
      null
    else
      s.getBytes(encoding)
}

时间

/**
 * Some common constants
 */
object Time {
  val NsPerUs = 1000
  val UsPerMs = 1000
  val MsPerSec = 1000
  val NsPerMs = NsPerUs * UsPerMs
  val NsPerSec = NsPerMs * MsPerSec
  val UsPerSec = UsPerMs * MsPerSec
  val SecsPerMin = 60
  val MinsPerHour = 60
  val HoursPerDay = 24
  val SecsPerHour = SecsPerMin * MinsPerHour
  val SecsPerDay = SecsPerHour * HoursPerDay
  val MinsPerDay = MinsPerHour * HoursPerDay
}

/**
 * A mockable interface for time functions
 */
trait Time {
  
  def milliseconds: Long

  def nanoseconds: Long

  def sleep(ms: Long)
}

/**
 * The normal system implementation of time functions
 */
object SystemTime extends Time {
  
  def milliseconds: Long = System.currentTimeMillis
  
  def nanoseconds: Long = System.nanoTime
  
  def sleep(ms: Long): Unit = Thread.sleep(ms)
  
}

加载配置文件

  /**
   * Read a properties file from the given path
   * @param filename The path of the file to read
   */
   def loadProps(filename: String): Properties = {
     val props = new Properties()
     var propStream: InputStream = null
     try {
       propStream = new FileInputStream(filename)
       props.load(propStream)
     } finally {
       if(propStream != null)
         propStream.close
     }
     props
   }

等等

  • 比如还有一些 zkUtil,offerset 之类的.
  • 所以,当我们开发的时候,需要某个框架的某个功能的时候,我们不如简单看看源码,往往源码的测试类里都有我们需要的功能.
  • 这样,比找博客是不是更直接一点呢.