[转帖]Scala Actor:多线程的基础学习_Android, Python及开发编程讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Android, Python及开发编程讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 2908 | 回复: 0   主题: [转帖]Scala Actor:多线程的基础学习        下一篇 
刘伟
注册用户
等级:少校
经验:938
发帖:82
精华:0
注册:2013-6-24
状态:离线
发送短消息息给刘伟 加好友    发送短消息息给刘伟 发消息
发表于: IP:您无权察看 2013-6-27 10:27:03 | [全部帖] [楼主帖] 楼主

Scala Actor是Scala里多线程的基础,核心思想是用消息传递来进行线程间的信息共享和同步。
   Scala Actor线程模型可以这样理解:所有Actor共享一个线程池,总的线程个数可以配置,也可以根据CPU个数决定;当一个Actor启动之后,Scala分配一个线程给它使用,如果使用receive模型,这个线程就一直为该Actor所有,如果使用react模型,Scala执行完react方法后抛出异常,则该线程就可以被其它Actor使用。

  1. def start(): Actor = synchronized { 
  2.        // Reset various flags. 
  3.        // 
  4.        // Note that we do *not* reset `trapExit`. The reason is that 
  5.        // users should be able to set the field in the constructor 
  6.        // and before `act` is called. 
  7.       
  8.        exitReason = 'normal 
  9.        exiting = false 
  10.        shouldExit = false 
  11.       
  12.        scheduler execute { 
  13.              ActorGC.newActor(Actor.this) 
  14.              (new Reaction(Actor.this)).run() 
  15.        } 
  16.       
  17.        this 


复制代码

其中Reaction实现Runnable接口,scheduler基本相当于是一个线程池,所以调用start方法之后会有一个线程来为该Actor服务。

使用receive模型。

  1. def receive[R](f: PartialFunction[Any, R]): R = { 
  2.       assert(Actor.self == this, "receive from channel belonging to other actor") 
  3.       this.synchronized { 
  4.              if (shouldExit) exit() // links 
  5.              val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) 
  6.              if (null eq qel) { 
  7.                    waitingFor = f.isDefinedAt 
  8.                    isSuspended = true 
  9.                    suspendActor() 
  10.              } else { 
  11.              received = Some(qel.msg) 
  12.              sessions = qel.session :: sessions 
  13.        } 
  14.        waitingFor = waitingForNone 
  15.        isSuspended = false 
  16. val result = f(received.get) 
  17. sessions = sessions.tail 
  18. result 


复制代码

如果当前mailbox里面没有可以处理的消息,调用suspendActor,该方法会调用wait;如果有消息,这调用PartialFunction进行处理。

使用react模型。

  1. def react(f: PartialFunction[Any, Unit]): Nothing = { 
  2.       assert(Actor.self == this, "react on channel belonging to other actor") 
  3.       this.synchronized { 
  4.              if (shouldExit) exit() // links 
  5.              val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) 
  6.              if (null eq qel) { 
  7.                    waitingFor = f.isDefinedAt 
  8.                    continuation = f 
  9.                    isDetached = true 
  10.              } else { 
  11.              sessions = List(qel.session) 
  12.              scheduleActor(f, qel.msg) 
  13.        } 
  14.        throw new SuspendActorException 


复制代码

如果当前mailbox没有可以处理的消息,设置waitingFor和continuation,这两个变量会在接收到消息的时候使用;如果有消息,则调用scheduleActor,该方法会在线程池里选择一个新的线程来处理,具体的处理方法也是由PartialFunction决定。不管是哪条路径,react都会立即返回,或者说是立即抛出异常,结束该线程的执行,这样该线程就可以被其它Actor使用。

再来看看接收消息的处理代码。

  1. def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { 
  2.       if (waitingFor(msg)) { 
  3.              received = Some(msg) 
  4.             
  5.              if (isSuspended) 
  6.              sessions = replyTo :: sessions 
  7.              else 
  8.              sessions = List(replyTo) 
  9.             
  10.              waitingFor = waitingForNone 
  11.             
  12.              if (!onTimeout.isEmpty) { 
  13.                    onTimeout.get.cancel() 
  14.                    onTimeout = None 
  15.              } 
  16.             
  17.              if (isSuspended) 
  18.              resumeActor() 
  19.              else // assert continuation != null 
  20.              scheduler.execute(new Reaction(this, continuation, msg)) 
  21.       } else { 
  22.        mailbox.append(msg, replyTo) 


复制代码

如果当前没有在等待消息或者接收到的消息不能处理,就丢到mailbox里去;相反,则进行消息的处理。这里对于receive模型和react模型就有了分支:如果isSuspended为true,表示是receive模型,并且线程在wait,就调用resumeActor,该方法会调用notify;否则就是react模型,同样在线程池里选择一个线程进行处理。

这样,相信大家对Scala Actor就有了一个基本的认识。




赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论