 |
 |

01-19-2009, 02:07 PM
|
 |
Contributor
|
|
Join Date: Dec 2004
Posts: 418
|
|
Blocking a thread to wait for callback.
|
|
Hello, I have been reading the async design patterns on MSDN and the general idea makes sense to me but in practice it is proving to be more difficult. I am working on a project utilizing VMWARE VIX
HTML Code:
http://pubs.vmware.com/vix-api/ReferenceGuide/
. Each function such as connecting to a host server, retrieving properties, etc. can be executed in two different ways.
Connecting to a host using a blocking Wait:
Code:
Dim m_lib As New VixLib
Dim m_hostJob As VixCOM.IJob
m_hostJob = m_lib.Connect(VixCOM.Constants.VIX_API_VERSION, VixCOM.Constants.VIX_SERVICEPROVIDER_VMWARE_VI_SERVER, _
m_host, m_port, m_user, m_pass, 0, Nothing, Nothing)
m_vixError = m_hostJob.Wait(m_propertyIds, m_results)
or with a callback (ICallback):
Code:
Dim m_lib As New VixLib
Dim m_hostJob As VixCOM.IJob
dim m_callback as VixCOM.ICallback
m_hostJob = m_lib.Connect(VixCOM.Constants.VIX_API_VERSION, VixCOM.Constants.VIX_SERVICEPROVIDER_VMWARE_VI_SERVER, _
m_host, m_port, m_user, m_pass, 0, Nothing, m_callback)
The problem with the VIX SDK is that sometimes the functions never return, hanging the application. dblock
HTML Code:
http://code.dblock.org/ShowPost.aspx?id=29
came up with a library that specifies a timeout value. When the timeout runs out the method aborts and returns regardless whether it finished or not.
The callback method is an active wait where the code continues to execute. What I want to implement is a blocking wait with a timeout.
The ICallback interface has one sub OnVixEvent(IJob,Integer,IVixHandle). This method is fired at least twice, first when some progress is posted and second when it is complete. The first call to this method can be one second after registering the callback or 30 seconds, so I can not rely on this method to block the main thread.
What is confusing to me is how to create some kind of async result class that would block the main thread from executing until either a timer runs out or the OnVixEvent method returns an Operation Complete signal that also implements ICallback.
|
|

01-21-2009, 07:44 AM
|
 |
Contributor
|
|
Join Date: Dec 2004
Posts: 418
|
|
|

01-21-2009, 10:05 AM
|
 |
Ultimate Contributor
Preferred language: Forum Leader * Guru *
|
|
Join Date: Feb 2004
Location: Austin, TX
Posts: 6,695
|
|
|
|
Your implementation is close to what I'd suggest, but there's some minor quirks. You've got a WaitHandle; you don't need to add anything extra to implement a timeout because WaitOne() has an overload that takes a timeout value.
Any time you have a busy loop you should look into getting rid of it; these can consume more CPU resources than is reasonable. Your code might look like this (forgive syntax problems; I just reformatted and VS 2008 is not installed yet and I'm also trying to save space. The reformat's also why I hadn't answered yet  ):
Code:
Public Class DoesSomethingAsync
Private Shared ReadOnly LockObject As New Object()
Private _callback As Callback
Private _callbackState As Integer = 0
Private _flag As ManualResetEvent
Sub Main()
_flag = new ManualResetEvent(False)
_callback = New Callback(_flag)
m_lib.Connect(...)
_flag.Reset() ' Make sure flag is reset out of superstition
Dim completed as Boolean = _flag.WaitOne(TimeSpan.FromSeconds(30)) ' blocks here
' "Not completed" means "timed out" in this context; I really wish the return values from
' WaitOne() were inverted.
If Not completed Then
Console.WriteLine("Operation timed out.")
Else
Console.WriteLine("Final state is: {0}", _callback.CallbackState)
End If
End Sub
End Class
Public Class Callback
Implements VixCOM.ICallback
Private _callbackState As Integer
Private _flag As ManualResetEvent
Public ReadOnly Property CallbackState() As Integer
Public Sub New(ByVal flag As ManualResetEvent)
_callbackState = 0
_flag.Set()
End Sub
Public Sub OnVixEvent(...)
' same as before, but add at the end:
_
End Sub
End Class
This gives the Callback class a little more than 30 seconds to finish and set the flag. If it does, then WaitOne() returns True. If it doesn't, then the operation times out and WaitOne() returns False.
If I can risk sounding rude, why on Earth would you post it in a PDF? It's code. Text. To get it into a PDF you had to either copy the entire thing and paste it into something that can generate PDFs or use some kind of PDF printer. This, as opposed to just attaching a .vb or .txt file, which would have been half the size and more accessible for users. I hadn't installed Adobe Reader yet, so I actually had no way to look at it offhand.
|
|

01-21-2009, 12:27 PM
|
 |
Contributor
|
|
Join Date: Dec 2004
Posts: 418
|
|
Thanks for replying. Looks like much cleaner implementation...
Code:
Public Sub OnVixEvent(...)
' same as before, but add at the end:
_flag.Set ?
End Sub
Why would I not want to use TheadPool.QueueUserWorkItem?
Is a new thread spawned for the Callback object with ManualResetEvent.WaitOne? Could I use AutoResetEvent.WaitAny the same way? (I have more to read http://msdn.microsoft.com/en-us/library/zd6a283y.aspx)
Is the purpose of the ManualResetEvent and AutoResetEvent just to act a signal to the parent thread?
To answer your question, I first posted my PDF on vmware forums since their forum does not support [code] tags and I didn't know how to post it otherwise. Then just to update this forum I threw in the PDF... But you're right it is pretty annoying.
|
Last edited by fixitchris; 01-21-2009 at 12:35 PM.
|

01-21-2009, 01:25 PM
|
 |
Ultimate Contributor
Preferred language: Forum Leader * Guru *
|
|
Join Date: Feb 2004
Location: Austin, TX
Posts: 6,695
|
|
Quote:
|
Why would I not want to use TheadPool.QueueUserWorkItem?
|
If I recall correctly, your original code did the equivalent of this on a worker thread:
Code:
While SomeConditionIsFalse()
Thread.Sleep(500)
End While
_flag.Set()
This is useless. It's a worker thread that's checking a condition twice per second. In your callback code, you used a threaded timer to try and detect a timeout. If the operation completes before the timer callback is called, then hopefully your thread above notices and responds. If the timer callback is called before the operation completes, hopefully the status is changed and the above thread code picks it up.
So you've got 3 .NET threads and one thread managed by the vmWare API; we'll call them main, mainWorker, callbackTimer, and vmWareApi. main blocks on a call to WaitOne(). mainWorker is looping continuously until a certain condition is met, then it sets the flag. callbackTimer is looping continously until a certain condition is met, then it sets a condition that will cause mainWorker to set the flag. All of these interactions are prone to race conditions, since no thread synchronization structures are used. For example, the timeout timer starts when you instantiate m_Callback. What if it takes a few seconds to get mainWorker started and call Connect()? You might time out a little early. Worse, what if you time out and vmWare calls OnVixEvent at nearly the same time? You might end up setting one value in one method, then immediately changing it in another.
Compare this to my model: there's two threads: main and vmWareApi. When vmWareApi is finished, it uses the semaphore to tell main to quit blocking. If vmWareApi takes too long, main will quit waiting and indicate a timeout occurred. There's still a race condition: the vmWare call might complete shortly after the timeout. However, in my version when the race condition happens the value of CallbackState is more deterministic.
What I'm trying to say is your model requires communication between four threads through both a semaphore (the ManualResetEvent) and a variable that's not thread-safe. Your two cases are as follows: 1) One thread sets a value that causes another thread to set a value that causes the main thread to unblock. 2) One thread calls a method that sets a value that causes another thread to set a value that causes the main thread to unblock. My model has two threads that communicate via one semaphore; the two cases are as follows: 1) The worker thread times out and the main thread unblocks. 2) The worker thread completes and signals the main thread to continue. It's simpler to manage, and cleaner to write.
Quote:
|
Is a new thread spawned for the Callback object with ManualResetEvent.WaitOne? Could I use AutoResetEvent.WaitAny the same way?
|
I don't understand what you are asking. ManualResetEvent is not a thread, it's a semaphore used for communication between threads. The difference between manual and auto reset events is that an AutoResetEvent will automatically reset itself as soon as one thread unblocks via a wait call; ManualResetEvent does not change its state unless you change it manually.
Quote:
|
Is the purpose of the ManualResetEvent and AutoResetEvent just to act a signal to the parent thread?
|
Correct. These classes exist so that threads can communicate with each other. They provide facilities for a thread to suspend its operation until a stimulus (the flag change) restarts it. Think of it as like a light bulb between a factory and the people that load trucks. When the light's red, the factory is producing stock but doesn't have any for the loaders to load, so the loaders know they can play video games or have a party or something. When the factory's got something for the loaders to load, the light turns green and the workers get to work. The light's a semaphore here.
|
|

01-21-2009, 02:25 PM
|
 |
Contributor
|
|
Join Date: Dec 2004
Posts: 418
|
|
aha, so it's all about the semaphores. A new concept to me, thanks for clarifying. My confusion is if WaitOne blocks the running thread, how does the Async code execute without explicitly running a new thread or using the ThreadPool?
How would you recommend I work through a problem involving waiting on 2 or more signals?
Code:
Public Enum EnStatus
NotStarted = 0
Running = 1
Finished = 2
End Enum
Class Host
Public Property IPadd as string = "127.0.0.1"
Public Property Status as EnStatus = EnStatus.NotStarted
Public Sub DoWork(flag as AutoResetEvent)
Status = EnStatus.Running
'
'Do some work...
'
Status = EnStatus.Finished
flag.Set()
End Sub
End Class
Class HostController
Private _flag as ManualResetEvent
Private _autoEventsCol as Collection
Public Sub New()
Dim Infinity as integer = -1
Dim HostCol as Collection = <collection of HOST objects>
Dim MaxHostsAtOneTime as integer = 2
'HostCol.count = 10 for example
For i = 0 to HostCol.Count - 1
_autoEventsCol.Add(new AutoResetEvent(false))
Next i
For i = 1 to MaxHostsAtOneTime
For j = 0 to HostCol.Count - 1
If HostCol.Item(j).Status = EnStatus.NotStarted then
HostCol.Item(j).DoWork(_autoEventsCol.Item(j).WaitOne(Infinity)) ' <- I don't think this will work.
Exit For
End If
Next j
Next i
'Now I want to wait until the running HOST objects falls below MaxHostsAtOneTime and I want to
'run another object from the HostCol until all are finished. But only MaxHostsAtOneTime are allowed
'to run at the same time.
End Sub
End Class
|
|

01-21-2009, 03:15 PM
|
 |
Ultimate Contributor
Preferred language: Forum Leader * Guru *
|
|
Join Date: Feb 2004
Location: Austin, TX
Posts: 6,695
|
|
Quote:
|
aha, so it's all about the semaphores. A new concept to me, thanks for clarifying. My confusion is if WaitOne blocks the running thread, how does the Async code execute without explicitly running a new thread or using the ThreadPool?
|
In your code sample, you call Connect() and give it a callback that it will call when it is finished. This means that, as part of its logic, Connect() probably creates its own thread and does its work on that one. Think about it as if it worked this way (ignoring the fact that I think it was a function and not a sub):
Code:
Public Class SomeVmWareClass
Public Sub Connect(..., ByVal callback As ICallback)
_callback = callback
ThreadPool.QueueUserWorkItem(AddressOf DoWork, args)
End Sub
Private Sub DoWork(...)
' mysterious things
_callback.OnVixEvent(...)
End Sub
End Class
It manages its own thread that you can't touch, and calls a callback method (maybe from that thread) when it's finished. This is similar to how the BackgroundWorker component works: you call a single method to start the asynchronous work, and it tells you when it's done.
Quote:
|
How would you recommend I work through a problem involving waiting on 2 or more signals?
|
What you've got is a classic producer/consumer situation. My favorite analogy for this is a factory. Let's lay it out in list format so we have the requirements enumerated:
Suppose you have a factory that builds widgets, and a loading dock that puts widgets in boxes. Here's the requirements for the system: - If the loaders have nothing to do, they can sleep.
- If the conveyor belt is full (10 items), the factory should stop building until the loaders remove an item.
You can implement this with semaphores and it works OK; you basically need a "work available" semaphore and a "conveyor available" semaphore that both threads manage. However, it's usually more elegant to put a third entity into the mix: the conveyor belt. If you do this, then instead of loaders having to communicate with the factory and vice versa, both the loaders and the factory only care about communicating with the conveyor belt. Here's how making the conveyor belt smart changes the situation: - When the conveyor belt has less than 10 items, the factory can produce 1 item.
- When the factory puts an item on the conveyor belt, a buzzer should sound so the workers know to put down the beer and start working.
- When the workers remove an item from the conveyor belt, a buzzer should sound so the factory knows it can produce another item.
- When the conveyor belt is empty, the workers can have a party.
The conveyor belt can be implemented as a simple collection class:
Code:
Public Class ConveyorBelt
Private _items As New List(Of Item)()
Public AddItem(ByVal item As Item)
_items.Add(item)
RaiseEvent ItemsChanged(Me, EventArgs.Empty)
End Sub
Public Function RemoveItem() As Item
Dim item As Item = _items(0)
_items.RemoveAt(0)
RaiseEvent ItemsChanged(Me, EventArgs.Empty)
End Sub
Public ReadOnly Property Count As Integer
Get
Return _items.Count
End Sub
End Sub
Public Event ItemsChanged As EventHandler
End Class
Now, to stay within the rules, the factory needs produce items until the conveyor belt is full; when it's full, the factory needs to wait until the conveyor has room. To do this, it handles ItemsChanged:
Code:
Public Class Factory
Private WithEvents _conveyor As ConveyorBelt ' assume this is set in the constructor
Private _conveyorAvailable As New ManualResetEvent(False)
Public Sub Produce()
' assume this happens on a worker thread; for simplicity I omitted the code to do so
While FactoryIsOpen()
If _conveyor.Count < 10 Then
_conveyor.Add(New Item())
Else
_conveyorAvailable.Reset()
_conveyorAvailable.WaitOne()
End If
End While
End Sub
Private Sub HandleConveyorChanged(...) Handles _conveyor.ItemsChanged
' the conveyor items have changed, try to produce again
_conveyorAvailable.Set()
End Sub
End Class
Likewise, the workers need to remove items until the conveyor is empty, then wait until the conveyor belt tells them there are new items:
Code:
Public Class LoadingDock
Private WithEvents _conveyor As ConveyorBelt ' assume this is set in the constructor
Private _conveyorFull As New ManualResetEvent(False)
Public Sub Load()
' assume this happens on a worker thread; for simplicity I omitted the code to do so
While OnDuty()
If _conveyor.Count > 0 Then
Dim item As Item = _conveyor.RemoveItem()
LoadItem(item)
Else
_conveyorFull.Reset()
_conveyorFull.WaitOne()
End If
End While
End Sub
Private Sub HandleConveyorChanged(...) Handles _conveyor.ItemsChanged
' the conveyor items have changed, try to load again
_conveyorAvailable.Set()
End Sub
End Class
I really like this approach rather than directly using semaphores, because it's a lot easier to describe. The best way to manage resources across multiple threads is to have some central resource manager with which the threads communicate. It's a nightmare to debug when you have multiple threads that all communicate with each other, but when each thread only communicates with one thing it's much easier to see what's going on.
To make this relevant to your example, extend the analogy. You need a factory to create host items. The conveyor belt is the collection of hosts. The loaders aren't present in this scenario. You need to first create enough jobs to fill your host collection, then wait until one of the jobs signals that it's finished (via callback or timeout). When a job finishes, start another until there are no jobs left.
|
|

01-22-2009, 02:58 PM
|
 |
Contributor
|
|
Join Date: Dec 2004
Posts: 418
|
|
AW, thanks for the clear explanations...
I picked up VB Design Patterns from my library yesterday. Level Up for me.
|
|

01-26-2009, 10:42 AM
|
 |
Contributor
|
|
Join Date: Dec 2004
Posts: 418
|
|
AW, is the below code correct if I wanted to throttle the LoadItem sub in LoadingDock class?
Code:
Imports System.Threading
Public Class LoadingDock
Private WithEvents _conveyor As ConveyorBelt ' assume this is set in the constructor
Private _conveyorFull As New ManualResetEvent(False)
Private _availableWorkers As Integer = 2
Public Sub Load()
' assume this happens on a worker thread; for simplicity I omitted the code to do so
While OnDuty()
If _conveyor.Count > 0 And _availableWorkers > 0 Then
Dim item As hostItem = _conveyor.RemoveItem()
ThreadPool.QueueUserWorkItem(AddressOf LoadItem, item)
Else
_conveyorFull.Reset()
_conveyorFull.WaitOne()
End If
End While
End Sub
Private Sub HandleConveyorChanged(...) Handles _conveyor.ItemsChanged
' the conveyor items have changed, try to load again
_conveyorAvailable.Set()
End Sub
Private Sub LoadItem(ByVal item As hostItem)
_availableWorkers -= 1
' do a whole bunch of work...
_availableWorkers += 1
End Sub
End Class
Also, you were talking about eliminating loops for condition checking and you mention here to implement worker threads. How would a worker thread continuously check the conveyorBelt count without looping on a worker thread?
In the VB Design Patterns book they code the conveyorBelt RemoveItem by using
Code:
While myqueue.Count = 0
Monitor.Wait(myQueue)
End While
Return myQueue.RemoveItem
|
|

01-26-2009, 11:58 AM
|
 |
Contributor
|
|
Join Date: Dec 2004
Posts: 418
|
|
I was looking more at the ThreadPool and I noticed that to throttle the Threadpool queue execution, I can ThreadPool.SetMaxThreads = 2 and supposedly, the ThreadPool.QueueUserWorkItem will execute "when a thread pool thread becomes available." So I can queue up 100 different items but the thread pool will only be able to execute two at a time. Am I seeing this correctly.
|
|

01-26-2009, 02:44 PM
|
 |
Contributor
|
|
Join Date: Dec 2004
Posts: 418
|
|
The ThreadPool.setMaxThreads will not work because the setMaxThreads value can not be below System.Environment.ProcessorCount.
I found another way, however, again with Semaphores!
Code:
m_SemaphorePool = New Semaphore(maxThreads, maxThreads)
Do Until l_cHostNames.Count = 0
Dim t As New Thread(New ParameterizedThreadStart(AddressOf DoWork))
t.Start(New cHostObject(m_semaphorePool,...))
Loop
Private Sub DoWork(ByVal HostObject As cHostObject)
HostObject.Test()
End Sub
Public Class cHostObject
Public Sub Test()
Debug.WriteLine(m_host.IP & "--- new thread")
m_ParentSemaphorePool.WaitOne()
Debug.WriteLine(m_host.IP & "--- working...")
Thread.Sleep(20000)
Debug.WriteLine(m_host.IP & "--- semaphore release")
m_ParentSemaphorePool.Release()
End Sub
End Class
Output:
Code:
172.16.4.242--- new thread
172.16.4.242--- working...
127.0.0.2--- new thread
127.0.0.2--- working...
127.0.0.1--- new thread
172.16.4.242--- semaphore release
127.0.0.1--- working...
127.0.0.2--- semaphore release
127.0.0.1--- semaphore release
|
|
|
Currently Active Users Viewing This Thread: 1 (0 members and 1 guests)
|
|
|
| Thread Tools |
|
|
| Display Modes |
Linear Mode
|
Posting Rules
|
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts
HTML code is Off
|
|
|
| |
|
|
|
 |
|