1
1
use std:: sync:: { Arc , RwLock } ;
2
- use std:: sync:: mpsc:: { Receiver , Sender , channel} ;
3
- use event:: { Event , EventArg } ;
2
+ use std:: sync:: mpsc:: channel;
3
+ use event:: { Event , EventSender , EventReceiver } ;
4
4
use item:: { Item , ItemGroup , MatchedItem , MatchedItemGroup , MatchedRange } ;
5
5
use std:: thread;
6
6
@@ -33,12 +33,12 @@ enum MatcherMode {
33
33
}
34
34
35
35
pub struct Matcher {
36
- tx_result : Sender < ( Event , EventArg ) > ,
36
+ tx_result : EventSender ,
37
37
mode : MatcherMode ,
38
38
}
39
39
40
40
impl Matcher {
41
- pub fn new ( tx_result : Sender < ( Event , EventArg ) > ) -> Self {
41
+ pub fn new ( tx_result : EventSender ) -> Self {
42
42
Matcher {
43
43
tx_result : tx_result,
44
44
mode : MatcherMode :: Fuzzy ,
@@ -78,18 +78,21 @@ impl Matcher {
78
78
79
79
}
80
80
81
- pub fn run ( & self , rx_item : Receiver < ( Event , EventArg ) > ) {
82
- let ( tx_matcher, rx_matcher) = channel ( ) ;
81
+ pub fn run ( & self , rx_item : EventReceiver ) {
82
+ let ( tx_matcher, rx_matcher) : ( EventSender , EventReceiver ) = channel ( ) ;
83
83
let matcher_restart = Arc :: new ( AtomicBool :: new ( false ) ) ;
84
84
// start a new thread listening for EvMatcherRestart, that means the query had been
85
85
// changed, so that matcher shoudl discard all previous events.
86
86
{
87
87
let matcher_restart = Arc :: clone ( & matcher_restart) ;
88
88
thread:: spawn ( move || {
89
89
while let Ok ( ( ev, arg) ) = rx_item. recv ( ) {
90
+ debug ! ( "matcher: rx_item: {:?}" , ev) ;
90
91
match ev {
91
92
Event :: EvMatcherRestart => {
92
93
matcher_restart. store ( true , Ordering :: Relaxed ) ;
94
+
95
+ let _ = tx_matcher. send ( ( ev, Box :: new ( true ) ) ) ;
93
96
while matcher_restart. load ( Ordering :: Relaxed ) {
94
97
thread:: sleep ( Duration :: from_millis ( 10 ) ) ;
95
98
}
@@ -108,77 +111,76 @@ impl Matcher {
108
111
let mut matcher_engine: Option < Box < MatchEngine > > = None ;
109
112
let mut num_processed: usize = 0 ;
110
113
let mut matcher_mode = self . mode ;
111
- loop {
112
114
115
+ while let Ok ( ( ev, arg) ) = rx_matcher. recv ( ) {
116
+ debug ! ( "matcher: rx_matcher: {:?}" , ev) ;
113
117
if matcher_restart. load ( Ordering :: Relaxed ) {
114
118
while let Ok ( _) = rx_matcher. try_recv ( ) { }
115
119
matcher_restart. store ( false , Ordering :: Relaxed ) ;
120
+ continue ;
116
121
}
117
122
118
- if let Ok ( ( ev, arg) ) = rx_matcher. recv_timeout ( Duration :: from_millis ( 10 ) ) {
119
- match ev {
120
- Event :: EvMatcherNewItem => {
121
- let items: ItemGroup = * arg. downcast ( ) . unwrap ( ) ;
122
- num_processed += items. len ( ) ;
123
-
124
- matcher_engine. as_ref ( ) . map ( |mat| {
125
- let matched_items: MatchedItemGroup = items. into_iter ( )
126
- . filter_map ( |item| mat. match_item ( item) )
127
- . collect ( ) ;
128
- let _ = self . tx_result . send ( ( Event :: EvModelNewItem , Box :: new ( matched_items) ) ) ;
129
- } ) ;
130
-
131
- // report the number of processed items
132
- let _ = self . tx_result . send ( ( Event :: EvModelNotifyProcessed , Box :: new ( num_processed) ) ) ;
133
- }
123
+ match ev {
124
+ Event :: EvMatcherRestart => {
125
+ num_processed = 0 ;
126
+ let query = arg. downcast :: < String > ( ) . unwrap ( ) ;
134
127
135
- Event :: EvReaderStopped | Event :: EvReaderStarted => {
136
- let _ = self . tx_result . send ( ( ev, arg) ) ;
137
- }
138
- Event :: EvSenderStopped => {
139
- // Since matcher is single threaded, sender stopped means all items are
140
- // processed.
141
- let _ = self . tx_result . send ( ( Event :: EvModelNotifyProcessed , Box :: new ( num_processed) ) ) ;
142
- let _ = self . tx_result . send ( ( Event :: EvMatcherStopped , arg) ) ;
143
- }
128
+ // notifiy the model that the query had been changed
129
+ let _ = self . tx_result . send ( ( Event :: EvModelRestart , Box :: new ( true ) ) ) ;
144
130
131
+ let mode_string = match matcher_mode {
132
+ MatcherMode :: Regex => "RE" . to_string ( ) ,
133
+ MatcherMode :: Exact => "EX" . to_string ( ) ,
134
+ _ => "" . to_string ( ) ,
135
+ } ;
136
+ let _ = self . tx_result . send ( ( Event :: EvModelNotifyMatcherMode , Box :: new ( mode_string) ) ) ;
145
137
146
- Event :: EvMatcherRestart => {
147
- num_processed = 0 ;
148
- let query = arg. downcast :: < String > ( ) . unwrap ( ) ;
138
+ matcher_engine = Some ( EngineFactory :: build ( & query, matcher_mode) ) ;
139
+ }
149
140
150
- // notifiy the model that the query had been changed
151
- let _ = self . tx_result . send ( ( Event :: EvModelRestart , Box :: new ( true ) ) ) ;
141
+ Event :: EvMatcherNewItem => {
142
+ let items: ItemGroup = * arg. downcast ( ) . unwrap ( ) ;
143
+ num_processed += items. len ( ) ;
152
144
153
- let mode_string = match matcher_mode {
154
- MatcherMode :: Regex => "RE" . to_string ( ) ,
155
- MatcherMode :: Exact => "EX" . to_string ( ) ,
156
- _ => "" . to_string ( ) ,
157
- } ;
158
- let _ = self . tx_result . send ( ( Event :: EvModelNotifyMatcherMode , Box :: new ( mode_string ) ) ) ;
145
+ matcher_engine . as_ref ( ) . map ( |mat| {
146
+ let matched_items : MatchedItemGroup = items . into_iter ( )
147
+ . filter_map ( |item| mat . match_item ( item ) )
148
+ . collect ( ) ;
149
+ let _ = self . tx_result . send ( ( Event :: EvModelNewItem , Box :: new ( matched_items ) ) ) ;
150
+ } ) ;
159
151
160
- matcher_engine = Some ( EngineFactory :: build ( & query, matcher_mode) ) ;
161
- }
152
+ // report the number of processed items
153
+ let _ = self . tx_result . send ( ( Event :: EvModelNotifyProcessed , Box :: new ( num_processed) ) ) ;
154
+ }
162
155
163
- Event :: EvActRotateMode => {
164
- if self . mode == MatcherMode :: Regex {
165
- // sk started with regex mode.
166
- matcher_mode = if matcher_mode == self . mode {
167
- MatcherMode :: Fuzzy
168
- } else {
169
- MatcherMode :: Regex
170
- } ;
156
+ Event :: EvReaderStopped | Event :: EvReaderStarted => {
157
+ let _ = self . tx_result . send ( ( ev, arg) ) ;
158
+ }
159
+ Event :: EvSenderStopped => {
160
+ // Since matcher is single threaded, sender stopped means all items are
161
+ // processed.
162
+ let _ = self . tx_result . send ( ( Event :: EvModelNotifyProcessed , Box :: new ( num_processed) ) ) ;
163
+ let _ = self . tx_result . send ( ( Event :: EvMatcherStopped , arg) ) ;
164
+ }
165
+
166
+ Event :: EvActRotateMode => {
167
+ if self . mode == MatcherMode :: Regex {
168
+ // sk started with regex mode.
169
+ matcher_mode = if matcher_mode == self . mode {
170
+ MatcherMode :: Fuzzy
171
171
} else {
172
- matcher_mode = if matcher_mode == self . mode {
173
- MatcherMode :: Regex
174
- } else {
175
- self . mode
176
- }
172
+ MatcherMode :: Regex
173
+ } ;
174
+ } else {
175
+ matcher_mode = if matcher_mode == self . mode {
176
+ MatcherMode :: Regex
177
+ } else {
178
+ self . mode
177
179
}
178
180
}
179
-
180
- _ => { }
181
181
}
182
+
183
+ _ => { }
182
184
}
183
185
}
184
186
}
0 commit comments