// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// event.cpp
//
// This file includes two parts: event and _Condition_variable.
// The core implementations of events and _Condition_variable which understand the cooperative nature of
// the scheduler and are designed to be scalable.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
#include "concrtinternal.h"
#pragma warning (disable : 4702)
//
// NOTE: The design of the wait-for-multiple semantic tries to keep the following goals:
//
// - Single event waits (create/wait/set) are very efficient requiring few memory barriers and
// absolutely no shared locking. This is necessary to support utilizing the event for stolen chore
// signaling. Currently, there are N*2+1 memory barriers (where N is the number to grab/release
// a spin-lock).
//
// - Multiple event wait can be supported on the same Event type. There is (as yet) no bifurcation between
// an event that can be used in a WaitForMultiple and one that can't.
//
// - There are (few) shared locks between multiple events.
//
// This leads to a few unfortunate side effects in the implementation described below:
//
// - Each event has a spinlock which now guards its wait chains. No longer do we have a simple CAS loop
// for light-weight events. This will add extra memory barriers in the fast path (5 or 3 depending
// on the lock versus 2 in the prototype implementation).
//
// - Each wait-for-multiple requires a single heap allocation. With normal sized wait lists, this should
// come from the concurrent suballocator.
//
// - Wait-for-multiple on N events requires N spinlock acquisitions although the code is left open to the
// possibility of lock pooling on a granularity to be decided by the scheduler.
namespace Concurrency
{
namespace details
{
//**************************************************************************
// Shared Functionality:
//
// This functionality is shared between agents infrastructure in msvcp for timeouts
// there as well as in the eventing infrastructure here in msvcr for timeouts as well.
// This is an msvcr export to msvcp to keep a shared timer queue throughout ConcRT.
//
//**************************************************************************
///
/// Returns the demand initialized single timer queue used for event timeouts, timer agents, etc...
///
HANDLE GetSharedTimerQueue()
{
// TimerQueue was needed for XP support
return NULL;
}
//**************************************************************************
// Internal Prototypes and Definitions:
//
// These are purely internal to the event implementation are placed here in lieu
// of generally visible headers.
//**************************************************************************
class EventWaitNode;
///
/// Represents a wait block. It is indirectly chained to an event via a Wait*Node.
///
class WaitBlock
{
public:
enum STATE {UNDECIDED, SKIP, DONT_SKIP};
///
/// Wait block constructor
///
WaitBlock() : m_pContext(NULL), m_smSkip_BlockUnblock(UNDECIDED)
{
m_pContext = Context::CurrentContext();
}
///
/// Called when the wait is satisfied (the event is signaled). Note that the derived class may or may
/// not unblock depending on the exact wait semantics.
///
///
/// An indication of whether the event needs to track this node after a signal due to the potential
/// for a reset to impact the overall wait.
///
virtual bool Satisfy(Context **pContextOut, EventWaitNode *pNode) = 0;
///
/// Called when the event is reset. A wait-all may need to adjust counters to prevent the wait from being
/// satisfied.
///
///
/// An indication of whether the wait node is still valid
///
virtual bool Reset() = 0;
///
/// Called when the underlying event is being destroyed / rundown. Allows cleaning up of wait blocks.
///
virtual void Destroy() = 0;
///
/// Called in order to check whether a node is still alive or dead during a sweep of the wait chain.
///
virtual bool SweepWaitNode() = 0;
///
/// Called in order to check whether a node is still alive or dead during a sweep of the reset chain.
///
virtual bool SweepResetNode() = 0;
// The context which this wait must block/unblock.
Context *m_pContext;
// Flag to decide on skipping a pair of block/unblock to avoid unblocking of a context blocked due
// to scoped lock and unblocking it via event's set operation, which is the wrong/mismatched reason for unblocking.
// Further comments in MultiWaitBlock::SingleSatisfy() method.
volatile long m_smSkip_BlockUnblock;
};
///
/// Represents a wait on a single object (with or without a timer).
///
class SingleWaitBlock : public WaitBlock
{
public:
virtual bool Satisfy(Context **pContextOut, EventWaitNode *pNode);
virtual bool Reset();
virtual void Destroy();
virtual bool SweepWaitNode();
virtual bool SweepResetNode();
};
class MultiWaitBlock : public WaitBlock
{
public:
// An indication of which object caused the wait to be satisfied.
EventWaitNode *m_pSatisfiedBy;
// Timer queue timer.
HANDLE m_hTimer;
// The final trigger count.
volatile long m_finalTrigger;
// The number of things pointing at the wait block (wait nodes or timers).
size_t m_waiters;
// When the count reaches the trigger limit, the wait block is satisfied.
volatile size_t m_triggerLimit{};
// The number of signaled objects
volatile size_t m_count;
// The number of completed waiters (main counter of when the block can be freed)
volatile size_t m_completions;
// An indication of whether this wait has a timeout or not. Timeouts are handled by a two stage
// wait (m_count -> m_finalTrigger).
bool m_fHasTimeout;
// A variable that is set if there was a timeout associated with the wait block and the wait timed out.
// It is set either by the timer thread, or in the wait_for_* operation itself if the timeout was 0 and the wait was not satisfied
// when the wait_for_* method was invoked.
volatile bool m_fWaitTimedOut;
///
/// MultiWaitBlock constructor.
///
MultiWaitBlock(size_t waitObjects, bool timeout, bool timer)
: m_pSatisfiedBy(NULL)
, m_hTimer(NULL)
, m_finalTrigger(0)
, m_waiters(waitObjects + static_cast(timer))
, m_count(0)
, m_completions(0)
, m_fHasTimeout(timeout)
, m_fWaitTimedOut(false)
{
}
///
/// Called when a node (or something masquerading as such) is done with its reference on the block.
///
void NotifyCompletedNode();
///
/// Called when a timer on the wait block fires.
///
static void CALLBACK DispatchEventTimer(PTP_CALLBACK_INSTANCE instance, void * pContext, PTP_TIMER timer);
///
/// Same as DispatchEventTimer, only used on WinXP platform.
///
static void CALLBACK DispatchEventTimerXP(PVOID pContext, BOOLEAN timerOrWaitFired);
protected:
virtual void SingleSatisfy(Context **pContextOut, EventWaitNode *pNode);
};
class MultiWaitBlockHolder
{
public:
MultiWaitBlockHolder(bool fWaitAll, size_t count, bool timeout, bool timer);
~MultiWaitBlockHolder();
void Release()
{
m_count++;
}
MultiWaitBlock *GetWaitBlock() const
{
return m_pWaitBlock;
}
EventWaitNode *GetWaitNode(size_t i) const
{
return reinterpret_cast (m_pMemBlock + m_blockSize + m_nodeSize * i);
}
size_t GetIndexOfNode(EventWaitNode *pNode) const
{
return (size_t) (reinterpret_cast (pNode) - (m_pMemBlock + m_blockSize)) / m_nodeSize;
}
private:
size_t m_blockSize;
size_t m_nodeSize;
size_t m_totalBlockSize;
BYTE *m_pMemBlock;
MultiWaitBlock *m_pWaitBlock;
size_t m_count;
size_t m_refs;
};
class WaitAllBlock : public MultiWaitBlock
{
public:
WaitAllBlock(size_t waitObjects, bool timeout, bool timer) : MultiWaitBlock(waitObjects, timeout, timer)
{
m_triggerLimit = waitObjects;
}
virtual bool Satisfy(Context **pContextOut, EventWaitNode *pNode);
virtual bool Reset();
virtual void Destroy();
virtual bool SweepWaitNode();
virtual bool SweepResetNode();
};
class WaitAnyBlock : public MultiWaitBlock
{
public:
WaitAnyBlock(size_t waitObjects, bool timeout, bool timer) : MultiWaitBlock(waitObjects, timeout, timer)
{
m_triggerLimit = 1;
}
virtual bool Satisfy(Context **pContextOut, EventWaitNode *pNode);
virtual bool Reset();
virtual void Destroy();
virtual bool SweepWaitNode();
virtual bool SweepResetNode();
};
///
/// An event wait node represents an abstract wait block which is chained to each event such that when the
/// event is signaled, the wait block is notified and performs the appropriate unblocking (or additional
/// waiting) required.
///
class EventWaitNode
{
public:
EventWaitNode* m_pNext{};
WaitBlock *m_pWaitBlock;
EventWaitNode(WaitBlock *pWaitBlock) noexcept : m_pWaitBlock(pWaitBlock)
{
}
bool Satisfy(Context **pContextOut)
{
return m_pWaitBlock->Satisfy(pContextOut, this);
}
bool Reset()
{
return m_pWaitBlock->Reset();
}
void Destroy()
{
m_pWaitBlock->Destroy();
}
bool SweepWaitNode()
{
return m_pWaitBlock->SweepWaitNode();
}
bool SweepResetNode()
{
return m_pWaitBlock->SweepResetNode();
}
};
EventWaitNode * Sweep(EventWaitNode *pNode, bool fWaitChain);
} // namespace details
// Details for _Condition_variable
namespace details
{
///
/// This is the wait-block design for handling two signal sources:
/// 1. signals fired by user
/// 2. signals fired by timer
/// If there is no timer in this block, the block will be allocated on
/// the stack, which will be finally destroyed when the context get released;
/// however, when it comes with the timer, the block will be allocated on
/// the heap, and 2 phases de-reference checks are required to *delete* this object.
///
class TimedSingleWaitBlock : public SingleWaitBlock
{
// The event node nested in wait-block,
// which shares the lifetime with wait-block
EventWaitNode m_eventNode;
HANDLE m_hTimer{};
const bool m_hasTimer;
// The de-reference times
volatile long m_deRef;
// The number of signaled objects
// It is used to determine who fires the signal first.
volatile long m_signalCounter;
public:
TimedSingleWaitBlock & operator =(const TimedSingleWaitBlock &) = delete;
// An indication of whether this wait has a timeout or not.
volatile bool m_fWaitTimedOut;
virtual bool SweepWaitNode();
virtual void Destroy();
///
/// Called when a timer on the wait-block fires.
///
static void CALLBACK DispatchEventTimer(PTP_CALLBACK_INSTANCE instance, void * pContext, PTP_TIMER timer);
///
/// Same as DispatchEventTimer, only used on WinXP platform.
///
static void CALLBACK DispatchEventTimerXP(PVOID pContext, BOOLEAN timerOrWaitFired);
virtual bool Satisfy(Context **pContextOut, EventWaitNode *pNode);
///
/// TimedSingleWaitBlock constructor.
///
TimedSingleWaitBlock(bool hasTimer)
: m_eventNode(nullptr)
, m_hasTimer(hasTimer)
, m_deRef(0)
, m_signalCounter(0)
, m_fWaitTimedOut(false)
{
m_eventNode.m_pWaitBlock = this;
}
///
/// Get the event node nested in the wait-block
///
EventWaitNode *getEventNode()
{
return &m_eventNode;
}
bool createTimer(unsigned int timerout)
{
if (m_hasTimer)
{
return (m_hTimer = RegisterAsyncTimerAndLoadLibrary(timerout, TimedSingleWaitBlock::DispatchEventTimer, this)) != nullptr;
}
return false;
}
void destroyTimer(bool waitForOutstandingCallback)
{
if (m_hasTimer)
{
if (waitForOutstandingCallback && m_hTimer)
DeleteAsyncTimerAndUnloadLibrary(static_cast(m_hTimer));
// If it's an async deletion (happens inside the callback) we don't do anything here, new callback handler will handle it.
}
}
};
///
/// It will be called by two event sources :
/// 1. the timer, with argument *pNode* nullptr
/// 2. the user trigger, with *pNode* the address of the event node.
/// This function will take actions ONLY when FIRST time being called,
/// any further calls will be ignored with return value False.
/// If the first caller sets pContextOut nullptr, it will Unblock the
/// context immediately, otherwise, it will pass out the blocked context
/// by pContextOut.
///
bool TimedSingleWaitBlock::Satisfy(Context **pContextOut, EventWaitNode *pNode)
{
if (InterlockedIncrement(&m_signalCounter) == 1)
{
// Timer will be destroyed when as soon as
// it is satisfied
destroyTimer(pNode != nullptr);
// check who initiated this *Satisfy*
m_fWaitTimedOut = pNode == nullptr;
if (pContextOut)
*pContextOut = m_pContext;
else
m_pContext->Unblock();
return true;
}
return false;
}
///
/// Delete the wait-block after 2 times de-reference.
/// If the block is allocated on the stack, it should only
/// be de-referenced once -- when he is removed from the
/// event list.
///
void TimedSingleWaitBlock::Destroy()
{
if (InterlockedIncrement(&m_deRef) == 2)
delete this;
}
///
/// Clean the timed-out node.
///
bool TimedSingleWaitBlock::SweepWaitNode()
{
if (m_fWaitTimedOut)
{
Destroy();
return false;
}
return true;
}
///
/// Called when a timer on an condition variable is signaled.
///
void TimedSingleWaitBlock::DispatchEventTimer(PTP_CALLBACK_INSTANCE instance, void * pContext, PTP_TIMER timer)
{
TimedSingleWaitBlock *pWaitBlock = reinterpret_cast (pContext);
if (pWaitBlock->Satisfy(nullptr, nullptr))
{
// We need to release the timer and dereference the module at the very end of the callback.
UnRegisterAsyncTimerAndUnloadLibrary(instance, timer);
}
}
void TimedSingleWaitBlock::DispatchEventTimerXP(LPVOID pContext, BOOLEAN)
{
TimedSingleWaitBlock *pWaitBlock = reinterpret_cast (pContext);
pWaitBlock->Satisfy(nullptr, nullptr);
// It does not need to de-reference anything.
}
_Condition_variable::_Condition_variable() :
_M_pWaitChain(nullptr), _M_lock()
{
}
_Condition_variable::~_Condition_variable()
{
//
// It's entirely possible that some other thread is currently executing inside ::set, and is currently holding the lock.
// Since the waiter that was woken up could destroy the event, either by deleting a heap allocated, or unwinding the
// stack, we need to let that other thread (that invoked ::set) get out of the lock before we proceed.
//
_M_lock._Flush_current_owner();
// release all contexts
notify_all();
}
///
/// Fast method for waiting on _Condition_variable without timeout
///
void _Condition_variable::wait(Concurrency::critical_section& _Lck)
{
// Please refers to the wait_for function
TimedSingleWaitBlock block(false);
EventWaitNode *pEventNode = block.getEventNode();
{
critical_section::scoped_lock lock(_M_lock);
pEventNode->m_pNext = Sweep(reinterpret_cast (_M_pWaitChain), true);
_M_pWaitChain = pEventNode;
_Lck.unlock();
}
Context::Block();
_Lck.lock();
}
///
/// Timed wait method
///
bool _Condition_variable::wait_for(Concurrency::critical_section& _Lck, unsigned int _Timeout)
{
// If check special case for _Timeout for efficiency reason
if (_Timeout == 0)
return false;
else if (_Timeout == COOPERATIVE_TIMEOUT_INFINITE)
{
wait(_Lck);
return true;
}
// Create timed node on heap, which will be destroyed when
// the de-reference counts 2.
TimedSingleWaitBlock *pBlock = _concrt_new TimedSingleWaitBlock(true);
EventWaitNode *pNode = pBlock->getEventNode();
// Commit to wait:
// Step 1: Chain itself on the event list, set the timer,
// and release the mutex.
// The _Lck must be unlocked after chaining, to protect the atomic.
{
critical_section::scoped_lock lock(_M_lock);
pNode->m_pNext = Sweep(reinterpret_cast (_M_pWaitChain), true);
_M_pWaitChain = pNode;
if (!pBlock->createTimer(_Timeout))
throw std::bad_alloc();
_Lck.unlock();
}
// Step 2: blocks itself
Context::Block();
// After being waken up, collect the result and de-reference itself.
bool res = !pBlock->m_fWaitTimedOut;
pBlock->Destroy();
// Re-acquire the mutex
_Lck.lock();
return res;
}
///
/// It only wake up one context if there are
/// some contexts waiting, otherwise, it does nothing.
///
void _Condition_variable::notify_one()
{
// optimization for reducing unnecessary lock
if (!_M_pWaitChain)
return;
critical_section::scoped_lock lock(_M_lock);
EventWaitNode *p = reinterpret_cast (_M_pWaitChain);
EventWaitNode *np;
Context *cp = nullptr;
// It iterates over the chain, and finds first
// wait-block not timed out yet.
while (p && !p->Satisfy(&cp))
{
np = p->m_pNext;
p->Destroy();
p = np;
}
if (p)
{
// If it finds one available wait-block
_M_pWaitChain = p->m_pNext;
p->Destroy();
// Unblock() must be called after Destroy()
// since releasing context may invalidate the wait-block.
cp->Unblock();
}
else
_M_pWaitChain = NULL;
}
///
/// It wakes up all contexts if there are
/// some contexts waiting, otherwise, it does nothing.
///
void _Condition_variable::notify_all()
{
// optimization for reducing unnecessary lock
if (!_M_pWaitChain)
return;
EventWaitNode *p, *np;
{
critical_section::scoped_lock lock(_M_lock);
p = reinterpret_cast (_M_pWaitChain);
_M_pWaitChain = nullptr;
}
// It is safe to go over the list without lock
// since the chain has been removed from the head.
while (p)
{
Context *cp = nullptr;
p->Satisfy(&cp);
np = p->m_pNext;
p->Destroy();
// Unblock() must be called after Destroy()
// since releasing context may invalidate the wait-block.
if (cp)
cp->Unblock();
p = np;
}
}
}
///
/// Constructs an event.
///
event::event() :
_M_pWaitChain(EVENT_UNSIGNALED),
_M_pResetChain(NULL)
{
}
///
/// Destroys an event.
///
event::~event()
{
//
// It's entirely possible that some other thread is currently executing inside ::set, and is currently holding the lock.
// Since the waiter that was woken up could destroy the event, either by deleting a heap allocated, or unwinding the
// stack, we need to let that other thread (that invoked ::set) get out of the lock before we proceed.
//
_M_lock._Flush_current_owner();
//
// Go through and make sure any event blocks are satisfied. One would expect items only on the reset list,
// but we'll handle both cases -- the runtime should not be leaking regardless.
//
EventWaitNode *pNext;
EventWaitNode *pNode = reinterpret_cast (_M_pWaitChain);
if (pNode > EVENT_SIGNALED)
{
for(; pNode != NULL; pNode = pNext)
{
pNext = pNode->m_pNext;
if (pNode->Satisfy(NULL))
{
pNode->Destroy();
}
}
}
for (pNode = reinterpret_cast (_M_pResetChain); pNode != NULL; pNode = pNext)
{
pNext = pNode->m_pNext;
pNode->Destroy();
}
}
///
/// Waits on the specified event.
///
size_t event::wait(unsigned int timeout)
{
const EventWaitNode *pOldChain;
//
// Waits with timeout fall back on the heavy weight "wait for multiple" mechanism. The only place
// we use a light-weight spin/stack semantic is with a single *WAIT*.
//
// We can specially handle a 0 timeout "check" here though.
//
if (timeout != COOPERATIVE_TIMEOUT_INFINITE)
{
if (timeout == 0)
{
if (reinterpret_cast (_M_pWaitChain) == EVENT_SIGNALED)
return 0;
else
return COOPERATIVE_WAIT_TIMEOUT;
}
event *pThis = this;
return event::wait_for_multiple(&pThis, 1, true, timeout);
}
// Spin wait (no yielding) for the event to be set.
_SpinWaitNoYield spinWait;
do
{
pOldChain = reinterpret_cast (_M_pWaitChain);
if (pOldChain == EVENT_SIGNALED)
{
return 0;
}
} while (spinWait._SpinOnce());
//
// Give up and block, first putting our context on a stack-based
// list of waiting contexts for this event.
//
SingleWaitBlock block;
EventWaitNode node(&block);
bool fSatisfied = false;
{
critical_section::scoped_lock lockGuard(_M_lock);
if (_M_pWaitChain == EVENT_SIGNALED)
fSatisfied = true;
else
{
node.m_pNext = Sweep(reinterpret_cast (_M_pWaitChain), true);
_M_pWaitChain = &node;
}
}
if (!fSatisfied )
{
bool bSkip = block.m_smSkip_BlockUnblock == WaitBlock::SKIP // Avoid unnecessary InterlockedCompareExchange for optimizing.
|| InterlockedCompareExchange(&block.m_smSkip_BlockUnblock, WaitBlock::DONT_SKIP, WaitBlock::UNDECIDED) == WaitBlock::SKIP;
if(!bSkip)
Context::Block();
}
return 0;
}
///
/// Resets the specified event.
///
void event::reset()
{
critical_section::scoped_lock lockGuard(_M_lock);
if (_M_pWaitChain == EVENT_SIGNALED)
{
EventWaitNode *pRoot = NULL;
EventWaitNode *pNext = NULL;
EventWaitNode *pNode = reinterpret_cast (_M_pResetChain);
_M_pResetChain = NULL;
for (; pNode != NULL; pNode = pNext)
{
pNext = pNode->m_pNext;
if (pNode->Reset())
{
//
// We need to shift this back to the wait list. The wait hasn't been satisfied and
// this reset impacts the block.
//
pNode->m_pNext = pRoot;
pRoot = pNode;
}
}
_M_pWaitChain = pRoot;
}
}
///
/// Sets the specified event.
///
void event::set()
{
Context **pContexts = NULL;
ULONG nodeCount = 0;
_MallocaArrayHolder mholder;
{
critical_section::scoped_lock lockGuard(_M_lock);
//
// Although it's not technically necessary to interlock this, it allows an optimization for light-weight events
// in that they are able to spin for a period before blocking. Without the fence here, they would not.
//
EventWaitNode *pOldChain;
pOldChain = reinterpret_cast (
InterlockedExchangePointer (reinterpret_cast (&_M_pWaitChain), EVENT_SIGNALED)
);
if (pOldChain > EVENT_SIGNALED)
{
ASSERT(_M_pResetChain == NULL);
EventWaitNode *pNext;
//
// Note that the lock grabbed above is within the event, so it's entirely possible that the moment we unblock
// the context, the lock is gone. We also don't want to diddle in the scheduler lists while under a hot
// lock, so build the list of contexts to unblock, release the lock, and then diddle in the scheduler.
//
nodeCount = 0;
for (EventWaitNode *pNode = pOldChain; pNode != NULL; pNode = pNode->m_pNext)
nodeCount++;
pContexts = mholder._InitOnRawMalloca(_malloca(sizeof (Context *) * nodeCount));
nodeCount = 0;
for (EventWaitNode *pNode = pOldChain; pNode != NULL; pNode = pNext)
{
//
// Need to cache the next pointer, since as soon as we unblock,
// the stack-based EventWaitNode may be deallocated.
//
pNext = pNode->m_pNext;
Context *pContext;
if (pNode->Satisfy(&pContext))
{
//
// If Satisfy returned true, we need to track the node as it's part of
// a wait-for-all and a reset on this event could impact it.
//
pNode->m_pNext = reinterpret_cast (_M_pResetChain);
//
// Guarded via the spinlock.
//
_M_pResetChain = pNode;
}
if (pContext != NULL)
pContexts[nodeCount++] = pContext;
}
}
}
//
// Unblock contexts outside the given dispatch lock.
//
while(nodeCount-- > 0)
{
pContexts[nodeCount]->Unblock();
}
}
#pragma warning(push)
#pragma warning(disable:26010)
///
/// Waits for multiple events to become signaled.
///
///
/// An array of events to wait upon
///
///
/// A count of events within the array
///
///
/// An indication of whether to wait for all events or just a single one
///
_Use_decl_annotations_
size_t event::wait_for_multiple(event** pEvents, size_t count, bool fWaitAll, unsigned int timeout)
{
//
// Handle some trivial cases up front
//
if (pEvents == NULL)
{
throw std::invalid_argument("pEvents");
}
//
// Nothing to wait on.
//
if (count == 0)
return 0;
//
// Optimize for any caller which decides to call this to wait on a single event. All waits with timeouts
// flow through here as we need the heavier weight mechanism.
//
if (count == 1 && (timeout == 0 || timeout == COOPERATIVE_TIMEOUT_INFINITE))
{
if (pEvents[0] == NULL)
{
throw std::invalid_argument("pEvents");
}
return pEvents[0]->wait(timeout);
}
for (size_t i = 0; i < count; i++)
{
if (pEvents[i] == NULL)
{
throw std::invalid_argument("pEvents");
}
}
MultiWaitBlockHolder waitBlock(fWaitAll, count, timeout != COOPERATIVE_TIMEOUT_INFINITE, (timeout != 0 && timeout != COOPERATIVE_TIMEOUT_INFINITE));
MultiWaitBlock *pWaitBlock = waitBlock.GetWaitBlock();
//
// Chain to each event, carefully checking signal state for each as we go. Note that a wait
// any can be satisfied immediately if any fail due to an event already being signaled. In
// that case, we must carefully dispose the rest of the nodes and make sure the counters are
// appropriate for wait block disposal as the chained ones get dechained later on other event
// set/reset/destruction.
//
bool fSatisfied = false;
for (size_t i = 0; i < count; i++)
{
event *pEvent = pEvents[i];
Context *pSatisfiedContext;
critical_section::scoped_lock lockGuard(pEvent->_M_lock);
EventWaitNode *pWaitNode = waitBlock.GetWaitNode(i);
waitBlock.Release();
EventWaitNode *pOldChain = reinterpret_cast (pEvent->_M_pWaitChain);
if (pOldChain == EVENT_SIGNALED)
{
//
// Event was signaled before we could add ourself to the wait list... We must be
// very careful here. For a "wait any", we are satisfied but need to take care
// to ensure that the heap blocks get appropriately freed and dechained. For a wait
// all, we need to chain to the reset list as it's possible that the event is reset
// before some other event that would satisfy the wait is signaled.
//
if (fWaitAll)
{
if (pWaitNode->Satisfy(&pSatisfiedContext))
{
pWaitNode->m_pNext = Sweep(reinterpret_cast (pEvent->_M_pResetChain), false);
pEvent->_M_pResetChain = pWaitNode;
}
if (pSatisfiedContext != NULL)
{
ASSERT(i == count - 1);
fSatisfied = true;
}
}
else
{
//
// The wait is satisfied.
//
pWaitNode->Satisfy(&pSatisfiedContext);
if(pSatisfiedContext != NULL)
fSatisfied = true;
for (size_t j = i + 1; j < count; j++)
{
pWaitNode = waitBlock.GetWaitNode(j);
waitBlock.Release();
pWaitNode->Satisfy(&pSatisfiedContext);
ASSERT(pSatisfiedContext == NULL);
}
break;
}
}
else
{
pWaitNode->m_pNext = Sweep(pOldChain, true);
pEvent->_M_pWaitChain = pWaitNode;
}
}
if (!fSatisfied )
{
//
// For explanation of skipping Block/Unblock please see the comments in MultiWaitBlock::SingleSatisfy() method.
//
bool bSkip = pWaitBlock->m_smSkip_BlockUnblock == WaitBlock::SKIP // Avoid unnecessary InterlockedCompareExchange for optimizing.
|| InterlockedCompareExchange(&pWaitBlock->m_smSkip_BlockUnblock, WaitBlock::DONT_SKIP, WaitBlock::UNDECIDED) == WaitBlock::SKIP;
if( !bSkip )
{
//
// Handle timeouts of zero specially. We don't want to block the thread.
//
if (timeout == 0)
{
if (InterlockedIncrement(&pWaitBlock->m_finalTrigger) == 1)
{
pWaitBlock->m_pSatisfiedBy = NULL;
fSatisfied = true;
pWaitBlock->m_fWaitTimedOut = true;
}
else
{
Context::Block();
}
}
else
{
if (timeout != COOPERATIVE_TIMEOUT_INFINITE)
{
if (pWaitBlock->m_finalTrigger == 0)
{
if ((pWaitBlock->m_hTimer = RegisterAsyncTimerAndLoadLibrary(timeout, MultiWaitBlock::DispatchEventTimer, pWaitBlock)) == nullptr)
{
//
// Note that the thread is left in a state unexplicable by the scheduler here. It's quite possible someone ::Unblocks this context in
// the future. With this error, we make no attempt to unwind that.
//
throw std::bad_alloc();
}
waitBlock.Release();
}
}
Context::Block();
}
}
}
return (pWaitBlock->m_pSatisfiedBy == NULL) ? COOPERATIVE_WAIT_TIMEOUT : waitBlock.GetIndexOfNode(pWaitBlock->m_pSatisfiedBy);
}
#pragma warning(pop)
namespace details
{
///
/// Constructs a holder for a single allocation wait block which gets split into a wait block and a series of wait nodes,
/// one per wait object.
///
MultiWaitBlockHolder::MultiWaitBlockHolder(bool fWaitAll, size_t count, bool timeout, bool timer) : m_count(0)
{
//
// Allocate a single block comprised of all the wait nodes / block that we need to satisfy
// the wait for multiple.
//
m_blockSize = ALIGNED_SIZE(fWaitAll ? sizeof(WaitAllBlock) : sizeof(WaitAnyBlock), P2_ALIGN);
m_nodeSize = ALIGNED_SIZE(sizeof(EventWaitNode), P2_ALIGN);
m_totalBlockSize = m_blockSize + m_nodeSize * count;
m_pMemBlock = _concrt_new BYTE[m_totalBlockSize];
m_pWaitBlock = reinterpret_cast (m_pMemBlock);
if (fWaitAll)
{
_Analysis_assume_(m_totalBlockSize >= sizeof(WaitAllBlock));
new(m_pMemBlock) WaitAllBlock(count, timeout, timer);
}
else
{
_Analysis_assume_(m_totalBlockSize >= sizeof(WaitAnyBlock));
new(m_pMemBlock) WaitAnyBlock(count, timeout, timer);
}
BYTE *pWaitNodeAddr = m_pMemBlock + m_blockSize;
for (size_t i = 0; i < count; i++)
{
new(pWaitNodeAddr) EventWaitNode(m_pWaitBlock);
pWaitNodeAddr += m_nodeSize;
}
//
// The number of references on the block is the number of wait objects plus the timer plus one for
// the stack frame of the WaitForMultiple which initialized us. The block gets freed when NotifyCompletedNode
// is called m_refs number of times. This object is responsible, in normal cases, for releasing the single
// stack frame reference. It's also responsible for cleaning up and releasing any references that won't come
// from wait objects / timers due to exceptions thrown in the midst of setting up the wait.
//
m_refs = count + (timer ? 2 : 1);
}
///
/// Destructor for the wait block holder. Releases any references on the block which will not come as the result
/// of a release.
///
MultiWaitBlockHolder::~MultiWaitBlockHolder()
{
while(m_count++ < m_refs)
m_pWaitBlock->NotifyCompletedNode();
}
///
/// Called in order to satisfy the wait. This handles a single wait/timer combination. Any multi-wait semantic
/// must override this and call the base class in order to present a single-wait semantic.
///
void MultiWaitBlock::SingleSatisfy(Context **pContextOut, EventWaitNode *pNode)
{
//
// If there is a timeout, the timer may already have unblocked the context.
//
Context *pContext = m_pContext;
bool fSatisfied = true;
if (m_fHasTimeout)
{
if (InterlockedIncrement(&m_finalTrigger) != 1)
fSatisfied = false;
}
if (fSatisfied)
{
// SingleSatisfy can be called with pNode set to NULL, but only when the wait has timed out.
ASSERT(pNode != NULL);
m_pSatisfiedBy = pNode;
if (m_hTimer)
{
DeleteAsyncTimerAndUnloadLibrary(static_cast(m_hTimer));
//
// Now, we need to answer the question of whether the timer fired and incremented the
// trigger or not. That will answer the question of when we delete the wait block.
//
if (m_finalTrigger == 1)
NotifyCompletedNode();
}
//
// The wait_for_multiple() or wait() may be in the process of chaining the context to wait
// chain of the event. Before chaining it has taken a lock on the event. It is possible
// that the current context being unblocked (in this SingleSatisfy()) could be the one
// blocked because of the lock taken above. In this case m_fOkToUnblock flag was set to FALSE in
// wait_for_multiple() or wait() and so here(in set()) we should not Unblock() the context and also set
// a flag to not Block() the context in the wait_for_multiple(), for which m_fDoNotBlock flag
// is set here. This cancels out the Block() Unblock().
// If we do not take this measure and Unblock if above situation occurs, then Context blocked on
// above lock will run, thus the critical region will be executed concurrently, which is
// disastrous. Also this (Unblocking here) could result in Unblock/Unblock sequence on a
// context which is illegal.
//
bool bSkip = !(pNode->m_pWaitBlock->m_smSkip_BlockUnblock == WaitBlock::DONT_SKIP // Avoid unnecessary InterlockedCompareExchange for optimizing.
|| InterlockedCompareExchange(&pNode->m_pWaitBlock->m_smSkip_BlockUnblock, WaitBlock::SKIP, WaitBlock::UNDECIDED) == WaitBlock::DONT_SKIP);
//
// It is *NOT* safe to touch the this pointer if bSkip is true, or right after the context is unblocked (if bSkip is false) since the context associated
// with this wait block could return from the wait, and destroy the wait block in the process.
//
if(bSkip)
{
if(pContextOut != NULL)
*pContextOut = NULL; // No context in list, hence no Unblocking in set()
}
else if (pContextOut != NULL)
*pContextOut = pContext;
else
pContext->Unblock();
}
}
void MultiWaitBlock::DispatchEventTimer(PTP_CALLBACK_INSTANCE instance, void * pContext, PTP_TIMER timer)
{
MultiWaitBlock *pWaitBlock = reinterpret_cast (pContext);
Context *pUnblockContext = NULL;
bool deleteTimer = false;
if (InterlockedIncrement(&pWaitBlock->m_finalTrigger) == 1)
{
pUnblockContext = pWaitBlock->m_pContext;
// Defer the timer deletion until the very end of this callback.
deleteTimer = true;
//
// Note that after this point, m_hTimer is invalid. Only the entity that transitions m_finalTrigger
// to 1 is allowed to play with deleting the timer.
//
// Mark the block as timed out. This will allow us to cleanup the wait nodes associated with this wait block
// the next time the event's wait and reset chains are swept.
pWaitBlock->m_fWaitTimedOut = true;
}
if (pUnblockContext != NULL)
{
pWaitBlock->m_pSatisfiedBy = NULL;
pUnblockContext->Unblock();
}
pWaitBlock->NotifyCompletedNode();
if (deleteTimer)
{
UnRegisterAsyncTimerAndUnloadLibrary(instance, timer);
}
}
///
/// Called when a timer on an event is signaled.
///
void MultiWaitBlock::DispatchEventTimerXP(LPVOID pContext, BOOLEAN)
{
MultiWaitBlock *pWaitBlock = reinterpret_cast (pContext);
Context *pUnblockContext = NULL;
if (InterlockedIncrement(&pWaitBlock->m_finalTrigger) == 1)
{
pUnblockContext = pWaitBlock->m_pContext;
platform::__DeleteTimerQueueTimer(GetSharedTimerQueue(), pWaitBlock->m_hTimer, NULL);
//
// Note that after this point, m_hTimer is invalid. Only the entity that transitions m_finalTrigger
// to 1 is allowed to play with deleting the timer.
//
// Mark the block as timed out. This will allow us to cleanup the wait nodes associated with this wait block
// the next time the event's wait and reset chains are swept.
pWaitBlock->m_fWaitTimedOut = true;
}
if (pUnblockContext != NULL)
{
pWaitBlock->m_pSatisfiedBy = NULL;
pUnblockContext->Unblock();
}
pWaitBlock->NotifyCompletedNode();
}
///
/// Called to indicate that the event wait has been satisfied.
///
bool SingleWaitBlock::Satisfy(Context **pContextOut, EventWaitNode *pNode)
{
//
// For explanation of skipping Block/Unblock please see the comments in MultiWaitBlock::SingleSatisfy() method.
//
bool bSkip = !(pNode->m_pWaitBlock->m_smSkip_BlockUnblock == WaitBlock::DONT_SKIP // Avoid unnecessary InterlockedCompareExchange for optimizing.
|| InterlockedCompareExchange(&pNode->m_pWaitBlock->m_smSkip_BlockUnblock, WaitBlock::SKIP, WaitBlock::UNDECIDED) == WaitBlock::DONT_SKIP );
if( bSkip )
{
if(pContextOut)
*pContextOut = NULL; // No context in list, hence no Unblocking in set()
}
else if (pContextOut != NULL)
*pContextOut = m_pContext;
else
m_pContext->Unblock();
return false;
}
#pragma warning(push)
#pragma warning(disable: 4702)
///
/// Called to indicate that the event for a single wait has been reset.
///
bool SingleWaitBlock::Reset()
{
ASSERT(false);
return false;
}
///
/// Called to indicate that the event node was on the rundown list at event destruction.
///
void SingleWaitBlock::Destroy()
{
ASSERT(false);
}
///
/// Called in order to check whether a node is still alive or dead during a sweep of the wait chain.
///
bool SingleWaitBlock::SweepWaitNode()
{
return true;
}
///
/// Called in order to check whether a node is still alive or dead during a sweep of the reset chain.
///
bool SingleWaitBlock::SweepResetNode()
{
ASSERT(false);
return false;
}
#pragma warning(pop)
void MultiWaitBlock::NotifyCompletedNode()
{
size_t waiters = m_waiters;
//
// Once satisfied, we are responsible for incrementing the completion counter. When it hits
// the number of waiters, we can destroy the shared wait block.
//
if (InterlockedIncrementSizeT(&m_completions) == waiters + 1)
delete[] (reinterpret_cast (this));
}
///
/// Called to indicate that an event for the wait-any has triggered and we should satisfy this
/// wait block.
///
bool WaitAnyBlock::Satisfy(Context **pContextOut, EventWaitNode *pNode)
{
if (pContextOut != NULL)
*pContextOut = NULL;
//
// NOTE: m_pWaitBlock is unsafe as soon as we increment the counter if we are not the entity
// to increment the counter to the wait limit. Cache everything up front!
//
ASSERT(m_triggerLimit == 1);
size_t triggerCount = InterlockedIncrementSizeT(&m_count);
if (triggerCount == m_triggerLimit)
SingleSatisfy(pContextOut, pNode);
NotifyCompletedNode();
//
// On a wait-any, we no longer need the wait node. The single wait block containing the node is
// freed by the last satisfied waiter.
//
return false;
}
///
/// Called to indicate that an event in the wait-any has reset. This is irrelevant to us.
///
bool WaitAnyBlock::Reset()
{
ASSERT(false);
return false;
}
///
/// Called to indicate that an event with the node present on the rundown list is being
/// destroyed. This should never be called for a wait any.
///
void WaitAnyBlock::Destroy()
{
}
///
/// Called in order to check whether a node is still alive or dead during a sweep of the wait chain.
///
bool WaitAnyBlock::SweepWaitNode()
{
if (m_count >= m_triggerLimit || m_fWaitTimedOut)
{
Context *pContext;
// If the wait has timed out, go ahead and satisfy the block. Since the timer has already fired and woken up the context,
// we are not in danger of doing so.
Satisfy(&pContext, NULL);
ASSERT(pContext == NULL);
return false;
}
return true;
}
///
/// Called in order to check whether a node is still alive or dead during a sweep of the reset chain.
///
bool WaitAnyBlock::SweepResetNode()
{
ASSERT(false);
return false;
}
///
/// Called to indicate that an event for the wait-all has triggered and we should satisfy this
/// wait node. Note that this does *NOT* indicate that the wait should be satisfied yet.
///
bool WaitAllBlock::Satisfy(Context **pContextOut, EventWaitNode *pNode)
{
if (pContextOut != NULL)
*pContextOut = NULL;
ASSERT(m_triggerLimit >= 1);
size_t triggerCount = InterlockedIncrementSizeT(&m_count);
if (triggerCount == m_triggerLimit)
{
SingleSatisfy(pContextOut, pNode);
NotifyCompletedNode();
return false;
}
return true;
}
///
/// Called to indicate that an event which was previously signaled and counting towards a satisfied
/// wait all block has reset.
///
bool WaitAllBlock::Reset()
{
size_t triggerLimit = m_triggerLimit;
//
// Ensure that we never decrement once the wait is satisfied. We need to make sure that a reset subsequent
// just gets rid of the wait block.
//
size_t previousTriggerCount = m_count;
for(;;)
{
if (previousTriggerCount == triggerLimit)
break;
size_t xchgCount = InterlockedCompareExchangeSizeT(&m_count, previousTriggerCount - 1, previousTriggerCount);
if (xchgCount == previousTriggerCount)
break;
previousTriggerCount = xchgCount;
}
if (previousTriggerCount == triggerLimit)
{
NotifyCompletedNode();
return false;
}
return true;
}
///
/// Called in order to check whether a node is still alive or dead during a sweep of the wait chain.
///
bool WaitAllBlock::SweepWaitNode()
{
ASSERT(m_count < m_triggerLimit);
if (m_fWaitTimedOut)
{
Context * pContext;
if (Satisfy(&pContext, NULL))
Destroy();
ASSERT(pContext == NULL);
return false;
}
return true;
}
///
/// Called in order to check whether a node is still alive or dead during a sweep of the reset chain.
///
bool WaitAllBlock::SweepResetNode()
{
ASSERT(m_count <= m_triggerLimit);
if (m_count >= m_triggerLimit)
{
//
// The reset will clear us out.
//
Reset();
return false;
}
else if (m_fWaitTimedOut)
{
Destroy();
return false;
}
return true;
}
///
/// Called when an event with an all-node is destroyed with the event present on a rundown list, this
/// destroys the wait node and releases its shared reference on the wait block.
///
void WaitAllBlock::Destroy()
{
NotifyCompletedNode();
}
///
/// Called in order to sweep out unused entries from a given node list. This clears dead wait-for-all nodes
/// on a reset-list or dead wait-for-any nodes on the wait-list, as well as nodes associated with timed out wait blocks on both lists.
///
///
/// true if the wait chain of an event is being swept and false if the reset chain is being swept
///
EventWaitNode * Sweep(EventWaitNode *pNode, bool fWaitChain)
{
EventWaitNode *pRoot = NULL;
EventWaitNode *pNext = NULL;
for (; pNode != NULL; pNode = pNext)
{
// Cache the next pointer since the sweep could destroy the swept node.
pNext = pNode->m_pNext;
bool keepNode = fWaitChain ? pNode->SweepWaitNode() : pNode->SweepResetNode();
if (keepNode)
{
pNode->m_pNext = pRoot;
pRoot = pNode;
}
}
return pRoot;
}
//
// A StructuredEvent is simply a pointer with a few distinguished values. A newly
// initialized StructuredEvent will be set to 0. A StructuredEvent that has one or more waiters
// on it, that is, contexts which called StructuredEvent::Wait before StructuredEvent::Set has
// signaled the StructuredEvent, will simply point to a linked list of those waiters,
// via stack-blocks so no heap allocation is required. A StructuredEvent that is
// signaled is set to 1. Once an event is signaled, it can be safely
// deallocated, even if StructuredEvent::Set is still running.
//
//
// StructuredEvent - Synchronization object mediating access to the low-level context
// Block and Unblock APIs.
//
struct StructuredEventWaitNode
{
StructuredEventWaitNode *m_next;
::Concurrency::Context *m_context;
};
//
// Wait until the event is signaled (via some other context calling Set())
//
void StructuredEvent::Wait()
{
//
// Spin a short time waiting to be signaled before we block
//
void *oldPtr = m_ptr;
if (oldPtr == EVENT_SIGNALED)
return;
_SpinWaitBackoffNone spinWait;
for (;;)
{
oldPtr = m_ptr;
if (oldPtr == EVENT_SIGNALED)
return;
if ( !spinWait._SpinOnce())
break;
}
//
// Give up and block, first putting our context on a stack-based
// list of waiting contexts for this event
//
::Concurrency::Context *context = SchedulerBase::FastCurrentContext();
StructuredEventWaitNode node;
node.m_context = context;
for (;;)
{
node.m_next = (StructuredEventWaitNode*)oldPtr;
void *xchgPtr = InterlockedCompareExchangePointer(&m_ptr, &node, oldPtr);
if (xchgPtr == oldPtr)
break;
oldPtr = xchgPtr;
if (oldPtr == EVENT_SIGNALED)
{
//
// Event was signaled before we could add ourself to the wait
// list, so no need to block any longer
//
return;
}
}
context->Block();
}
//
// Set the event as signaled, and unblock any other contexts waiting
// on the event.
//
void StructuredEvent::Set()
{
void *oldPtr = m_ptr;
//
// Mark the event signaled, and get the waiters list, if any
//
for (;;)
{
void *xchgPtr = InterlockedCompareExchangePointer(&m_ptr, EVENT_SIGNALED, oldPtr);
if (xchgPtr == oldPtr)
break;
oldPtr = xchgPtr;
}
//
// If the event had any waiters, then unblock them
//
if (oldPtr > EVENT_SIGNALED)
{
for (StructuredEventWaitNode *node = (StructuredEventWaitNode *)oldPtr, *next; node != NULL; node = next)
{
//
// Need to cache the next pointer, since as soon as we unblock,
// the stack-based StructuredEventWaitNode may be deallocated.
//
// Technically, there should be a memory fence after retrieving
// the next pointer, but practically it's unnecessary, as long
// as there is a locked operation inside the call to Unblock
// before the blocked context starts running. I don't think
// it's possible to write a scheduler unblock operation without
// needing a locked op, so I'm avoiding the extra cost per
// waiter here.
//
next = node->m_next;
node->m_context->Unblock();
}
}
}
} // namespace details
} // namespace Concurrency