Skip to content

[ISSUE #99] Add default rebalance strategy #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class RocketMQSource extends AbstractSupportShuffleSource {

protected Long pullIntervalMs;

protected String strategyName;
protected String strategyName = STRATEGY_AVERAGE;

protected transient DefaultMQPushConsumer consumer;
protected transient ConsumeFromWhere consumeFromWhere;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.configurable.service;

import com.alibaba.fastjson.JSONObject;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -28,7 +27,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
Expand Down Expand Up @@ -94,7 +92,7 @@ protected void updateConfiguresCache(List<IConfigurable> configureList) {

protected boolean equals(String key, List<?> newConfigureList) {
for (Object o : newConfigureList) {
IConfigurable configure = (IConfigurable)o;
IConfigurable configure = (IConfigurable) o;
String tempKey = getConfigureKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
if (key.equals(tempKey)) {
IConfigurable oldConfigure = configurableMap.get(key);
Expand All @@ -115,31 +113,24 @@ public <T extends IConfigurable> List<T> queryConfigurableByType(String type) {
}
List<T> result = new ArrayList<T>();
for (IConfigurable configurable : list) {
result.add((T)configurable);
result.add((T) configurable);
}
return result;
}

@Override
public boolean refreshConfigurable(String namespace) {
//每次刷新,重新刷新配置文件
//if(ComponentCreator.propertiesPath!=null){
// ComponentCreator.setProperties(ComponentCreator.propertiesPath);
//}

this.namespace = namespace;
// Map<String, List<IConfigurable>> namespace2ConfigurableMap = new HashMap<>();
Map<String, List<IConfigurable>> tempType2ConfigurableMap = new HashMap<>();
Map<String, IConfigurable> tempName2ConfigurableMap = new HashMap<>();
GetConfigureResult configures = loadConfigurable(namespace);
// updateConfiguresCache(configures.getConfigure());
if (configures != null && configures.isQuerySuccess() && configures.getConfigurables() != null) {
// List<Configure> configureList = filterConfigure(configures.getConfigure());
List<IConfigurable> configurables = configures.getConfigurables();
List<IConfigurable> configurableList = checkAndUpdateConfigurables(configurables, tempType2ConfigurableMap, tempName2ConfigurableMap);
// this.namespace2ConfigurableMap = namespace2ConfigurableMap;
for (IConfigurable configurable : configurableList) {
if (configurable instanceof IAfterConfigurableRefreshListener) {
((IAfterConfigurableRefreshListener)configurable).doProcessAfterRefreshConfigurable(this);
((IAfterConfigurableRefreshListener) configurable).doProcessAfterRefreshConfigurable(this);
}
}
return true;
Expand All @@ -149,10 +140,12 @@ public boolean refreshConfigurable(String namespace) {

@Override
public <T> T queryConfigurable(String configurableType, String name) {
return (T)queryConfigurableByIdent(configurableType, name);
return (T) queryConfigurableByIdent(configurableType, name);
}

protected List<IConfigurable> checkAndUpdateConfigurables(List<IConfigurable> configurables, Map<String, List<IConfigurable>> tempType2ConfigurableMap, Map<String, IConfigurable> tempName2ConfigurableMap) {
protected List<IConfigurable> checkAndUpdateConfigurables(List<IConfigurable> configurables,
Map<String, List<IConfigurable>> tempType2ConfigurableMap,
Map<String, IConfigurable> tempName2ConfigurableMap) {
List<IConfigurable> configurableList = new ArrayList<>();
for (IConfigurable configurable : configurables) {
try {
Expand Down Expand Up @@ -185,7 +178,7 @@ private void destroyOldConfigurables(Map<String, IConfigurable> tempName2Configu

private void destroyOldConfigurable(IConfigurable oldConfigurable) {
if (AbstractConfigurable.class.isInstance(oldConfigurable)) {
((AbstractConfigurable)oldConfigurable).destroy();
((AbstractConfigurable) oldConfigurable).destroy();
}
String key = getConfigureKey(oldConfigurable.getNameSpace(), oldConfigurable.getType(),
oldConfigurable.getConfigureName());
Expand All @@ -194,17 +187,14 @@ private void destroyOldConfigurable(IConfigurable oldConfigurable) {

protected void initConfigurable(IConfigurable configurable) {
if (AbstractConfigurable.class.isInstance(configurable)) {
AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
AbstractConfigurable abstractConfigurable = (AbstractConfigurable) configurable;
abstractConfigurable.setConfigurableService(this);
}

configurable.init();

}

/**
* 内部使用
*/
private ScheduledExecutorService scheduledExecutorService;

@Override
Expand Down Expand Up @@ -232,10 +222,6 @@ public void run() {
}, polingTime, polingTime, TimeUnit.SECONDS);
}
}
// @Override
// public List<IConfigurable> queryConfigurable(String nameSpace) {
// return namespace2ConfigurableMap.get(nameSpace);
// }

@Override
public List<IConfigurable> queryConfigurable(String type) {
Expand All @@ -260,7 +246,6 @@ public IConfigurable queryConfigurableByIdent(String type, String name) {

@Override
public void update(IConfigurable configurable) {
// update(configurable,name2ConfigurableMap,type2ConfigurableMap);
updateConfigurable(configurable);
}

Expand All @@ -284,7 +269,6 @@ protected boolean update(IConfigurable configurable, Map<String, IConfigurable>
IConfigurable oldConfigurable = this.name2ConfigurableMap.get(nameKey);
if (equals(configureKey, configurableList)) {
configurable = oldConfigurable;
// name2ConfigurableMap.put(nameKey, name2ConfigurableMap.get(nameKey));
} else {
destroyOldConfigurable(oldConfigurable);
initConfigurable(configurable);
Expand All @@ -297,14 +281,12 @@ protected boolean update(IConfigurable configurable, Map<String, IConfigurable>
updateConfiguresCache(configurable);
name2ConfigurableMap.put(nameKey, configurable);
String typeKey = MapKeyUtil.createKey(configurable.getType());
// put2Map(namespace2ConfigurableMap, namespace, configurable);
put2Map(type2ConfigurableMap, typeKey, configurable);
return isUpdate;
}

@Override
public void insert(IConfigurable configurable) {
// update(configurable,name2ConfigurableMap,type2ConfigurableMap);
insertConfigurable(configurable);
}

Expand Down Expand Up @@ -371,7 +353,6 @@ protected Configure createConfigure(IConfigurable configurable) {
jsonObject.put(CLASS_NAME, configurable.getClass().getName());
configure.setJsonValue(jsonObject.toJSONString());
}
// configure.createIdentification();
return configure;
}

Expand All @@ -383,7 +364,7 @@ public <T> Map<String, T> queryConfigurableMapByType(String type) {
}
Map<String, T> result = new HashMap<String, T>();
for (IConfigurable configurable : configurables) {
result.put(configurable.getConfigureName(), (T)configurable);
result.put(configurable.getConfigureName(), (T) configurable);
}
return result;
}
Expand Down Expand Up @@ -423,7 +404,7 @@ protected IConfigurable createConfigurableFromJson(String namespace, String type
configurable.setNameSpace(namespace);
configurable.setType(type);
if (AbstractConfigurable.class.isInstance(configurable)) {
AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
AbstractConfigurable abstractConfigurable = (AbstractConfigurable) configurable;
abstractConfigurable.setConfigurableService(this);
}
configurable.toObject(jsonValue);
Expand All @@ -450,7 +431,7 @@ protected IConfigurable convertConfigurable(Configure configure) {
jsonString);
if (configurable instanceof Entity) {
// add by wangtl 20171110 Configurable接口第三方包也在用,故不能Configurable里加接口,只能加到抽象类里,这里强转下
Entity abs = (Entity)configurable;
Entity abs = (Entity) configurable;
abs.setId(configure.getId());
abs.setGmtCreate(configure.getGmtCreate());
abs.setGmtModified(configure.getGmtModified());
Expand Down