Skip to content

chore: close window when client sends close operation #186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 112 additions & 22 deletions src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -18,6 +19,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.HashSet;
import java.util.Collections;

/**
* ReduceSupervisorActor actor distributes the messages to actors and handles failure.
Expand All @@ -28,7 +32,18 @@ class ReduceSupervisorActor extends AbstractActor {
private final Metadata md;
private final ActorRef shutdownActor;
private final StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;
private final Map<String, ActorRef> actorsMap = new HashMap<>();
private final Map<String, ActorInfo> windowMap = new HashMap<>();

// Inner class to hold actor information
private static class ActorInfo {
final Map<String, ActorRef> actorsMap;
final ReduceOuterClass.Window window;

ActorInfo(ReduceOuterClass.Window window) {
this.actorsMap = new HashMap<>();
this.window = window;
}
}

public ReduceSupervisorActor(
ReducerFactory<? extends Reducer> reducerFactory,
Expand Down Expand Up @@ -90,38 +105,100 @@ public Receive createReceive() {
track all the child actors using actors map
*/
private void invokeActors(ActorRequest actorRequest) {
String[] keys = actorRequest.getKeySet();
String uniqueId = actorRequest.getUniqueIdentifier();
if (!actorsMap.containsKey(uniqueId)) {
Reducer reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
.actorOf(ReduceActor.props(keys, md, reduceHandler));
actorsMap.put(uniqueId, actorRef);
}
ReduceOuterClass.ReduceRequest request = actorRequest.getRequest();
ReduceOuterClass.ReduceRequest.WindowOperation operation = request.getOperation();
ReduceOuterClass.ReduceRequest.WindowOperation.Event event = operation.getEvent();

switch (event) {
case OPEN:
// For OPEN, create a new actor for the window and keys
String[] keys = actorRequest.getKeySet();
String uniqueId = actorRequest.getUniqueIdentifier();

// Create a new reducer actor
Reducer reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
.actorOf(ReduceActor.props(keys, md, reduceHandler));

// Track this actor by its window
String windowId = getWindowId(operation.getWindows(0));
windowMap.computeIfAbsent(windowId, k -> new ActorInfo(operation.getWindows(0)))
.actorsMap.put(uniqueId, actorRef);

HandlerDatum handlerDatum = constructHandlerDatum(actorRequest.getRequest().getPayload());
actorsMap.get(uniqueId).tell(handlerDatum, getSelf());
// Process the payload if present
if (request.hasPayload()) {
HandlerDatum handlerDatum = constructHandlerDatum(request.getPayload());
actorRef.tell(handlerDatum, getSelf());
}
break;

case APPEND:
// For APPEND, use existing actor
String appendUniqueId = actorRequest.getUniqueIdentifier();
String appendWindowId = getWindowId(operation.getWindows(0));

ActorInfo actorInfo = windowMap.get(appendWindowId);
if (actorInfo == null || !actorInfo.actorsMap.containsKey(appendUniqueId)) {
log.warn("Received APPEND for non-existent actor: {}", appendUniqueId);
break;
}

// Process the payload
if (request.hasPayload()) {
HandlerDatum appendHandlerDatum = constructHandlerDatum(request.getPayload());
actorInfo.actorsMap.get(appendUniqueId).tell(appendHandlerDatum, getSelf());
}
break;

case CLOSE:
// For CLOSE, we need to find all actors with matching window
String closeWindowId = getWindowId(operation.getWindows(0));
ActorInfo closeActorInfo = windowMap.get(closeWindowId);

if (closeActorInfo != null) {
// Send EOF to all actors for this window
for (Map.Entry<String, ActorRef> entry : closeActorInfo.actorsMap.entrySet()) {
entry.getValue().tell(Constants.EOF, getSelf());
}
}
break;

default:
log.warn("Unsupported operation: {}", event);
}
}

private void sendEOF(String EOF) {
for (Map.Entry<String, ActorRef> entry : actorsMap.entrySet()) {
entry.getValue().tell(EOF, getSelf());
for (ActorInfo actorInfo : windowMap.values()) {
for (ActorRef actor : actorInfo.actorsMap.values()) {
actor.tell(EOF, getSelf());
}
}
}

// listen to child actors for the result.
private void responseListener(ActorResponse actorResponse) {
/*
send the result back to the client
remove the child entry from the map after getting result.
if there are no entries in the map, that means processing is
done we can close the stream.
*/
ReduceOuterClass.Window window = actorResponse.getResponse().getWindow();
String windowId = getWindowId(window);
String actorId = actorResponse.getUniqueIdentifier();

ActorInfo actorInfo = windowMap.get(windowId);
if (actorInfo == null) {
log.warn("Received response for unknown window: {}", windowId);
return;
}

if (actorResponse.getResponse().getEOF()) {
actorsMap.remove(actorResponse.getUniqueIdentifier());
if (actorsMap.isEmpty()) {
// only send the last EOF to the response gRPC output stream.
actorInfo.actorsMap.remove(actorId);

// If this window has no more actors, remove it
if (actorInfo.actorsMap.isEmpty()) {
responseObserver.onNext(actorResponse.getResponse());
windowMap.remove(windowId);
}

// If all windows are processed, send EOF and complete
if (windowMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
Expand Down Expand Up @@ -183,4 +260,17 @@ public void processFailure(
shutdownActor.tell(cause, context.parent());
}
}

// Helper method to get a unique ID for a window
private String getWindowId(ReduceOuterClass.Window window) {
long startMillis = convertToEpochMilli(window.getStart());
long endMillis = convertToEpochMilli(window.getEnd());
return String.format(
"%d:%d",
startMillis, endMillis);
}

private long convertToEpochMilli(Timestamp timestamp) {
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
}
}
48 changes: 0 additions & 48 deletions src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java

This file was deleted.

Loading
Loading