Skip to content

Commit faa6461

Browse files
authored
Merge pull request globalsign#1 from globalsign/development
Development
2 parents 95e2bfa + fdec4b9 commit faa6461

File tree

4 files changed

+614
-267
lines changed

4 files changed

+614
-267
lines changed

changestreams.go

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ const (
1717
UpdateLookup = "updateLookup"
1818
)
1919

20+
type changeDomainType int
21+
22+
const (
23+
changeDomainCollection changeDomainType = iota
24+
changeDomainDatabase
25+
changeDomainCluster
26+
)
27+
2028
type ChangeStream struct {
2129
iter *Iter
2230
isClosed bool
@@ -28,6 +36,9 @@ type ChangeStream struct {
2836
err error
2937
m sync.Mutex
3038
sessionCopied bool
39+
domainType changeDomainType
40+
session *Session
41+
database *Database
3142
}
3243

3344
type ChangeStreamOptions struct {
@@ -53,15 +64,15 @@ type ChangeStreamOptions struct {
5364
var errMissingResumeToken = errors.New("resume token missing from result")
5465

5566
// Watch constructs a new ChangeStream capable of receiving continuing data
56-
// from the database.
67+
// from the database, it works at collection level.
5768
func (c *Collection) Watch(pipeline interface{},
5869
options ChangeStreamOptions) (*ChangeStream, error) {
5970

6071
if pipeline == nil {
6172
pipeline = []bson.M{}
6273
}
6374

64-
csPipe := constructChangeStreamPipeline(pipeline, options)
75+
csPipe := constructChangeStreamPipeline(pipeline, options, changeDomainCollection)
6576
pipe := c.Pipe(&csPipe)
6677
if options.MaxAwaitTimeMS > 0 {
6778
pipe.SetMaxTime(options.MaxAwaitTimeMS)
@@ -85,6 +96,82 @@ func (c *Collection) Watch(pipeline interface{},
8596
resumeToken: nil,
8697
options: options,
8798
pipeline: pipeline,
99+
domainType: changeDomainCollection,
100+
}, nil
101+
}
102+
103+
// Watch constructs a new ChangeStream capable of receiving continuing data
104+
// from the database, it works at cluster level (change events from all collections across al DBs).
105+
func (sess *Session) Watch(pipeline interface{},
106+
options ChangeStreamOptions) (*ChangeStream, error) {
107+
108+
if pipeline == nil {
109+
pipeline = []bson.M{}
110+
}
111+
112+
csPipe := constructChangeStreamPipeline(pipeline, options, changeDomainCluster)
113+
pipe := sess.pipe(&csPipe)
114+
if options.MaxAwaitTimeMS > 0 {
115+
pipe.SetMaxTime(options.MaxAwaitTimeMS)
116+
}
117+
if options.BatchSize > 0 {
118+
pipe.Batch(options.BatchSize)
119+
}
120+
pIter := pipe.Iter()
121+
122+
// check that there was no issue creating the iterator.
123+
// this will fail immediately with an error from the server if running against
124+
// a standalone.
125+
if err := pIter.Err(); err != nil {
126+
return nil, err
127+
}
128+
129+
pIter.isChangeStream = true
130+
return &ChangeStream{
131+
iter: pIter,
132+
resumeToken: nil,
133+
options: options,
134+
pipeline: pipeline,
135+
domainType: changeDomainCluster,
136+
session: sess,
137+
}, nil
138+
}
139+
140+
// Watch constructs a new ChangeStream capable of receiving continuing data
141+
// from the database, it works at DB level (change events from all collections in this DB).
142+
func (db *Database) Watch(pipeline interface{},
143+
options ChangeStreamOptions) (*ChangeStream, error) {
144+
145+
if pipeline == nil {
146+
pipeline = []bson.M{}
147+
}
148+
149+
csPipe := constructChangeStreamPipeline(pipeline, options, changeDomainDatabase)
150+
pipe := db.pipe(&csPipe)
151+
if options.MaxAwaitTimeMS > 0 {
152+
pipe.SetMaxTime(options.MaxAwaitTimeMS)
153+
}
154+
if options.BatchSize > 0 {
155+
pipe.Batch(options.BatchSize)
156+
}
157+
pIter := pipe.Iter()
158+
159+
// check that there was no issue creating the iterator.
160+
// this will fail immediately with an error from the server if running against
161+
// a standalone.
162+
if err := pIter.Err(); err != nil {
163+
return nil, err
164+
}
165+
166+
pIter.isChangeStream = true
167+
return &ChangeStream{
168+
iter: pIter,
169+
resumeToken: nil,
170+
options: options,
171+
pipeline: pipeline,
172+
domainType: changeDomainDatabase,
173+
session: db.Session,
174+
database: db,
88175
}, nil
89176
}
90177

@@ -213,7 +300,7 @@ func (changeStream *ChangeStream) Timeout() bool {
213300
}
214301

215302
func constructChangeStreamPipeline(pipeline interface{},
216-
options ChangeStreamOptions) interface{} {
303+
options ChangeStreamOptions, domain changeDomainType) interface{} {
217304
pipelinev := reflect.ValueOf(pipeline)
218305

219306
// ensure that the pipeline passed in is a slice.
@@ -231,6 +318,9 @@ func constructChangeStreamPipeline(pipeline interface{},
231318
if options.ResumeAfter != nil {
232319
changeStreamStageOptions["resumeAfter"] = options.ResumeAfter
233320
}
321+
if domain == changeDomainCluster {
322+
changeStreamStageOptions["allChangesForCluster"] = true
323+
}
234324

235325
changeStreamStage := bson.M{"$changeStream": changeStreamStageOptions}
236326

@@ -274,10 +364,18 @@ func (changeStream *ChangeStream) resume() error {
274364
opts.ResumeAfter = changeStream.resumeToken
275365
}
276366
// make a new pipeline containing the resume token.
277-
changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts)
367+
changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts, changeStream.domainType)
278368

279369
// generate the new iterator with the new connection.
280-
newPipe := changeStream.collection.Pipe(changeStreamPipeline)
370+
var newPipe *Pipe
371+
if changeStream.domainType == changeDomainCollection {
372+
newPipe = changeStream.collection.Pipe(changeStreamPipeline)
373+
} else if changeStream.domainType == changeDomainCluster {
374+
newPipe = changeStream.session.pipe(changeStreamPipeline)
375+
} else if changeStream.domainType == changeDomainDatabase {
376+
newPipe = changeStream.database.pipe(changeStreamPipeline)
377+
}
378+
281379
changeStream.iter = newPipe.Iter()
282380
if err := changeStream.iter.Err(); err != nil {
283381
return err

0 commit comments

Comments
 (0)