@@ -288,9 +288,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) {
288288 }
289289}
290290
291- // TestWatcherRequestProgress ensures synced watcher can correctly
292- // report its correct progress.
293- func TestWatcherRequestProgress (t * testing.T ) {
291+ func TestWatcherRequestProgressBadId (t * testing.T ) {
294292 b , tmpPath := betesting .NewDefaultTmpBackend (t )
295293
296294 // manually create watchableStore instead of newWatchableStore
@@ -302,14 +300,12 @@ func TestWatcherRequestProgress(t *testing.T) {
302300 unsynced : newWatcherGroup (),
303301 synced : newWatcherGroup (),
304302 }
305-
306303 defer func () {
307304 s .store .Close ()
308305 os .Remove (tmpPath )
309306 }()
310307
311308 testKey := []byte ("foo" )
312- notTestKey := []byte ("bad" )
313309 testValue := []byte ("bar" )
314310 s .Put (testKey , testValue , lease .NoLease )
315311
@@ -322,26 +318,91 @@ func TestWatcherRequestProgress(t *testing.T) {
322318 t .Fatalf ("unexpected %+v" , resp )
323319 default :
324320 }
321+ }
325322
326- id , _ := w .Watch (0 , notTestKey , nil , 1 )
327- w .RequestProgress (id )
328- select {
329- case resp := <- w .Chan ():
330- t .Fatalf ("unexpected %+v" , resp )
331- default :
323+ func TestWatcherRequestProgress (t * testing.T ) {
324+ testKey := []byte ("foo" )
325+ notTestKey := []byte ("bad" )
326+ testValue := []byte ("bar" )
327+ tcs := []struct {
328+ name string
329+ startRev int64
330+ expectProgressBeforeSync bool
331+ expectProgressAfterSync bool
332+ }{
333+ {
334+ name : "Zero revision" ,
335+ startRev : 0 ,
336+ expectProgressBeforeSync : true ,
337+ expectProgressAfterSync : true ,
338+ },
339+ {
340+ name : "Old revision" ,
341+ startRev : 1 ,
342+ expectProgressAfterSync : true ,
343+ },
344+ {
345+ name : "Current revision" ,
346+ startRev : 2 ,
347+ expectProgressAfterSync : true ,
348+ },
349+ {
350+ name : "Current revision plus one" ,
351+ startRev : 3 ,
352+ },
353+ {
354+ name : "Current revision plus two" ,
355+ startRev : 4 ,
356+ },
332357 }
358+ for _ , tc := range tcs {
359+ t .Run (tc .name , func (t * testing.T ) {
360+ b , tmpPath := betesting .NewDefaultTmpBackend (t )
361+
362+ // manually create watchableStore instead of newWatchableStore
363+ // because newWatchableStore automatically calls syncWatchers
364+ // method to sync watchers in unsynced map. We want to keep watchers
365+ // in unsynced to test if syncWatchers works as expected.
366+ s := & watchableStore {
367+ store : NewStore (zap .NewExample (), b , & lease.FakeLessor {}, StoreConfig {}),
368+ unsynced : newWatcherGroup (),
369+ synced : newWatcherGroup (),
370+ }
333371
334- s .syncWatchers ()
372+ defer func () {
373+ s .store .Close ()
374+ os .Remove (tmpPath )
375+ }()
376+
377+ s .Put (testKey , testValue , lease .NoLease )
335378
336- w .RequestProgress (id )
337- wrs := WatchResponse {WatchID : id , Revision : 2 }
379+ w := s .NewWatchStream ()
380+
381+ id , _ := w .Watch (0 , notTestKey , nil , tc .startRev )
382+ w .RequestProgress (id )
383+ asssertProgressSent (t , w , id , tc .expectProgressBeforeSync )
384+ s .syncWatchers ()
385+ w .RequestProgress (id )
386+ asssertProgressSent (t , w , id , tc .expectProgressAfterSync )
387+ })
388+ }
389+ }
390+
391+ func asssertProgressSent (t * testing.T , stream WatchStream , id WatchID , expectProgress bool ) {
338392 select {
339- case resp := <- w .Chan ():
340- if ! reflect .DeepEqual (resp , wrs ) {
341- t .Fatalf ("got %+v, expect %+v" , resp , wrs )
393+ case resp := <- stream .Chan ():
394+ if expectProgress {
395+ wrs := WatchResponse {WatchID : id , Revision : 2 }
396+ if ! reflect .DeepEqual (resp , wrs ) {
397+ t .Fatalf ("got %+v, expect %+v" , resp , wrs )
398+ }
399+ } else {
400+ t .Fatalf ("unexpected response %+v" , resp )
401+ }
402+ default :
403+ if expectProgress {
404+ t .Fatalf ("failed to receive progress" )
342405 }
343- case <- time .After (time .Second ):
344- t .Fatal ("failed to receive progress" )
345406 }
346407}
347408
0 commit comments