kafka 笔记

有时候,说程序员是复制粘贴代码.我觉得你说得对.但是,得看你复制几手代码了.读源码,复制源码,学习源码.学习优秀程序员写代码的风格.

我有时候看源码,喜欢找找工具类,看看能不能复用.
比如kafka的源码

看看源码工具类

序列化字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import kafka.utils.VerifiableProperties
/**
* A decoder is a method of turning byte arrays into objects.
* An implementation is required to provide a constructor that
* takes a VerifiableProperties instance.
*/
trait Decoder[T] {
def fromBytes(bytes: Array[Byte]): T
}
/**
* The default implementation does nothing, just returns the same byte array it takes in.
*/
class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] {
def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}
/**
* The string encoder translates strings into bytes. It uses UTF8 by default but takes
* an optional property serializer.encoding to control this.
*/
class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] {
val encoding =
if(props == null)
"UTF8"
else
props.getString("serializer.encoding", "UTF8")
def fromBytes(bytes: Array[Byte]): String = {
new String(bytes, encoding)
}
}

反序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import kafka.utils.VerifiableProperties
/**
* An encoder is a method of turning objects into byte arrays.
* An implementation is required to provide a constructor that
* takes a VerifiableProperties instance.
*/
trait Encoder[T] {
def toBytes(t: T): Array[Byte]
}
/**
* The default implementation is a no-op, it just returns the same array it takes in
*/
class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
override def toBytes(value: Array[Byte]): Array[Byte] = value
}
class NullEncoder[T](props: VerifiableProperties = null) extends Encoder[T] {
override def toBytes(value: T): Array[Byte] = null
}
/**
* The string encoder takes an optional parameter serializer.encoding which controls
* the character set used in encoding the string into bytes.
*/
class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] {
val encoding =
if(props == null)
"UTF8"
else
props.getString("serializer.encoding", "UTF8")
override def toBytes(s: String): Array[Byte] =
if(s == null)
null
else
s.getBytes(encoding)
}

时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Some common constants
*/
object Time {
val NsPerUs = 1000
val UsPerMs = 1000
val MsPerSec = 1000
val NsPerMs = NsPerUs * UsPerMs
val NsPerSec = NsPerMs * MsPerSec
val UsPerSec = UsPerMs * MsPerSec
val SecsPerMin = 60
val MinsPerHour = 60
val HoursPerDay = 24
val SecsPerHour = SecsPerMin * MinsPerHour
val SecsPerDay = SecsPerHour * HoursPerDay
val MinsPerDay = MinsPerHour * HoursPerDay
}
/**
* A mockable interface for time functions
*/
trait Time {
def milliseconds: Long
def nanoseconds: Long
def sleep(ms: Long)
}
/**
* The normal system implementation of time functions
*/
object SystemTime extends Time {
def milliseconds: Long = System.currentTimeMillis
def nanoseconds: Long = System.nanoTime
def sleep(ms: Long): Unit = Thread.sleep(ms)
}

加载配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Read a properties file from the given path
* @param filename The path of the file to read
*/
def loadProps(filename: String): Properties = {
val props = new Properties()
var propStream: InputStream = null
try {
propStream = new FileInputStream(filename)
props.load(propStream)
} finally {
if(propStream != null)
propStream.close
}
props
}

等等

  • 比如还有一些 zkUtil,offerset 之类的.
  • 所以,当我们开发的时候,需要某个框架的某个功能的时候,我们不如简单看看源码,往往源码的测试类里都有我们需要的功能.
  • 这样,比找博客是不是更直接一点呢.
Donate comment here