// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SchedulerProxy.cpp
//
// RM proxy for a scheduler instance
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
#include "concrtinternal.h"
namespace Concurrency
{
namespace details
{
#pragma warning (push)
#pragma warning (disable : 4702)
///
/// Constructs a scheduler proxy.
///
SchedulerProxy::SchedulerProxy(IScheduler * pScheduler, ResourceManager * pResourceManager, const SchedulerPolicy &policy)
: m_pThreadProxyFactory(nullptr)
, m_pResourceManager(pResourceManager)
, m_pHillClimbing(nullptr)
, m_staticData{}
, m_queueLength(0)
, m_currentConcurrency(0)
, m_numAllocatedCores(0)
, m_numBorrowedCores(0)
, m_numFixedCores(0)
, m_numAssignedThreads(0)
, m_numExternalThreads(0)
, m_numExternalThreadCores(0)
{
ASSERT(pScheduler != NULL);
m_pScheduler = pScheduler;
m_maxConcurrency = policy.GetPolicyValue(::Concurrency::MaxConcurrency);
m_minConcurrency = policy.GetPolicyValue(::Concurrency::MinConcurrency);
m_targetOversubscriptionFactor = policy.GetPolicyValue(::Concurrency::TargetOversubscriptionFactor);
m_contextStackSize = policy.GetPolicyValue(::Concurrency::ContextStackSize);
m_contextPriority = policy.GetPolicyValue(::Concurrency::ContextPriority);
m_fDoHillClimbing = policy.GetPolicyValue(::Concurrency::DynamicProgressFeedback) == ::Concurrency::ProgressFeedbackEnabled;
if (m_contextPriority == INHERIT_THREAD_PRIORITY)
{
m_contextPriority = (char) platform::__GetThreadPriority(GetCurrentThread());
}
m_id = m_pScheduler->GetId();
ASSERT(m_id != -1);
unsigned int coreCount = m_pResourceManager->GetCoreCount();
m_coreCount = coreCount;
ASSERT(coreCount > 0 && coreCount <= INT_MAX);
ASSERT(m_maxConcurrency > 0 && m_maxConcurrency >= m_minConcurrency);
unsigned int originalTof = m_targetOversubscriptionFactor;
// Find the minimum target oversubscription factor required to satisfy MaxConcurrency with the cores available.
unsigned int minTof = (m_maxConcurrency + coreCount - 1)/coreCount;
if (originalTof < minTof)
{
// Adjust target oversubscription factor to ensure that we can satisfy MaxConcurrency with the cores on the system.
m_targetOversubscriptionFactor = minTof;
// The scheduler needs all the cores on the machine to satisfy max threads. Moreover we will need to oversubscribe
// more than the user indicated.
m_desiredHardwareThreads = coreCount;
}
else
{
m_desiredHardwareThreads = (m_maxConcurrency + originalTof - 1)/originalTof;
}
// Now adjust target oversubscription factor to ensure that MaxConcurrency virtual processors are evenly distributed
// over the desired number of hardware threads (i.e each core gets either m_tof vprocs or m_tof - 1 vprocs). Also
// calculate how many of the assigned cores will get m_tof vprocs.
if ((m_maxConcurrency % m_desiredHardwareThreads) == 0)
{
// This is the common case. We have a simple distribution and every allocated core will get tof vprocs.
m_targetOversubscriptionFactor = m_maxConcurrency/m_desiredHardwareThreads;
m_numFullySubscribedCores = m_desiredHardwareThreads;
m_minimumHardwareThreads = (m_minConcurrency + m_targetOversubscriptionFactor - 1)/m_targetOversubscriptionFactor;
}
else
{
// We have an uneven distribution; some cores will get tof vprocs and some will get tof - 1.
ASSERT(m_targetOversubscriptionFactor > 1);
m_targetOversubscriptionFactor = (m_maxConcurrency + m_desiredHardwareThreads - 1)/m_desiredHardwareThreads;
m_numFullySubscribedCores = m_desiredHardwareThreads - ((m_desiredHardwareThreads * m_targetOversubscriptionFactor) - m_maxConcurrency);
// Calculate min hardware threads. We need to make sure that given the way vprocs are distributed to cores
// (where some cores could get tof vprocs and some could get tof - 1 vprocs), the scheduler proxy will never go below
// min concurrency if it is left with just the minimum number of cores (and all of those cores happen to have tof -1
// vprocs assigned to them).
if (((m_desiredHardwareThreads - m_numFullySubscribedCores) * (m_targetOversubscriptionFactor - 1)) >= m_minConcurrency)
{
m_minimumHardwareThreads = (m_minConcurrency + m_targetOversubscriptionFactor - 2)/(m_targetOversubscriptionFactor - 1);
}
else
{
m_minimumHardwareThreads = (m_desiredHardwareThreads - m_numFullySubscribedCores);
unsigned int remainingThreads = (m_minConcurrency - (m_minimumHardwareThreads * (m_targetOversubscriptionFactor - 1)));
ASSERT(remainingThreads < m_minConcurrency);
m_minimumHardwareThreads += (remainingThreads + m_targetOversubscriptionFactor - 1)/m_targetOversubscriptionFactor;
}
}
ASSERT(m_maxConcurrency <= m_targetOversubscriptionFactor * m_desiredHardwareThreads);
ASSERT(m_numFullySubscribedCores <= m_desiredHardwareThreads);
ASSERT(m_targetOversubscriptionFactor > 1 || m_numFullySubscribedCores == m_desiredHardwareThreads);
ASSERT(m_targetOversubscriptionFactor > 0 && m_targetOversubscriptionFactor <= INT_MAX);
ASSERT(m_desiredHardwareThreads > 0 && m_desiredHardwareThreads <= coreCount);
ASSERT(m_desiredHardwareThreads > 0 && m_minimumHardwareThreads <= m_desiredHardwareThreads);
// Hold a reference to the resource manager.
int ref = m_pResourceManager->Reference();
(ref);
CONCRT_COREASSERT(ref > 1);
if (m_fDoHillClimbing)
{
m_pHillClimbing = _concrt_new HillClimbing(m_id, coreCount, this);
}
m_nodeCount = GetProcessorNodeCount();
// The allocated nodes structure is created when the first allocation is made for this scheduler proxy. We need to read global core
// state during this allocation, and therefore we need to perform it while holding the RM lock.
m_pAllocatedNodes = NULL;
m_pSortedNodeOrder = _concrt_new unsigned int[m_nodeCount];
for (unsigned int i = 0; i < m_nodeCount; ++i)
{
m_pSortedNodeOrder[i] = i;
}
#if defined(CONCRT_TRACING)
m_drmInitialState = NULL;
#endif
}
#pragma warning (pop)
///
/// Called by a scheduler in order make an initial request for an allocation of virtual processors. The request
/// is driven by policies within the scheduler queried via the IScheduler::GetPolicy method. If the request
/// can be satisfied via the rules of allocation, it is communicated to the scheduler as a call to
/// IScheduler::AddVirtualProcessors.
///
///
/// Whether to subscribe the current thread and account for it during resource allocation.
///
///
/// The IExecutionResource instance representing current thread if doSubscribeCurrentThread was true; NULL otherwise.
///
IExecutionResource * SchedulerProxy::RequestInitialVirtualProcessors(bool doSubscribeCurrentThread)
{
return m_pResourceManager->RequestInitialVirtualProcessors(this, doSubscribeCurrentThread);
}
///
/// Called in order to notify the resource manager that the given scheduler is shutting down. This
/// will cause the resource manager to immediately reclaim all resources granted to the scheduler.
///
void SchedulerProxy::Shutdown()
{
m_pResourceManager->Shutdown(this);
}
///
/// Gets a new thread proxy from the factory.
///
IThreadProxy * SchedulerProxy::GetNewThreadProxy(IExecutionContext * pContext)
{
if (m_pThreadProxyFactory == NULL)
{
// Populate the cached pointer from the one in the RM
m_pThreadProxyFactory = GetResourceManager()->GetThreadProxyFactoryManager()->GetFreeThreadProxyFactory();
}
FreeThreadProxy * pProxy = static_cast(m_pThreadProxyFactory->RequestProxy(ContextStackSize(), ContextPriority()));
pProxy->AssociateExecutionContext(pContext);
return pProxy;
}
///
/// Ensures that a context is bound to a thread proxy. This API should *NOT* be called in the vast majority of circumstances.
/// The IThreadProxy::SwitchTo will perform late binding to thread proxies as necessary. There are, however, circumstances
/// where it is necessary to pre-bind a context to ensure that the SwitchTo operation switches to an already bound context. This
/// is the case on a UMS scheduling context as it cannot call allocation APIs.
///
///
/// The context to bind.
///
void SchedulerProxy::BindContext(IExecutionContext * pContext)
{
if (pContext == NULL)
{
throw std::invalid_argument("pContext");
}
// Find out if this context already has a thread proxy, if not we have to request one from the factory.
if (pContext->GetProxy() == NULL)
{
// Find a thread proxy from the pool that corresponds to the stack size and priority we need.
GetNewThreadProxy(pContext);
}
}
///
/// Returns an **unstarted** thread proxy attached to pContext, to the thread proxy factory.
/// Such a thread proxy **must** be unstarted.
/// This API should *NOT* be called in the vast majority of circumstances.
///
///
/// The context to unbind.
///
void SchedulerProxy::UnbindContext(IExecutionContext * pContext)
{
if (pContext == NULL)
{
throw std::invalid_argument("pContext");
}
FreeThreadProxy * pProxy = static_cast (pContext->GetProxy());
ASSERT(pProxy != NULL);
pProxy->ReturnIdleProxy();
}
///
/// This function retrieves the execution resource associated with this thread, if one exists
///
///
/// The ExecutionResource instance representing current thread in the runtime.
///
ExecutionResource * SchedulerProxy::GetCurrentThreadExecutionResource()
{
ExecutionResource * pExecutionResource = NULL;
DWORD tlsSlot = GetResourceManager()->GetExecutionResourceTls();
void * tlsPointer = platform::__TlsGetValue(tlsSlot);
size_t tlsValue = (size_t) tlsPointer;
if ((tlsPointer != NULL) && ((tlsValue & TlsResourceBitMask) == TlsResourceInResource))
{
pExecutionResource = (ExecutionResource *) tlsValue;
}
return pExecutionResource;
}
///
/// This function retrieves the execution resource associated with this thread, if one exists,
/// and updates the reference count on it for better bookkeeping.
///
///
/// The ExecutionResource instance representing current thread in the runtime.
///
ExecutionResource * SchedulerProxy::ReferenceCurrentThreadExecutionResource()
{
ExecutionResource * pExecutionResource = NULL;
DWORD tlsSlot = GetResourceManager()->GetExecutionResourceTls();
void * tlsPointer = platform::__TlsGetValue(tlsSlot);
if (tlsPointer != NULL)
{
size_t tlsValue = (size_t) tlsPointer;
if ((tlsValue & TlsResourceBitMask) == TlsResourceInResource)
{
// The current thread was previously subscribed with the RM.
pExecutionResource = (ExecutionResource *) tlsValue;
VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
// If this is a nested subscribe call then if there was a virtual processor root,
// it could not have been removed, because it would have been marked as "fixed".
ASSERT(pVPRoot == NULL || !pVPRoot->IsRootRemoved());
pExecutionResource->IncrementUseCounts();
}
else if ((tlsValue & TlsResourceBitMask) == TlsResourceInProxy)
{
// The current thread is a thread proxy.
FreeThreadProxy * pThreadProxy = (FreeThreadProxy *) (((size_t) tlsValue) & ~TlsResourceInProxy);
pExecutionResource = pThreadProxy->GetVirtualProcessorRoot()->GetExecutionResource();
VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
if (pVPRoot != NULL && pVPRoot->IsRootRemoved())
{
// The virtual processor root that this thread is running on has been removed. We have to
// create a new execution resource abstraction for the current thread and perform an external
// thread allocation for this scheduler proxy.
pExecutionResource = NULL;
}
else
{
pExecutionResource->IncrementUseCounts();
}
}
}
if (pExecutionResource != NULL)
{
return GetResourceForNewSubscription(pExecutionResource);
}
return pExecutionResource;
}
///
/// Creates or reuses an execution resource for the thread subscription
///
ExecutionResource * SchedulerProxy::GetResourceForNewSubscription(ExecutionResource * pParentExecutionResource)
{
ExecutionResource * pExecutionResource = NULL;
if (pParentExecutionResource->GetSchedulerProxy() != this)
{
pExecutionResource = _concrt_new ExecutionResource(this, pParentExecutionResource);
pExecutionResource->IncrementUseCounts();
}
else
{
pExecutionResource = pParentExecutionResource;
}
return pExecutionResource;
}
///
/// Registers that a call to SubscribeCurrentThread has occurred for this core, making this core immovable.
///
void SchedulerProxy::IncrementFixedCoreCount(unsigned int nodeId, unsigned int coreIndex, bool isExternalThread)
{
SchedulerCore * pCore = &m_pAllocatedNodes[nodeId].m_pCores[coreIndex];
if (pCore->m_numFixedThreads++ == 0)
{
SchedulerNode * pNode = &m_pAllocatedNodes[nodeId];
pNode->m_numFixedCores++;
m_numFixedCores++;
if (pCore->IsBorrowed())
{
// When a core becomes fixed, we temporarily remove the borrowed flag on it, and restore it when it
// becomes movable again.
pCore->m_fPreviouslyBorrowed = true;
ToggleBorrowedState(pNode, coreIndex);
}
// If this core has no virtual processors on it, count it as a core exclusively dedicated to external threads.
if (isExternalThread && m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_numAssignedThreads == 0)
{
++m_numExternalThreadCores;
}
}
// Increment the external thread count on the core, which helps account for all the resources running on that core.
if (isExternalThread)
{
m_numExternalThreads++;
pCore->m_numExternalThreads++;
}
}
///
/// Registers that a call to IExecutionResource::Release has occurred, potentially freeing this core.
///
void SchedulerProxy::DecrementFixedCoreCount(unsigned int nodeId, unsigned int coreIndex, bool isExternalThread)
{
SchedulerCore * pCore = &m_pAllocatedNodes[nodeId].m_pCores[coreIndex];
// Decrement external thread count on the core which helps account for all the resources running on that core.
if (isExternalThread)
{
ASSERT(pCore->m_numExternalThreads > 0);
pCore->m_numExternalThreads--;
m_numExternalThreads--;
}
ASSERT(pCore->m_numFixedThreads > 0);
if (--pCore->m_numFixedThreads == 0)
{
SchedulerNode * pNode = &m_pAllocatedNodes[nodeId];
ASSERT(pCore->m_numExternalThreads == 0);
m_numFixedCores--;
pNode->m_numFixedCores--;
if (pCore->m_fPreviouslyBorrowed)
{
// If this was a borrowed core convereted to fixed due to a subscription request, we restore the state
// back to borrowed, here.
ASSERT(!pCore->IsBorrowed());
ToggleBorrowedState(pNode, coreIndex);
pCore->m_fPreviouslyBorrowed = false;
}
// If this core was owned only due to an external thread being on it, then there is
// no more reason for it to be marked as such.
if (isExternalThread && m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_numAssignedThreads == 0)
{
m_numExternalThreadCores--;
}
}
}
///
/// This API registers the current thread with the resource manager associating it with this scheduler proxy,
/// and returns an instance of IExecutionResource back to the scheduler for bookkeeping and maintenance.
///
///
/// The IExecutionResource instance representing current thread in the runtime.
///
IExecutionResource * SchedulerProxy::SubscribeCurrentThread()
{
return m_pResourceManager->SubscribeCurrentThread(this);
}
///
/// Creates a new execution resource for the external thread and registers it with the scheduler proxy.
///
ExecutionResource * SchedulerProxy::CreateExternalThreadResource(SchedulerNode * pNode, unsigned int coreIndex)
{
ExecutionResource * pExecutionResource = _concrt_new ExecutionResource(this, pNode, coreIndex);
pExecutionResource->IncrementUseCounts();
return pExecutionResource;
}
///
/// Adds the execution resource to the list of subscribed threads
///
void SchedulerProxy::AddThreadSubscription(ExecutionResource * pExecutionResource)
{
m_threadSubscriptions.AddTail(pExecutionResource);
}
///
/// Removes the execution resource from the list of subscribed threads
///
void SchedulerProxy::RemoveThreadSubscription(ExecutionResource * pExecutionResource)
{
m_threadSubscriptions.Remove(pExecutionResource);
delete pExecutionResource;
}
///
/// Called by the RM when it is done reserving cores for the scheduler proxy. The scheduler proxy allocates virtual processors
/// or standalone execution resources based on the cores that were reserved for it.
///
ExecutionResource * SchedulerProxy::GrantAllocation(unsigned int numberReserved, bool fInitialAllocation, bool fSubscribeCurrentThread)
{
ASSERT(m_numAllocatedCores == 0 || !fInitialAllocation);
ASSERT(m_numExternalThreads == 0 || !fInitialAllocation);
// The scheduler proxy's allocated node map contains 'numberReserved' cores that the RM has reserved in order to
// satisfy the proxy's request based on its request and the availability of resources. The cores are marked with
// ProcessorCore::Reserved, and will be converted to ProcessorCore::Allocated here.
// Note that 'numberReserved' could have the value 0, if this is an allocation for an external thread. In this case, depending
// on whether the scheduler has more than its minimum, we will either oversubscribe a core, or remove virtual processors
// assigned to a core in order to accommodate the external thread.
unsigned int reservationsAllocated = 0;
ExecutionResource * pExecutionResource = NULL;
ASSERT(!fInitialAllocation || m_minimumHardwareThreads == MinHWThreads());
// Calculate the number of virtual processors we will give this scheduler based on the core allocation
// we received. Each core will be allocated either m_tof vprocs or m_tof - 1 vprocs, based on the
// desired hardware threads and the value for max concurrency.
// The current thread subscription we are about to make does not contribute to MinHWThreads() at present. The external thread
// gets an exclusive core, if the remaining cores, allocated and reserved, can satisfy at least 1 more then the current minimum.
// Note that 'externalThreadCore' can be 1 even if no cores were reserved -> in this case we will have to remove vprocs from an allocated
// core and use it exclusively for the external thread.
unsigned int externalThreadCores = fSubscribeCurrentThread ? (m_numAllocatedCores + numberReserved > MinHWThreads() ? 1 : 0) : 0;
unsigned int vprocCores = (numberReserved > externalThreadCores) ? (numberReserved - externalThreadCores) : 0;
bool fRemoveVProcs = (externalThreadCores > 0 && numberReserved == 0);
bool fShareExternalThreadCore = (fSubscribeCurrentThread && externalThreadCores == 0);
// These variables are used if a thread subscription is part of this allocation. For a thread subscription assignment to a core,
// there are 3 possibilities:
// 1. A Reserved core exists in the current allocation map exclusive for the thread subscription.
// 2. The thread subscription will share a core with virtual processors.
// 3. An existing allocated core will be assigned to the external thread after removing all vprocs that are currently allocated to it.
unsigned int externalThreadUseCount = (unsigned int) -1;
unsigned int externalThreadCoreIndex = (unsigned int) -1;
SchedulerNode * pExternalThreadNode = NULL;
unsigned int currentNodeIndex = fSubscribeCurrentThread ? m_pResourceManager->GetCurrentNodeAndCore(NULL) : (unsigned int) -1;
ASSERT(!fRemoveVProcs || (m_numAllocatedCores > MinHWThreads()));
unsigned int vprocCount = 0;
if (vprocCores > 0)
{
ASSERT(m_numFullySubscribedCores > 0 && m_numFullySubscribedCores <= m_desiredHardwareThreads);
if (vprocCores <= m_numFullySubscribedCores)
{
vprocCount = vprocCores * m_targetOversubscriptionFactor;
}
else
{
vprocCount = (m_numFullySubscribedCores * m_targetOversubscriptionFactor) +
((vprocCores - m_numFullySubscribedCores) * (m_targetOversubscriptionFactor - 1));
}
}
ASSERT(!fInitialAllocation || (vprocCount >= m_minConcurrency && vprocCount <= m_maxConcurrency));
IVirtualProcessorRoot** vprocArray = (vprocCount > 0) ? _concrt_new IVirtualProcessorRoot *[vprocCount] : NULL;
unsigned int vprocIndex = 0;
bool externalThreadCoreFound= !fSubscribeCurrentThread;
// We may not have a core reserved for the external thread, so we should loop until the external thread is assigned to an
// existing core, if thread subscription is requested.
for (unsigned int nodeIndex = 0; (reservationsAllocated < numberReserved || !externalThreadCoreFound) &&
nodeIndex < m_nodeCount; ++nodeIndex)
{
// If the core is marked Reserved, we will either assign to it virtual processors, the external thread or both. Whether or not
// the external thread shares the core with virtual processors depends on the value of fShareExternalThreadCore.
// If we find any cores marked Allocated, it implies that this is not the initial allocation, and all we're looking to do here
// is assign a core to the external thread. The external thread could either share a core with vprocs or displace vprocs, depending
// on the value of fRemoveVProcs.
SchedulerNode * pNode = &m_pAllocatedNodes[nodeIndex];
if (pNode->m_reservedCores > 0 || pNode->m_allocatedCores > 0)
{
for(unsigned int coreIndex = 0; (reservationsAllocated < numberReserved || !externalThreadCoreFound) &&
coreIndex < pNode->m_coreCount; ++coreIndex)
{
SchedulerCore * pCore = &pNode->m_pCores[coreIndex];
if (pCore->m_coreState == ProcessorCore::Reserved)
{
bool assignExternalThread = (!externalThreadCoreFound && (reservationsAllocated == numberReserved - 1 || currentNodeIndex == nodeIndex));
bool assignVProcs = (!assignExternalThread || externalThreadCores == 0);
ASSERT(pCore->m_numAssignedThreads == 0 && pCore->m_numFixedThreads == 0);
pCore->m_coreState = ProcessorCore::Allocated;
ASSERT(pNode->m_allocatedCores < pNode->m_coreCount);
++pNode->m_allocatedCores;
++m_numAllocatedCores;
// If the external thread also needs a core, first try to put it in a node whose affinity is a superset of the hardware thread
// it is currently running on. If not, reaffinitize it.
if (assignExternalThread)
{
// The execution resource is created right before returning from the function.
pExternalThreadNode = pNode;
externalThreadCoreIndex = coreIndex;
externalThreadCoreFound = true;
}
if (assignVProcs)
{
ASSERT(!assignExternalThread || fShareExternalThreadCore);
// Create virtual processor roots in the scheduler proxy, corresponding to the node and core we're currently looking at.
unsigned int numVprocs = 0;
if (m_numFullySubscribedCores > 0)
{
numVprocs = m_targetOversubscriptionFactor;
// As we assign m_tof threads to a core, we decrement this value. This value is also updated in
// AddCore and RemoveCore. After the scheduler proxy has been given its initial allocation
// or resources, this variable keeps track of how many out of the remaining quota of cores the
// scheduler proxy could acquire (desired - allocated) would get tof threads per core if they
// were added to the scheduler during dynamic core migration.
--m_numFullySubscribedCores;
}
else
{
numVprocs = m_targetOversubscriptionFactor - 1;
}
pCore->m_numAssignedThreads += numVprocs;
m_numAssignedThreads += numVprocs;
while (numVprocs-- > 0)
{
_Analysis_assume_(vprocIndex < vprocCount);
vprocArray[vprocIndex++] = CreateVirtualProcessorRoot(pNode, coreIndex);
}
ASSERT(vprocIndex <= vprocCount);
}
++reservationsAllocated;
}
else if (pCore->m_coreState == ProcessorCore::Allocated)
{
// If we encounter allocated cores, this is a subsequent allocation for an external core. Determine if the external
// thread should share a core with existing vprocs and external threads, or displace some vprocs to get a core to itself.
// Walk through all the allocated cores to find the right one to either oversubscribe or displace. For over
// subscription find the core with the least number of vprocs + external thread assigned to it (favouring the node
// where the current thread is running if there is more than one such core).
// For displacement, we need to find an unfixed core, favouring the node where the current thread is running.
if (fShareExternalThreadCore)
{
ASSERT(!fRemoveVProcs && externalThreadCores == 0);
unsigned int useCount = pCore->m_numAssignedThreads + pCore->m_numExternalThreads;
if (useCount < externalThreadUseCount || (useCount == externalThreadUseCount && nodeIndex == currentNodeIndex))
{
externalThreadUseCount = useCount;
pExternalThreadNode = pNode;
externalThreadCoreIndex = coreIndex;
// We don't set externalThreadCoreFound here, since we want to examine all allocated cores.
}
}
else if (fRemoveVProcs)
{
ASSERT(externalThreadCores == 1);
if (!pCore->IsFixed() && (pExternalThreadNode == NULL || nodeIndex == currentNodeIndex))
{
pExternalThreadNode = pNode;
externalThreadCoreIndex = coreIndex;
if (nodeIndex == currentNodeIndex)
{
// Stop looking if we find an unfixed core on the current node.
externalThreadCoreFound = true;
}
}
}
}
else
{
ASSERT(pCore->m_coreState == ProcessorCore::Unassigned);
}
}
pNode->m_reservedCores = 0;
}
}
ASSERT(vprocIndex == vprocCount);
if (vprocCount > 0)
{
AddVirtualProcessorRoots(vprocArray, vprocCount);
delete [] vprocArray;
}
if (fSubscribeCurrentThread)
{
ASSERT(pExternalThreadNode != NULL && externalThreadCoreIndex != (unsigned int) -1);
if (fShareExternalThreadCore)
{
ASSERT(externalThreadCores == 0);
}
else if (fRemoveVProcs)
{
ASSERT(externalThreadCores == 1);
ASSERT(m_numAllocatedCores > MinHWThreads());
// Remove the core and replace with an external thread subscription. Note that the use count for this core
// stays the same, as we simply replace virtual processors with a thread subscription.
RemoveCore(pExternalThreadNode, externalThreadCoreIndex);
pExternalThreadNode->m_pCores[externalThreadCoreIndex].m_coreState = ProcessorCore::Allocated;
ASSERT(pExternalThreadNode->m_allocatedCores < pExternalThreadNode->m_coreCount);
++pExternalThreadNode->m_allocatedCores;
++m_numAllocatedCores;
}
else
{
ASSERT(externalThreadCores == 1);
}
pExecutionResource = CreateExternalThreadResource(pExternalThreadNode, externalThreadCoreIndex);
}
#if defined(CONCRT_TRACING)
m_numTotalCores = m_nodeCount * m_pAllocatedNodes[0].m_coreCount;
m_drmInitialState = _concrt_new SchedulerCoreData[m_numTotalCores];
memset(m_drmInitialState, 0, sizeof(SchedulerCoreData) * m_numTotalCores);
#endif
ASSERT(m_numAllocatedCores >= MinHWThreads() && m_numAllocatedCores <= DesiredHWThreads());
return pExecutionResource;
}
///
/// Causes the resource manager to create a new virtual processor root running atop the same hardware thread as this
/// execution resource. Typically, this is used when a scheduler wishes to oversubscribe a particular hardware thread
/// for a limited amount of time.
///
///
/// The execution resource abstraction on which to oversubscribe.
///
///
/// A new virtual processor root running atop the same hardware thread as this execution resource.
///
IVirtualProcessorRoot * SchedulerProxy::CreateOversubscriber(IExecutionResource * pExecutionResource)
{
// The scheduler proxy on the virtual processor root has to match 'this'
VirtualProcessorRoot * pOversubscribedRoot = NULL;
ExecutionResource * pResource = dynamic_cast(pExecutionResource);
bool isVprocRoot = false;
// If dynamic cast failed then we must have a virtual processor root.
if (pResource == NULL)
{
pResource = static_cast(pExecutionResource)->GetExecutionResource();
isVprocRoot = true;
}
// Cannot verify the scheduler proxy for external threads because they can "live" on
// multiple schedulers at the same time (nested).
if (isVprocRoot && pResource->GetSchedulerProxy() != this)
{
throw std::invalid_argument("pExecutionResource");
}
// Synchronize with other concurrent calls that are adding/removing virtual processor roots.
{
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
// Use the scheduler proxy to clone this virtual processor root.
SchedulerNode * pNode = &m_pAllocatedNodes[pResource->GetNodeId()];
unsigned int coreIndex = pResource->GetCoreIndex();
pOversubscribedRoot = CreateVirtualProcessorRoot(pNode, coreIndex);
// We mark these vproc roots as oversubscribed to indicate that they do not contribute
// towards concurrency levels bounded by the policy
pOversubscribedRoot->MarkAsOversubscribed();
pNode->m_pCores[coreIndex].m_resources.AddTail(pOversubscribedRoot->GetExecutionResource());
}
return pOversubscribedRoot;
}
///
/// Creates a virtual processor root and adds it to the scheduler proxys list of roots.
///
VirtualProcessorRoot * SchedulerProxy::CreateVirtualProcessorRoot(SchedulerNode * pNode, unsigned int coreIndex)
{
return _concrt_new FreeVirtualProcessorRoot(this, pNode, coreIndex);
}
///
/// Notifies the scheduler associated with this proxy to adds the virtual processor roots provided.
/// Called by the RM during initial allocation and dynamic core migration.
///
void SchedulerProxy::AddVirtualProcessorRoots(IVirtualProcessorRoot ** vprocRoots, unsigned int count)
{
// Note, that we are holding the global RM allocation lock when this API is called.
{
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
for (unsigned int i = 0; i < count; ++i)
{
VirtualProcessorRoot * pRoot = static_cast(vprocRoots[i]);
// Add the resources associated with the roots to the corresponding lists in the scheduler proxy.
unsigned int nodeId = pRoot->GetNodeId();
unsigned int coreIndex = pRoot->GetCoreIndex();
m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_resources.AddTail(pRoot->GetExecutionResource());
}
m_pScheduler->AddVirtualProcessors((IVirtualProcessorRoot **) vprocRoots, count);
m_currentConcurrency += count;
}
}
///
/// Adds an execution resource to the list of resources that run on a particular core.
///
void SchedulerProxy::AddExecutionResource(ExecutionResource * pExecutionResource)
{
{
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
// Add the resource to the corresponding list in the scheduler proxy.
unsigned int nodeId = pExecutionResource->GetNodeId();
unsigned int coreIndex = pExecutionResource->GetCoreIndex();
m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_resources.AddTail(pExecutionResource);
}
}
///
/// Toggles the state on a core from borrowed to owned (and vice versa), and updates necessary counts.
///
void SchedulerProxy::ToggleBorrowedState(SchedulerNode * pNode, unsigned int coreIndex)
{
SchedulerCore * pCore = &pNode->m_pCores[coreIndex];
if (pCore->m_fBorrowed)
{
--m_numBorrowedCores;
--pNode->m_numBorrowedCores;
pCore->m_fBorrowed = false;
}
else
{
++m_numBorrowedCores;
++pNode->m_numBorrowedCores;
pCore->m_fBorrowed = true;
}
}
///
/// Adds an appropriate number of virtual processor roots to the scheduler associated with this proxy.
/// Called by the RM during core migration when the RM decides to give this scheduler an additional
/// core.
///
void SchedulerProxy::AddCore(SchedulerNode * pNode, unsigned int coreIndex, bool fBorrowed)
{
// Note, that we are holding the global RM allocation lock when this API is called.
// Decide how many virtual processors to give the scheduler on this core. Note that this value is required
// to be either m_tof or m_tof - 1.
unsigned int numThreads = 0;
if (m_numFullySubscribedCores > 0)
{
numThreads = m_targetOversubscriptionFactor;
--m_numFullySubscribedCores;
}
else
{
numThreads = m_targetOversubscriptionFactor - 1;
}
ASSERT(numThreads > 0 && numThreads <= INT_MAX);
ASSERT(pNode->m_allocatedCores < pNode->m_coreCount);
++pNode->m_allocatedCores;
ASSERT(m_numAllocatedCores < DesiredHWThreads());
++m_numAllocatedCores;
SchedulerCore * pCore = &pNode->m_pCores[coreIndex];
ASSERT(pCore->m_coreState == ProcessorCore::Unassigned);
pCore->m_coreState = ProcessorCore::Allocated;
ASSERT(pCore->m_numAssignedThreads == 0);
pCore->m_numAssignedThreads = numThreads;
m_numAssignedThreads += pCore->m_numAssignedThreads;
ASSERT(m_numAssignedThreads <= m_maxConcurrency);
if (fBorrowed)
{
ASSERT(!pCore->IsBorrowed());
ToggleBorrowedState(pNode, coreIndex);
}
// Special case for when there is 1 vproc per core - this is likely to be the common case.
IVirtualProcessorRoot * pRoot;
IVirtualProcessorRoot ** pRootArray = (numThreads == 1) ? &pRoot : _concrt_new IVirtualProcessorRoot *[numThreads];
for (unsigned int i = 0; i < numThreads; ++i)
{
pRootArray[i] = CreateVirtualProcessorRoot(pNode, coreIndex);
}
AddVirtualProcessorRoots(pRootArray, numThreads);
if (pRootArray != &pRoot)
{
delete [] pRootArray;
}
}
///
/// Notifies the scheduler associated with this proxy to remove the virtual processor roots associated
/// with the core provided. Called by the RM during core migration.
///
void SchedulerProxy::RemoveCore(SchedulerNode * pNode, unsigned int coreIndex)
{
// Note, that we are holding the global RM allocation lock when this API is called.
ASSERT(pNode->m_allocatedCores > 0 && pNode->m_allocatedCores <= pNode->m_coreCount);
--pNode->m_allocatedCores;
ASSERT(m_numAllocatedCores > MinVprocHWThreads());
--m_numAllocatedCores;
SchedulerCore * pCore = &pNode->m_pCores[coreIndex];
ASSERT(pCore->m_coreState == ProcessorCore::Allocated || pCore->m_coreState == ProcessorCore::Stolen);
pCore->m_coreState = ProcessorCore::Unassigned;
ASSERT(pCore->m_numAssignedThreads == m_targetOversubscriptionFactor ||
pCore->m_numAssignedThreads == m_targetOversubscriptionFactor - 1);
if (pCore->m_numAssignedThreads == m_targetOversubscriptionFactor)
{
++m_numFullySubscribedCores;
}
m_numAssignedThreads -= pCore->m_numAssignedThreads;
ASSERT(m_numAssignedThreads >= m_minConcurrency && m_numAssignedThreads < m_maxConcurrency);
pCore->m_numAssignedThreads = 0;
if (pCore->m_fBorrowed)
{
ToggleBorrowedState(pNode, coreIndex);
}
pCore->m_fIdleDuringDRM = false;
ASSERT(GetNumOwnedCores() >= MinHWThreads());
// A lock is required around the iteration of nodes and the call to AddVirtualProcessors to synchronize with concurrent
// calls to DestroyVirtualProcessorRoot, which removes roots from the array and deletes them.
{ // begin locked region
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
ExecutionResource * pExecutionResource = pCore->m_resources.First();
while (pExecutionResource != NULL)
{
// Remember the next root before hand, since a IVirtualProcessorRoot::Remove call could happen inline
// for the root we're removing, and by the time we get back, that root could be deleted.
ExecutionResource * pNextExecutionResource = pCore->m_resources.Next(pExecutionResource);
VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
if (pVPRoot != NULL && !pVPRoot->IsRootRemoved())
{
pVPRoot->MarkRootRemoved();
IVirtualProcessorRoot * pIRoot = pVPRoot;
m_pScheduler->RemoveVirtualProcessors(&pIRoot, 1);
}
pExecutionResource = pNextExecutionResource;
}
} // end locked region
}
///
/// Called by the RM to instruct this scheduler proxy to notify its scheduler that this core is now
/// externally busy or externally idle.
///
void SchedulerProxy::SendCoreNotification(SchedulerCore * pCore, bool isBusyNotification)
{
// Avoid a memory allocation under two locks if we have less than 8 roots per core - this is expected to be
// the common case.
IVirtualProcessorRoot * pRootArray[8];
IVirtualProcessorRoot ** pRoots= NULL;
#pragma warning(push)
#pragma warning(disable: 6385 6386) // TRANSITION, VSO-1807030
// Note, that we are holding the global RM allocation lock when this API is called.
{ // begin locked region
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
unsigned int numThreadsIndex = 0;
if (pCore->m_resources.Count() > 8)
{
pRoots = _concrt_new IVirtualProcessorRoot * [pCore->m_resources.Count()];
}
else
{
pRoots = pRootArray;
}
ExecutionResource * pExecutionResource = pCore->m_resources.First();
while (pExecutionResource != NULL)
{
ExecutionResource * pNextExecutionResource = pCore->m_resources.Next(pExecutionResource);
VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
if (pVPRoot != NULL && !pVPRoot->IsRootRemoved())
{
pRoots[numThreadsIndex++] = pVPRoot;
}
pExecutionResource = pNextExecutionResource;
}
ASSERT(numThreadsIndex <= (unsigned int) pCore->m_resources.Count());
// Now that the array is populated, send notifications for this core
if (isBusyNotification)
{
m_pScheduler->NotifyResourcesExternallyBusy(pRoots, numThreadsIndex);
}
else
{
m_pScheduler->NotifyResourcesExternallyIdle(pRoots, numThreadsIndex);
}
} // end locked region
#pragma warning(pop)
if (pRoots!= pRootArray)
{
delete [] pRoots;
}
}
///
/// Removes a root from the scheduler proxy and destroys it. This API is called in response to a scheduler
/// informing the RM that it is done with a virtual processor root.
///
void SchedulerProxy::DestroyVirtualProcessorRoot(VirtualProcessorRoot * pRoot)
{
// Synchronize with other concurrent calls that are adding/removing virtual processor roots.
{ // begin locked region
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
SchedulerNode * pNode = &m_pAllocatedNodes[pRoot->GetNodeId()];
ASSERT(pNode->m_id == pRoot->GetNodeId());
// NOTE: This API is called in response to a scheduler being done with a virtual processor root.
// The scheduler is expected not to invoke ISchedulerProxy::Shutdown, which destroys
// all remaining roots in the proxy, until all individual calls for removing virtual processor
// roots have completed.
pNode->m_pCores[pRoot->GetCoreIndex()].m_resources.Remove(pRoot->GetExecutionResource());
if (!pRoot->IsOversubscribed())
{
// Oversubscribed vprocs do not contribute towards concurrency level
ASSERT(m_currentConcurrency > 0);
--m_currentConcurrency;
}
} // end locked region
pRoot->DeleteThis();
}
///
/// Removes an execution resource from the scheduler proxy, and destroys it. This API is called in response to a scheduler
/// informing the RM that it is done with an execution resource.
///
void SchedulerProxy::DestroyExecutionResource(ExecutionResource * pExecutionResource)
{
// NOTE: This function should be called with the RM lock held.
SchedulerNode * pNode = &m_pAllocatedNodes[pExecutionResource->GetNodeId()];
SchedulerCore * pCore = &pNode->m_pCores[pExecutionResource->GetCoreIndex()];
ASSERT(pNode->m_id == pExecutionResource->GetNodeId());
// Mark this core as available to others if this was the last resource on it
// If this is the last running resource on this core then mark it as available again
if (pCore->m_numAssignedThreads + pCore->m_numExternalThreads == 0)
{
// If there are no vprocs or external threads, then core cannot be fixed
ASSERT(!pCore->IsFixed());
ASSERT(pNode->m_allocatedCores > 0 && pNode->m_allocatedCores <= pNode->m_coreCount);
pNode->m_allocatedCores--;
ASSERT(m_numAllocatedCores > MinHWThreads());
pCore->m_coreState = ProcessorCore::Unassigned;
m_numAllocatedCores--;
ASSERT(m_numAllocatedCores <= DesiredHWThreads());
m_pResourceManager->DecrementCoreUseCount(pExecutionResource->GetNodeId(), pExecutionResource->GetCoreIndex());
}
// Synchronize with other concurrent calls that are adding/removing execution resources.
{ // begin locked region
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
pCore->m_resources.Remove(pExecutionResource);
} // end locked region
delete pExecutionResource;
}
///
/// Called to assist dynamic resource management in determining whether cores assigned to schedulers
/// are idle. An idle core is one whose subscription level is 0.
///
void SchedulerProxy::IncrementCoreSubscription(ExecutionResource * pExecutionResource)
{
unsigned int nodeId = pExecutionResource->GetNodeId();
unsigned int coreIndex = pExecutionResource->GetCoreIndex();
if ((InterlockedIncrement(&m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_subscriptionLevel) == 1) &&
(m_pResourceManager->GetNumSchedulersForNotifications() > (ShouldReceiveNotifications() ? 1UL : 0UL)))
{
// We've incremented the local subscription from 0 to 1 -> this may warrant notifications.
// Note that the number of schedulers needing notifications may change right after we read it, but any
// missed notifications will be sent at the next Dynamic RM Poll.
// We simply set the dynamic RM event here. Note -> there may not yet be a dynamic RM thread at this point.
// We clearly have 2 schedulers, but it could be that the second one is just being created. In that case,
// notifications will be sent when the dynamic RM starts up (right after the second scheduler has finished
// receiving all its resources). We may even race with shutdown for the penultimate scheduler. If the DRM
// thread wakes up and there is only one scheduler left, it will go back to waiting.
m_pResourceManager->WakeupDynamicRMWorker();
}
}
///
/// Called to assist dynamic resource management in determining whether cores assigned to schedulers
/// are idle. An idle core is one whose subscription level is 0.
///
void SchedulerProxy::DecrementCoreSubscription(ExecutionResource * pExecutionResource)
{
unsigned int nodeId = pExecutionResource->GetNodeId();
unsigned int coreIndex = pExecutionResource->GetCoreIndex();
if ((InterlockedDecrement(&m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_subscriptionLevel) == 0) &&
(m_pResourceManager->GetNumSchedulersForNotifications() > (ShouldReceiveNotifications() ? 1UL : 0UL)))
{
// We've decremented the local subscription from 1 to 0 -> this may warrant notifications.
// Note that the number of schedulers needing notifications may change right after we read it, but any
// missed notifications will be sent at the next Dynamic RM Poll.
// We simply set the dynamic RM event here. Note -> there may not yet be a dynamic RM thread at this point.
// We clearly have 2 schedulers, but it could be that the second one is just being created. In that case,
// notifications will be sent when the dynamic RM starts up (right after the second scheduler has finished
// receiving all its resources). We may even race with shutdown for the penultimate scheduler. If the DRM
// thread wakes up and there is only one scheduler left, it will go back to waiting.
m_pResourceManager->WakeupDynamicRMWorker();
}
}
///
/// Called to adjust the suggested allocation such that we do not exceed maxConcurrency.
/// This routine takes into account vprocs that are marked for removal but haven't yet been
/// retired by the scheduler. The suggested allocation would be decreased to account for such
/// vprocs.
///
unsigned int SchedulerProxy::AdjustAllocationIncrease(unsigned int suggestedAllocation) const
{
ASSERT(suggestedAllocation >= GetNumAllocatedCores());
ASSERT(suggestedAllocation <= DesiredHWThreads());
// Figure out the max number of new cores we can add
unsigned int newCores = 0;
// Since we could be not holding the scheduler proxy lock the value in m_currentConcurrency could
// be changing. This is fine since a later DRM sweep will migrate appropriate number of cores.
if (m_maxConcurrency > m_currentConcurrency)
{
unsigned int remainingConcurrency = m_maxConcurrency - m_currentConcurrency;
// Convert remaining concurrency to number of cores
unsigned int fullySubscribedConcurrency = m_numFullySubscribedCores * m_targetOversubscriptionFactor;
if (fullySubscribedConcurrency >= remainingConcurrency)
{
newCores = remainingConcurrency / m_targetOversubscriptionFactor;
}
else
{
ASSERT(m_targetOversubscriptionFactor > 1);
newCores = m_numFullySubscribedCores;
newCores += ((remainingConcurrency - fullySubscribedConcurrency) / (m_targetOversubscriptionFactor - 1));
}
}
unsigned int maxAllocation = (GetNumAllocatedCores() + newCores);
// Cores used exclusively by external threads are included in numAllocatedCores. As a result
// maxAllocation could go above desired.
maxAllocation = min(maxAllocation, DesiredHWThreads());
#if defined(CONCRT_TRACING)
if (maxAllocation < suggestedAllocation)
{
TRACE(CONCRT_TRACE_DYNAMIC_RM, L"Scheduler %d: Allocated: %d, Suggested: %d, Adjusted Suggested: %d",
GetId(), GetNumAllocatedCores(), suggestedAllocation, maxAllocation);
}
#endif
return min(maxAllocation, suggestedAllocation);
}
SchedulerProxy::~SchedulerProxy()
{
//
// Clean up anything which might be used during the asynchronous delete.
//
m_pResourceManager->DestroyAllocatedNodeData(m_pAllocatedNodes);
delete [] m_pSortedNodeOrder;
#if defined(CONCRT_TRACING)
delete [] m_drmInitialState;
#endif
//
// Release the reference on the Resource manager
//
m_pResourceManager->Release();
}
///
/// Called to shutdown a scheduler proxy. Derived classes can override shutdown behavior based on this.
///
void SchedulerProxy::FinalShutdown()
{
Cleanup();
DeleteThis();
}
///
/// Cleans up resources associated with the scheduler.
///
void SchedulerProxy::Cleanup()
{
//
// Delete vproc roots that exist in the allocated nodes at this time. The deletion here is a notification. It may happen asynchronously
// depending on the type of scheduler proxy. The data structures maintained for the scheduler proxy cannot go away until the deferred
// deletion happens.
//
for (unsigned int i = 0; i < m_nodeCount; ++i)
{
SchedulerNode * pNode = &m_pAllocatedNodes[i];
for (unsigned int j = 0; j < pNode->m_coreCount; ++j)
{
ExecutionResource * pExecutionResource = pNode->m_pCores[j].m_resources.First();
while (pExecutionResource != NULL)
{
ExecutionResource * pExecutionResourceToDelete = pExecutionResource;
pExecutionResource = pNode->m_pCores[j].m_resources.Next(pExecutionResource);
VirtualProcessorRoot * pVPRoot = pExecutionResourceToDelete->GetVirtualProcessorRoot();
ASSERT(pVPRoot != NULL);
// Since the root is going away, check if it contributes to the subscription count on the core, and
// fix up the count, if so.
pVPRoot->ResetSubscriptionLevel();
pVPRoot->DeleteThis();
}
}
}
delete m_pHillClimbing;
}
#if defined(CONCRT_TRACING)
///
/// Sets or clears a flag indicating that the RM needs to do an external thread allocation for this
/// scheduler proxy.
///
void SchedulerProxy::TraceInitialDRMState()
{
int traceCoreIndex = 0;
for (unsigned int nodeIndex = 0; nodeIndex < m_nodeCount; ++nodeIndex)
{
SchedulerNode * pAllocatedNode = &m_pAllocatedNodes[nodeIndex];
for (unsigned int coreIndex = 0; coreIndex < pAllocatedNode->m_coreCount; ++coreIndex)
{
SchedulerCore * pAllocatedCore = &pAllocatedNode->m_pCores[coreIndex];
SchedulerCoreData * pCoreData = &m_drmInitialState[traceCoreIndex++];
pCoreData->m_nodeIndex = (unsigned char)nodeIndex;
pCoreData->m_coreIndex = (unsigned char)coreIndex;
pCoreData->m_fAllocated = pAllocatedCore->m_coreState == ProcessorCore::Allocated;
pCoreData->m_fFixed = pAllocatedCore->IsFixed();
pCoreData->m_fBorrowed = pAllocatedCore->IsBorrowed();
pCoreData->m_fIdle = pAllocatedCore->IsIdle();
}
}
}
#endif
} // namespace details
} // namespace Concurrency