Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,8 @@ class ApiGateway {
joins: transformJoins(cubeDefinitions[cube.name]?.joins),
preAggregations: transformPreAggregations(cubeDefinitions[cube.name]?.preAggregations),
}));
res({ cubes });

await res({ cubes });
} catch (e: any) {
this.handleError({
e,
Expand All @@ -687,6 +688,7 @@ class ApiGateway {

public async getPreAggregations({ cacheOnly, metaOnly, context, res }: { cacheOnly?: boolean, metaOnly?: boolean, context: RequestContext, res: ResponseResultFn }) {
const requestStarted = new Date();

try {
const compilerApi = await this.getCompilerApi(context);
const preAggregations = await compilerApi.preAggregations();
Expand Down Expand Up @@ -768,7 +770,7 @@ class ApiGateway {
})),
});

res({
await res({
preAggregationPartitions: preAggregationPartitions.map(mergePartitionsAndVersionEntries())
});
} catch (e: any) {
Expand Down Expand Up @@ -799,7 +801,7 @@ class ApiGateway {
const { partitions } = (preAggregationPartitions?.[0] || {});
const preAggregationPartition = partitions?.find(p => p?.tableName === versionEntry.table_name);

res({
await res({
preview: preAggregationPartition && await orchestratorApi.getPreAggregationPreview(
context,
preAggregationPartition
Expand All @@ -824,7 +826,7 @@ class ApiGateway {
query
);

res({ result });
await res({ result });
} catch (e: any) {
this.handleError({
e, context, res, requestStarted
Expand Down Expand Up @@ -1177,7 +1179,7 @@ class ApiGateway {
const requestStarted = new Date();
try {
const orchestratorApi = await this.getAdapterApi(context);
res({
await res({
result: await orchestratorApi.getPreAggregationQueueStates()
});
} catch (e: any) {
Expand All @@ -1194,7 +1196,7 @@ class ApiGateway {
try {
const { queryKeys, dataSource } = normalizeQueryCancelPreAggregations(this.parseQueryParam(query));
const orchestratorApi = await this.getAdapterApi(context);
res({
await res({
result: await orchestratorApi.cancelPreAggregationQueriesFromQueue(queryKeys, dataSource)
});
} catch (e: any) {
Expand Down Expand Up @@ -1333,7 +1335,8 @@ class ApiGateway {
await this.assertApiScope('sql', context.securityContext);

const result = await this.sqlServer.sql4sql(query, disablePostProcessing, context.securityContext);
res({ sql: result });

await res({ sql: result });
} catch (e: any) {
this.handleError({
e,
Expand Down Expand Up @@ -1378,7 +1381,7 @@ class ApiGateway {
order: R.fromPairs(sqlQuery.order.map(({ id: key, desc }) => [key, desc ? 'desc' : 'asc']))
});

res(queryType === QueryTypeEnum.REGULAR_QUERY ?
await res(queryType === QueryTypeEnum.REGULAR_QUERY ?
{ sql: toQuery(sqlQueries[0]) } :
sqlQueries.map((sqlQuery) => ({ sql: toQuery(sqlQuery) })));
} catch (e: any) {
Expand Down Expand Up @@ -1512,7 +1515,7 @@ class ApiGateway {
[...dataSources].map(async dataSource => ({ [dataSource]: (await compilerApi.getSqlGenerator(query, dataSource)).sqlGenerator }))
)).reduce((a, b) => ({ ...a, ...b }), {});

res({ memberToDataSource, dataSourceToSqlGenerator });
await res({ memberToDataSource, dataSourceToSqlGenerator });
} catch (e: any) {
this.handleError({
e, context, res, requestStarted
Expand Down Expand Up @@ -1594,7 +1597,7 @@ class ApiGateway {
))
);

res({
await res({
queryType,
normalizedQueries,
queryOrder: sqlQueries.map((sqlQuery) => R.fromPairs(
Expand Down Expand Up @@ -1961,11 +1964,11 @@ class ApiGateway {
);

if (props.queryType === 'multi') {
// We prepare the final json result on native side
// We prepare the final JSON result on the native side
const resultMulti = new ResultMultiWrapper(results, { queryType, slowQuery });
await res(resultMulti);
} else {
// We prepare the full final json result on native side
// We prepare the full final JSON result on the native side
await res(results[0]);
}
} catch (e: any) {
Expand Down Expand Up @@ -2142,12 +2145,16 @@ class ApiGateway {
queryType,
apiType,
});

const state = await subscriptionState();
if (result && (!state || JSON.stringify(state.result) !== JSON.stringify(result))) {
res(result.message, result.opts);
// We prepare the full final JSON result on the native side
await res(result.message, result.opts);
} else if (error) {
res(error.message, error.opts);
// We prepare the full final JSON result on the native side
await res(error.message, error.opts);
}

await subscribe({ error, result });
} catch (e: any) {
this.handleError({
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-api-gateway/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ export * from './user-error';
export { getRequestIdFromRequest } from './request-parser';
export { TransformDataRequest } from './types/responses';

export type { SubscriptionServer } from './ws';
export type { SubscriptionServer, WebSocketSendMessageFn } from './ws';
4 changes: 2 additions & 2 deletions packages/cubejs-server-core/src/core/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
import type { Application as ExpressApplication } from 'express';

import { BaseDriver, DriverFactoryByDataSource } from '@cubejs-backend/query-orchestrator';
import type { SubscriptionServer } from '@cubejs-backend/api-gateway';
import type { SubscriptionServer, WebSocketSendMessageFn } from '@cubejs-backend/api-gateway';

import { RefreshScheduler, ScheduledRefreshOptions } from './RefreshScheduler';
import { OrchestratorApi, OrchestratorApiOptions } from './OrchestratorApi';
Expand Down Expand Up @@ -451,7 +451,7 @@ export class CubejsServerCore {
}
}

public initSubscriptionServer(sendMessage): SubscriptionServer {
public initSubscriptionServer(sendMessage: WebSocketSendMessageFn): SubscriptionServer {
const apiGateway = this.apiGateway();
return apiGateway.initSubscriptionServer(sendMessage);
}
Expand Down
Loading