Skip to content

Commit 31a6d57

Browse files
author
Will Banfield
committed
minor: move change stream to own file
1 parent e78a135 commit 31a6d57

File tree

2 files changed

+63
-62
lines changed

2 files changed

+63
-62
lines changed

changestreams.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package mgo
2+
3+
type ChangeStream struct {
4+
iter *Iter
5+
options ChangeStreamOptions
6+
pipeline interface{}
7+
readPreference *ReadPreference
8+
}
9+
10+
type ChangeStreamOptions struct {
11+
12+
// FullDocument controls the amount of data that the server will return when
13+
// returning a changes document.
14+
FullDocument string
15+
16+
// ResumeAfter specifies the logical starting point for the new change stream.
17+
ResumeAfter *bson.Raw
18+
19+
// MaxAwaitTimeMS specifies the maximum amount of time for the server to wait
20+
// on new documents to satisfy a change stream query.
21+
MaxAwaitTimeMS int64
22+
23+
// BatchSize specifies the number of documents to return per batch.
24+
BatchSize int32
25+
26+
// Collation specifies the way the server should collate returned data.
27+
Collation *Collation
28+
}
29+
30+
func constructChangeStreamPipeline(pipeline interface{},
31+
options ChangeStreamOptions) interface{} {
32+
pipelinev := reflect.ValueOf(pipeline)
33+
34+
// ensure that the pipeline passed in is a slice.
35+
if pipelinev.Kind() != reflect.Slice {
36+
panic("pipeline argument must be a slice")
37+
}
38+
39+
// construct the options to be used by the change notification
40+
// pipeline stage.
41+
changeNotificationStageOptions := bson.M{}
42+
43+
if options.FullDocument != "" {
44+
changeNotificationStageOptions["fullDocument"] = options.FullDocument
45+
}
46+
if options.ResumeAfter != nil {
47+
changeNotificationStageOptions["resumeAfter"] = options.ResumeAfter
48+
}
49+
changeNotificationStage := bson.M{"$changeNotification": changeNotificationStageOptions}
50+
51+
pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1)
52+
53+
// insert the change notification pipeline stage at the beginning of the
54+
// aggregation.
55+
pipeOfInterfaces[0] = changeNotificationStage
56+
57+
// convert the passed in slice to a slice of interfaces.
58+
for i := 0; i < pipelinev.Len(); i++ {
59+
pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface()
60+
}
61+
var pipelineAsInterface interface{} = pipeOfInterfaces
62+
return pipelineAsInterface
63+
}

session.go

Lines changed: 0 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -2247,68 +2247,6 @@ func (c *Collection) FindId(id interface{}) *Query {
22472247
return c.Find(bson.D{{"_id", id}})
22482248
}
22492249

2250-
type ChangeStream struct {
2251-
iter *Iter
2252-
options ChangeStreamOptions
2253-
pipeline interface{}
2254-
readPreference *ReadPreference
2255-
}
2256-
2257-
type ChangeStreamOptions struct {
2258-
2259-
// FullDocument controls the amount of data that the server will return when
2260-
// returning a changes document.
2261-
FullDocument string
2262-
2263-
// ResumeAfter specifies the logical starting point for the new change stream.
2264-
ResumeAfter *bson.Raw
2265-
2266-
// MaxAwaitTimeMS specifies the maximum amount of time for the server to wait
2267-
// on new documents to satisfy a change stream query.
2268-
MaxAwaitTimeMS int64
2269-
2270-
// BatchSize specifies the number of documents to return per batch.
2271-
BatchSize int32
2272-
2273-
// Collation specifies the way the server should collate returned data.
2274-
Collation *Collation
2275-
}
2276-
2277-
func constructChangeStreamPipeline(pipeline interface{},
2278-
options ChangeStreamOptions) interface{} {
2279-
pipelinev := reflect.ValueOf(pipeline)
2280-
2281-
// ensure that the pipeline passed in is a slice.
2282-
if pipelinev.Kind() != reflect.Slice {
2283-
panic("pipeline argument must be a slice")
2284-
}
2285-
2286-
// construct the options to be used by the change notification
2287-
// pipeline stage.
2288-
changeNotificationStageOptions := bson.M{}
2289-
2290-
if options.FullDocument != "" {
2291-
changeNotificationStageOptions["fullDocument"] = options.FullDocument
2292-
}
2293-
if options.ResumeAfter != nil {
2294-
changeNotificationStageOptions["resumeAfter"] = options.ResumeAfter
2295-
}
2296-
changeNotificationStage := bson.M{"$changeNotification": changeNotificationStageOptions}
2297-
2298-
pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1)
2299-
2300-
// insert the change notification pipeline stage at the beginning of the
2301-
// aggregation.
2302-
pipeOfInterfaces[0] = changeNotificationStage
2303-
2304-
// convert the passed in slice to a slice of interfaces.
2305-
for i := 0; i < pipelinev.Len(); i++ {
2306-
pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface()
2307-
}
2308-
var pipelineAsInterface interface{} = pipeOfInterfaces
2309-
return pipelineAsInterface
2310-
}
2311-
23122250
type Pipe struct {
23132251
session *Session
23142252
collection *Collection

0 commit comments

Comments
 (0)