@@ -154,13 +154,20 @@ fn process_attribute_types(ourlog: &mut OurLog) {
154
154
( Annotated ( Some ( Double ) , _) , Annotated ( Some ( Value :: U64 ( _) ) , _) ) => ( ) ,
155
155
( Annotated ( Some ( Double ) , _) , Annotated ( Some ( Value :: F64 ( _) ) , _) ) => ( ) ,
156
156
( Annotated ( Some ( String ) , _) , Annotated ( Some ( Value :: String ( _) ) , _) ) => ( ) ,
157
- ( Annotated ( ty_opt @ Some ( Unknown ( _) ) , ty_meta) , _) => {
158
- ty_meta. add_error ( ErrorKind :: InvalidData ) ;
159
- ty_meta. set_original_value ( ty_opt. take ( ) ) ;
157
+ // Note: currently the mapping to Kafka requires that invalid or unknown combinations
158
+ // of types and values are removed from the mapping.
159
+ //
160
+ // Usually Relay would only modify the offending values, but for now, until there
161
+ // is better support in the pipeline here, we need to remove the entire attribute.
162
+ ( Annotated ( Some ( Unknown ( _) ) , _) , _) => {
163
+ let original = attribute. value_mut ( ) . take ( ) ;
164
+ attribute. meta_mut ( ) . add_error ( ErrorKind :: InvalidData ) ;
165
+ attribute. meta_mut ( ) . set_original_value ( original) ;
160
166
}
161
- ( Annotated ( Some ( _) , _) , Annotated ( value @ Some ( _) , value_meta) ) => {
162
- value_meta. add_error ( ErrorKind :: InvalidData ) ;
163
- value_meta. set_original_value ( value. take ( ) ) ;
167
+ ( Annotated ( Some ( _) , _) , Annotated ( Some ( _) , _) ) => {
168
+ let original = attribute. value_mut ( ) . take ( ) ;
169
+ attribute. meta_mut ( ) . add_error ( ErrorKind :: InvalidData ) ;
170
+ attribute. meta_mut ( ) . set_original_value ( original) ;
164
171
}
165
172
( Annotated ( None , _) , _) | ( _, Annotated ( None , _) ) => {
166
173
let original = attribute. value_mut ( ) . take ( ) ;
0 commit comments