1414package com .facebook .presto .flightshim ;
1515
1616import com .facebook .airlift .json .JsonCodec ;
17+ import com .facebook .airlift .json .JsonCodecFactory ;
18+ import com .facebook .airlift .json .JsonObjectMapperProvider ;
1719import com .facebook .airlift .log .Logger ;
1820import com .facebook .plugin .arrow .ArrowBatchSource ;
1921import com .facebook .presto .Session ;
22+ import com .facebook .presto .block .BlockJsonSerde ;
2023import com .facebook .presto .common .RuntimeStats ;
24+ import com .facebook .presto .common .block .Block ;
25+ import com .facebook .presto .common .block .BlockEncodingManager ;
26+ import com .facebook .presto .common .type .RowType ;
27+ import com .facebook .presto .common .type .Type ;
2128import com .facebook .presto .execution .QueryIdGenerator ;
2229import com .facebook .presto .metadata .Split ;
2330import com .facebook .presto .spi .ColumnHandle ;
2431import com .facebook .presto .spi .ColumnMetadata ;
2532import com .facebook .presto .spi .ConnectorId ;
2633import com .facebook .presto .spi .ConnectorPageSource ;
27- import com .facebook .presto .spi .ConnectorSession ;
2834import com .facebook .presto .spi .ConnectorSplit ;
2935import com .facebook .presto .spi .ConnectorTableHandle ;
3036import com .facebook .presto .spi .ConnectorTableLayoutHandle ;
31- import com .facebook .presto .spi .SplitContext ;
3237import com .facebook .presto .spi .TableHandle ;
33- import com .facebook .presto .spi .connector .Connector ;
34- import com .facebook .presto .spi .connector .ConnectorPageSourceProvider ;
35- import com .facebook .presto .spi .connector .ConnectorRecordSetProvider ;
3638import com .facebook .presto .spi .connector .ConnectorTransactionHandle ;
3739import com .facebook .presto .spi .security .Identity ;
38- import com .facebook .presto .split .RecordPageSourceProvider ;
40+ import com .facebook .presto .split .PageSourceManager ;
41+ import com .facebook .presto .type .TypeDeserializer ;
42+ import com .google .common .collect .ImmutableMap ;
3943import org .apache .arrow .flight .BackpressureStrategy ;
4044import org .apache .arrow .flight .CallStatus ;
4145import org .apache .arrow .flight .NoOpFlightProducer ;
4953import java .util .Optional ;
5054import java .util .concurrent .ExecutorService ;
5155
52- import static com .facebook .airlift .json .JsonCodec .jsonCodec ;
5356import static com .facebook .presto .metadata .SessionPropertyManager .createTestingSessionPropertyManager ;
5457import static com .facebook .presto .testing .TestingSession .DEFAULT_TIME_ZONE_KEY ;
55- import static com .google .common .base .Preconditions .checkState ;
5658import static com .google .common .collect .ImmutableList .toImmutableList ;
5759import static java .lang .String .format ;
58- import static java .util .Collections .unmodifiableList ;
5960import static java .util .Locale .ENGLISH ;
6061import static java .util .Objects .requireNonNull ;
6162
@@ -64,20 +65,39 @@ public class FlightShimProducer
6465 implements Closeable
6566{
6667 private static final Logger log = Logger .get (FlightShimProducer .class );
67- private static final JsonCodec <FlightShimRequest > REQUEST_JSON_CODEC = jsonCodec (FlightShimRequest .class );
6868 private static final int CLIENT_POLL_TIME = 5000 ; // Backpressure poll time ms
6969 private final BufferAllocator allocator ;
7070 private final FlightShimPluginManager pluginManager ;
71+ private final PageSourceManager pageSourceManager ;
7172 private final FlightShimConfig config ;
7273 private final ExecutorService shimExecutor ;
74+ private final JsonCodec <FlightShimRequest > requestCodec ;
7375
7476 @ Inject
75- public FlightShimProducer (BufferAllocator allocator , FlightShimPluginManager pluginManager , FlightShimConfig config , @ ForFlightShimServer ExecutorService shimExecutor )
77+ public FlightShimProducer (
78+ BufferAllocator allocator ,
79+ FlightShimPluginManager pluginManager ,
80+ FlightShimConfig config ,
81+ @ ForFlightShimServer ExecutorService shimExecutor ,
82+ TypeDeserializer typeDeserializer ,
83+ BlockEncodingManager blockEncodingManager ,
84+ PageSourceManager pageSourceManager )
7685 {
7786 this .allocator = allocator .newChildAllocator ("flight-shim" , 0 , Long .MAX_VALUE );
7887 this .pluginManager = requireNonNull (pluginManager , "pluginManager is null" );
7988 this .config = requireNonNull (config , "config is null" );
8089 this .shimExecutor = requireNonNull (shimExecutor , "shimExecutor is null" );
90+ this .pageSourceManager = requireNonNull (pageSourceManager , "pageSourceManager is null" );
91+
92+ JsonObjectMapperProvider provider = new JsonObjectMapperProvider ();
93+ BlockJsonSerde .Deserializer blockDeserializer = new BlockJsonSerde .Deserializer (blockEncodingManager );
94+ provider .setJsonDeserializers (ImmutableMap .of (
95+ RowType .class , typeDeserializer ,
96+ Type .class , typeDeserializer ,
97+ Block .class , blockDeserializer ));
98+ JsonCodecFactory jsonCodecFactory = new JsonCodecFactory (provider );
99+
100+ this .requestCodec = jsonCodecFactory .jsonCodec (FlightShimRequest .class );
81101 }
82102
83103 @ Override
@@ -97,26 +117,18 @@ private void runGetStreamAsync(CallContext context, Ticket ticket, ServerStreamL
97117 final BackpressureStrategy backpressureStrategy = new BackpressureStrategy .CallbackBackpressureStrategy ();
98118 backpressureStrategy .register (listener );
99119
100- FlightShimRequest request = REQUEST_JSON_CODEC .fromJson (ticket .getBytes ());
120+ FlightShimRequest request = requestCodec . fromJson ( ticket . getBytes ()); // REQUEST_JSON_CODEC.fromJson(ticket.getBytes());
101121 log .debug ("Request for connector: %s" , request .getConnectorId ());
102122
103- FlightShimPluginManager .ConnectorHolder connectorHolder = pluginManager .getConnector (request .getConnectorId ());
104- requireNonNull (connectorHolder , format ("Requested connector not loaded: %s" , request .getConnectorId ()));
105-
106- //Connector connector = connectorHolder.getConnector();
107- ConnectorSplit connectorSplit = connectorHolder .getCodecSplit ().fromJson (request .getSplitBytes ());
108-
109- List <? extends ColumnHandle > columnHandles = request .getColumnHandlesBytes ().stream ().map (
110- columnHandleBytes -> connectorHolder .getCodecColumnHandle ().fromJson (columnHandleBytes )
111- ).collect (toImmutableList ());
123+ FlightShimPluginManager .ConnectorCodecs connectorCodecs = pluginManager .getConnectorCodecs (request .getConnectorId ());
124+ requireNonNull (connectorCodecs , format ("Requested connector not loaded: %s" , request .getConnectorId ()));
112125
113- List <ColumnMetadata > columnsMetadata = columnHandles .stream ()
114- .map (connectorHolder ::getColumnMetadata ).collect (toImmutableList ());
126+ ConnectorSplit connectorSplit = connectorCodecs .getCodecSplit ().fromJson (request .getSplitBytes ());
115127
116- ConnectorTableHandle connectorTableHandle = connectorHolder .getCodecTableHandle ().fromJson (request .getTableHandleBytes ());
117- ConnectorTransactionHandle transactionHandle = connectorHolder .getCodecTransactionHandle ().fromJson (request .getTransactionHandleBytes ());
128+ ConnectorTableHandle connectorTableHandle = connectorCodecs .getCodecTableHandle ().fromJson (request .getTableHandleBytes ());
129+ ConnectorTransactionHandle transactionHandle = connectorCodecs .getCodecTransactionHandle ().fromJson (request .getTransactionHandleBytes ());
118130 Optional <ConnectorTableLayoutHandle > connectorTableLayoutHandle =
119- request .getTableLayoutHandleBytes ().map (tableLayoutHandleBytes -> connectorHolder .getCodecTableLayoutHandle ().fromJson (tableLayoutHandleBytes ));
131+ request .getTableLayoutHandleBytes ().map (tableLayoutHandleBytes -> connectorCodecs .getCodecTableLayoutHandle ().fromJson (tableLayoutHandleBytes ));
120132 TableHandle tableHandle = new TableHandle (new ConnectorId (request .getConnectorId ()), connectorTableHandle , transactionHandle , connectorTableLayoutHandle );
121133
122134 // Create a dummy session to load the connector
@@ -129,23 +141,15 @@ private void runGetStreamAsync(CallContext context, Ticket ticket, ServerStreamL
129141 ConnectorId connectorId = new ConnectorId (request .getConnectorId ());
130142 Split split = new Split (connectorId , transactionHandle , connectorSplit );
131143
132- List <ColumnHandle > columnHandles2 = request .getColumnHandlesBytes ().stream ().map (
133- columnHandleBytes -> connectorHolder .getCodecColumnHandle ().fromJson (columnHandleBytes )
144+ List <ColumnHandle > columnHandles = request .getColumnHandlesBytes ().stream ().map (
145+ columnHandleBytes -> connectorCodecs .getCodecColumnHandle ().fromJson (columnHandleBytes )
134146 ).collect (toImmutableList ());
135147
136- ConnectorPageSource connectorPageSource = pluginManager . createPageSource ( session , split , tableHandle , columnHandles2 , new RuntimeStats ());
137-
138- ConnectorSession connectorSession = session . toConnectorSession ( connectorId );
148+ List < ColumnMetadata > columnsMetadata = request . getOutputType (). getFields (). stream ()
149+ . map ( field -> ColumnMetadata . builder (). setName ( field . getName (). orElse ( "$data$" )). setType ( field . getType ()). build ())
150+ . collect ( toImmutableList () );
139151
140- /*ConnectorPageSourceProvider connectorPageSourceProvider = getConnectorPageSourceProvider(connector, connectorId);
141- ConnectorPageSource connectorPageSource = connectorPageSourceProvider.createPageSource(
142- transactionHandle,
143- connectorSession,
144- split,
145- null,
146- unmodifiableList(columnHandles),
147- new SplitContext(false),
148- new RuntimeStats());*/
152+ ConnectorPageSource connectorPageSource = pageSourceManager .createPageSource (session , split , tableHandle , columnHandles , new RuntimeStats ());
149153
150154 try (ArrowBatchSource batchSource = new ArrowBatchSource (allocator , columnsMetadata , connectorPageSource , config .getMaxRowsPerBatch ())) {
151155 listener .setUseZeroCopy (true );
@@ -177,30 +181,6 @@ private void runGetStreamAsync(CallContext context, Ticket ticket, ServerStreamL
177181 }
178182 }
179183
180- private ConnectorPageSourceProvider getConnectorPageSourceProvider (Connector connector , ConnectorId connectorId )
181- {
182- ConnectorPageSourceProvider connectorPageSourceProvider = null ;
183- try {
184- connectorPageSourceProvider = connector .getPageSourceProvider ();
185- requireNonNull (connectorPageSourceProvider , format ("Connector %s returned a null page source provider" , connectorId ));
186- }
187- catch (UnsupportedOperationException ignored ) {
188- }
189-
190- if (connectorPageSourceProvider == null ) {
191- ConnectorRecordSetProvider connectorRecordSetProvider = null ;
192- try {
193- connectorRecordSetProvider = connector .getRecordSetProvider ();
194- requireNonNull (connectorRecordSetProvider , format ("Connector %s returned a null record set provider" , connectorId ));
195- }
196- catch (UnsupportedOperationException ignored ) {
197- }
198- checkState (connectorRecordSetProvider != null , "Connector %s has neither a PageSource or RecordSet provider" , connectorId );
199- connectorPageSourceProvider = new RecordPageSourceProvider (connectorRecordSetProvider );
200- }
201- return connectorPageSourceProvider ;
202- }
203-
204184 @ Override
205185 public void close ()
206186 {
0 commit comments