Skip to content

[WIP][ISSUE #5249]Refactor/unified runtime pipeline and protocol processors#5250

Open
qqeasonchen wants to merge 13 commits into
apache:masterfrom
qqeasonchen:refactor/unified-runtime-pipeline
Open

[WIP][ISSUE #5249]Refactor/unified runtime pipeline and protocol processors#5250
qqeasonchen wants to merge 13 commits into
apache:masterfrom
qqeasonchen:refactor/unified-runtime-pipeline

Conversation

@qqeasonchen
Copy link
Copy Markdown
Contributor

Description

#5249 This PR consolidates EventMesh's fragmented runtime architecture into a single unified runtime with standardized processing pipelines.

Commit 1: Unify Connector, Function, and Core Runtime into eventmesh-runtime

  • Removed the standalone eventmesh-runtime-v2 module entirely (33 files, ~1700 lines deleted), including its separate runtime lifecycle, connector/function managers, health/monitor/status services, meta storage, and configuration
  • Moved core abstractions (Router, RouterBuilder, ConnectorEventPublisher, SourceWorker) into eventmesh-runtime and eventmesh-function modules
  • Added RouterEngine, FilterEngine, TransformerEngine as core processing engines in eventmesh-runtime
  • Added EventMeshConnectorBootstrap to manage connector lifecycle within the unified runtime
  • Refactored EventMeshServer to integrate engines and A2A service initialization
  • Refactored ClientGroupWrapper (TCP) to delegate to the new engines

Commit 2: Unified Ingress/Egress Pipelines

  • Introduced IngressProcessor — a centralized pipeline implementing Filter -> Transformer -> Router for incoming messages
  • Introduced EgressProcessor — a centralized pipeline implementing Filter -> Transformer for outgoing messages
  • Refactored ClientGroupWrapper (TCP SDK) to use IngressProcessor and EgressProcessor
  • Refactored EventMeshConnectorBootstrap (Connectors) to use the new processors
  • Updated SinkWorker to support embedded mode for unified runtime execution

Commit 3: Complete HTTP and gRPC Processor Migration

  • HTTP Processors (5): Migrated SendAsyncMessageProcessor, SendSyncMessageProcessor, BatchSendMessageProcessor, BatchSendMessageV2Processor, and SendAsyncEventProcessor to use IngressProcessor
  • gRPC Processors (3): Migrated PublishCloudEventsProcessor, BatchPublishCloudEventProcessor, and RequestCloudEventProcessor with bidirectional pipeline support
  • Added BatchProcessResult — utility class for tracking success/filtered/failed counts
  • Added comprehensive tests: IngressProcessorTest, EgressProcessorTest, BatchProcessResultTest

Key Changes

File Action
eventmesh-runtime-v2/ (entire module) Deleted
eventmesh-runtime/.../core/protocol/IngressProcessor.java Added
eventmesh-runtime/.../core/protocol/EgressProcessor.java Added
eventmesh-runtime/.../core/protocol/BatchProcessResult.java Added
eventmesh-runtime/.../boot/RouterEngine.java Added
eventmesh-runtime/.../boot/EventMeshConnectorBootstrap.java Added
All HTTP/gRPC processors Modified

Test Plan

  • ./gradlew :eventmesh-runtime:test passes
  • All existing unit tests pass with no regressions

qqeasonchen and others added 8 commits December 24, 2025 11:08
… EgressProcessor

- Introduced IngressProcessor (Filter -> Transformer -> Router) and EgressProcessor (Filter -> Transformer) to centralize pipeline logic.
- Refactored ClientGroupWrapper (TCP SDK) to use these processors.
- Refactored EventMeshConnectorBootstrap (Connectors) to use these processors.
- Updated SinkWorker to support embedded mode for unified runtime execution.
- Updated Unified Runtime Design documentation to reflect architectural clarity.
…ine architecture

This commit completes the migration of all HTTP and gRPC protocol processors to use
the unified IngressProcessor and EgressProcessor pipeline architecture, ensuring
consistent Filter-Transformer-Router processing across all protocols.

Changes:
- HTTP Processors (5): Migrated SendAsyncMessageProcessor, SendSyncMessageProcessor,
  BatchSendMessageProcessor, BatchSendMessageV2Processor, and refactored
  SendAsyncEventProcessor to use IngressProcessor
- gRPC Processors (3): Migrated PublishCloudEventsProcessor,
  BatchPublishCloudEventProcessor, and RequestCloudEventProcessor with bidirectional
  pipeline support (Ingress for requests, Egress for responses)
- Added BatchProcessResult utility class to track success/filtered/failed counts for
  batch processing with detailed statistics
- Added comprehensive unit tests: IngressProcessorTest, EgressProcessorTest,
  BatchProcessResultTest, and enhanced SendAsyncEventProcessorTest
- Added IngressProcessor and EgressProcessor getters to EventMeshServer and
  EventMeshGrpcServer for cross-module access
- Updated design documentation (unified-runtime-design.md) to reflect the new
  architecture and migration status
- Updated configuration guide (core-engines-configuration.md) with pipeline key format

Architecture improvements:
- Unified pipeline key format: {producerGroup}-{topic}
- Consistent filter behavior: filtered messages return SUCCESS status
  (except request-reply returns error)
- Router support: topic changes tracked with finalTopic variable
- Batch statistics: detailed success/filtered/failed counts with message IDs
- Bidirectional processing: RequestCloudEventProcessor applies Ingress to requests
  and Egress to responses

All tests pass with no regressions.
@qqeasonchen qqeasonchen requested a review from Copilot May 6, 2026 13:56
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 66 out of 68 changed files in this pull request and generated 23 comments.

Comment on lines +20 to +35
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.function.api.Router;
import org.apache.eventmesh.function.filter.pattern.Pattern;
import org.apache.eventmesh.function.transformer.Transformer;
import org.apache.eventmesh.runtime.boot.EventMeshServer;
Comment on lines +118 to +125
// Let's try to set the internal mqProducerWrapper field using reflection.
try {
java.lang.reflect.Field field = ClientGroupWrapper.class.getDeclaredField("mqProducerWrapper");
field.setAccessible(true);
field.set(clientGroupWrapper, mqProducerWrapper);
} catch (Exception e) {
e.printStackTrace();
}
Comment on lines 26 to 30
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.api.exception.StorageRuntimeException;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
Comment on lines 143 to +155
this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType());
this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType());
this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType());

this.ingressProcessor = new IngressProcessor(
eventMeshTCPServer.getEventMeshServer().getFilterEngine(),
eventMeshTCPServer.getEventMeshServer().getTransformerEngine(),
eventMeshTCPServer.getEventMeshServer().getRouterEngine()
);
this.egressProcessor = new EgressProcessor(
eventMeshTCPServer.getEventMeshServer().getFilterEngine(),
eventMeshTCPServer.getEventMeshServer().getTransformerEngine()
);
Comment on lines +46 to +74
// 1. Filter
org.apache.eventmesh.function.filter.pattern.Pattern filterPattern = filterEngine.getFilterPattern(pipelineKey);
if (filterPattern != null && event.getData() != null) {
String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
if (!filterPattern.filter(content)) {
// Filtered out
return null;
}
}

// 2. Transformer
org.apache.eventmesh.function.transformer.Transformer transformer = transformerEngine.getTransformer(pipelineKey);
if (transformer != null && event.getData() != null) {
String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
String transformedContent = transformer.transform(content);
event = CloudEventBuilder.from(event)
.withData(transformedContent.getBytes(StandardCharsets.UTF_8))
.build();
}

// 3. Router
org.apache.eventmesh.function.api.Router router = routerEngine.getRouter(pipelineKey);
if (router != null && event.getData() != null) {
String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
String newTopic = router.route(content);
event = CloudEventBuilder.from(event)
.withSubject(newTopic)
.build();
}
Comment on lines +112 to +117
@Getter
private A2APublishSubscribeService a2aPublishSubscribeService;

public A2APublishSubscribeService getA2APublishSubscribeService() {
return a2aPublishSubscribeService;
}
Comment on lines +20 to +28
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Comment on lines +20 to +28
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Comment on lines +20 to +25
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Comment on lines +48 to +55
import org.apache.eventmesh.runtime.metrics.http.EventMeshHttpMetricsManager;
import org.apache.eventmesh.runtime.metrics.http.HttpMetrics;
import org.apache.eventmesh.runtime.util.RemotingHelper;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
qqeasonchen and others added 4 commits May 9, 2026 11:06
…5246)

* update architecture

* update architecture image

* Test: Add unit test for SendAsyncEventProcessor to verify V1 and V2 logic integration

* Test & Docs: Add unit tests for core engines and documentation for plugin configuration

* 1.12.0-prepare (apache#5222)

* Update copyright year to 2025

* update release version

* feat: add A2A Agent Card Registry based on EMQX reference implementation

- Add Agent Card Java model classes matching A2A spec (AgentCard, AgentInterface,
  AgentProvider, AgentCapabilities, AgentSkill, SecurityScheme, etc.)
- Add Agent Card JSON Schema from EMQX for validation
- Add AgentCardValidator with JSON Schema validation support
- Add AgentIdentity with hierarchical ID (org_id/unit_id/agent_id) and
  discovery topic construction/parsing
- Update A2AProtocolConstants with Agent Card operations, status constants,
  CE extension keys, and ID validation pattern
- Update EnhancedA2AProtocolAdaptor for Agent Card operation routing and
  discovery topic support
- Implement A2APublishSubscribeService with full Card Registry (CRUD, status
  tracking, event metadata augmentation)
- Add A2ACardHttpHandler REST API for card management
- Add AgentCardDemo example
- Add json-schema-validator and protocol-a2a dependencies

* fix: remove test files that reference refactoring code moved to separate PR

Remove RouterEngineTest.java and SendAsyncEventProcessorTest.java which
test RouterEngine/IngressProcessor pipeline code that was moved to the
refactor/unified-runtime-pipeline branch. These files were added as part
of the refactoring commits that have been separated into their own PR.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: resolve checkstyle violations blocking CI

- Remove unused import java.nio.charset.StandardCharsets in A2APublishSubscribeService
- Use try-with-resources for EventMeshTCPClient in example files to fix
  resource leak checkstyle warnings (AsyncPublish, SyncRequest,
  cloudevents AsyncPublish)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: resolve checkstyle import ordering and PMD violations

- Remove unused java.util.Collections import and reorder imports to match
  checkstyle groups (org.apache.eventmesh, java, io, com, lombok) in
  A2APublishSubscribeService
- Reorder java.* imports before io.* in A2ACardHttpHandler to match
  checkstyle ImportOrder rule
- Inline EventMeshTCPClientConfig.builder() into factory calls in example
  files to avoid PMD DU anomaly warnings from standalone config variables

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: remove redundant and unused imports in AgentCardDemo

- Remove redundant import for A2AAbstractDemo (same package)
- Remove unused import for AgentIdentity

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: reorder imports in AgentCardValidator to match checkstyle ImportOrder

Move java.* imports before com.* and lombok.* to comply with the project's
import group ordering (org.apache.eventmesh, java, com, lombok).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: update FilterEngineTest and TransformerEngineTest for new constructors

FilterEngine and TransformerEngine now require ProducerManager and
ConsumerManager in addition to MetaStorage. Update test files to mock
these additional dependencies.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

---------

Co-authored-by: mike_xwm <mike_xwm@126.com>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
The eventmesh-runtime-v2 module was removed from settings.gradle but
still referenced in build.gradle's includedProjects list, causing
Gradle build failures.
Resolve conflicts:
- docs/plugins/core-engines-configuration.md: Keep PR changes (Pipeline Key feature)
- A2APublishSubscribeService.java: Keep upstream (Agent Card Registry)
- FilterEngineTest.java: Keep upstream (ProducerManager/ConsumerManager)
- TransformerEngineTest.java: Keep upstream (ProducerManager/ConsumerManager)
@qqeasonchen qqeasonchen force-pushed the refactor/unified-runtime-pipeline branch from 469ea79 to e1e5692 Compare May 12, 2026 06:22
@qqeasonchen qqeasonchen changed the title [ISSUE #5249]Refactor/unified runtime pipeline and protocol processors [WIP][ISSUE #5249]Refactor/unified runtime pipeline and protocol processors May 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants