Skip to content

Commit 255136a

Browse files
peterdekadomodwyer
authored andcommitted
changeStream support (globalsign#97)
Add $changeStream support
1 parent 6d2b967 commit 255136a

File tree

4 files changed

+885
-11
lines changed

4 files changed

+885
-11
lines changed

changestreams.go

Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
package mgo
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"reflect"
7+
"sync"
8+
"time"
9+
10+
"github.com/globalsign/mgo/bson"
11+
)
12+
13+
type FullDocument string
14+
15+
const (
16+
Default = "default"
17+
UpdateLookup = "updateLookup"
18+
)
19+
20+
type ChangeStream struct {
21+
iter *Iter
22+
isClosed bool
23+
options ChangeStreamOptions
24+
pipeline interface{}
25+
resumeToken *bson.Raw
26+
collection *Collection
27+
readPreference *ReadPreference
28+
err error
29+
m sync.Mutex
30+
sessionCopied bool
31+
}
32+
33+
type ChangeStreamOptions struct {
34+
35+
// FullDocument controls the amount of data that the server will return when
36+
// returning a changes document.
37+
FullDocument FullDocument
38+
39+
// ResumeAfter specifies the logical starting point for the new change stream.
40+
ResumeAfter *bson.Raw
41+
42+
// MaxAwaitTimeMS specifies the maximum amount of time for the server to wait
43+
// on new documents to satisfy a change stream query.
44+
MaxAwaitTimeMS time.Duration
45+
46+
// BatchSize specifies the number of documents to return per batch.
47+
BatchSize int
48+
49+
// Collation specifies the way the server should collate returned data.
50+
//TODO Collation *Collation
51+
}
52+
53+
var errMissingResumeToken = errors.New("resume token missing from result")
54+
55+
// Watch constructs a new ChangeStream capable of receiving continuing data
56+
// from the database.
57+
func (coll *Collection) Watch(pipeline interface{},
58+
options ChangeStreamOptions) (*ChangeStream, error) {
59+
60+
if pipeline == nil {
61+
pipeline = []bson.M{}
62+
}
63+
64+
csPipe := constructChangeStreamPipeline(pipeline, options)
65+
pipe := coll.Pipe(&csPipe)
66+
if options.MaxAwaitTimeMS > 0 {
67+
pipe.SetMaxTime(options.MaxAwaitTimeMS)
68+
}
69+
if options.BatchSize > 0 {
70+
pipe.Batch(options.BatchSize)
71+
}
72+
pIter := pipe.Iter()
73+
74+
// check that there was no issue creating the iterator.
75+
// this will fail immediately with an error from the server if running against
76+
// a standalone.
77+
if err := pIter.Err(); err != nil {
78+
return nil, err
79+
}
80+
81+
pIter.isChangeStream = true
82+
return &ChangeStream{
83+
iter: pIter,
84+
collection: coll,
85+
resumeToken: nil,
86+
options: options,
87+
pipeline: pipeline,
88+
}, nil
89+
}
90+
91+
// Next retrieves the next document from the change stream, blocking if necessary.
92+
// Next returns true if a document was successfully unmarshalled into result,
93+
// and false if an error occured. When Next returns false, the Err method should
94+
// be called to check what error occurred during iteration. If there were no events
95+
// available (ErrNotFound), the Err method returns nil so the user can retry the invocaton.
96+
//
97+
// For example:
98+
//
99+
// pipeline := []bson.M{}
100+
//
101+
// changeStream := collection.Watch(pipeline, ChangeStreamOptions{})
102+
// for changeStream.Next(&changeDoc) {
103+
// fmt.Printf("Change: %v\n", changeDoc)
104+
// }
105+
//
106+
// if err := changeStream.Close(); err != nil {
107+
// return err
108+
// }
109+
//
110+
// If the pipeline used removes the _id field from the result, Next will error
111+
// because the _id field is needed to resume iteration when an error occurs.
112+
//
113+
func (changeStream *ChangeStream) Next(result interface{}) bool {
114+
// the err field is being constantly overwritten and we don't want the user to
115+
// attempt to read it at this point so we lock.
116+
changeStream.m.Lock()
117+
118+
defer changeStream.m.Unlock()
119+
120+
// if we are in a state of error, then don't continue.
121+
if changeStream.err != nil {
122+
return false
123+
}
124+
125+
if changeStream.isClosed {
126+
changeStream.err = fmt.Errorf("illegal use of a closed ChangeStream")
127+
return false
128+
}
129+
130+
var err error
131+
132+
// attempt to fetch the change stream result.
133+
err = changeStream.fetchResultSet(result)
134+
if err == nil {
135+
return true
136+
}
137+
138+
// if we get no results we return false with no errors so the user can call Next
139+
// again, resuming is not needed as the iterator is simply timed out as no events happened.
140+
// The user will call Timeout in order to understand if this was the case.
141+
if err == ErrNotFound {
142+
return false
143+
}
144+
145+
// check if the error is resumable
146+
if !isResumableError(err) {
147+
// error is not resumable, give up and return it to the user.
148+
changeStream.err = err
149+
return false
150+
}
151+
152+
// try to resume.
153+
err = changeStream.resume()
154+
if err != nil {
155+
// we've not been able to successfully resume and should only try once,
156+
// so we give up.
157+
changeStream.err = err
158+
return false
159+
}
160+
161+
// we've successfully resumed the changestream.
162+
// try to fetch the next result.
163+
err = changeStream.fetchResultSet(result)
164+
if err != nil {
165+
changeStream.err = err
166+
return false
167+
}
168+
169+
return true
170+
}
171+
172+
// Err returns nil if no errors happened during iteration, or the actual
173+
// error otherwise.
174+
func (changeStream *ChangeStream) Err() error {
175+
changeStream.m.Lock()
176+
defer changeStream.m.Unlock()
177+
return changeStream.err
178+
}
179+
180+
// Close kills the server cursor used by the iterator, if any, and returns
181+
// nil if no errors happened during iteration, or the actual error otherwise.
182+
func (changeStream *ChangeStream) Close() error {
183+
changeStream.m.Lock()
184+
defer changeStream.m.Unlock()
185+
changeStream.isClosed = true
186+
err := changeStream.iter.Close()
187+
if err != nil {
188+
changeStream.err = err
189+
}
190+
if changeStream.sessionCopied {
191+
changeStream.iter.session.Close()
192+
changeStream.sessionCopied = false
193+
}
194+
return err
195+
}
196+
197+
// ResumeToken returns a copy of the current resume token held by the change stream.
198+
// This token should be treated as an opaque token that can be provided to instantiate
199+
// a new change stream.
200+
func (changeStream *ChangeStream) ResumeToken() *bson.Raw {
201+
changeStream.m.Lock()
202+
defer changeStream.m.Unlock()
203+
if changeStream.resumeToken == nil {
204+
return nil
205+
}
206+
var tokenCopy = *changeStream.resumeToken
207+
return &tokenCopy
208+
}
209+
210+
// Timeout returns true if the last call of Next returned false because of an iterator timeout.
211+
func (changeStream *ChangeStream) Timeout() bool {
212+
return changeStream.iter.Timeout()
213+
}
214+
215+
func constructChangeStreamPipeline(pipeline interface{},
216+
options ChangeStreamOptions) interface{} {
217+
pipelinev := reflect.ValueOf(pipeline)
218+
219+
// ensure that the pipeline passed in is a slice.
220+
if pipelinev.Kind() != reflect.Slice {
221+
panic("pipeline argument must be a slice")
222+
}
223+
224+
// construct the options to be used by the change notification
225+
// pipeline stage.
226+
changeStreamStageOptions := bson.M{}
227+
228+
if options.FullDocument != "" {
229+
changeStreamStageOptions["fullDocument"] = options.FullDocument
230+
}
231+
if options.ResumeAfter != nil {
232+
changeStreamStageOptions["resumeAfter"] = options.ResumeAfter
233+
}
234+
235+
changeStreamStage := bson.M{"$changeStream": changeStreamStageOptions}
236+
237+
pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1)
238+
239+
// insert the change notification pipeline stage at the beginning of the
240+
// aggregation.
241+
pipeOfInterfaces[0] = changeStreamStage
242+
243+
// convert the passed in slice to a slice of interfaces.
244+
for i := 0; i < pipelinev.Len(); i++ {
245+
pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface()
246+
}
247+
var pipelineAsInterface interface{} = pipeOfInterfaces
248+
return pipelineAsInterface
249+
}
250+
251+
func (changeStream *ChangeStream) resume() error {
252+
// copy the information for the new socket.
253+
254+
// Thanks to Copy() future uses will acquire a new socket against the newly selected DB.
255+
newSession := changeStream.iter.session.Copy()
256+
257+
// fetch the cursor from the iterator and use it to run a killCursors
258+
// on the connection.
259+
cursorId := changeStream.iter.op.cursorId
260+
err := runKillCursorsOnSession(newSession, cursorId)
261+
if err != nil {
262+
return err
263+
}
264+
265+
// change out the old connection to the database with the new connection.
266+
if changeStream.sessionCopied {
267+
changeStream.collection.Database.Session.Close()
268+
}
269+
changeStream.collection.Database.Session = newSession
270+
changeStream.sessionCopied = true
271+
272+
opts := changeStream.options
273+
if changeStream.resumeToken != nil {
274+
opts.ResumeAfter = changeStream.resumeToken
275+
}
276+
// make a new pipeline containing the resume token.
277+
changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts)
278+
279+
// generate the new iterator with the new connection.
280+
newPipe := changeStream.collection.Pipe(changeStreamPipeline)
281+
changeStream.iter = newPipe.Iter()
282+
if err := changeStream.iter.Err(); err != nil {
283+
return err
284+
}
285+
changeStream.iter.isChangeStream = true
286+
return nil
287+
}
288+
289+
// fetchResumeToken unmarshals the _id field from the document, setting an error
290+
// on the changeStream if it is unable to.
291+
func (changeStream *ChangeStream) fetchResumeToken(rawResult *bson.Raw) error {
292+
changeStreamResult := struct {
293+
ResumeToken *bson.Raw `bson:"_id,omitempty"`
294+
}{}
295+
296+
err := rawResult.Unmarshal(&changeStreamResult)
297+
if err != nil {
298+
return err
299+
}
300+
301+
if changeStreamResult.ResumeToken == nil {
302+
return errMissingResumeToken
303+
}
304+
305+
changeStream.resumeToken = changeStreamResult.ResumeToken
306+
return nil
307+
}
308+
309+
func (changeStream *ChangeStream) fetchResultSet(result interface{}) error {
310+
rawResult := bson.Raw{}
311+
312+
// fetch the next set of documents from the cursor.
313+
gotNext := changeStream.iter.Next(&rawResult)
314+
err := changeStream.iter.Err()
315+
if err != nil {
316+
return err
317+
}
318+
319+
if !gotNext && err == nil {
320+
// If the iter.Err() method returns nil despite us not getting a next batch,
321+
// it is becuase iter.Err() silences this case.
322+
return ErrNotFound
323+
}
324+
325+
// grab the resumeToken from the results
326+
if err := changeStream.fetchResumeToken(&rawResult); err != nil {
327+
return err
328+
}
329+
330+
// put the raw results into the data structure the user provided.
331+
if err := rawResult.Unmarshal(result); err != nil {
332+
return err
333+
}
334+
return nil
335+
}
336+
337+
func isResumableError(err error) bool {
338+
_, isQueryError := err.(*QueryError)
339+
// if it is not a database error OR it is a database error,
340+
// but the error is a notMaster error
341+
//and is not a missingResumeToken error (caused by the user provided pipeline)
342+
return (!isQueryError || isNotMasterError(err)) && (err != errMissingResumeToken)
343+
}
344+
345+
func runKillCursorsOnSession(session *Session, cursorId int64) error {
346+
socket, err := session.acquireSocket(true)
347+
if err != nil {
348+
return err
349+
}
350+
err = socket.Query(&killCursorsOp{[]int64{cursorId}})
351+
if err != nil {
352+
return err
353+
}
354+
socket.Release()
355+
356+
return nil
357+
}

0 commit comments

Comments
 (0)