Skip to content

Commit

Permalink
Handle properly shadow context in worker executor implementation.
Browse files Browse the repository at this point in the history
Motivation:

The implementation of worker executor assumes that a context is always a ContextImpl. Since we added support for shadow contexts, the implementation should handle this case as well.

Changes:

Update the worker executor implementation to handle the case of a shadow context.
  • Loading branch information
vietj committed Jan 8, 2025
1 parent 8e05f2b commit 7fbc36f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 5 deletions.
10 changes: 7 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/impl/ShadowContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,25 @@
* middleware running on the actual context interacts with this context resources, middleware running on the shadow context
* interacts with the shadow context resources.</p>
*/
final class ShadowContext extends ContextBase {
public final class ShadowContext extends ContextBase {

final VertxInternal owner;
final ContextBase delegate;
private final EventLoopExecutor eventLoop;
private final TaskQueue orderedTasks;
final TaskQueue orderedTasks;

public ShadowContext(VertxInternal owner, EventLoopExecutor eventLoop, ContextInternal delegate) {
ShadowContext(VertxInternal owner, EventLoopExecutor eventLoop, ContextInternal delegate) {
super(((ContextBase)delegate).locals);
this.owner = owner;
this.eventLoop = eventLoop;
this.delegate = (ContextBase) delegate;
this.orderedTasks = new TaskQueue();
}

public ContextInternal delegate() {
return delegate;
}

@Override
public EventExecutor eventLoop() {
return eventLoop;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,18 @@ public WorkerPool getPool() {
@Override
public <T> Future<@Nullable T> executeBlocking(Callable<T> blockingCodeHandler, boolean ordered) {
ContextInternal context = vertx.getOrCreateContext();
ContextImpl impl = context instanceof DuplicatedContext ? ((DuplicatedContext)context).delegate : (ContextImpl) context;
return pool.executeBlocking(context, blockingCodeHandler, ordered ? impl.executeBlockingTasks : null);
TaskQueue orderedTasks;
if (ordered) {
if (context instanceof ShadowContext) {
orderedTasks = ((ShadowContext)context).orderedTasks;
} else {
ContextImpl impl = context instanceof DuplicatedContext ? ((DuplicatedContext)context).delegate : (ContextImpl) context;
orderedTasks = impl.executeBlockingTasks;
}
} else {
orderedTasks = null;
}
return pool.executeBlocking(context, blockingCodeHandler, orderedTasks);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.vertx.tests.context;

import io.netty.channel.EventLoop;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.*;
import io.vertx.core.impl.LocalSeq;
import io.vertx.core.impl.ShadowContext;
import io.vertx.core.impl.VertxThread;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.net.NetClient;
Expand Down Expand Up @@ -40,6 +43,7 @@ protected void setUp() throws Exception {
contextLocal = ContextLocal.registerLocal(Object.class);
shadowVertx = (VertxInternal) Vertx.vertx();
actualVertx = (VertxInternal) Vertx.vertx();
disableThreadChecks();
}

@Override
Expand Down Expand Up @@ -504,4 +508,29 @@ private void testGetOrCreateContextFromUnassociatedThread(Executor executor) {
});
await();
}

@Test
public void testWorkerExecutorExecuteBlocking() {
WorkerExecutor exec = shadowVertx.createSharedWorkerExecutor("abc");
ContextInternal actualCtx = actualVertx.getOrCreateContext();
actualCtx.runOnContext(v1 -> {
Thread expected = Thread.currentThread();
ContextInternal shadowCtx = shadowVertx.getOrCreateContext();
Future<Context> fut = exec.executeBlocking(() -> {
ShadowContext ctx = (ShadowContext) Vertx.currentContext();
assertNotSame(shadowCtx, ctx);
assertSame(actualCtx, ctx.delegate());
assertSame(shadowCtx.owner(), shadowVertx);
return ctx;
});
fut.onComplete(onSuccess(res -> {
ShadowContext ctx = (ShadowContext) Vertx.currentContext();
assertSame(res, ctx);
assertSame(shadowCtx.owner(), shadowVertx);
assertSame(expected, Thread.currentThread());
testComplete();
}));
});
await();
}
}

0 comments on commit 7fbc36f

Please sign in to comment.