本篇同步发文於个人Blog: [读书笔记] Threading in C# - PART 2: BASIC SYNCHRONIZATION
Simple blocking methods: 像是Sleep, Join, Task.Wait等
Locking constructs: 限制数个Thread做後续操作, Excluseive locking只有一个thread, 比如lock(Monitor.Enter, Monitor.Exit), Mutex 和 SpinLock. 而Nonexclusive locking像是Semaphore, SemaphoreSlim 和 reader/writer locks
Signaling constructs: Thread可以暂停, 直到收到通知才恢复, 这可避免没效率的轮询. 比如用 event wait handler, Monitor的Wait/Pulse, CountdownEvent和Barrier
Nonblocking synchronization constructs: Thread.MemoryBarrier, Thread.VolatileRead, Thread.VolatileWrite, volatile关键字 和 Interlocked类别
等待、Sleep等会让thread暂停, 把time slice还给CPU.
可用ThreadState检查是否Blocked
bool blocked = (someThread.ThreadState & ThreadState.WaitSleepJoin) != 0
当Thread发生block 或 unblock, 都会造成 Context Switch
unblock的触发条件:
blocking条件已满足
operation timing out(有指定timeout的时候)
被interrupt
被aborted
用loop一直查询某条件, 造成CPU消耗很大的运算资源
一种比较好一点的写法是, 在loop内加个Thread.Sleep
如果要用Spinning的写法, 一种是保证该条件很快就满足的运算, 另外一种是用SpinLock/SpinWait
区分 thread-safe和thread-unsafe的程序码, 通常是有static变数, 如果有多个thread存取时会不会错误
最简单的方式是用lock关键字做同步化, 绑住某个同步化物件, 只允许一个thread操作, 其他的thread变成blocked状态, 且依照queue的顺序来排队
Construct | Purpose | Cross-Process ? | Overhead |
---|---|---|---|
lock (Monitor.Enter / Monitor.Exit) | 确保只有一个Thread能存取资源或一段code | 20 ns | |
Mutext | 同lock | Yes | 1000 ns |
SemaphoreSlim | 确保指定数量的thread能存取资源或一段code | 200 ns | |
Semaphore | 同Semaphore | Yes | 1000 ns |
ReaderWriterLockSlim | 允许多个reader能与一个writer共存 | 40 ns | |
ReaderWriterLock | 同ReaderWriterLockSlim | 100 ns |
Monitor.Enter(_locker);
try
{
DoSomething();
}
finally
{
Monitor.Exit(_locker);
}
上面写法会有bug, 如果在Enter和try之间发生exception (比如thread被Abort或记忆体溢出), 则永远不释放该locker
更好的写法是在try内用Enter, 且代入bool的变数, 用来判断是否lock成功, 成功的话可以呼叫Exit
bool lockTaken = false;
try
{
Monitor.Enter(_locker, ref lockTaken)
}
finally
{
if(lockTaken)
{
Monitor.Exit(_locker);
}
}
必须是reference type的物件
一般是private的物件, 做逻辑封装
精准的lock会用专门的locker物件
lock(this)或lock(typeof(SomeClass)), 很难预防死结和过多的blocking
基本的规则是, lock在存取可写的共享物件
Thread safe与unsafe的写法
class ThreadUnsafe
{
static int _x;
static void Increment() { ++_x; }
static void Assign() { _x = 123; }
}
class ThreadSafe
{
static readonly object _locker = new object();
static int _x;
static void Increment() { lock(_locker) { ++_x; }}
static void Assign() { lock(_locker) { _x = 123; }}
}
有一组变数, 写跟读总是在同一个lock, 则称它们是Atomic
比如下面x与y的除法范例
lock (_locker)
{
if(x!=0)
y /= x;
}
有时会有破坏atomicity的bug, 比如有呼叫其他函式造成exception, 使某些变数没完整计算完
建议其他函式先运算完再把它的值带入到lock, 或者try的catch/finally做rollback
lock可以巢状包装
适用於lock的内容, 有call其他的函式, 这些函式实作再加上lock
当两个thread都掌握对方的资源且等待对方释放, 没任何进展就是死结
基本的死结案例:
using System;
using System.Threading;
class TestDeadlocks
{
static void Main(string[] args){
object locker1 = new object();
object locker2 = new object();
new Thread(() => {
lock(locker1){
Thread.Sleep(1000);
Console.WriteLine("Ready to lock 2");
lock(locker2);
}
}).Start();
lock(locker2){
Thread.Sleep(1000);
Console.WriteLine("Ready to lock 1");
lock(locker1);
}
Console.WriteLine("Hi");
Console.Read();
}
}
更复杂的死结是,Thread 1 lock 而呼叫A class的X方法, X方法呼叫B class的Y 方法, 另外Thread 2 lock而呼叫B class的Y方法, Y方法呼叫A class的X方法.
考虑lock是否要用在别的class的函式
之後的declarative, data parallelism, immutable types 和 nonblock synchronization能减少lock的需求
另一些常见的死结发生在WPF的Dispatcher.Invoke或Winform的Control.Invoke, 解法是用BeginInvoke
基本上lock的速度很快
如果有很短暂的lock, 可以改用SpinLock, 减少Context Switch
Lock得太久, 会减少共时性的效能; Lock也是造成Deadlock的风险
跨Process的lock, 大约比lock慢50倍
使用WaitOne做lock, ReleaseMutex unlock, 而用close或dispose也是release
Mutex认出同样的lock是用Name
如果是执行在Terminal Services环境, 一般的Mutex无法跨terminal server session, 要在Name加上Global\ 前缀字
using System;
using System.Threading;
class OneAtATimePlease
{
static void Main(string[] args){
using(var mutex = new Mutex(false, "test oreilly"))
{
if(!mutex.WaitOne(TimeSpan.FromSeconds(3), false)){
Console.WriteLine(" another is running");
Console.Read();
return;
}
RunProgram();
}
}
static void RunProgram()
{
Console.WriteLine("To exit");
Console.Read();
}
}
Semaphore允许多个Thread在同一区段执行, 超过此容量的thread会block等待
把Semaphore的容量设为1, 就和lock与mutex一样, 但Semaphore的Release是任何thread都能呼叫
SemaphoreSlim有更低延迟, 且能带cancellation token, 用在parallel programming
如果Semaphore有给名字, 也是能跨Process
下面范例是最多3个Thread进入
using System;
using System.Threading;
class SemaphoreClub
{
static SemaphoreSlim _sem = new SemaphoreSlim(3);
static void Main(string[] args){
for(int i = 0 ; i < 5; ++i){
new Thread(Enter).Start(i);
}
Console.Read();
}
static void Enter(object id)
{
Console.WriteLine(id + " wants to enter");
_sem.Wait();
Console.WriteLine(id + " is in!");
Thread.Sleep(500 * (int) id);
Console.WriteLine(id + " is leaving");
_sem.Release();
}
}
开发时要维护该type所有Thread-safe栏位
Thread-safety有效能上的花费, 即使没有多执行绪也必要花费
使用Thread-safe的type不一定能让执行程序thread-safe
基本的用法是用exclusive lock去锁定特定的程序码而达到thread-safe
另外是减少共享资料, 达到无状态的功能, 比如ASP.NET Web的Request, 大都是独自处理
最後是用automatic locking regime的方式, 对class或property加上ContextBoundObject和Synchronization属性, 就能自动有锁的功能. 但缺点是会产生另一种方式的死结、并发性差、意外重入等问题. 尽量用exlusive lock.
Enumeration是thread-unsafe的行为, 所以共同资料要enumeration时, 先宣告一个local变数, 再用lock的方式copy (ToList, ToArray等)到local变数
Enumeration的另一种解法是reader/writer lock
class ThreadSafe
{
static List<string> _list = new List<string>();
static void Main()
{
new Thread(AddItem).Start();
new Thread(AddItem).Start();
}
static void AddItem()
{
lock (_list) _list.Add("Item " + _list.Count());
string[] items;
lock (_list) items = _list.ToArray();
foreach(string s in items) Console.WriteLine(s);
}
}
Locking around thread-safe objects
Static members
.net framework设计static member是thread-safe, 而实例的member不是. 比如取DateTime.Now, 就不需要去用lock来取
static function不是thread-safe, 要确认功能对资料的共享性
Read-only thread safety
能在文件注明该collection是只读访问的thread-safe, 并要求使用者在只读的方法做写入
实作ToArray等, 本身会有thread-unsafe的issue
如果文件缺少说明, 要注意是否某些方法是read-only. 比如Random.Next(), 内部实作有更新private seed, 因此要用lock取值或者分开的Random物件
在WPF或Winform, UI的元件有Affinity特性, 代表哪个thread建立元件, 那元件只能被那thread存取.
所以别的thread需要marshal原本thread来控制元件, 比如Winform的Invoke或BeginInvoke, WPF的Invoke或BeginInvoke
Invoke是同步方法, 会block目前thread; BeginInvoke是非同步方法, 立即回传caller而marshal的request会进到queue(和keyboard, mouse的事件使用同样的message queue)
Worker threads versus UI threads
Rich client有两大thread: UI Thread和Worker Thread
UI Thread专门建立UI元件, Worker thread一般用来执行long-running job
Rich client都会有一个UI Thread且是Main thread, 再由它生成work thread, 可直接生成或者用BackgroundWorker
Single Document Interface (SDI), 像是Word, 会有多个UI Thread
物件能封装成里面的状态不能被内部与外部改变, 称为immutable object. 决定它内部值是在Constructor且值是Read-only. 可减少lock的执行时间.
下面范例是建立一个immutable object, 只有assign新物件才会需要lock, 取值不需要
class ProgressStatus
{
public readonly int PercentComplete;
public readonly string StatusMessage;
public ProgressStatus(int percentComplete, string statusMessage)
{
PercentComplete = percentComplete;
StatusMessage = statusMessage;
}
}
class Program
{
readonly object _statusLocker = new object();
ProgressStatus _status;
void SomeFunction()
{
_status = new ProgressStatus(50, "Working on it");
ProgressStatus statusCopy;
lock(_statusLocker) statusCopy = _status;
int pc = statusCopy.PercentComplete;
string msg = statusCopy.StatusMessage;
}
}
在int pc = ... 的最後2行, 有隐含用Memory barrier包装
後续不使用lock, 还会有显示Memory barrier, Interlocked.CompareExchange, spin-waits等功能可用
Signaling是指thread会一直等待, 直到收到从别的Thread发的通知
和一般C#的event不相关
3种类型: AutoResetEvent, ManualResetEvent, CountdownEvent
Construct | Purpose | Cross-Process ? | Overhead |
---|---|---|---|
AutoResetEvent | 允许一个thread当收到singal时,执行一次unblock | Yes | 1000 ns |
ManualResetEvent | 允许一个thread当收到singal时,执行无限期的unblock (直到它重置) | Yes | 1000 ns |
ManualResetEventSlim (Net Framework 4) | 同ManualResetEvent | 40 ns | |
CountdownEvent (Net Framework 4) | 允许一个thread当收到预定数量的singal时,执行unblock | 40 ns | |
Barrier (Net Framework 4) | 实作Thread执行屏障 | 80 ns | |
Wait and Pulse | 允许一个thread block直到某条件达成 | 120 ns for a Pulse |
它像是一个票闸, 插入一张票只让一个人过
Thread 在门闸时呼叫WaitOne来wait/block, 而呼叫Set插入票
如果有多个thread在门闸呼叫WaitOne, 变成queue排队
Ticket可以来自任何thread, 代表任何unblock的thread可存取该AutoResetEvent物件并呼叫Set
在constructor代入true的话, 代表直接呼叫Set
用EventWaitHandle可达到相同的功能 (EventWaitHandle是AutoResetEvent的父类别)
var auto = new AutoResetEvent (false);
// 等同写法
var auto2 = new EventWaitHandle(false, EventResetMode.AutoReset);
using System;
using System.Threading;
class TestAutoResetEvent
{
static EventWaitHandle _waitHandle = new AutoResetEvent(false);
static void Main(string[] args){
new Thread(() => {
Console.WriteLine("Wait...");
_waitHandle.WaitOne();
Console.WriteLine("awake");
}).Start();
Thread.Sleep(1000);
_waitHandle.Set();
Console.Read();
}
}
Producer/consumer queue
一个queue用来放进要执行的任务, 而其他thread在背景从这queue挑任务来做
用这种queue能有效管理要执行的thread数量, 比如IO密集型任务可只安排一个thread, 其他需要10个
CLR的Thread pool也是一种Producer/consumer queue
queue插入的资料会有对应的任务, 比如填入档案名称, 而对应的任务是加密该档案
以下用AutoResetEvent实作范例
using System;
using System.Collections.Generic;
using System.Threading;
namespace ProducerConsumerTest
{
class Program
{
static void Main(string[] args)
{
using (ProducerConsumerQueue q = new ProducerConsumerQueue())
{
q.EnqueueTask("Hello");
for(int i = 0; i < 20; ++i)
{
q.EnqueueTask("Say " + i);
}
q.EnqueueTask("Good bye");
}
}
}
public class ProducerConsumerQueue : IDisposable
{
EventWaitHandle _wh = new AutoResetEvent(false);
Thread _worker;
readonly object _locker = new object();
Queue<string> _tasks = new Queue<string>();
public ProducerConsumerQueue()
{
_worker = new Thread(Work);
_worker.Start();
}
public void EnqueueTask(string task)
{
lock (_locker)
{
_tasks.Enqueue(task);
}
_wh.Set();
}
public void Dispose()
{
EnqueueTask(null); // signal the consumer to exit
_worker.Join(); // wait for the consumer's thread to finish
_wh.Close(); // release any OS Resources
}
private void Work()
{
while (true)
{
string task = null;
lock(_locker)
{
if(_tasks.Count > 0)
{
task = _tasks.Dequeue();
if(task == null)
{
return;
}
}
}
if(task != null)
{
Console.WriteLine("Performing task : " + task);
Thread.Sleep(1000); // simulate work...
}
else
{
_wh.WaitOne(); // no more tasks , wait for a signal
}
}
}
}
}
用lock去锁定queue, 达到thread-safe
在enqueue之後, 呼叫Set, 通知在while(true)有wait的thread可以往下做
如果caller插入null的资料, 直接结束
queue如果是空的, 会呼叫WaitOne等待signal
在Dispose的实作, 呼叫Enqueue(null), 让Work方法读到null而return结束, 否则Thread的Join永远不结束; 对EventWaitHandle呼叫Close, 可以释放内部有用到的资源
.Net Framework 4 有BlockingCollection, 实作Producer/Consumer queue
上述用AutoResetEvent的Producer/Consumer queue是个好的范例, 未来加上cancellation或bounded queue, 都可以此为起点
和AutoResetEvent相比, ManualResetEvent是一般的闸门, 呼叫Set时, 让所有等待(有呼叫过WaitOne)的Thread全都能进入
呼叫Reset能把闸门关上
呼叫WaitOne就会Block
等同的写法
var manual1 = new ManualResetEvent(false);
var manual2 = new EventWaitHandle(false, EventResetModel.ManualReset);
另一个是ManualResetEventSlim能执行更快且支援CancellationToken, 但不能跨Process
ManualResetEvent是让一个Thread允许多个Thread unblock, CountdownEvent则相反
用CountdownEvent可以等多个Thread执行後再往後执行
在.NET Framework 4之前, 可以用Wait and Pulse来实作CountdownEvent
建构CountdownEvent指定要的数量, 呼叫Wait则block该thread, 而呼叫Signal会降低count, 直到count为0, 该thread将unblock
以下范例是等待3个Thread执行後, 才继续执行
using System;
using System.Threading;
public class Program
{
static CountdownEvent _countDown = new CountdownEvent(3);
public static void Main()
{
new Thread(SaySomething).Start("Thread 1");
new Thread(SaySomething).Start("Thread 2");
new Thread(SaySomething).Start("Thread 3");
_countDown.Wait();
Console.WriteLine("All threads have finished");
}
static void SaySomething(object msg)
{
Thread.Sleep(3000);
Console.WriteLine(msg);
_countDown.Signal();
}
}
Count可以用AddCount来加更多需等待的数量, 但如果已经达到count = 0而又呼叫AddCount, 将抛出exception
建议可用TryAddCount, 回传false代表count已经是0
呼叫Reset将Count回到初始值
EventWaitHandle可以指定名字, 让多个Process根据同一个名字而共同参考
基本用法:
EventWaitHandle wh = new EventWaitHandle(false, EventResetMode.AutoReset, "MyCompany.MyApp.Name");
using System;
using System.Threading;
namespace TestWaitHandleThreadPool
{
class Program
{
static ManualResetEvent _starter = new ManualResetEvent(false);
static void Main(string[] args)
{
RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject(_starter, Go, "Some Data", -1, true);
Thread.Sleep(5000);
_starter.Set();
Console.ReadLine();
reg.Unregister(_starter);
}
static void Go(object data, bool timeOut)
{
Console.WriteLine("Start work : " + data);
}
}
}
参数-1代表不用timeout, 如果有timeout的话, 会检测传送的物件(范例是Some Data字串)的状态; 参数true代表该Thread pool收到signal後, 不再重设要Wait.
假如原本用WaiOne的方式处理, Server收到100个任务就得new 100个Thread, 变成绑定太多且大量Block. 改写的方法如下, 让後续的委托工作都给‘hread Pool处理
void AppServerMethod()
{
_wh.WaitOne();
// ... continue execution
}
// 变成
void AppServerMethod()
{
RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject(_starter, Resume, null, -1, true);
// ...
}
static void Resume(object data, bool timeOut)
{
// ... continue execution
}
WaitHandle提供static method, 包含WaitNay, WaitAll, SignalAndWait, 可以对有继承WaitHandle的物件使用较复杂的Signal/Wait的功能
WaitAny: 等待任一个Thread收到Signal
WaitAll: 等待所有Thread都收到Signal
SignalAndWait: 对第一个参数的thread发出signal, 对第二个参数的thread做等待
Alternatives to WaitAll and SignalAndWait
WaitAll和SignalAndWait不能在单一执行绪的环境执行.
SignalAndWait的替代方案是Barrier类别, 而WaitAll的替代方案是Parallel class的Invoke方法
继承ContextBoundObject且加上Synchronization属性, CLR在这物件会自动使用lock
如下范例, 每个Demo函式会排队执行
using System;
using System.Runtime.Remoting.Contexts;
using System.Threading;
namespace TestAutoLock
{
class Program
{
static void Main(string[] args)
{
AutoLock safeInstance = new AutoLock();
new Thread(safeInstance.Demo).Start();
new Thread(safeInstance.Demo).Start();
safeInstance.Demo();
}
}
[Synchronization]
public class AutoLock : ContextBoundObject
{
public void Demo()
{
Console.Write("Thread id : " + Thread.CurrentThread.ManagedThreadId);
Console.Write(" Start.....");
Thread.Sleep(1000);
Console.WriteLine("End");
}
}
}
自动lock不包含static的成员和没有继承ContextBoundObject的物件(比如Form)
想像是CLR将原始Class套上一层ContextBoundObject Proxy, 能呼叫原始Class的成员, 再为它的方法都加上同步化的功能
如果前面的AutoLock是个Collection, 则使用它的物件也必须是ContextBoundObject, 否则存取它的item需要手动加上lock
Synchronization Context预设会延伸从同一层Scope的Context, 也就是lock包含的深度一直向下
在Synchronization的attribute可以改变预设的行为, 有这些选项:
NOT_SUPPORTED: 就跟没加上Synchronization的属性一样
SUPPORTED: 如果来自别的synchronized 物件做初始化, 则延伸它的context, 否则保持unsynchronized
REQUIRED (预设): 如果来自别的synchronized 物件做初始化, 则延伸它的context, 否则建立新的Context
REQUIRES_NEW: 总是建立新增Synchronization context
using System;
using System.Runtime.Remoting.Contexts;
using System.Threading;
namespace TestAutoLockDeadlock
{
[Synchronization]
public class Deadlock : ContextBoundObject
{
public Deadlock Other;
public void Demo()
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
Console.WriteLine("Call other");
Other.Hello();
}
void Hello()
{
Console.WriteLine("hello");
}
}
class Program
{
private static void Main(string[] args)
{
Deadlock dead1 = new Deadlock();
Deadlock dead2 = new Deadlock();
dead1.Other = dead2;
dead2.Other = dead1;
new Thread(dead1.Demo).Start();
dead2.Demo();
Console.Read();
}
}
}
两个Deadlock物件都是在Program建立, Program本身是unsynchronized, 所以Deadlock物件建立各自的Synchronization Context, 也有各自的lock
呼叫对方的Hello方法後, 即发生Deadlock
Reentrant的定义是, 如果有段程序码被中断, 执行绪去执行别的程序, 之後再回来执行这段程序而没造成影响
通常Thread-safe和reentrant视为同等
如果[Synchronization(true)]这样使用, 代表需要reentry, 当执行离开此程序码时, 会把lock释放, 可避免deadlock. 副作用是在这释放期间, 任何thread可以进入该物件的context(比如呼叫它的方法)
[Synchronization(true)]是类别层级, 所以在非该context的呼叫都会当class层面的木马(?)
如果没有reentrancy, 则在一些场合比较难工作, 比如在一个synchronized class实作多执行绪, 将逻辑委托给其他worker thread, 则worker thread彼此间要沟通没有reentrancy的话, 将会受阻.
同步自动锁造成deadlock, reentrancy, 删除并发等问题, 在一些应用场合没有手动lock来的好用
>>: [读书笔记] Threading in C# - PART 3: USING THREADS
茕茕白兔,东走西顾。衣不如新,人不如故。心理学研究#怀旧,会让人变得积极乐观,本来想读VB重温16岁...
v-if 条件渲染 Vue 之中还有一个相当实用的功能就是条件渲染了,条件渲染类似於使用 if el...
前言 今天比较赶 我先贴code有空在补充搂 目标 练习blade layout 界街前後端 tra...
Python的资料储存容器, 可以分为tuple、串列(list)、字典(dict)与集合(set)...
上一篇我们修改了资料库 并且成功地把BeautifulSoup的资料送到Database内了 今天我...