Monstache
PPG007 ... 2023-7-24 About 3 min
# Monstache
monstache (opens new window) 是一个将数据从 MongoDB 同步到 Elasticsearch 的工具,并支持改动实时同步,但是这需要 MongoDB 开启 oplog,也就是需要开启复制集,对于一个单机 MongoDB 节点,参照此处 (opens new window)开启复制集。
根据文档下载可执行文件后,修改配置文件:
# MongoDB 连接配置
# [mongo]
# MongoDB 的连接字符串
mongo-url = "mongodb://localhost:27018"
# Elasticsearch 连接配置
# [elasticsearch]
# Elasticsearch 的连接地址
elasticsearch-urls = ["https://localhost:9200"]
elasticsearch-validate-pem-file = false
elasticsearch-user = "elastic"
elasticsearch-password = "*****************8"
direct-read-namespaces = ["es.member"]
# 配置自己的插件
mapper-plugin-path = "/home/user/workspace/monstache/myplugin.so"
# 监控的数据库集合配置
[[mapping]]
# MongoDB 中表的 namespace
namespace = "es.member"
# elasticsearch 中索引的名字
index = "member"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
其他配置项参照此处 (opens new window)。
如果提供的配置不能满足需求或者希望对数据同步有更详细的控制,那么就需要自行编写插件了。
首先拉取 monstache GitHub 仓库 (opens new window),然后切换到下载的预构建版本对应的 tag,创建一个 my_plugin.go 文件,插件必须遵守下面的规范:
// plugins must import this package
// import "github.com/rwynn/monstache/monstachemap"
// plugins must implement a function named "Map" with the following signature
// func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error)
// plugins can be compiled using go build -buildmode=plugin -o myplugin.so myplugin.go
// to enable the plugin start with monstache -mapper-plugin-path /path/to/myplugin.so
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
我们首先编写一个程序来向 MongoDB 中插入 mock 数据:
type Member struct {
Id primitive.ObjectID `bson:"_id"`
Name string `bson:"name"`
Email string `bson:"email"`
Tags []string `bson:"tags"`
Phone string `bson:"phone"`
Birth int `bson:"birth"`
IsDeleted bool `bson:"isDeleted"`
IsActivated bool `bson:"isActivated"`
Properties []Property `bson:"properties"`
ActivatedAt time.Time `bson:"activatedAt,omitempty"`
}
type Property struct {
PropertyName string `bson:"propertyName"`
Value interface{} `bson:"value"`
}
var tags = []string{
"A",
"B",
"C",
"D",
}
func GetTags(count int) []string {
result := make([]string, 0, count)
for i := 0; i < count; i++ {
result = append(result, tags[randomdata.Number(len(tags))])
}
return StrArrayUnique(result)
}
func StrArrayUnique(arr []string) []string {
m := make(map[string]bool, len(arr))
for _, v := range arr {
m[v] = true
}
result := make([]string, 0, len(m))
for k, _ := range m {
result = append(result, k)
}
return result
}
func GenProperties(phone, email, name string) []Property {
properties := []Property{
{
PropertyName: "name",
Value: name,
},
{
PropertyName: "email",
Value: email,
},
{
PropertyName: "phone",
Value: phone,
},
{
PropertyName: "country",
Value: randomdata.Country(randomdata.ThreeCharCountry),
},
{
PropertyName: "totalBuys",
Value: randomdata.Number(0, 100),
},
{
PropertyName: "notDisturb",
Value: randomdata.Boolean(),
},
{
PropertyName: "addresses",
Value: []string{
strings.ReplaceAll(randomdata.Address(), "\n", ""),
randomdata.IpV4Address(),
randomdata.IpV6Address(),
randomdata.MacAddress(),
},
},
}
return properties
}
func NewRandomMember() Member {
isActivated := randomdata.Boolean()
phone := randomdata.PhoneNumber()
phone = strings.ReplaceAll(phone, " ", "")
phone = strings.TrimLeft(phone, "+")
name := randomdata.SillyName()
email := randomdata.Email()
birth, _ := time.Parse(randomdata.DateOutputLayout, randomdata.FullDate())
return Member{
Id: primitive.NewObjectID(),
Name: name,
Email: email,
Tags: GetTags(randomdata.Number(1, 4)),
Phone: phone,
IsDeleted: false,
IsActivated: isActivated,
Properties: GenProperties(phone, email, name),
ActivatedAt: func() time.Time {
if !isActivated {
return time.Time{}
}
date := randomdata.FullDate()
t, _ := time.Parse(randomdata.DateOutputLayout, date)
return t
}(),
Birth: cast.ToInt(strings.TrimLeft(birth.Format("0102"), "0")),
}
}
func genDefaultData(ctx context.Context) {
client := getMongoClient(ctx)
col := client.Database("es").Collection("member")
col.RemoveAll(ctx, primitive.M{})
for i := 0; i < 40000; i++ {
col.InsertOne(ctx, NewRandomMember())
}
}
func getMongoClient(ctx context.Context) *qmgo.Client {
client, err := qmgo.NewClient(ctx, &qmgo.Config{
Uri: "mongodb://127.0.0.1:27018",
})
if err != nil {
panic(err)
}
return client
}
func main() {
genDefaultData(context.Background())
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
上面的代码使用了 github.com/Pallinder/go-randomdata
库来 mock 了数据,执行上面的程序就会在 MongoDB 中插入数据了。
下面开始编写自定义插件:
package main
import (
"fmt"
"github.com/rwynn/monstache/v6/monstachemap"
)
func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error) {
doc := input.Document
if properties, ok := doc["properties"].([]interface{}); ok {
doc["properties"] = MapMemberProperties(properties)
}
output = &monstachemap.MapperPluginOutput{}
output.Document = doc
return
}
func MapMemberProperties(properties []interface{}) map[string]interface{} {
propertiesMap := make(map[string]interface{}, len(properties))
for _, property := range properties {
if pm, ok := property.(map[string]interface{}); ok {
if pm["propertyName"] == "" {
continue
}
newPropertyMap := make(map[string]interface{})
switch pm["value"].(type) {
case int, int32, int64, float32, float64:
newPropertyMap["doubleValue"] = pm["value"]
newPropertyMap["type"] = "doubleValue"
case bool:
newPropertyMap["boolValue"] = pm["value"]
newPropertyMap["type"] = "boolValue"
case []interface{}:
newPropertyMap["stringArrayValue"] = pm["value"]
newPropertyMap["type"] = "stringArrayValue"
default:
newPropertyMap["stringValue"] = pm["value"]
newPropertyMap["type"] = "stringValue"
}
propertiesMap[fmt.Sprintf("%s", pm["propertyName"])] = newPropertyMap
}
}
return propertiesMap
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
在上面的代码中,对于 member 表的 properties 字段进行了重新处理。
然后执行下面的命令构建插件:
go build -buildmode=plugin -o myplugin.so myplugin.go
1
Note
自定义插件的 package 必须是 main。
构建完成后,修改 monstache 配置文件的 mapper-plugin-path
的值为构建出的 so 文件的路径,然后执行下面的命令即可开启同步:
./monstache -f config.toml
1