博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink – SlotSharingGroup
阅读量:5231 次
发布时间:2019-06-14

本文共 19815 字,大约阅读时间需要 66 分钟。

 

SlotSharingGroup

表示不同的task可以共享slot,但是这是soft的约束,即也可以不在一个slot

默认情况下,整个StreamGraph都会用一个默认的“default” SlotSharingGroup,即所有的JobVertex的task都可以共用一个slot

/** * A slot sharing units defines which different task (from different job vertices) can be * deployed together within a slot. This is a soft permission, in contrast to the hard constraint * defined by a co-location hint. */public class SlotSharingGroup implements java.io.Serializable {    private final Set
ids = new TreeSet
(); /** Mapping of tasks to subslots. This field is only needed inside the JobManager, and is not RPCed. */ private transient SlotSharingGroupAssignment taskAssignment;

可以看到,关键的部分

1. TreeSet<JobVertexID>(),用于保存属于这个group的Jobvertexid

2. SlotSharingGroupAssignment

 

CoLocationGroup

只是hard约束,在group中的JobVertices,需要对应index的subtask跑在一个slot中

/** * A Co-location group is a group of JobVertices, where the i-th subtask of one vertex * has to be executed on the same TaskManager as the i-th subtask of all * other JobVertices in the same group. *  * 

The co-location group is used for example to make sure that the i-th subtasks for iteration * head and iteration tail are scheduled to the same TaskManager.

*/public class CoLocationGroup implements java.io.Serializable { /** The ID that describes the slot co-location-constraint as a group */ private final AbstractID id = new AbstractID(); /** The vertices participating in the co-location group */ private final List
vertices = new ArrayList
(); /** The constraints, which hold the shared slots for the co-located operators */ private transient ArrayList
constraints;

 

CoLocationConstraint,可以看作一种特殊的SharedSlot

/** * A CoLocationConstraint manages the location of a set of tasks * (Execution Vertices). In co-location groups, the different subtasks of * different JobVertices need to be executed on the same {
@link Instance}. * This is realized by creating a special shared slot that holds these tasks. * *

This class tracks the location and the shared slot for this set of tasks. */public class CoLocationConstraint { private final CoLocationGroup group; private volatile SharedSlot sharedSlot; private volatile ResourceID lockedLocation;

 

几种Slot,

AllocatedSlot ,代表从taskmanager分配出的slot

/** * The {
@code AllocatedSlot} represents a slot that the JobManager allocated from a TaskManager. * It represents a slice of allocated resources from the TaskManager. * *

To allocate an {

@code AllocatedSlot}, the requests a slot from the ResourceManager. The * ResourceManager picks (or starts) a TaskManager that will then allocate the slot to the * JobManager and notify the JobManager. * *

Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6), * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the * JobManager. All slots had a default unknown resource profile. */public class AllocatedSlot { /** The ID under which the slot is allocated. Uniquely identifies the slot. */ private final AllocationID slotAllocationId; /** The ID of the job this slot is allocated for */ private final JobID jobID; /** The location information of the TaskManager to which this slot belongs */ private final TaskManagerLocation taskManagerLocation; /** The resource profile of the slot provides */ private final ResourceProfile resourceProfile; /** RPC gateway to call the TaskManager that holds this slot */ private final TaskManagerGateway taskManagerGateway; /** The number of the slot on the TaskManager to which slot belongs. Purely informational. */ private final int slotNumber;

 

Slot,可以看作对AllocatedSlot的封装

/** * Base class for slots that the Scheduler / ExecutionGraph take from the SlotPool and use to place * tasks to execute into. A slot corresponds to an AllocatedSlot (a slice of a TaskManager's resources), * plus additional fields to track what is currently executed in that slot, or if the slot is still * used or disposed (ExecutionGraph gave it back to the pool). * * 

In the simplest case, a slot holds a single task ({

@link SimpleSlot}). In the more complex * case, a slot is shared ({
@link SharedSlot}) and contains a set of tasks. Shared slots may contain * other shared slots which in turn can hold simple slots. That way, a shared slot may define a tree * of slots that belong to it. */public abstract class Slot { /** The allocated slot that this slot represents. */ private final AllocatedSlot allocatedSlot; /** The owner of this slot - the slot was taken from that owner and must be disposed to it */ private final SlotOwner owner; /** The parent of this slot in the hierarchy, or null, if this is the parent */ @Nullable private final SharedSlot parent; /** The id of the group that this slot is allocated to. May be null. */ @Nullable private final AbstractID groupID; /** The number of the slot on which the task is deployed */ private final int slotNumber;

 

SimpleSlot,放单个task的slot

/** * A SimpleSlot represents a single slot on a TaskManager instance, or a slot within a shared slot. * * 

If this slot is part of a {

@link SharedSlot}, then the parent attribute will point to that shared slot. * If not, then the parent attribute is null. */public class SimpleSlot extends Slot { /** Task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */ private volatile Execution executedTask; //非share,只有一个task /** The locality attached to the slot, defining whether the slot was allocated at the desired location. */ private volatile Locality locality = Locality.UNCONSTRAINED;

 

SharedSlot ,

/** * This class represents a shared slot. A shared slot can have multiple * {
@link SimpleSlot} instances within itself. This allows to * schedule multiple tasks simultaneously to the same resource. Sharing a resource with multiple * tasks is crucial for simple pipelined / streamed execution, where both the sender and the receiver * are typically active at the same time. * *

IMPORTANT: This class contains no synchronization. Thus, the caller has to guarantee proper * synchronization. In the current implementation, all concurrently modifying operations are * passed through a {

@link SlotSharingGroupAssignment} object which is responsible for * synchronization. */public class SharedSlot extends Slot { /** The assignment group os shared slots that manages the availability and release of the slots */ private final SlotSharingGroupAssignment assignmentGroup; /** The set os sub-slots allocated from this shared slot */ private final Set
subSlots;

可以看到sharedSlot继承自Slot,而Slot中只有一个

AllocatedSlot allocatedSlot

所以,无论在subSlots有多少slot,但他们都是共用这个allocatedSlot的

从相应的sharedSlot上去分配simpleSlot

SimpleSlot allocateSubSlot(AbstractID groupId) {        if (isAlive()) {            SimpleSlot slot = new SimpleSlot(                    getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(),                     getTaskManagerGateway(), this, groupId);            subSlots.add(slot);            return slot;        }        else {            return null;        }    }

 

public SimpleSlot(            JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,            TaskManagerGateway taskManagerGateway,            @Nullable SharedSlot parent, @Nullable AbstractID groupID) {        super(parent != null ? //如果有parent,即属于某个sharedSlot                parent.getAllocatedSlot() : //使用parent sharedSlot                new AllocatedSlot(NO_ALLOCATION_ID, jobID, location, slotNumber, //创建新的AllocatedSlot                        ResourceProfile.UNKNOWN, taskManagerGateway),                owner, slotNumber, parent, groupID);    }

 

 

SlotSharingGroupAssignment,用于管理一组SharedSlot

注释中的示意图,比较清晰

/** * The SlotSharingGroupAssignment manages a set of shared slots, which are shared between * tasks of a {
@link org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup}. * *

The assignments shares tasks by allowing a shared slot to hold one vertex per * JobVertexID. For example, consider a program consisting of job vertices "source", "map", * "reduce", and "sink". If the slot sharing group spans all four job vertices, then * each shared slot can hold one parallel subtask of the source, the map, the reduce, and the * sink vertex. Each shared slot holds the actual subtasks in child slots, which are (at the leaf level), * the {

@link SimpleSlot}s.

* *

An exception are the co-location-constraints, that define that the i-th subtask of one * vertex needs to be scheduled strictly together with the i-th subtasks of of the vertices * that share the co-location-constraint. To manage that, a co-location-constraint gets its * own shared slot inside the shared slots of a sharing group.

* *

Consider a job set up like this:

* *
{
@code * +-------------- Slot Sharing Group --------------+ * | | * | +-- Co Location Group --+ | * | | | | * | (source) ---> (head) ---> (tail) ---> (sink) | * | | | | * | +-----------------------+ | * +------------------------------------------------+ * }
* *

The slot hierarchy in the slot sharing group will look like the following

* *
 *     Shared(0)(root) *        | *        +-- Simple(2)(sink) *        | *        +-- Shared(1)(co-location-group) *        |      | *        |      +-- Simple(0)(tail) *        |      +-- Simple(1)(head) *        | *        +-- Simple(0)(source) * 
*/public class SlotSharingGroupAssignment { /** All slots currently allocated to this sharing group */ private final Set
allSlots = new LinkedHashSet
(); /** The slots available per vertex type (JobVertexId), keyed by TaskManager, to make them locatable */ private final Map
>> availableSlotsPerJid = new LinkedHashMap<>();

核心的结构,

allSlots,用于保存所有的SharedSlot,这些SharedSlot都是可以共享的,被分配给不同的JobVertex下的task

availableSlotsPerJid,用于记录对应关系,AbstractID表示JobVertexID,ResourceID表示TaskManager

 

最核心的函数,

getSlotForTask,为task分配slot

/**     * Gets a slot suitable for the given task vertex. This method will prefer slots that are local     * (with respect to {
@link ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non local * slots if no local slot is available. The method returns null, when this sharing group has * no slot is available for the given JobVertexID. * * @param vertex The vertex to allocate a slot for. * * @return A slot to execute the given ExecutionVertex in, or null, if none is available. */ public SimpleSlot getSlotForTask(ExecutionVertex vertex) { return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocationsBasedOnInputs()); //默认以input所分配的slot的location信息,作为Preferred } /** * */ SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable
locationPreferences) { synchronized (lock) { Tuple2
p = getSlotForTaskInternal(vertexID, locationPreferences, false); //获取SharedSlot,第三个参数意思是,不是一定要local if (p != null) { SharedSlot ss = p.f0; SimpleSlot slot = ss.allocateSubSlot(vertexID); //从SharedSlot中分配SimpleSlot slot.setLocality(p.f1); return slot; } else { return null; } } }

 

getSlotForTaskInternal

private Tuple2
getSlotForTaskInternal( AbstractID groupId, Iterable
preferredLocations, boolean localOnly) { // check if there is anything at all in this group assignment if (allSlots.isEmpty()) { //如果没有slots,返回 return null; } // get the available slots for the group Map
> slotsForGroup = availableSlotsPerJid.get(groupId); //取出JobVertex所对应的结构slotsForGroup if (slotsForGroup == null) { //初始化slotsForGroup // we have a new group, so all slots are available slotsForGroup = new LinkedHashMap<>(); availableSlotsPerJid.put(groupId, slotsForGroup); for (SharedSlot availableSlot : allSlots) { //因为allSlots是共享的,所以都可以加到slotsForGroup作为可用slots putIntoMultiMap(slotsForGroup, availableSlot.getTaskManagerID(), availableSlot); //将availableSlot注册到slotsForGroup,也就是放到其location所对应的list里面 } } else if (slotsForGroup.isEmpty()) { //如果slotsForGroup存在,但是没有可用slots // the group exists, but nothing is available for that group return null; } // check whether we can schedule the task to a preferred location boolean didNotGetPreferred = false; if (preferredLocations != null) { //如果有perferred location for (TaskManagerLocation location : preferredLocations) { //对每一个具体的prefer location // set the flag that we failed a preferred location. If one will be found, // we return early anyways and skip the flag evaluation didNotGetPreferred = true; //tricky逻辑,如果下面return,这里的设置也没用;如果没返回,说明没有找到prefer的,所以设为true没有问题 SharedSlot slot = removeFromMultiMap(slotsForGroup, location.getResourceID()); //如果可以在slotsForGroup找到对应prefer location上的slot,为何要remove,因为一个jobvertex不可能有两个task跑在同一个slot上 if (slot != null && slot.isAlive()) { return new Tuple2<>(slot, Locality.LOCAL); //返回,并且满足prefer,所以是local,local的含义是和prefer在同一个taskmanager上 } } } // if we want only local assignments, exit now with a "not found" result if (didNotGetPreferred && localOnly) { //如果没有找到prefer local,并且需要localonly,返回null return null; } Locality locality = didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED; //走到这里,并didNotGetPreferred = false,说明preferredLocations = null,即UNCONSTRAINED,没有约束条件 // schedule the task to any available location SharedSlot slot; while ((slot = pollFromMultiMap(slotsForGroup)) != null) { //在不指定taskmanager location的情况下,随意找一个slot if (slot.isAlive()) { return new Tuple2<>(slot, locality); } } // nothing available after all, all slots were dead return null; }

SharedSlot.allocateSubSlot

见上

 

那么自然有个疑问,allSlots里面的slot哪边来的?

 

addSharedSlotAndAllocateSubSlot

private SimpleSlot addSharedSlotAndAllocateSubSlot(            SharedSlot sharedSlot, Locality locality, JobVertexID groupId, CoLocationConstraint constraint) {        final ResourceID location = sharedSlot.getTaskManagerID();        synchronized (lock) {                        SimpleSlot subSlot;            AbstractID groupIdForMap;                        // add to the total bookkeeping            if (!allSlots.add(sharedSlot)) { //加到allSlots中                throw new IllegalArgumentException("Slot was already contained in the assignment group");            }                                if (constraint == null) {                // allocate us a sub slot to return                subSlot = sharedSlot.allocateSubSlot(groupId); //简单的allocate一个simpleSlot                groupIdForMap = groupId;            }            else { //如果有CoLocationConstraint                            }                        if (subSlot != null) {                // preserve the locality information                subSlot.setLocality(locality);                                // let the other groups know that this slot exists and that they                // can place a task into this slot.                boolean entryForNewJidExists = false;                                for (Map.Entry
>> entry : availableSlotsPerJid.entrySet()) { // there is already an entry for this groupID if (entry.getKey().equals(groupIdForMap)) { entryForNewJidExists = true; continue; } Map
> available = entry.getValue(); putIntoMultiMap(available, location, sharedSlot); //对于其他的jobVertex,把sharedSlot加上去 } // make sure an empty entry exists for this group, if no other entry exists if (!entryForNewJidExists) { //如果存在参数中的groupId,那么就把它的slot信息清空 availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap
>()); } return subSlot; } } // end synchronized (lock) }

 

而addSharedSlotAndAllocateSubSlot在Scheduler中被调用,

protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex,                                                    Iterable
requestedLocations, SlotSharingGroupAssignment groupAssignment, CoLocationConstraint constraint, boolean localOnly) { // we need potentially to loop multiple times, because there may be false positives // in the set-with-available-instances while (true) { Pair
instanceLocalityPair = findInstance(requestedLocations, localOnly); //根据locations信息找到local的instance if (instanceLocalityPair == null) { //如果没有可用的instance,返回null // nothing is available return null; } final Instance instanceToUse = instanceLocalityPair.getLeft(); final Locality locality = instanceLocalityPair.getRight(); try { JobVertexID groupID = vertex.getJobvertexId(); // allocate a shared slot from the instance SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment); //从instance申请一个SharedSlot // if the instance has further available slots, re-add it to the set of available resources. if (instanceToUse.hasResourcesAvailable()) { //如果这个instance还有多余的资源,再加入instancesWithAvailableResources,下次还能继续用来分配 this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse); } if (sharedSlot != null) { // add the shared slot to the assignment group and allocate a sub-slot SimpleSlot slot = constraint == null ? groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupID) : //把分配的SharedSlot加到SlotSharingGroup的SlotSharingGroupAssignment中 groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, constraint); if (slot != null) { return slot; } else { // could not add and allocate the sub-slot, so release shared slot sharedSlot.releaseSlot(); } } } catch (InstanceDiedException e) { // the instance died it has not yet been propagated to this scheduler // remove the instance from the set of available instances removeInstance(instanceToUse); } // if we failed to get a slot, fall through the loop } }

 

getNewSlotForSharingGroup是在当SlotSharingGroup没有可用的slot时,会被调用从instance中分配SharedSlot

 

参考,

转载于:https://www.cnblogs.com/fxjwind/p/6703312.html

你可能感兴趣的文章
[转]ASP数组全集,多维数组和一维数组
查看>>
C# winform DataGridView 常见属性
查看>>
逻辑运算和while循环.
查看>>
Nhiberate (一)
查看>>
c#后台计算2个日期之间的天数差
查看>>
安卓开发中遇到的小问题
查看>>
ARTS打卡第3周
查看>>
linux后台运行和关闭SSH运行,查看后台任务
查看>>
cookies相关概念
查看>>
CAN总线波形中ACK位电平为什么会偏高?
查看>>
MyBatis课程2
查看>>
桥接模式-Bridge(Java实现)
查看>>
svn客户端清空账号信息的两种方法
查看>>
springboot添加servlet的两种方法
查看>>
java的Array和List相互转换
查看>>
layui父页面执行子页面方法
查看>>
如何破解域管理员密码
查看>>
Windows Server 2008 R2忘记管理员密码后的解决方法
查看>>
IE11兼容IE8的设置
查看>>
windows server 2008 R2 怎么集成USB3.0驱动
查看>>