hadoop2.8.4 版本yarn RM fairScheduler调度性能优化的尝试
对一般小公司来说 可能yarn调度能力足够了 但是对于大规模集群1000 or 2000+的话 yarn的调度性能捉襟见肘
恰好网上看到一篇很好的文章https://tech.meituan.com/2019/08/01/hadoop-yarn-scheduling-performance-optimization-practice.html
参考了YARN-5969 发现hadoop2.9.0已经修正了该issue 实测提高了调度性能
FairScheduler 调度方式有两种
心跳调度:Yarn的NodeManager会通过心跳的方式定期向ResourceManager汇报自身状态 伴随着这次rpc请求 会触发Resourcemanager 触发nodeUpdate()方法 为这个节点进行一次资源调度
持续调度:有一个固定守护线程每隔很短的时间调度 实时的资源分配,与NodeManager的心跳出发的调度相互异步并行进行
心跳调度作为一个线程 每次运行
每次nodeUpdate 走的都是相同的逻辑
复制代码
// If the node is decommissioning, send an update to have the total
// resource equal to the used resource, so no available resource to
// schedule.
if (nm.getState() == NodeState.DECOMMISSIONING) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
.newInstance(getSchedulerNode(nm.getNodeID())
.getUsedResource(), 0)));
}
if (continuousSchedulingEnabled) {
if (!completedContainers.isEmpty()) { //心跳调度
attemptScheduling(node);
}
} else {
attemptScheduling(node); //持续调度
}
// Updating node resource utilization
node.setAggregatedContainersUtilization(
nm.getAggregatedContainersUtilization());
node.setNodeUtilization(nm.getNodeUtilization());
复制代码
continuousSchedulingAttempt
复制代码
/**
* Thread which attempts scheduling resources continuously,
* asynchronous to the node heartbeats.
*/
private class ContinuousSchedulingThread extends Thread {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
continuousSchedulingAttempt();
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
return;
}
}
}
}
复制代码
之后进行一次node节点 根据资源宽松情况的排序
复制代码
void continuousSchedulingAttempt() throws InterruptedException {
long start = getClock().getTime();
List nodeIdList = new ArrayList(nodes.keySet());
// Sort the nodes by space available on them, so that we offer
// containers on emptier nodes first, facilitating an even spread. This
// requires holding the scheduler lock, so that the space available on a
// node doesn't change during the sort.
synchronized (this) {
Collections.sort(nodeIdList, nodeAvailableResourceComparator);
}
// iterate all nodes
for (NodeId nodeId : nodeIdList) {
FSSchedulerNode node = getFSSchedulerNode(nodeId);
try {
if (node != null && Resources.fitsIn(minimumAllocation,
node.getAvailableResource())) {
attemptScheduling(node);
}
} catch (Throwable ex) {
LOG.error("Error while attempting scheduling for node " + node +
": " + ex.toString(), ex);
if ((ex instanceof YarnRuntimeException) &&
(ex.getCause() instanceof InterruptedException)) {
// AsyncDispatcher translates InterruptedException to
// YarnRuntimeException with cause InterruptedException.
// Need to throw InterruptedException to stop schedulingThread.
throw (InterruptedException)ex.getCause();
}
}
}
复制代码
依次对node遍历分配Container
queueMgr.getRootQueue().assignContainer(node) 从root遍历树 对抽象的应用资源遍历
复制代码
boolean validReservation = false;
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
validReservation = reservedAppSchedulable.assignReservedContainer(node);
}
if (!validReservation) {
// No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
Resource assignedResource = Resources.clone(Resources.none());
Resource maxResourcesToAssign =
Resources.multiply(node.getAvailableResource(), 0.5f);
while (node.getReservedContainer() == null) {
boolean assignedContainer = false;
Resource assignment = queueMgr.getRootQueue().assignContainer(node);
if (!assignment.equals(Resources.none())) { //判断是否分配到container
assignedContainers++;
assignedContainer = true;
Resources.addTo(assignedResource, assignment);
}
if (!assignedContainer) { break; }
if (!shouldContinueAssigning(assignedContainers,
maxResourcesToAssign, assignedResource)) {
break;
}
}
复制代码
接下来在assignContainer 方法中对子队列使用特定的比较器排序这里是fairSchduler
复制代码
@Override
public Resource assignContainer(FSSchedulerNode node) { 对于每一个服务器,对资源树进行一次递归搜索
Resource assigned = Resources.none();
// If this queue is over its limit, reject
if (!assignContainerPreCheck(node)) {
return assigned;
}
// Hold the write lock when sorting childQueues
writeLock.lock();
try {
Collections.sort(childQueues, policy.getComparator());
} finally {
writeLock.unlock();
}
复制代码
对队列下的app排序
复制代码
/*
* We are releasing the lock between the sort and iteration of the
* "sorted" list. There could be changes to the list here:
* 1. Add a child queue to the end of the list, this doesn't affect
* container assignment.
* 2. Remove a child queue, this is probably good to take care of so we
* don't assign to a queue that is going to be removed shortly.
*/
readLock.lock();
try {
for (FSQueue child : childQueues) {
assigned = child.assignContainer(node);
if (!Resources.equals(assigned, Resources.none())) {
break;
}
}
} finally {
readLock.unlock();
}
return assigned;
复制代码
assignContainer 可能传入的是app 可能传入的是一个队列 是队列的话 进行递归 直到找到app为止(root(FSParentQueue)节点递归调用assignContainer(),最终将到达最终叶子节点的assignContainer()方法,才真正开始进行分配)
我们在这里 关注的就是排序
hadoop2.8.4 排序类 FairSharePolicy中的 根据权重 需求的资源大小 和内存占比 进行排序 多次获取
getResourceUsage() 产生了大量重复计算 这个方法是一个动态获取的过程(耗时)
复制代码
@Override
public int compare(Schedulable s1, Schedulable s2) {
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
minShareRatio1 = (double) s1.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize();
minShareRatio2 = (double) s2.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize();
useToWeightRatio1 = s1.getResourceUsage().getMemorySize() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemorySize() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else
// Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}
复制代码
新版优化后如下
复制代码
@Override
public int compare(Schedulable s1, Schedulable s2) {
int res = compareDemand(s1, s2);
// Pre-compute resource usages to avoid duplicate calculation
Resource resourceUsage1 = s1.getResourceUsage();
Resource resourceUsage2 = s2.getResourceUsage();
if (res == 0) {
res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);
}
if (res == 0) {
res = compareFairShareUsage(s1, s2, resourceUsage1, resourceUsage2);
}
// Break the tie by submit time
if (res == 0) {
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
}
// Break the tie by job name
if (res == 0) {
res = s1.getName().compareTo(s2.getName());
}
return res;
}
private int compareDemand(Schedulable s1, Schedulable s2) {
int res = 0;
Resource demand1 = s1.getDemand();
Resource demand2 = s2.getDemand();
if (demand1.equals(Resources.none()) && Resources.greaterThan(
RESOURCE_CALCULATOR, null, demand2, Resources.none())) {
res = 1;
} else if (demand2.equals(Resources.none()) && Resources.greaterThan(
RESOURCE_CALCULATOR, null, demand1, Resources.none())) {
res = -1;
}
return res;
}
private int compareMinShareUsage(Schedulable s1, Schedulable s2,
Resource resourceUsage1, Resource resourceUsage2) {
int res;
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
resourceUsage1, minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
resourceUsage2, minShare2);
if (s1Needy && !s2Needy) {
res = -1;
} else if (s2Needy && !s1Needy) {
res = 1;
} else if (s1Needy && s2Needy) {
double minShareRatio1 = (double) resourceUsage1.getMemorySize() /
Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE)
.getMemorySize();
double minShareRatio2 = (double) resourceUsage2.getMemorySize() /
Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE)
.getMemorySize();
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
} else {
res = 0;
}
return res;
}
/**
* To simplify computation, use weights instead of fair shares to calculate
* fair share usage.
*/
private int compareFairShareUsage(Schedulable s1, Schedulable s2,
Resource resourceUsage1, Resource resourceUsage2) {
double weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);
double weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);
double useToWeightRatio1;
double useToWeightRatio2;
if (weight1 > 0.0 && weight2 > 0.0) {
useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1;
useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2;
} else { // Either weight1 or weight2 equals to 0
if (weight1 == weight2) {
// If they have same weight, just compare usage
useToWeightRatio1 = resourceUsage1.getMemorySize();
useToWeightRatio2 = resourceUsage2.getMemorySize();
} else {
// By setting useToWeightRatios to negative weights, we give the
// zero-weight one less priority, so the non-zero weight one will
// be given slots.
useToWeightRatio1 = -weight1;
useToWeightRatio2 = -weight2;
}
}
return (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
}
}
复制代码
用了测试环境集群 比较了修改前后两次队列排序耗时
上面红框里为 新版本 下面红框为老版本 虽然没有进行压测 但是在同样的调度任务前提下 是有说服力的 在大集群上每秒调度上千万乃至上亿次该方法时 调度优化变的明显https://www.cnblogs.com/songchaolin/p/11844217.html