[WIP][ISSUE #5249]Refactor/unified runtime pipeline and protocol processors#5250
Open
qqeasonchen wants to merge 13 commits into
Open
[WIP][ISSUE #5249]Refactor/unified runtime pipeline and protocol processors#5250qqeasonchen wants to merge 13 commits into
qqeasonchen wants to merge 13 commits into
Conversation
…ugin configuration
… EgressProcessor - Introduced IngressProcessor (Filter -> Transformer -> Router) and EgressProcessor (Filter -> Transformer) to centralize pipeline logic. - Refactored ClientGroupWrapper (TCP SDK) to use these processors. - Refactored EventMeshConnectorBootstrap (Connectors) to use these processors. - Updated SinkWorker to support embedded mode for unified runtime execution. - Updated Unified Runtime Design documentation to reflect architectural clarity.
…ine architecture
This commit completes the migration of all HTTP and gRPC protocol processors to use
the unified IngressProcessor and EgressProcessor pipeline architecture, ensuring
consistent Filter-Transformer-Router processing across all protocols.
Changes:
- HTTP Processors (5): Migrated SendAsyncMessageProcessor, SendSyncMessageProcessor,
BatchSendMessageProcessor, BatchSendMessageV2Processor, and refactored
SendAsyncEventProcessor to use IngressProcessor
- gRPC Processors (3): Migrated PublishCloudEventsProcessor,
BatchPublishCloudEventProcessor, and RequestCloudEventProcessor with bidirectional
pipeline support (Ingress for requests, Egress for responses)
- Added BatchProcessResult utility class to track success/filtered/failed counts for
batch processing with detailed statistics
- Added comprehensive unit tests: IngressProcessorTest, EgressProcessorTest,
BatchProcessResultTest, and enhanced SendAsyncEventProcessorTest
- Added IngressProcessor and EgressProcessor getters to EventMeshServer and
EventMeshGrpcServer for cross-module access
- Updated design documentation (unified-runtime-design.md) to reflect the new
architecture and migration status
- Updated configuration guide (core-engines-configuration.md) with pipeline key format
Architecture improvements:
- Unified pipeline key format: {producerGroup}-{topic}
- Consistent filter behavior: filtered messages return SUCCESS status
(except request-reply returns error)
- Router support: topic changes tracked with finalTopic variable
- Batch statistics: detailed success/filtered/failed counts with message IDs
- Bidirectional processing: RequestCloudEventProcessor applies Ingress to requests
and Egress to responses
All tests pass with no regressions.
Comment on lines
+20
to
+35
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.ArgumentMatchers.anyString; | ||
| import static org.mockito.Mockito.doReturn; | ||
| import static org.mockito.Mockito.lenient; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.spy; | ||
| import static org.mockito.Mockito.verify; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| import org.apache.eventmesh.api.SendCallback; | ||
| import org.apache.eventmesh.common.protocol.tcp.Header; | ||
| import org.apache.eventmesh.common.protocol.tcp.UserAgent; | ||
| import org.apache.eventmesh.function.api.Router; | ||
| import org.apache.eventmesh.function.filter.pattern.Pattern; | ||
| import org.apache.eventmesh.function.transformer.Transformer; | ||
| import org.apache.eventmesh.runtime.boot.EventMeshServer; |
Comment on lines
+118
to
+125
| // Let's try to set the internal mqProducerWrapper field using reflection. | ||
| try { | ||
| java.lang.reflect.Field field = ClientGroupWrapper.class.getDeclaredField("mqProducerWrapper"); | ||
| field.setAccessible(true); | ||
| field.set(clientGroupWrapper, mqProducerWrapper); | ||
| } catch (Exception e) { | ||
| e.printStackTrace(); | ||
| } |
Comment on lines
26
to
30
| import org.apache.eventmesh.api.SendResult; | ||
| import org.apache.eventmesh.api.exception.OnExceptionContext; | ||
| import org.apache.eventmesh.api.exception.StorageRuntimeException; | ||
| import org.apache.eventmesh.common.exception.EventMeshException; | ||
| import org.apache.eventmesh.common.protocol.SubscriptionItem; |
Comment on lines
143
to
+155
| this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType()); | ||
| this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType()); | ||
| this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType()); | ||
|
|
||
| this.ingressProcessor = new IngressProcessor( | ||
| eventMeshTCPServer.getEventMeshServer().getFilterEngine(), | ||
| eventMeshTCPServer.getEventMeshServer().getTransformerEngine(), | ||
| eventMeshTCPServer.getEventMeshServer().getRouterEngine() | ||
| ); | ||
| this.egressProcessor = new EgressProcessor( | ||
| eventMeshTCPServer.getEventMeshServer().getFilterEngine(), | ||
| eventMeshTCPServer.getEventMeshServer().getTransformerEngine() | ||
| ); |
Comment on lines
+46
to
+74
| // 1. Filter | ||
| org.apache.eventmesh.function.filter.pattern.Pattern filterPattern = filterEngine.getFilterPattern(pipelineKey); | ||
| if (filterPattern != null && event.getData() != null) { | ||
| String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); | ||
| if (!filterPattern.filter(content)) { | ||
| // Filtered out | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| // 2. Transformer | ||
| org.apache.eventmesh.function.transformer.Transformer transformer = transformerEngine.getTransformer(pipelineKey); | ||
| if (transformer != null && event.getData() != null) { | ||
| String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); | ||
| String transformedContent = transformer.transform(content); | ||
| event = CloudEventBuilder.from(event) | ||
| .withData(transformedContent.getBytes(StandardCharsets.UTF_8)) | ||
| .build(); | ||
| } | ||
|
|
||
| // 3. Router | ||
| org.apache.eventmesh.function.api.Router router = routerEngine.getRouter(pipelineKey); | ||
| if (router != null && event.getData() != null) { | ||
| String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); | ||
| String newTopic = router.route(content); | ||
| event = CloudEventBuilder.from(event) | ||
| .withSubject(newTopic) | ||
| .build(); | ||
| } |
Comment on lines
+112
to
+117
| @Getter | ||
| private A2APublishSubscribeService a2aPublishSubscribeService; | ||
|
|
||
| public A2APublishSubscribeService getA2APublishSubscribeService() { | ||
| return a2aPublishSubscribeService; | ||
| } |
Comment on lines
+20
to
+28
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
| import static org.junit.jupiter.api.Assertions.assertNull; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| import static org.mockito.ArgumentMatchers.anyString; | ||
| import static org.mockito.Mockito.lenient; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.verify; | ||
| import static org.mockito.Mockito.when; |
Comment on lines
+20
to
+28
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
| import static org.junit.jupiter.api.Assertions.assertNull; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| import static org.mockito.ArgumentMatchers.anyString; | ||
| import static org.mockito.Mockito.lenient; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.verify; | ||
| import static org.mockito.Mockito.when; |
Comment on lines
+20
to
+25
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.ArgumentMatchers.anyBoolean; | ||
| import static org.mockito.ArgumentMatchers.eq; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.when; | ||
|
|
Comment on lines
+48
to
+55
| import org.apache.eventmesh.runtime.metrics.http.EventMeshHttpMetricsManager; | ||
| import org.apache.eventmesh.runtime.metrics.http.HttpMetrics; | ||
| import org.apache.eventmesh.runtime.util.RemotingHelper; | ||
|
|
||
| import java.net.InetSocketAddress; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.HashMap; | ||
| import java.util.Map; |
…5246) * update architecture * update architecture image * Test: Add unit test for SendAsyncEventProcessor to verify V1 and V2 logic integration * Test & Docs: Add unit tests for core engines and documentation for plugin configuration * 1.12.0-prepare (apache#5222) * Update copyright year to 2025 * update release version * feat: add A2A Agent Card Registry based on EMQX reference implementation - Add Agent Card Java model classes matching A2A spec (AgentCard, AgentInterface, AgentProvider, AgentCapabilities, AgentSkill, SecurityScheme, etc.) - Add Agent Card JSON Schema from EMQX for validation - Add AgentCardValidator with JSON Schema validation support - Add AgentIdentity with hierarchical ID (org_id/unit_id/agent_id) and discovery topic construction/parsing - Update A2AProtocolConstants with Agent Card operations, status constants, CE extension keys, and ID validation pattern - Update EnhancedA2AProtocolAdaptor for Agent Card operation routing and discovery topic support - Implement A2APublishSubscribeService with full Card Registry (CRUD, status tracking, event metadata augmentation) - Add A2ACardHttpHandler REST API for card management - Add AgentCardDemo example - Add json-schema-validator and protocol-a2a dependencies * fix: remove test files that reference refactoring code moved to separate PR Remove RouterEngineTest.java and SendAsyncEventProcessorTest.java which test RouterEngine/IngressProcessor pipeline code that was moved to the refactor/unified-runtime-pipeline branch. These files were added as part of the refactoring commits that have been separated into their own PR. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: resolve checkstyle violations blocking CI - Remove unused import java.nio.charset.StandardCharsets in A2APublishSubscribeService - Use try-with-resources for EventMeshTCPClient in example files to fix resource leak checkstyle warnings (AsyncPublish, SyncRequest, cloudevents AsyncPublish) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: resolve checkstyle import ordering and PMD violations - Remove unused java.util.Collections import and reorder imports to match checkstyle groups (org.apache.eventmesh, java, io, com, lombok) in A2APublishSubscribeService - Reorder java.* imports before io.* in A2ACardHttpHandler to match checkstyle ImportOrder rule - Inline EventMeshTCPClientConfig.builder() into factory calls in example files to avoid PMD DU anomaly warnings from standalone config variables Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: remove redundant and unused imports in AgentCardDemo - Remove redundant import for A2AAbstractDemo (same package) - Remove unused import for AgentIdentity Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: reorder imports in AgentCardValidator to match checkstyle ImportOrder Move java.* imports before com.* and lombok.* to comply with the project's import group ordering (org.apache.eventmesh, java, com, lombok). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: update FilterEngineTest and TransformerEngineTest for new constructors FilterEngine and TransformerEngine now require ProducerManager and ConsumerManager in addition to MetaStorage. Update test files to mock these additional dependencies. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: mike_xwm <mike_xwm@126.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
The eventmesh-runtime-v2 module was removed from settings.gradle but still referenced in build.gradle's includedProjects list, causing Gradle build failures.
Resolve conflicts: - docs/plugins/core-engines-configuration.md: Keep PR changes (Pipeline Key feature) - A2APublishSubscribeService.java: Keep upstream (Agent Card Registry) - FilterEngineTest.java: Keep upstream (ProducerManager/ConsumerManager) - TransformerEngineTest.java: Keep upstream (ProducerManager/ConsumerManager)
469ea79 to
e1e5692
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
#5249 This PR consolidates EventMesh's fragmented runtime architecture into a single unified runtime with standardized processing pipelines.
Commit 1: Unify Connector, Function, and Core Runtime into eventmesh-runtime
eventmesh-runtime-v2module entirely (33 files, ~1700 lines deleted), including its separate runtime lifecycle, connector/function managers, health/monitor/status services, meta storage, and configurationRouter,RouterBuilder,ConnectorEventPublisher,SourceWorker) intoeventmesh-runtimeandeventmesh-functionmodulesRouterEngine,FilterEngine,TransformerEngineas core processing engines ineventmesh-runtimeEventMeshConnectorBootstrapto manage connector lifecycle within the unified runtimeEventMeshServerto integrate engines and A2A service initializationClientGroupWrapper(TCP) to delegate to the new enginesCommit 2: Unified Ingress/Egress Pipelines
IngressProcessor— a centralized pipeline implementing Filter -> Transformer -> Router for incoming messagesEgressProcessor— a centralized pipeline implementing Filter -> Transformer for outgoing messagesClientGroupWrapper(TCP SDK) to useIngressProcessorandEgressProcessorEventMeshConnectorBootstrap(Connectors) to use the new processorsSinkWorkerto support embedded mode for unified runtime executionCommit 3: Complete HTTP and gRPC Processor Migration
SendAsyncMessageProcessor,SendSyncMessageProcessor,BatchSendMessageProcessor,BatchSendMessageV2Processor, andSendAsyncEventProcessorto useIngressProcessorPublishCloudEventsProcessor,BatchPublishCloudEventProcessor, andRequestCloudEventProcessorwith bidirectional pipeline supportBatchProcessResult— utility class for tracking success/filtered/failed countsIngressProcessorTest,EgressProcessorTest,BatchProcessResultTestKey Changes
eventmesh-runtime-v2/(entire module)eventmesh-runtime/.../core/protocol/IngressProcessor.javaeventmesh-runtime/.../core/protocol/EgressProcessor.javaeventmesh-runtime/.../core/protocol/BatchProcessResult.javaeventmesh-runtime/.../boot/RouterEngine.javaeventmesh-runtime/.../boot/EventMeshConnectorBootstrap.javaTest Plan
./gradlew :eventmesh-runtime:testpasses