// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // SchedulerProxy.h // // RM proxy for a scheduler instance // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- #pragma once namespace Concurrency { namespace details { #pragma warning(push) #pragma warning(disable: 4265) // non-virtual destructor in base class class SchedulerProxy : public ::Concurrency::ISchedulerProxy { public: /// /// Constructs a scheduler proxy. /// SchedulerProxy(IScheduler * pScheduler, ResourceManager * pResourceManager, const SchedulerPolicy &policy); /// /// Called in order to notify the resource manager that the given scheduler is shutting down. This /// will cause the resource manager to immediately reclaim all resources granted to the scheduler. /// virtual void Shutdown(); /// /// Called by a scheduler in order make an initial request for an allocation of virtual processors. The request /// is driven by policies within the scheduler queried via the IScheduler::GetPolicy method. If the request /// can be satisfied via the rules of allocation, it is communicated to the scheduler as a call to /// IScheduler::AddVirtualProcessors. /// /// /// Whether to subscribe the current thread and account for it during resource allocation. /// /// /// The IExecutionResource instance representing current thread if doSubscribeCurrentThread was true; NULL otherwise. /// virtual IExecutionResource * RequestInitialVirtualProcessors(bool doSubscribeCurrentThread); /// /// Ensures that a context is bound to a thread proxy. This API should *NOT* be called in the vast majority of circumstances. /// The IThreadProxy::SwitchTo will perform late binding to thread proxies as necessary. There are, however, circumstances /// where it is necessary to pre-bind a context to ensure that the SwitchTo operation switches to an already bound context. This /// is the case on a UMS scheduling context as it cannot call allocation APIs. /// /// /// The context to bind. /// virtual void BindContext(IExecutionContext * pContext); /// /// Returns an **unstarted** thread proxy attached to pContext, to the thread proxy factory. /// Such a thread proxy **must** be unstarted. /// This API should *NOT* be called in the vast majority of circumstances. /// /// /// The context to unbind. /// virtual void UnbindContext(IExecutionContext * pContext); /// /// This API registers the current thread with the resource manager associating it with this scheduler, /// and returns an instance of IExecutionResource back to the scheduler, for bookkeeping and maintenance. /// /// /// The IExecutionResource instance representing current thread in the runtime. /// virtual IExecutionResource * SubscribeCurrentThread(); /// /// The unique identifier of the scheduler this proxy represents. /// unsigned int GetId() const { return m_id; } /// /// Causes the resource manager to create a new virtual processor root running atop the same hardware thread as this /// execution resource. Typically, this is used when a scheduler wishes to oversubscribe a particular hardware thread /// for a limited amount of time. /// /// /// The execution resource abstraction on which to oversubscribe. /// /// /// A new virtual processor root running atop the same hardware thread as this execution resource. /// virtual IVirtualProcessorRoot * CreateOversubscriber(IExecutionResource * pExecutionResource); /// /// Getters for the various policy elements. /// unsigned int MaxConcurrency() const { return m_maxConcurrency; } unsigned int MinConcurrency() const { return m_minConcurrency; } unsigned int TargetOversubscriptionFactor() const { return m_targetOversubscriptionFactor; } int ContextStackSize () const { return m_contextStackSize; } int ContextPriority () const { return m_contextPriority; } /// /// Returns the minimum number of cores that must contain vprocs for this scheduler. These cores /// may contain a subscribed thread in addition to virtual processors. /// unsigned int MinVprocHWThreads() const { // Compute number of cores used for virtual processors that are fixed ASSERT(m_numFixedCores >= m_numExternalThreadCores); unsigned int fixedVprocCores = m_numFixedCores - m_numExternalThreadCores; // Compute maximum(t1, minimum set by policy) which is minimum of virtual processor cores return max(fixedVprocCores, m_minimumHardwareThreads); } unsigned int MinHWThreads() const { // The minimum needed number of hardware threads (cores) is equal to: // - minimum needed vproc cores + minimum needed external thread cores unsigned int minimumCores = MinVprocHWThreads() + m_numExternalThreadCores; ASSERT(minimumCores <= m_coreCount); return minimumCores; } unsigned int DesiredHWThreads() const { unsigned int desiredCores = min(m_coreCount, m_desiredHardwareThreads + m_numExternalThreadCores); ASSERT(m_numExternalThreads != 0 || desiredCores == m_desiredHardwareThreads); return desiredCores; } unsigned int ComputeMinHWThreadsWithExternalThread() const { unsigned int newMin = min(m_coreCount, MinHWThreads() + 1); return newMin; } unsigned int ComputeDesiredHWThreadsWithExternalThread() const { unsigned int newDesired = min(m_coreCount, DesiredHWThreads() + 1); return newDesired; } /// /// Returns the number of external thread subscriptions /// unsigned int GetNumNestedThreadSubscriptions() { return m_threadSubscriptions.Count(); } /// /// Called to adjust the suggested allocation such that we do not exceed maxConcurrency. /// This routine takes into account vprocs that are marked for removal but haven't yet been /// retired by the scheduler. The suggested allocation would be decreased to account for such /// vprocs. /// unsigned int AdjustAllocationIncrease(unsigned int suggestedAllocation) const; /// /// Returns the number of cores allocated to the proxy at any time. /// unsigned int GetNumAllocatedCores() const { return m_numAllocatedCores; } /// /// Returns the number of borrowed cores. These are cores that were oversubscribed and temporarily /// assigned to this scheduler during dynamic core migration as they were found to be unused /// by the other scheduler(s) they were assigned to. The reason these cores were oversubscribed /// instead of migrated was that they contributed to the minimum number of cores on the other /// scheduler(s) and hence couldn't be taken away. /// unsigned int GetNumBorrowedCores() const { return m_numBorrowedCores; } /// /// Returns the number of owned cores. This is the total allocated cores minus the borrowed cores. /// unsigned int GetNumOwnedCores() const { return m_numAllocatedCores - m_numBorrowedCores; } /// /// Returns the number of fixed cores - cores that have a subscribed thread on them. These cores may /// also have vprocs belonging to this scheduler. /// unsigned int GetNumFixedCores() const { return m_numFixedCores; } /// /// Toggles the state on a core from borrowed to owned (and vice versa), and updates necessary counts. /// void ToggleBorrowedState(SchedulerNode * pNode, unsigned int coreIndex); /// /// Creates a new execution resource for the external thread and registers it with the scheduler proxy. /// ExecutionResource * CreateExternalThreadResource(SchedulerNode * pNode, unsigned int coreIndex); /// /// Called by the RM when it is done reserving cores for the scheduler proxy. The scheduler proxy /// allocates virtual processors or standalone execution resources based on the cores that were allocated /// to it. /// ExecutionResource * GrantAllocation(unsigned int numberReserved, bool fInitialAllocation, bool fSubscribeCurrentThread); /// /// Finds the core allocated by the RM on which a single subscribed external thread should run. /// ExecutionResource * GrantExternalThreadAllocation(bool doOversubscribeCore); /// /// Returns a pointer to the copy of allocated nodes that were assigned to the proxy at /// creation time. /// SchedulerNode * GetAllocatedNodes() const { return m_pAllocatedNodes; } /// /// Sets the allocated nodes for the scheduler proxy to the nodes provided. /// void SetAllocatedNodes(SchedulerNode * pNodes) { ASSERT(m_pAllocatedNodes == NULL && pNodes != NULL); m_pAllocatedNodes = pNodes; } /// /// Returns a pointer to the array that holds the sorted order for nodes. This is used by the /// RM to sort nodes by whatever criteria it chooses. /// unsigned int * GetSortedNodeOrder() const { return m_pSortedNodeOrder; } /// /// Returns a pointer to the scheduler associated with the scheduler proxy. /// IScheduler * Scheduler() const { return m_pScheduler; } /// /// Returns a pointer to the resource manager associated with the scheduler proxy. /// ResourceManager * GetResourceManager() const { return m_pResourceManager; } /// /// Returns a pointer to a data buffer that is used to store static allocation data. The data /// is populated and manipulated by the RM, but stored in the scheduler proxy for convenience. /// StaticAllocationData * GetStaticAllocationData() { return &m_staticData; } /// /// Returns a pointer to a data buffer that is used to store dynamic allocation data. The data /// is populated and manipulated by the RM, but stored in the scheduler proxy for convenience. /// DynamicAllocationData * GetDynamicAllocationData() { return &m_dynamicData; } /// /// Creates a virtual processor root and adds it to the scheduler proxys list of roots. /// virtual VirtualProcessorRoot * CreateVirtualProcessorRoot(SchedulerNode * pNode, unsigned int coreIndex); /// /// Notifies the scheduler associated with this proxy to add the virtual processor roots provided. /// Called by the RM during initial allocation and dynamic core migration. /// void AddVirtualProcessorRoots(IVirtualProcessorRoot ** vprocRoots, unsigned int count); /// /// Adds an appropriate number of virtual processor roots to the scheduler associated with this proxy. /// Called by the RM during core migration when the RM decides to give this scheduler an additional /// core. /// void AddCore(SchedulerNode * pNode, unsigned int coreIndex, bool fBorrowed); /// /// Notifies the scheduler associated with this proxy to remove the virtual processor roots associated /// with the core provided. Called by the RM during core migration. /// void RemoveCore(SchedulerNode * pNode, unsigned int coreIndex); /// /// Called by the RM to instruct this scheduler proxy to notify its scheduler that this core is now /// externally busy or externally idle. /// void SendCoreNotification(SchedulerCore * pCore, bool isBusyNotification); /// /// Removes a root from the scheduler proxy and destroys it. This API is called in response to a scheduler /// informing the RM that it is done with a virtual processor root. /// void DestroyVirtualProcessorRoot(VirtualProcessorRoot * pRoot); /// /// Removes an execution resource from the scheduler proxy and destroys it. This API is called in response to a scheduler /// informing the RM that it is done with an execution resource. /// void DestroyExecutionResource(ExecutionResource * pExecutionResource); /// /// Returns a hardware affinity for the given node. Note that a scheduler proxy may only be assigned a subset /// of cores within a node -> the mask in the affinity reflects this subset. /// /// /// An abstraction of the hardware affinity which can be applied to Win32 objects. /// HardwareAffinity GetNodeAffinity(unsigned int nodeId) { ASSERT(nodeId < m_nodeCount); ASSERT(m_pAllocatedNodes[nodeId].m_id == nodeId); return HardwareAffinity(static_cast(m_pAllocatedNodes[nodeId].m_processorGroup), m_pAllocatedNodes[nodeId].m_nodeAffinity); } /// /// Adds an execution resource to the list of resources that run on a particular core. /// void AddExecutionResource(ExecutionResource * pExecutionResource); /// /// Adds the execution resource to the list of subscribed threads /// void AddThreadSubscription(ExecutionResource * pExecutionResource); /// /// Removes the execution resource from the list of subscribed threads /// void RemoveThreadSubscription(ExecutionResource * pExecutionResource); /// /// Creates or reuses an execution resource for the thread subscription /// ExecutionResource * GetResourceForNewSubscription(ExecutionResource * pParentExecutionResource); /// /// This function retrieves the execution resource associated with this thread, if one exists, /// and updates the reference count on it for better bookkeeping. /// /// /// The ExecutionResource instance representing current thread in the runtime. /// ExecutionResource * ReferenceCurrentThreadExecutionResource(); /// /// This function retrieves the execution resource associated with this thread, if one exists. /// /// /// The ExecutionResource instance representing current thread in the runtime. /// ExecutionResource * GetCurrentThreadExecutionResource(); /// /// Registers that a call to SubscribeCurrentThread has occurred for this core, making this core immovable. /// void IncrementFixedCoreCount(unsigned int nodeId, unsigned int coreIndex, bool isExternalThread); /// /// Registers that a call to IExecutionResource::Release has occurred, potentially freeing this core. /// void DecrementFixedCoreCount(unsigned int nodeId, unsigned int coreIndex, bool isExternalThread); /// /// Returns the number of external threads on this scheduler proxy. /// unsigned int GetNumExternalThreads() { return m_numExternalThreads; } /// /// Decides whether this scheduler proxy should receive notifications when other /// schedulers borrow its cores or return them back. /// bool ShouldReceiveNotifications() { return (m_minimumHardwareThreads == m_desiredHardwareThreads); } /// /// A function that passes statistical information to the hill climbing instance. Based on these /// statistics, hill climbing will make a recommendation on the number of resources the scheduler /// should be allocated. /// /// /// The number of resources used in this period of time. /// /// /// The number of completed units or work in that period of time. /// /// /// The number of incoming units or work in that period of time. /// /// /// The total length of the work queue. /// /// /// The recommended allocation for the scheduler. /// unsigned int DoHillClimbing(unsigned int currentCoreCount, unsigned int completionRate, unsigned int arrivalRate, unsigned int queueLength) { return m_pHillClimbing->Update(currentCoreCount, completionRate, arrivalRate, queueLength); } /// /// This function returns whether the scheduler has opted in to statistical rebalancing. /// /// /// Whether hill climbing is enabled. /// bool IsHillClimbingEnabled() { return m_fDoHillClimbing; } /// /// Gets the current length of the scheduler queue. /// /// /// The queue length. /// unsigned int GetQueueLength() { return m_queueLength; } /// /// Sets the current length of the scheduler queue. /// /// /// The length to be set. /// void SetQueueLength(unsigned int queueLength) { m_queueLength = queueLength; } /// /// Gets a new thread proxy from the factory. /// virtual IThreadProxy * GetNewThreadProxy(IExecutionContext * pContext); /// /// Called to shutdown a scheduler proxy. Derived classes can override shutdown behavior based on this. /// virtual void FinalShutdown(); /// /// Called to assist dynamic resource management in determining whether cores assigned to schedulers /// are idle. An idle core is one whose subscription level is 0. /// void IncrementCoreSubscription(ExecutionResource * pExecutionResource); /// /// Called to assist dynamic resource management in determining whether cores assigned to schedulers /// are idle. An idle core is one whose subscription level is 0. /// void DecrementCoreSubscription(ExecutionResource * pExecutionResource); #if defined(CONCRT_TRACING) /// /// Captures the initial state of the scheduler map at the beginning of core migration, each cycle. /// void TraceInitialDRMState(); #endif protected: /// /// Deletes the scheduler proxy. /// virtual void DeleteThis() { delete this; } /// /// Cleans up resources associated with the scheduler. /// void Cleanup(); /// /// Destructor. /// ~SchedulerProxy(); // A cached pointer to a thread proxy factory of the appropriate type for this scheduler proxy. IThreadProxyFactory * m_pThreadProxyFactory; private: template friend class List; #if defined(CONCRT_TRACING) struct SchedulerCoreData { unsigned char m_nodeIndex; unsigned char m_coreIndex; bool m_fAllocated : 1; bool m_fFixed : 1; bool m_fBorrowed : 1; bool m_fIdle : 1; }; // Captures the initial global allocation during the DRM phase. SchedulerCoreData * m_drmInitialState; unsigned int m_numTotalCores; #endif IScheduler * m_pScheduler; // Pointer to the resource manager instance. ResourceManager * m_pResourceManager; // Local copy of allocation map for this scheduler proxy. SchedulerNode * m_pAllocatedNodes; // Helper array used to sort nodes, used by the RM during core migration. unsigned int * m_pSortedNodeOrder; // Links for a list. SchedulerProxy * m_pNext{}, * m_pPrev{}; // A lock that protects resource allocation and deallocation of roots within this proxy. _ReentrantBlockingLock m_lock; // Hill climbing instance. HillClimbing * m_pHillClimbing; // Static and dynamic allocation data is populated and manipulated by the RM, but // stored in the scheduler proxy for convenience. union { // Data used during static allocation. StaticAllocationData m_staticData; // Data used during dynamic allocation. DynamicAllocationData m_dynamicData; }; // Scheduler queue length. unsigned int m_queueLength; // Unique identifier. unsigned int m_id; // Variables that store policy elements. unsigned int m_desiredHardwareThreads; unsigned int m_minimumHardwareThreads; unsigned int m_minConcurrency; unsigned int m_maxConcurrency; unsigned int m_targetOversubscriptionFactor; int m_contextStackSize; int m_contextPriority; // Current concurrency level (number of vproc roots). This includes vproc roots // that are marked for removal but has not yet been destroyed by the scheduler. // Protected by the scheduler proxy lock unsigned int m_currentConcurrency; // The number of cores allocated to this scheduler proxy. unsigned int m_numAllocatedCores; // At any time this has the number of additional cores that can be allocated with m_tof threads. // When this falls to 0, all remaining allocated cores will get m_tof - 1 threads, to ensure that // we don't go over max concurrency threads. unsigned int m_numFullySubscribedCores; // The number of allocated cores that are borrowed. An borrowed core is a core that is assigned to // one or more different schedulers, but was found to be idle. The RM temporarily assigns idle resources to // schedulers that need them. unsigned int m_numBorrowedCores; // The number of cores that have a subscribed thread on them. These cores are 'fixed' in that they cannot // be removed by static/dynamic allocations, as long as the subscribed thread is present on them. unsigned int m_numFixedCores; // The number of virtual processors (threads) that were added to the related scheduler via initial // allocation or core migration. Does not include oversubscribed virtual processors. unsigned int m_numAssignedThreads; // The number of external threads that were added to the related scheduler via external subscription calls. unsigned int m_numExternalThreads; // The number of cores that external threads occupy exclusively. unsigned int m_numExternalThreadCores; // The number of hardware threads available on this machine. unsigned int m_coreCount; // Number of nodes in the allocated nodes array. unsigned int m_nodeCount; // List of execution resources representing subscribed threads List m_threadSubscriptions; // Used to determine whether statistical rebalancing is used for this scheduler proxy. bool m_fDoHillClimbing; }; #pragma warning(pop) } // namespace details } // namespace Concurrency