Go Book / 2 Go Advances / 03 Go 数据库 I O(三): Mongo D B 操作

03 Go 数据库 I O(三): Mongo D B 操作

一、Mongodb概述

我们都知道各种基于SQL的关系型数据库管理系统,如Mysql、Postgresql、sqlite。在现实世界中,结构化数据存储是网络和商务应用的主导技术,通过设计严格的关系模型构建业务系统是基本的设计方法。但是除了关系型数据库外,还有反其道而行的NoSQL(NoSQL = Not Only SQL ),意即"不仅仅是SQL"。

这类非结构化的数据库系统对超大规模数据的存储有着非常好的实践效果,其中Mongodb便是NoSql的热门应用,基于JavaScript引擎的bson结构和类似SQL的查询语法让其对大多数开发人员使用十分友好。Mongodb基于文档的结构能够灵活的存储非结构化的各种数据,如果你对json较熟悉,便会更容易入手,因为其文档内部的数据结构和json别无二致,操作mongodb文档和操作json一样简单。

这里提供中文的Mongodb使用手册,感兴趣可学习:https://www.mongodb.org.cn/tutorial/

二、Go 使用Mongodb

1.mgo

在Go中操作Mongodb需使用第三方库,之前github上比较热门的三方库为mgo,其作者已经暂停维护,但因其易用性使用的人还是相当多的,下面给出使用示例:

首先导入相关包:
import (
	"fmt"
	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
)

预先定义一个存储的文档结构:
//定义一个mongodb collection:student的文档结构
type MongoStudent struct {
	Id_   bson.ObjectId `bson:"_id"`
	Name  string        `bson:"name"`
	Phone string        `bson:"phone"`
	Email string        `bson:"email"`
	Sex   string        `bson:"sex"`
}

连接mongodb数据库并执行CRUD:要巧用 bson.M{} 这个查询构造struct

//拨号连接mongodb
session, err := mgo.Dial("localhost:27017/godemo")
ErrorHandler(err, "mgo.Dial()")
defer session.Close()
fmt.Println("连接mongodb成功...")

//取得一个collection操作句柄
stuCol := session.DB("godemo").C("student")

//查看student表的文档数
n, err := stuCol.Count()
ErrorHandler(err, "stuCol.Count()")
fmt.Println("Student文档数:", n)

增删改操作:

//1.1 新增文档 For Struct
stu1 := MongoStudent{bson.NewObjectId(), "Fun", "132xxxxxxxx", "924035827@qq.com", "Male"}
stu2 := MongoStudent{bson.NewObjectId(), "John", "186xxxxxxxx", "1012910232@qq.com", "Female"}
stu3 := MongoStudent{bson.NewObjectId(), "Jack", "150xxxxxxxx", "1012910232@qq.com", "Female"}
stu4 := MongoStudent{bson.NewObjectId(), "KKK", "133xxxxxxxx", "123456768@qq.com", "Male"}
err = stuCol.Insert(&stu1, &stu2, &stu3, &stu4)
ErrorHandler(err,"插入文档错误!!!")
fmt.Println("插入结构体数据成功!")

//1.2 新增文档 For map
stuMap := make(map[string]interface{},0)
stuMap["name"] = "fun map"
stuMap["age"] = 18
stuMap["phone"] = "156xxxxxxxx"
stuMap["email"] = "1234456324@qq.com"
stuMap["gender"] = "male"
err = stuCol.Insert(&stuMap)
ErrorHandler(err,"插入文档错误!!!")
fmt.Println("插入结构体数据成功!")

//2.1 修改文档
selector := bson.M{"_id": bson.ObjectIdHex("5c860a823ca58305282be41b")} //构造更新选择器
updator := bson.M{"$set": bson.M{"phone": "1111111111"}} 	构造更新数据集
err = stuCol.Update(selector, updator)
ErrorHandler(err,"stuCol.Update")
fmt.Println("stuCol.Update Success!")

err = stuCol.UpdateId(bson.ObjectIdHex("5c860a823ca58305282be41b"), bson.M{"$set": bson.M{"age": 30}})
ErrorHandler(err,"stuCol.Update")
fmt.Println("stuCol.UpdateId Success!")

changeInfo, err := stuCol.UpdateAll(bson.M{"sex": "Female"}, bson.M{"$set": bson.M{"sex": "Male"}})
ErrorHandler(err,"stuCol.UpdateAll")
fmt.Printf("更新情况:匹配%d,删除%d,更新%d,更新插入Id:%s",changeInfo.Matched,changeInfo.Removed,changeInfo.Updated,changeInfo.UpsertedId)

//2.2 更新没有则插入文档
changeInfo, err := stuCol.Upsert(bson.M{"sex": "Female"}, bson.M{"name": "Simee", "phone": "15014064202", "sex": "Female"})
ErrorHandler(err,"stuCol.Upsert")
fmt.Printf("更新情况:匹配%d,删除%d,更新%d,更新插入Id:%s",changeInfo.Matched,changeInfo.Removed,changeInfo.Updated,changeInfo.UpsertedId)

//3.删除文档
err = stuCol.RemoveId(bson.ObjectIdHex("5c8610693ca58305282be4a2"))
ErrorHandler(err,"stuCol.RemoveId")
fmt.Println("删除成功!")

stuCol.Remove()
stuCol.RemoveAll()

大容量操作Bulk(),适合大量增删改:

//获得bulk操作句柄,insert/update/remove等操作,然后run()执行
bulk := stuCol.Bulk()
bulk.Insert()
bulk.Update()
bulk.UpdateAll()
bulk.Remove()
bulk.RemoveAll()
bulk.Upsert()

bulk.Run()

//返回集合的所有索引的列表
indexes, err := stuCol.Indexes()
fmt.Println("索引列表:",indexes)

mgo中的查询操作,使用查询构造器,查询构造器方法:
  • func (q *Query) All(result interface{}) error 返回所有结果

  • func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err error) Apply 运行findAndModify MongoDB命令,该命令允许更新,插入或删除与查询匹配的文档,并原子地返回旧版本(默认)或新版本的文档(当ReturnNew为true时)。如果没有找到对象,则返回ErrNotFound。 Sort和Select查询方法会影响Apply的结果。如果多个文档与查询匹配,Sort可以通过首先对其进行排序来选择要处理的文档。选择仅允许检索新文档或旧文档的选定字段。

  • func (q *Query) Batch(n int) *Query 默认批处理大小由数据库本身定义。在撰写本文时,MongoDB将在第一批使用初始大小min(100 docs,4MB),在剩余的那些上使用4MB。

  • func (q *Query) Comment(comment string) *Query 注释 向查询添加注释以在数据库探查器输出中标识它。

  • func (q *Query) Count() (n int, err error) Count返回结果集中的文档总数

  • func (q *Query) Distinct(key string, result interface{}) error 去重的解组导致给定键的不同值列表。

  • func (q *Query) Explain(result interface{}) error Explain返回一些有关MongoDB服务器如何执行所请求查询的详细信息,例如检查的对象数,允许写入的读锁定的次数,等等。

  • func (q *Query) Hint(indexKey …string) *Query 提示 将在查询中包含一个显式“提示”,以强制服务器使用指定的索引,从而可能在某些情况下提高性能。提供的参数是组成要使用的索引的键的字段。有关如何构建indexKey的详细信息,请参阅EnsureIndex方法。

  • func (q *Query) Iter() *Iter Iter执行查询并返回一个能够遍历所有结果的迭代器。结果将以可配置大小的批量返回(请参阅批处理方法),并且在迭代可配置数量的文档时将请求更多文档(请参阅预取方法)。

  • func (q *Query) Limit(n int) *Query 限制将检索到的最大文档数限制为n,并将批量大小更改为相同的值。一旦Next返回了n个文档,以下调用将返回ErrNotFound。

  • func (q *Query) LogReplay() *Query LogReplay启用一个选项,该选项可优化通常在MongoDB oplog上进行的查询以进行重放。这是一个内部实现方面,很可能对其他用途无趣。但它至少看过一个用例,因此它通过API公开。

  • func (q *Query) MapReduce(job *MapReduce, result interface{}) (info *MapReduceInfo, err error) MapReduce为查询所涵盖的文档执行map / reduce作业。这种工作适用于通过Javascript函数在服务器端执行的非常灵活的数据批量聚合。 作业的结果可能是查询本身通过结果参数返回的,以防它们确实适合内存和单个文档。如果数据量可能过大,则必须通过设置提供的MapReduce作业的Out字段将结果存储回备用集合甚至单独的数据库中。在这种情况下,请提供nil作为结果参数。

  • func (q *Query) One(result interface{}) (err error) 返回一个结果集

  • func (q *Query) Prefetch(p float64) *Query 预取设置将请求下一批结果的点。当在Iter中缓存p * batch_size剩余文档时,将在后台请求下一批。例如,使用时: query.Batch(200).Prefetch(0.25) 并且只有50个文件缓存在待处理的Iter中,下一批200个将被请求。也可以使用Session的SetPrefetch方法在每个会话的基础上更改此设置。 默认预取值为0.25。

  • func (q *Query) Select(selector interface{}) *Query 选择允许选择应为找到的结果检索哪些字段。

  • func (q *Query) SetMaxScan(n int) *Query SetMaxScan限制查询在扫描指定数量的文档后停止。 此修饰符通常用于防止可能长时间运行的查询通过扫描过多数据来中断性能。

  • func (q *Query) SetMaxTime(d time.Duration) *Query SetMaxTime限制查询在运行指定时间后停止。 当达到时间限制时,MongoDB会自动取消查询。这可用于有效地防止和识别意外缓慢的查询。 关于执行此限制的机制的一些重要说明:

    • 请求可以阻止服务器上的锁定操作,并阻止 时间没有计算在内。换句话说,计时器仅在之后开始计时 查询在最初获取适当的锁时的实际开始;
    • 仅在可以进行操作的中断点中断操作 安全中止 - 总执行时间可能超过指定值;
    • 限制可以应用于CRUD操作和命令,但不是全部 命令是可中断的;
    • 在迭代结果时,计算后续批次包含在 总时间和迭代继续,直到分配的时间结束,但是 网络往返不考虑限制。
    • 此限制不会覆盖空闲游标的非活动游标超时 (默认为10分钟)。
  • func (q *Query) Skip(n int) *Query 跳过跳过查询结果中的n个初始文档。请注意,这仅适用于上限集合,其中文档按插入时间或排序结果自然排序。

  • func (q *Query) Snapshot() *Query 快照将强制执行的查询使用_id字段上的可用索引,以防止在单次迭代中多次返回同一文档。在文档大小发生变化且因此必须在迭代运行时移动时,如果没有此设置,则可能会发生这种情况。 由于快照模式遍历_id索引,因此它不能与排序或显式提示一起使用。它也不能使用任何其他索引进行查询。 即使使用快照模式,查询期间插入或删除的项目也可能会被退回,也可能不会被退回; 也就是说,此模式不是真正的时间点快照。 可以通过对将不被修改的字段使用任何唯一索引来获得相同的快照效果(最好也明确地使用提示)。通过在创建索引时将_id附加到索引,可以使非唯一索引(例如创建时间)唯一。

  • func (q *Query) Sort(fields …string) *Query 排序要求数据库根据提供的字段名称对返回的文档进行排序。字段名称可以加上 - (减号)作为前缀,以便按相反的顺序排序。

  • func (q *Query) Tail(timeout time.Duration) *Iter Tail返回一个tailable迭代器。与普通迭代器不同,一旦到达当前结果集的末尾,tailable迭代器可能会等待在集合中插入新值,而tailable迭代器只能与带帽集合一起使用。 timeout参数指示Next将在超时之前阻塞等待结果的时间。如果设置为-1,则Next不会超时,并且只要游标有效且会话未关闭,它将继续等待结果。如果设置为0,则下一次到达结果集的末尾时超时。否则,Next将等待至少给定的秒数,以便在超时之前使新文档可用。 在超时时,Next将解除阻塞并返回false,如果调用,Timeout方法将返回true。在这些情况下,仍然可以在同一个迭代器上再次调用Next来检查当前光标位置是否有新值,并且它将再次根据指定的timeoutSecs进行阻塞。但是,如果光标变为无效,则Next和Timeout都将返回false,并且必须重新启动查询。

查询用例:

//查询文档 通过collection.Find(),返回一个查询构造器,再用查询构造器取构建查询语句,使用One(&map)或All(&[]map)取接收查询结果
//6.1 根据Id查询
stuQuery := stuCol.FindId(bson.ObjectIdHex("5c8608a7baa5203fc198859f"))

//6.2 根据条件查询,连接One()函数查一个
oneData := make(map[string]interface{}) //定义一个接收器
err = stuCol.Find(bson.M{"_id":bson.ObjectIdHex("5c8608a7baa5203fc198859f")}).One(&oneData)
ErrorHandler(err,"Find().One()")
fmt.Println("查询一个结果:",oneData)

//6.3 根据条件查询,连接All()函数查多个
AllData := make([]map[string]interface{},0)
err = stuCol.Find(bson.M{"sex": "Male"}).All(&AllData)
ErrorHandler(err,"Find().All()")
fmt.Println("查询一个结果:",AllData)

//6.4 条件查询
QueryData := make([]map[string]interface{},0)
stuCol.Find(bson.M{}).Limit(2).All(&QueryData)
stuCol.Find(bson.M{}).Sort("name","-age").Skip(10).Limit(10)
Mongodb连接池

我们知道数据库IO是整个系统的瓶颈,一般我们会在数据层上构建缓存层以缓解数据库压力,除此之外,我们还需要主动限制数据库的连接数,以防访问冲破缓存限制抵达数据库过多的情况,所以生产环境构建连接池非常重要,以下为go编写一个连接池的用法:

mgo连接池是自带的,其每次获取一个连接,都是初始连接的一个Session拷贝,每次用完把拷贝的Session关闭即可。为防止编码过程中忘记关闭的情况,我们编写一个高阶函数M,每次使用mongo时调用M函数即可,其入参函数中编写mongo的操作,M函数自动管理连接的关闭。

//go mongodb 连接池
func BaseMongodbPool() {

	//定义一个数据接收器
	oneData := make(map[string]interface{}) //定义一个接收器

	//每执行一个M函数都从连接池拷贝一个连接,闭包内有集合collection的操作句柄实例,直接在闭包里mongo操作
	M("student", func(collection *mgo.Collection) {
		err := collection.FindId(bson.ObjectIdHex("5c8608a7baa5203fc198859f")).One(&oneData)
		ErrorHandler(err, "Find().One()")
		fmt.Println("查询一个结果:", oneData)
	})

}

/*
mongo连接池实现:
由于mgo连接池是自带的,你只需要使用session.Copy()拿到一个复制的session,用完之后session.Close()即可。

*/
const (
	USER string = "user"
	MSG  string = "msg"
)

var (
	session      *mgo.Session
	databaseName = "godemo"
)

func Session() *mgo.Session {
	if session == nil {
		var err error
		session, err = mgo.Dial("localhost")
		if err != nil {
			panic(err) // no, not really
		}
	}
	return session.Clone()
}

func M(collection string, f func(*mgo.Collection)) {
	//申请一个mongodb连接拷贝
	session := Session()
	//使用完即释放连接
	defer func() {
		session.Close()
		if err := recover(); err != nil {
			ErrorHandler(err.(error), "M")
		}
	}()

	//返回一个collection连接闭包
	c := session.DB(databaseName).C(collection)
	f(c)
}

2.mongo-go-driver

如果你需要一个长期维护的mongo驱动,目前mongo官方推出go驱动 https://github.com/mongodb/mongo-go-driver, 其已经有了用例说明。

至此Mongodb的操作专题就介绍到这,关于Mongo的实战我会在实战专题推出爬虫项目,敬请关注吧!