Skip to content

Commit e78a135

Browse files
author
Will Banfield
committed
MGO-139 add helper function to enable ChangeStream pipeline
1 parent 39b4000 commit e78a135

File tree

1 file changed

+62
-0
lines changed

1 file changed

+62
-0
lines changed

session.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2247,6 +2247,68 @@ func (c *Collection) FindId(id interface{}) *Query {
22472247
return c.Find(bson.D{{"_id", id}})
22482248
}
22492249

2250+
type ChangeStream struct {
2251+
iter *Iter
2252+
options ChangeStreamOptions
2253+
pipeline interface{}
2254+
readPreference *ReadPreference
2255+
}
2256+
2257+
type ChangeStreamOptions struct {
2258+
2259+
// FullDocument controls the amount of data that the server will return when
2260+
// returning a changes document.
2261+
FullDocument string
2262+
2263+
// ResumeAfter specifies the logical starting point for the new change stream.
2264+
ResumeAfter *bson.Raw
2265+
2266+
// MaxAwaitTimeMS specifies the maximum amount of time for the server to wait
2267+
// on new documents to satisfy a change stream query.
2268+
MaxAwaitTimeMS int64
2269+
2270+
// BatchSize specifies the number of documents to return per batch.
2271+
BatchSize int32
2272+
2273+
// Collation specifies the way the server should collate returned data.
2274+
Collation *Collation
2275+
}
2276+
2277+
func constructChangeStreamPipeline(pipeline interface{},
2278+
options ChangeStreamOptions) interface{} {
2279+
pipelinev := reflect.ValueOf(pipeline)
2280+
2281+
// ensure that the pipeline passed in is a slice.
2282+
if pipelinev.Kind() != reflect.Slice {
2283+
panic("pipeline argument must be a slice")
2284+
}
2285+
2286+
// construct the options to be used by the change notification
2287+
// pipeline stage.
2288+
changeNotificationStageOptions := bson.M{}
2289+
2290+
if options.FullDocument != "" {
2291+
changeNotificationStageOptions["fullDocument"] = options.FullDocument
2292+
}
2293+
if options.ResumeAfter != nil {
2294+
changeNotificationStageOptions["resumeAfter"] = options.ResumeAfter
2295+
}
2296+
changeNotificationStage := bson.M{"$changeNotification": changeNotificationStageOptions}
2297+
2298+
pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1)
2299+
2300+
// insert the change notification pipeline stage at the beginning of the
2301+
// aggregation.
2302+
pipeOfInterfaces[0] = changeNotificationStage
2303+
2304+
// convert the passed in slice to a slice of interfaces.
2305+
for i := 0; i < pipelinev.Len(); i++ {
2306+
pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface()
2307+
}
2308+
var pipelineAsInterface interface{} = pipeOfInterfaces
2309+
return pipelineAsInterface
2310+
}
2311+
22502312
type Pipe struct {
22512313
session *Session
22522314
collection *Collection

0 commit comments

Comments
 (0)