实例解析C++/CLI线程之线程状态持久性

来源:岁月联盟 编辑:zhu 时间:2006-09-23
  其他形式的同步

  我们可使用类Monitor与类Thread中的某些函数,直接控制线程的同步,请看例1。

  例1:

using namespace System;
using namespace System::Threading;

int main()
{
 /*1*/ MessageBuffer^ m = gcnew MessageBuffer;

 /*2a*/ ProcessMessages^ pm = gcnew ProcessMessages(m);
 /*2b*/ Thread^ pmt = gcnew Thread(gcnew ThreadStart(pm,&ProcessMessages::ProcessMessagesEntryPoint));
 /*2c*/ pmt->Start();

 /*3a*/ CreateMessages^ cm = gcnew CreateMessages(m);
 /*3b*/ Thread^ cmt = gcnew Thread(gcnew ThreadStart(cm, &CreateMessages::CreateMessagesEntryPoint));
 /*3c*/ cmt->Start();

 /*4*/ cmt->Join();
 /*5*/ pmt->Interrupt();
 /*6*/ pmt->Join();

 Console::WriteLine("Primary thread terminating");
}

public ref class MessageBuffer
{
 String^ messageText;
 public:
  void SetMessage(String^ s)
  {
   /*7*/ Monitor::Enter(this);
   messageText = s;
   /*8*/ Monitor::Pulse(this);
   Console::WriteLine("Set new message {0}", messageText);
   Monitor::Exit(this);
  }

  void ProcessMessages()
  {
   /*9*/ Monitor::Enter(this);
   while (true)
   {
    try
    {
     /*10*/ Monitor::Wait(this);
    }
    catch (ThreadInterruptedException^ e)
    {
     Console::WriteLine("ProcessMessage interrupted");
     return;
    }

    Console::WriteLine("Processed new message {0}", messageText);
   }
   Monitor::Exit(this);
  }
};

public ref class CreateMessages
{
 MessageBuffer^ msg;
 public:
  CreateMessages(MessageBuffer^ m)
  {
   msg = m;
  }

  void CreateMessagesEntryPoint()
  {
   for (int i = 1; i <= 5; ++i)
   {
    msg->SetMessage(String::Concat("M-", i.ToString()));
    Thread::Sleep(2000);
   }
   Console::WriteLine("CreateMessages thread terminating");
  }
};

public ref class ProcessMessages
{
 MessageBuffer^ msg;
 public:
  ProcessMessages(MessageBuffer^ m)
  {
   msg = m;
  }

  void ProcessMessagesEntryPoint()
  {
   msg->ProcessMessages();
   Console::WriteLine("ProcessMessages thread terminating");
  }
};
  在标记1中,创建一个MessageBuffer类型的共享缓冲区;接着在标记2a、2b、2c中,创建了一个线程用于处理放置于缓冲区中的每条信息;标记3a、3b和3c,也创建了一个线程,并在共享缓冲区中放置了连续的5条信息以便处理。这两个线程已被同步,因此处理者线程必须等到有"东西"放入到缓冲区中,才可以进行处理,且在前一条信息被处理完之前,不能放入第二条信息。在标记4中,将一直等待,直到创建者线程完成它的工作。

  当标记5执行时,处理者线程必须处理所有创建者线程放入的信息,因为使用了Thread::Interrupt让其停止工作,并继续等待标记6中调用的Thread::Join,这个函数允许调用线程阻塞它自己,直到其他线程结束。(一个线程可指定一个等待的最大时间,而不用无限等待下去。)

  线程CreateMessages非常清晰明了,它向共享缓冲区中写入了5条信息,并在每条信息之间等待2秒。为把一个线程挂起一个给定的时间(以毫秒计),我们调用了Thread::Sleep,在此,一个睡眠的线程可再继续执行,原因在于运行时环境,而不是另一个线程。

  线程ProcessMessages甚至更加简单,因为它利用了类MessageBuffer来做它的所有工作。类MessageBuffer中的函数是被同步的,因此在同一时间,只有一个函数能访问共享缓冲区。

  主程序首先启动处理者线程,这个线程会执行ProcessMessages,其将获得父对象的同步锁;然而,它立即调用了标记10中的Wait函数,这个函数将让它一直等待,直到再次被告之运行,期间,它也交出了同步锁,这样,允许创建者线程得到同步锁并执行SetMessage。一旦函数把新的信息放入到共享缓冲区中,就会调用标记8中的Pulse,其允许等待同步锁的任意线程被唤醒,并继续执行下去。但是,在SetMessage执行完成之前,这些都不可能发生,因为它在函数返回前都不可能交出同步锁。如果情况一旦发生,处理者线程将重新得到同步锁,并从标记10之后开始继续执行。此处要说明的是,一个线程即可无限等待,也可等到一个指定的时间到达。插1是程序的输出。

  插1:

Set new message M-1
Processed new message M-1
Set new message M-2
Processed new message M-2
Set new message M-3
Processed new message M-3
Set new message M-4
Processed new message M-4
Set new message M-5
Processed new message M-5
CreateMessages thread terminating
ProcessMessage interrupted
ProcessMessages thread terminating
Primary thread terminating
  请仔细留意,处理者线程启动于创建者线程之前。如果以相反的顺序启动,将会在没有处理者线程等待的情况下,添加第一条信息,此时,没有可供唤醒处理者线程,当处理者线程运行到它的第一个函数调用Wait时,将会错过第一条信息,且只会在第二条信息存储时被唤醒。


  管理线程

  默认情况下,如果一个线程是前台线程,它将会一直执行下去,直到进入点函数结束,而不管它父类的生命期是多久;而在另一方面,后台线程则会在父类线程结束时自动结束。可通过设置Thread的IsBackground属性,把一个线程配置为后台线程,用同样的方法,也可把一个后台线程配置为前台线程。

  一旦线程被启动,它即为活跃线程,可通过检查Thread的IsAlive属性来判断一个线程是否为活跃线程;通过调用Wait函数,并传递给它一个零毫秒,可使一个线程放弃剩余的CPU时间片;另外,线程还可通过CurrentThread::Thread::CurrentThread属性得到其自己的Thread对象。

  每个线程都有与之相关的优先级,运行时环境(即操作)通过它来调度线程的执行,可通过Thread::Priority属性来设置或检测线程的优先级,它的范围从ThreadPriority::Lowest 到ThreadPriority::Highest,默认情况下,线程的优先级为ThreadPriority::Normal。另外,因为实现环境的不同,线程调度会有所不同,所以在控制线程方面,不应该过分依赖线程的优先级。

  易变字段(域)

  volatile这个限定类型告诉编译器,可能会有多个线程控制或访问它所指定的对象,尤其是,一个或多个线程可能将异步读写此变量。基本上,这个限定词是强制编译器在进行优化时不要那么"激进"。

  请看例2中的代码段,在缺少volatile时,标记1中的代码完全可以忽略,因为在标记2中立即就改写了i的值;然而,指定了volatile后,编译器则必须执行这两行代码。

  例2:

volatile int i = 0;
/*1*/ i = 10;
/*2*/ i = 20;
/*3*/ if (i < 5 || i > 10) {
// ...
}

int copy = i;
/*4*/ if (copy < 5 || copy > 10) {
// ...
}
  在标记3中,编译器必须生成取回值i的代码两次,但是,在两次取值过程中,数值都有可能改变。为确保我们测试的是同一个值,在此不得不以类似标记4的代码来代替。通过把值i的一个快照存储在一个非易变的变量中,我们就可以安全地多次使用这个值了--因为它的值不可能在"后台"改变。在此,使用volatile,可避免对特定类型变量的显式异步访问。本篇文章发表于

  线程局部存储

  当编写多线程应用程序时,只在特定的线程中使用特定的变量,这是一个非常好的习惯,请看例3的程序:

  例3:

using namespace System;
using namespace System::Threading;

public ref class ThreadX
{
 /*1*/ int m1;
 /*2*/ static int m2 = 20;
 /*3*/ [ThreadStatic] static int m3 = 30;

 public:
  ThreadX()
  {
   m1 = 10;
  }
 
  void TMain()
  {
   String^ threadName = Thread::CurrentThread->Name;

   /*4*/ Monitor::Enter(ThreadX::typeid);
   for (int i = 1; i <= 5; ++i)
   {
    ++m1;
    ++m2;
    ++m3;
   }
   Console::WriteLine("Thread {0}: m1 = {1}, m2 = {2}, m3 = {3}",
threadName, m1, m2, m3);
   Monitor::Exit(ThreadX::typeid);
 }
};

int main()
{
 /*5*/ Thread::CurrentThread->Name = "t0";

 ThreadX^ o1 = gcnew ThreadX;
 Thread^ t1 = gcnew Thread(gcnew ThreadStart(o1, &ThreadX::TMain));
 t1->Name = "t1";

 ThreadX^ o2 = gcnew ThreadX;
 Thread^ t2 = gcnew Thread(gcnew ThreadStart(o2, &ThreadX::TMain));
 t2->Name = "t2";

 t1->Start();
 /*6*/ (gcnew ThreadX)->TMain();
 t2->Start();
 t1->Join();
 t2->Join();
}
  m1是一个实例字段,所以每个ThreadX的实例都有一份各自的拷贝,且在父类对象的生命期中都会存在;而另一方面,m2是一个类字段,所以对类来说,不管有几个类的实例,它只有单独的一个,从理论上来说,它将会一直存在,直到程序结束。但这两个字段都不是特定于某个线程的,如果以适当的构造,这两种类型的字段都能被多个线程访问。

  简单来说,线程局部存储就是特定线程拥有的某段内存,这段内存在新线程创建时被分配,而在线程结束时被释放,它结合了局部变量的私有性和静态变量的持久性。通过指定ThreadStatic属性,可把一个字段标记为线程局部类型,如例中的标记3所示,在成为静态字段之后,m3甚至还能有一个初始化函数。

  函数TMain为新线程的入口点,这个函数只是简单地递增这三个变量:m1、m2和m3,每回5次,并打印出它们当前的值。标记4中的同步锁保证了在这些字段递增或打印时,另一个线程不会同时访问它们。

  在标记5中,主线程把它的名字设置为t0,接着创建并启动了两个线程,另外,它也把TMain当作了一个普通函数直接调用,而不是作为创建的新线程的一部分来调用。程序的输出请见插2。

  插2:

Thread t0: m1 = 15, m2 = 25, m3 = 35
Thread t1: m1 = 15, m2 = 30, m3 = 5
Thread t2: m1 = 15, m2 = 35, m3 = 5
  每个线程都有其自己的m1实例,它被初始化为10,所以在递增5次之后,每个线程中的值都为15。而m2则有所不同,所有的三个线程都共享同一变量,所以这一变量被递增了15次。

  线程t1与t2在经过线程创建过程之后,每个都有其自己的m3,然而,这些线程局部变量会被赋予默认的零值,而不是在源代码中初始化的30,注意了,在经过5次递增之后,各个值均为5,而线程t0则有所不同,正如我们所看到的,这个线程不是由创建其他两个线程同样的机制创建的,所以,它的m3会接受显式初始化的值30。同时也请注意标记6,TMain作为一个普通函数被调用,而不是作为创建的新线程的一部分。


  原子性与互锁操作

  如果存在这样一种情况:一个应用程序有多个线程并行运行,每个线程对某些共享的整形变量,都有写操作--只是简单地使用++把变量递增1。这看起来似乎没什么问题,毕竟,还算像是一个原子性操作,但在多数中--至少从机器指令的角度来看,C++/CLI执行环境对所有整形类型,并不能普遍地保证无误。

  作为示例,例4中的程序有三个线程,每个线程都同时递增一个共享的64位整形变量一千万次,最后显示出这个变量的最终值,从理论上说,应该共递增了三千万次。这个程序目前可以两种方式运行:默认方式使用++操作符以非同步方式运行;而另一种方式,通过带有命令行参数Y或y,这回使用了一个同步的库递增函数。

  例4:

using namespace System;
using namespace System::Threading;

static bool interlocked = false;
const int maxCount = 10000000;
/*1*/ static long long value = 0;

void TMain()
{
 if (interlocked)
 {
  for (int i = 1; i <= maxCount; ++i)
  {
   /*2*/ Interlocked::Increment(value);
  }
 }
 else
 {
  for (int i = 1; i <= maxCount; ++i)
  {
   /*3*/ ++value;
  }
 }
}

int main(array<String^>^ argv)
{
 if (argv->Length == 1)
 {
  if (argv[0]->Equals("Y") || argv[0]->Equals("y"))
  {
   interlocked = true;
  }
 }

 /*4*/ Thread^ t1 = gcnew Thread(gcnew ThreadStart(&TMain));
 Thread^ t2 = gcnew Thread(gcnew ThreadStart(&TMain));
 Thread^ t3 = gcnew Thread(gcnew ThreadStart(&TMain));

 t1->Start();
 t2->Start();
 t3->Start();
 t1->Join();
 t2->Join();
 t3->Join();

 Console::WriteLine("After {0} operations, value = {1}", 3 * maxCount, value);
}
  当使用标准++操作符时,程序5次连续执行之后,输出如插3所示,可看出,结果与正确答案相距甚远,简单估算,大概有17%至50%的递增操作未正确完成;当程序运行于同步方式时--即使用Interlocked::Increment,所有的三千万次递增操作都正常完成,结果计算正确。本篇文章发表于

  插3:

  使用++操作符的输出

After 30000000 operations, value = 14323443
After 30000000 operations, value = 24521969
After 30000000 operations, value = 20000000
After 30000000 operations, value = 24245882
After 30000000 operations, value = 25404963
  使用Interlocked递增函数的输出

After 30000000 operations, value = 30000000
  另外,补充一点,Interlocked类还有另一个decrement函数。