InfluxDB 模型和存储引擎入门

InfluxDB 分为这里所列的版本:https://www.influxdata.com/products/editions/

数据模型和查询

参考:https://docs.influxdata.com/influxdb/v2.1/reference/key-concepts/data-elements/https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/glossary/

DBBF4749-218C-493E-A56E-4A6060E21AC7

  • Database: 数据库
  • Timestamp: 时间戳,
  • Measurement: A measurement acts as a container for tags, fields, and timestamps.
  • Field: 分为 <Field Key, Field Value>,Field Key 是名称字符串,Field Value 是只支持基本类型的值。所有的 Field 组成了一个 Field SetField 上的值是不会建索引的
  • Tags 是提供 Point 的元数据,有点类似 index,tags 按照 string 的方式存储. 所有的 tags 组成了 tag set。单个 Tag 包含 Tag KeyTag Value. 对于每个 tag key, tag value 可以构成一个类似 Card 的概念
    • 可以理解成,tag 本身都是 string 类型
    • tag 可以构建索引

那么,如上所述,InfluxDB 的 Schema 如下表示:

31928802-5350-4AE7-844F-AF8F1049115A

理解了上面的东西之后,需要理解 SeriesSeries Key 这两个概念:

A06BB0EF-D68C-491A-85C1-574E0DFCAC6F

Series: 包含 timestamp

5F759A4F-F31C-4855-9FAF-4F1A691E36D8

看到没有,这里相当于统计信息给抽出来了,一个 Series Key 对应一个 Series,而这个 Series 有着多个 <Timestamp, Field Value> 组下面才定义到具体的信息:

Point: 对应的单个 Series Key + <Timestamp + Field Value>, 可以理解成数据库的「行」。

最后,measurement 类似 RDBMS 的 ,而这里给出一个 Bucket 的概念,可以设置多个 measurement,并且设置一些 expire time 之类的条件。

以上就是 InfluxDB 的基本概念,具体一些 Schema 定义可以看:https://docs.influxdata.com/influxdb/v2.1/reference/key-concepts/data-schema/

Line Protocol

InfluxDB 可以使用 Line Protocol: https://github.com/influxdata/influxdb/tree/master/tsdb#line-protocol

query & latency

Timescale 给出了一个 benchmark,我觉得这种 benchmark 肯定都是自己吊打别人,但是对衡量数量级来说帮助很大,如下图:

https://www.timescale.com/blog/content/images/2022/01/20200716_Timescale_Blog_InfluxBenchmarks.jpg

20200716_Timescale_Blog_InfluxBenchmarks

可以看到,对这个的查询包含一些近实时的查询,而数据可能能在 ~100ms 左右的时间完成一些简单的 GroupBy。

Shard

Sharding is the horizontal partitioning of data in a database. Each partition is called shard. InfluxDB stores data in shard groups, which are organized by retention policy and store data with timestamps that fall within a specific time interval.

因为时序数据库是一个偏分析的场景,所以按照时间分片是一个相对自然的策略。这个分片有一个 RP(Retention Policy),是说保留具体内容的时间。我们都知道,Bucket 本身有一个过期时间配置,Shard Group 会按照 Bucket 的时间再切细一点,做过期时间配置:

F0977952-F7E5-464A-9002-58D8C328A9CB

再开源 InfluxDB 上,一个 Shard Group 只有一个 Shard,感觉很无力:

A9DA7AAC-234A-4AEE-BA33-85EE761211A9

InfluxDB Enterprise 则不一样,它可以配置 Shard 的复制等功能:https://docs.influxdata.com/enterprise_influxdb/v1.9/features/clustering-features/#shard-movement

InfluxDB 的 Cluster 版本现在设计如下:https://docs.influxdata.com/enterprise_influxdb/v1.8/concepts/clustering/

replication factor 如果设置为 X,那么一个 N 个 Data Node 的集群会配置 floor(N/X)Shard,然后这里会保证每个几乎机器都会被复制这些东西:

For example we have a shard group for 2016-09-19 that has two shards 1 and 2. Shard 1 is replicated to servers A and B while shard 2 is copied to servers C and D.

(我也不知道为什么这么设计,有人可以告诉我吗?)

这里会根据 hash 挑选写入的 shard,来做热点写打散:

1
2
3
4
// key is measurement + tagset
// shardGroup is the group for the values based on timestamp
// hash with fnv and then bucket
shard := shardGroup.shards[fnv.New64a(key) % len(shardGroup.Shards)]

写入的时候,可以配置 write-consistency,不过我感觉基本都是 RWN 都不算的简单同步复制的变种:https://docs.influxdata.com/enterprise_influxdb/v1.8/concepts/clustering/#write-consistency

这里还有个 hinted handoff,和 Dynamo 那套一样,都是故障时写顺延。

Compaction & Deletion

因为热点是写优化的,所以某个 ShardGroup 再不会写之后,相对来说会变成冷的,可以切成一些读优化、省空间的结构。而 Shard Group 本身也会过期,所以是需要删除的。

一些 Schema 相关的讨论

https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/schema_and_data_layout/

基本上是在教你「怎么用我们的数据库」。

存储引擎: TSM(Time-Structured Merge Tree)

每一个 Shard 都包含着 WAL 和 TSM 文件,这类似 LSM 的 WSL 和 SSTable。TSM 存储引擎包含:

  1. 内存索引:跨 Shards 共享,提供对 measurementstagsseries 的快速访问。这个可能会持久化成下面的 TSI。
  2. WAL:Shard 的写入优化格式
  3. Cache:对 TSM 的 WAL 缓存,类似 LevelDB/RocksDB 的 MemTable(我一眼以为是 BlockCache)
  4. TSM Files: 类似 SSTable,但着重于列式、压缩的格式
  5. FileStore: TSM Files 的 Manager,控制文件层面的增删访
  6. Compactor/Compactor Planner: 压缩相关。这里是希望把数据转成更读友好的形式
  7. Compression: 因为是列式数据,所以需要特定的压缩。有的 Pattern 是固定的压缩方式,有的 Pattern 则是根据数据分布来压缩的。

需要注意的是,对引擎的写入是 Batch 写入的,可能用户的客户端/agent会 Batch 采样数据过来,然后往热的 Shard Group 写数据。

WAL

当写入请求过来的时候,会先往 WAL 里面写入。WAL 以 10MB 为一个 Segment,然后批量压缩写入,格式如下:https://github.com/influxdata/influxdb/blob/master/tsdb/engine/tsm1/wal.go#L705-L722

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// The entries values are encode as follows:
//
// For each key and slice of values, first a 1 byte type for the []Values
// slice is written. Following the type, the length and key bytes are written.
// Following the key, a 4 byte count followed by each value as a 8 byte time
// and N byte value. The value is dependent on the type being encoded. float64,
// int64, use 8 bytes, boolean uses 1 byte, and string is similar to the key encoding,
// except that string values have a 4-byte length, and keys only use 2 bytes.
//
// This structure is then repeated for each key an value slices.
//
// ┌────────────────────────────────────────────────────────────────────┐
// │ WriteWALEntry │
// ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤
// │ Type │ Key Len │ Key │ Count │ Time │ Value │...│ Type │...│
// │1 byte│ 2 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │ │1 byte│ │
// └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘

整个 Batch 在组织成如上格式之后,在写入的时候还会被 snappy 压缩:

1
2
3
4
5
6
7
8
9
b, err := entry.Encode(bytes)
if err != nil {
bytesPool.Put(bytes)
return -1, err
}

encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))

compressed := snappy.Encode(encBuf, b)

文档上显示,单台机器上,每个 Shard 都会有一个 WAL 流,InfluxDB 致力于将其改成同一个 WAL 流。

Cache

The Cache is an in-memory copy of all data points current stored in the WAL. The points are organized by the key, which is the measurement, tag set, and unique field. Each field is kept as its own time-ordered range. The Cache data is not compressed while in memory.

这里提供同一种类型的快速访问. 看实际代码,Cache 会把实际存储 dispatch 到一个叫 ring 的结构体上。这是一个 Bucket + Concurrency Hash Map,逻辑大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// ring is a structure that maps series keys to entries.
//
// ring is implemented as a crude hash ring, in so much that you can have
// variable numbers of members in the ring, and the appropriate member for a
// given series key can always consistently be found. Unlike a true hash ring
// though, this ring is not resizeable—there must be at most 16 members in the
// ring, and the number of members must always be a power of 2.
//
// ring works as follows: Each member of the ring contains a single store, which
// contains a map of series keys to entries. A ring always has 16 partitions,
// and a member takes up one or more of these partitions (depending on how many
// members are specified to be in the ring)
//
// To determine the partition that a series key should be added to, the series
// key is hashed and the first 8 bits are used as an index to the ring.
//
type ring struct {
// The unique set of partitions in the ring.
// len(partitions) <= len(continuum)
partitions []*partition
}

func (r *ring) split(n int) []storer {
storers := make([]storer, n)
for i := 0; i < n; i++ {
storers[i], _ = newring(len(r.partitions))
}

for i, p := range r.partitions {
r := storers[i%n].(*ring)
r.partitions[i] = p
}
return storers
}

// partition provides safe access to a map of series keys to entries.
type partition struct {
mu sync.RWMutex
store map[string]*entry
}

注意 store 存储的 map[string]*entry,这里实际上是:

1
2
3
4
5
6
7
8
9
// entry is a set of values and some metadata.
type entry struct {
mu sync.RWMutex
values Values // All stored values.

// The type of values stored. Read only so doesn't need to be protected by
// mu.
vtype byte
}

这个 Values 就是前面 WAL 写的时候那个,是不是一切都连起来了!

TSM 文件

TSM 文件类似 LSM 的 SSTable,包含下面几个区域:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
+--------+------------------------------------+-------------+--------------+
| Header | Blocks | Index | Footer |
|5 bytes | N bytes | N bytes | 4 bytes |
+--------+------------------------------------+-------------+--------------+

+--------------------------------------------------------------------+
│ Blocks │
+---------------------+-----------------------+----------------------+
| Block 1 | Block 2 | Block N |
+---------------------+-----------------------+----------------------+
| CRC | Data | CRC | Data | CRC | Data |
| 4 bytes | N bytes | 4 bytes | N bytes | 4 bytes | N bytes |
+---------------------+-----------------------+----------------------+


+-----------------------------------------------------------------------------+
│ Index │
+-----------------------------------------------------------------------------+
│ Key Len │ Key │ Type │ Count │Min Time │Max Time │ Offset │ Size │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │ │
+-----------------------------------------------------------------------------+

+--------------------------------------------------+
| Type | Len | Timestamps | Values |
|1 Byte | VByte | N Bytes | N Bytes │
+--------------------------------------------------+

详细见:https://docs.influxdata.com/influxdb/v1.8/concepts/storage_engine/

需要注意 TSM 和 SSTable 的区别:

  1. 单个 Block 里面只有同一个 Series Key,里面的类型是相同的,可以用 Column Compression 来处理。除此之外,它还有 Timestamps 区段。
  2. 不同的 Block 可能拥有相同的 Series Key
  3. Series Key 按顺序排放

Compaction

  1. 内存拿到 Snapshot,然后 Compact 到文件上
  2. Level Compaction: 类似 LevelDB
  3. Index Optimization: engine 层好像不做这个,文档上写的是 Level4 大量堆积之后,拆分到一个存储/读均优化的结构。
  4. Full Compaction

需要注意的是,这里很鸡贼,说是 Level Compaction,实际上是 Tiered Compaction. 热文件是多写少读的,可以理解,最后数据冷了应该可以 Full Compaction 或者 Index Optimization,做成读优化的结构。

Write / Update / Delete / Query

这里着重需要理解 Delete。Write 这边会先进 Cache,然后等待 WriteSnapshot 进行。Update 就直接更新,等 Compaction 操作,因为这边会先读上面再读下面。Delete 的流程很诡异,这里会给内存删除,然后给每个有这个 key 的文件写一个 Tombstone 文件。读的时候做 Merge,感觉他这个对更新和删除就不是很友好。

查询这边其实就类似 LevelDB 查询了。不过我好像没太看到一些算子下推的操作,感觉对 AP Workload 支持一般般?

关键路径代码

写入入口, 会把请求分发到 CacheWAL

1
2
3
// WritePoints writes metadata and point data into the engine.
// It returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error

CompactCache 会完成 WAL -> TSM, 这里调用 WriteSnapshot 做 类似 Major Compaction 的操作:

1
2
3
4
5
// compactCache continually checks if the WAL cache should be written to disk.
func (e *Engine) compactCache()

// WriteSnapshot writes a Cache snapshot to one or more new TSM files.
func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, error)

而下列会构建 TSM 具体的 Blocks, 也包含了相同类型列的 Compression。

1
func (c *cacheKeyIterator) encode()

Compact 关注:

最上层触发在:

1
func (e *Engine) compact(wg *sync.WaitGroup)

下面逻辑:

1
2
3
4
5
6
7
8
9
// CompactFull writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) CompactFull(tsmFiles []string, logger *zap.Logger) ([]string, error)

// CompactFast writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger) ([]string, error)

// writeNewFiles writes from the iterator into new TSM files, rotating
// to a new file once it has reached the max TSM file size.
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error)

索引: TSI (Time Series Index)

InfluxDB 有一个 Series Cardinality 的概念,其实很好理解。回顾一下,Series Key 组成是:

A5A862C7-9674-4364-82D2-E122BF5F644A

这里我们会发现,对于 emailstatus 这两个 tag,这个 tag 我们虽然支持一堆 string 形式的 key,但是它的选择空间可能不是很大,这里提供了一定的优化空间。同时,如果 Series Key 数量很多,那这个可能会生成过多需要查询的 Index,这给系统带来了很大开销。

还有一种场景,如上,指定 status = start,不指定 email,也总得 AP 吧,这里处理就得扫一堆 TSM 的 Index 了。

这个地方,这里提供了反向索引,它对 <measurement, tag set>measurement 和 Tag 的任意一项到 Series Key 提供了映射。它也是一个有序结构。它的目的如下:

The goal is that the number of series should be unbounded by the amount of memory on the server hardware. Importantly, the number of series that exist in the database will have a negligible impact on database startup time.

可以看到,TSI 大概有这几种类型:

  • Index: TSI instance,单个 Shard 会有一个 Index
  • Partition: Index 内部会分成多个 Partition,来做 IO 和并发。每个 Partition 会对应不同的存储文件。
  • LogFile: TSI 的日志文件
  • IndexFile: TSI 最重要的部分之一,表示对应的实际索引

Partition 这个概念很让人困惑。实际上这是个写入优化的措施。写入的时候,利用 Partition 来做并行化,读取的时候,需要从每个 Partition 来 PointGet,或者 Merge Partition 的结果。我个人感觉这东西稍微有点过度设计了…

可以看到,它对外提供了下列接口(实际上是 tsdb/index.goIndex 这个 interface,TSM 内存那节有个类似的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ForEachMeasurementName iterates over all measurement names in the index,
// applying fn. It returns the first error encountered, if any.
//
// ForEachMeasurementName does not call fn on each partition concurrently so the
// call may provide a non-goroutine safe fn.
func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error

// MeasurementExists returns true if a measurement exists.
func (i *Index) MeasurementExists(name []byte) (bool, error)

// MeasurementHasSeries returns true if a measurement has non-tombstoned series.
func (i *Index) MeasurementHasSeries(name []byte) (bool, error)

...

和写入:

1
2
3
4
5
// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error

// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
func (i *Index) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsSlice []models.Tags) error

对于任何写入,这边会内存有各种 mapping,写 WAL。最后构建成 TSI File,格式参照:https://github.com/influxdata/influxdb/blob/master/tsdb/index/tsi1/doc.go#L65 ,分为:

  • Series Block: 存放了所有 Series Key,用 Hash Index 来加速
  • Tag Block: 存放了 Tag Key, Tag Value 到 Series Key 的映射,可能有多个 Block
  • Measurement Block: 存放 Measurement 的映射

注意,这中间还有 key 的 HashMap,来加速查找。同时,这里有合并多个 TSI 的需求,这里会写入 HLL++,并记录 key 和删除的 key,当合并的时候,可以根据 HLL 计算出大概的结果和空间放大,来调度 Compaction。

参考

https://github.com/influxdata/influxdb/tree/master/tsdb

博客: