Skip to content

Commit d9f70bd

Browse files
authored
Add MetaDataChangedEventBuilder (#34060)
* Add MetaDataChangedEventBuilder * Add MetaDataChangedEventBuilder
1 parent d527c00 commit d9f70bd

File tree

5 files changed

+278
-203
lines changed

5 files changed

+278
-203
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.mode.event.builder;
19+
20+
import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
21+
import org.apache.shardingsphere.metadata.persist.node.metadata.DataSourceMetaDataNode;
22+
import org.apache.shardingsphere.metadata.persist.node.metadata.TableMetaDataNode;
23+
import org.apache.shardingsphere.metadata.persist.node.metadata.ViewMetaDataNode;
24+
import org.apache.shardingsphere.mode.event.DataChangedEvent;
25+
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
26+
import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
27+
import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeAlteredEvent;
28+
import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeRegisteredEvent;
29+
import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeUnregisteredEvent;
30+
import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitAlteredEvent;
31+
import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitRegisteredEvent;
32+
import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitUnregisteredEvent;
33+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent;
34+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent;
35+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent;
36+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableDroppedEvent;
37+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewCreatedOrAlteredEvent;
38+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent;
39+
40+
import java.util.Optional;
41+
42+
/**
43+
* Meta data changed event builder.
44+
*/
45+
public final class MetaDataChangedEventBuilder {
46+
47+
/**
48+
* Build meta data changed event.
49+
*
50+
* @param databaseName database name
51+
* @param event data changed event
52+
* @return built event
53+
*/
54+
public Optional<DispatchEvent> build(final String databaseName, final DataChangedEvent event) {
55+
String key = event.getKey();
56+
Optional<String> schemaName = DatabaseMetaDataNode.getSchemaName(key);
57+
if (schemaName.isPresent()) {
58+
return buildSchemaChangedEvent(databaseName, schemaName.get(), event);
59+
}
60+
schemaName = DatabaseMetaDataNode.getSchemaNameByTableNode(key);
61+
if (schemaName.isPresent() && isTableMetaDataChanged(event.getKey())) {
62+
return buildTableChangedEvent(databaseName, schemaName.get(), event);
63+
}
64+
if (schemaName.isPresent() && isViewMetaDataChanged(event.getKey())) {
65+
return buildViewChangedEvent(databaseName, schemaName.get(), event);
66+
}
67+
if (DataSourceMetaDataNode.isDataSourcesNode(key)) {
68+
return buildDataSourceChangedEvent(databaseName, event);
69+
}
70+
return Optional.empty();
71+
}
72+
73+
private Optional<DispatchEvent> buildSchemaChangedEvent(final String databaseName, final String schemaName, final DataChangedEvent event) {
74+
switch (event.getType()) {
75+
case ADDED:
76+
case UPDATED:
77+
return Optional.of(new SchemaAddedEvent(databaseName, schemaName));
78+
case DELETED:
79+
return Optional.of(new SchemaDeletedEvent(databaseName, schemaName));
80+
default:
81+
return Optional.empty();
82+
}
83+
}
84+
85+
private boolean isTableMetaDataChanged(final String key) {
86+
return TableMetaDataNode.isTableActiveVersionNode(key) || TableMetaDataNode.isTableNode(key);
87+
}
88+
89+
private Optional<DispatchEvent> buildTableChangedEvent(final String databaseName, final String schemaName, final DataChangedEvent event) {
90+
if ((Type.ADDED == event.getType() || Type.UPDATED == event.getType()) && TableMetaDataNode.isTableActiveVersionNode(event.getKey())) {
91+
String tableName = TableMetaDataNode.getTableNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found."));
92+
return Optional.of(new TableCreatedOrAlteredEvent(databaseName, schemaName, tableName, event.getKey(), event.getValue()));
93+
}
94+
if (Type.DELETED == event.getType() && TableMetaDataNode.isTableNode(event.getKey())) {
95+
String tableName = TableMetaDataNode.getTableName(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found."));
96+
return Optional.of(new TableDroppedEvent(databaseName, schemaName, tableName));
97+
}
98+
return Optional.empty();
99+
}
100+
101+
private boolean isViewMetaDataChanged(final String key) {
102+
return ViewMetaDataNode.isViewActiveVersionNode(key) || ViewMetaDataNode.isViewNode(key);
103+
}
104+
105+
private Optional<DispatchEvent> buildViewChangedEvent(final String databaseName, final String schemaName, final DataChangedEvent event) {
106+
if ((Type.ADDED == event.getType() || Type.UPDATED == event.getType()) && ViewMetaDataNode.isViewActiveVersionNode(event.getKey())) {
107+
String viewName = ViewMetaDataNode.getViewNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found."));
108+
return Optional.of(new ViewCreatedOrAlteredEvent(databaseName, schemaName, viewName, event.getKey(), event.getValue()));
109+
}
110+
if (Type.DELETED == event.getType() && ViewMetaDataNode.isViewNode(event.getKey())) {
111+
String viewName = ViewMetaDataNode.getViewName(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found."));
112+
return Optional.of(new ViewDroppedEvent(databaseName, schemaName, viewName, event.getKey(), event.getValue()));
113+
}
114+
return Optional.empty();
115+
}
116+
117+
private Optional<DispatchEvent> buildDataSourceChangedEvent(final String databaseName, final DataChangedEvent event) {
118+
if (DataSourceMetaDataNode.isDataSourceUnitActiveVersionNode(event.getKey()) || DataSourceMetaDataNode.isDataSourceUnitNode(event.getKey())) {
119+
return buildStorageUnitChangedEvent(databaseName, event);
120+
}
121+
if (DataSourceMetaDataNode.isDataSourceNodeActiveVersionNode(event.getKey()) || DataSourceMetaDataNode.isDataSourceNodeNode(event.getKey())) {
122+
return buildStorageNodeChangedEvent(databaseName, event);
123+
}
124+
return Optional.empty();
125+
}
126+
127+
private Optional<DispatchEvent> buildStorageUnitChangedEvent(final String databaseName, final DataChangedEvent event) {
128+
Optional<String> dataSourceUnitName = DataSourceMetaDataNode.getDataSourceNameByDataSourceUnitActiveVersionNode(event.getKey());
129+
if (dataSourceUnitName.isPresent()) {
130+
if (Type.ADDED == event.getType()) {
131+
return Optional.of(new StorageUnitRegisteredEvent(databaseName, dataSourceUnitName.get(), event.getKey(), event.getValue()));
132+
}
133+
if (Type.UPDATED == event.getType()) {
134+
return Optional.of(new StorageUnitAlteredEvent(databaseName, dataSourceUnitName.get(), event.getKey(), event.getValue()));
135+
}
136+
}
137+
dataSourceUnitName = DataSourceMetaDataNode.getDataSourceNameByDataSourceUnitNode(event.getKey());
138+
if (Type.DELETED == event.getType() && dataSourceUnitName.isPresent()) {
139+
return Optional.of(new StorageUnitUnregisteredEvent(databaseName, dataSourceUnitName.get()));
140+
}
141+
return Optional.empty();
142+
}
143+
144+
private Optional<DispatchEvent> buildStorageNodeChangedEvent(final String databaseName, final DataChangedEvent event) {
145+
Optional<String> dataSourceNodeName = DataSourceMetaDataNode.getDataSourceNameByDataSourceNodeActiveVersionNode(event.getKey());
146+
if (dataSourceNodeName.isPresent()) {
147+
if (Type.ADDED == event.getType()) {
148+
return Optional.of(new StorageNodeRegisteredEvent(databaseName, dataSourceNodeName.get(), event.getKey(), event.getValue()));
149+
}
150+
if (Type.UPDATED == event.getType()) {
151+
return Optional.of(new StorageNodeAlteredEvent(databaseName, dataSourceNodeName.get(), event.getKey(), event.getValue()));
152+
}
153+
}
154+
dataSourceNodeName = DataSourceMetaDataNode.getDataSourceNameByDataSourceNodeNode(event.getKey());
155+
if (Type.DELETED == event.getType() && dataSourceNodeName.isPresent()) {
156+
return Optional.of(new StorageNodeUnregisteredEvent(databaseName, dataSourceNodeName.get()));
157+
}
158+
return Optional.empty();
159+
}
160+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.mode.event.builder;
19+
20+
import org.apache.shardingsphere.mode.event.DataChangedEvent;
21+
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
22+
import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
23+
import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeAlteredEvent;
24+
import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeRegisteredEvent;
25+
import org.apache.shardingsphere.mode.event.dispatch.datasource.node.StorageNodeUnregisteredEvent;
26+
import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitAlteredEvent;
27+
import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitRegisteredEvent;
28+
import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitUnregisteredEvent;
29+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent;
30+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent;
31+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent;
32+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableDroppedEvent;
33+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewCreatedOrAlteredEvent;
34+
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent;
35+
import org.junit.jupiter.api.extension.ExtensionContext;
36+
import org.junit.jupiter.params.ParameterizedTest;
37+
import org.junit.jupiter.params.provider.Arguments;
38+
import org.junit.jupiter.params.provider.ArgumentsProvider;
39+
import org.junit.jupiter.params.provider.ArgumentsSource;
40+
41+
import java.util.Optional;
42+
import java.util.stream.Stream;
43+
44+
import static org.hamcrest.CoreMatchers.instanceOf;
45+
import static org.hamcrest.MatcherAssert.assertThat;
46+
import static org.junit.jupiter.api.Assertions.assertFalse;
47+
import static org.junit.jupiter.api.Assertions.assertTrue;
48+
49+
class MetaDataChangedEventBuilderTest {
50+
51+
@ParameterizedTest(name = "{0}")
52+
@ArgumentsSource(TestCaseArgumentsProvider.class)
53+
void assertBuild(final String name, final String eventKey, final Type type, final Class<? extends DispatchEvent> toBePostedEventType) {
54+
Optional<DispatchEvent> actual = new MetaDataChangedEventBuilder().build("foo_db", new DataChangedEvent(eventKey, "value", type));
55+
if (null == toBePostedEventType) {
56+
assertFalse(actual.isPresent());
57+
} else {
58+
assertTrue(actual.isPresent());
59+
assertThat(actual.get(), instanceOf(toBePostedEventType));
60+
}
61+
}
62+
63+
private static class TestCaseArgumentsProvider implements ArgumentsProvider {
64+
65+
@Override
66+
public final Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) {
67+
return Stream.of(
68+
Arguments.of("changeWithoutDatabase", "/metadata", Type.IGNORED, null),
69+
Arguments.of("addSchema", "/metadata/foo_db/schemas/foo_schema", Type.ADDED, SchemaAddedEvent.class),
70+
Arguments.of("updateSchema", "/metadata/foo_db/schemas/foo_schema", Type.UPDATED, SchemaAddedEvent.class),
71+
Arguments.of("deleteSchema", "/metadata/foo_db/schemas/foo_schema", Type.DELETED, SchemaDeletedEvent.class),
72+
Arguments.of("ignoreChangeSchema", "/metadata/foo_db/schemas/foo_schema", Type.IGNORED, null),
73+
Arguments.of("addTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl/active_version/0", Type.ADDED, TableCreatedOrAlteredEvent.class),
74+
Arguments.of("updateTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl/active_version/0", Type.UPDATED, TableCreatedOrAlteredEvent.class),
75+
Arguments.of("deleteTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl", Type.DELETED, TableDroppedEvent.class),
76+
Arguments.of("invalidAddTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl", Type.ADDED, null),
77+
Arguments.of("invalidDeleteTable", "/metadata/foo_db/schemas/foo_schema/tables/foo_tbl/active_version/0", Type.DELETED, null),
78+
Arguments.of("addView", "/metadata/foo_db/schemas/foo_schema/views/foo_view/active_version/0", Type.ADDED, ViewCreatedOrAlteredEvent.class),
79+
Arguments.of("updateView", "/metadata/foo_db/schemas/foo_schema/views/foo_view/active_version/0", Type.UPDATED, ViewCreatedOrAlteredEvent.class),
80+
Arguments.of("deleteView", "/metadata/foo_db/schemas/foo_schema/views/foo_view", Type.DELETED, ViewDroppedEvent.class),
81+
Arguments.of("invalidAddView", "/metadata/foo_db/schemas/foo_schema/views/foo_view", Type.ADDED, null),
82+
Arguments.of("invalidDeleteView", "/metadata/foo_db/schemas/foo_schema/views/foo_view/active_version/0", Type.DELETED, null),
83+
Arguments.of("registerStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit/active_version/0", Type.ADDED, StorageUnitRegisteredEvent.class),
84+
Arguments.of("alterStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit/active_version/0", Type.UPDATED, StorageUnitAlteredEvent.class),
85+
Arguments.of("unregisterStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit", Type.DELETED, StorageUnitUnregisteredEvent.class),
86+
Arguments.of("invalidRegisterStorageNode", "/metadata/foo_db/data_sources/units/foo_unit", Type.ADDED, null),
87+
Arguments.of("invalidUnregisterStorageNode", "/metadata/foo_db/data_sources/units/foo_unit/active_version/0", Type.DELETED, null),
88+
Arguments.of("ignoreChangeStorageUnit", "/metadata/foo_db/data_sources/units/foo_unit", Type.IGNORED, null),
89+
Arguments.of("registerStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node/active_version/0", Type.ADDED, StorageNodeRegisteredEvent.class),
90+
Arguments.of("alterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node/active_version/0", Type.UPDATED, StorageNodeAlteredEvent.class),
91+
Arguments.of("unregisterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node", Type.DELETED, StorageNodeUnregisteredEvent.class),
92+
Arguments.of("invalidRegisterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node", Type.ADDED, null),
93+
Arguments.of("invalidUnregisterStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node/active_version/0", Type.DELETED, null),
94+
Arguments.of("ignoreChangeStorageNode", "/metadata/foo_db/data_sources/nodes/foo_node", Type.IGNORED, null),
95+
Arguments.of("invalidChangeDataSource", "/metadata/foo_db/data_sources/other", Type.ADDED, null));
96+
}
97+
}
98+
}

mode/core/src/test/java/org/apache/shardingsphere/mode/event/builder/RuleConfigurationChangedEventBuilderTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ void assertBuildWithoutRuleNodePathProvider() {
6161

6262
@ParameterizedTest(name = "{0}")
6363
@ArgumentsSource(TestCaseArgumentsProvider.class)
64-
void assertBuild(final String name, final String eventKey, final String eventValue, final DataChangedEvent.Type type,
65-
final boolean isEventPresent, final Class<? extends DispatchEvent> dispatchEventClass) {
64+
void assertBuild(final String name, final String eventKey, final String eventValue, final Type type, final boolean isEventPresent, final Class<? extends DispatchEvent> dispatchEventClass) {
6665
RuleNodePathProvider ruleNodePathProvider = mock(RuleNodePathProvider.class, RETURNS_DEEP_STUBS);
6766
when(ruleNodePathProvider.getRuleNodePath()).thenReturn(new RuleNodePath("fixture", Collections.singleton("named"), Collections.singleton("unique")));
6867
when(ShardingSphereServiceLoader.getServiceInstances(RuleNodePathProvider.class)).thenReturn(Collections.singleton(ruleNodePathProvider));

0 commit comments

Comments
 (0)