[转帖]TUXEDO通信模式以及示例_MQ, Tuxedo及OLTP讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  MQ, Tuxedo及OLTP讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 3103 | 回复: 0   主题: [转帖]TUXEDO通信模式以及示例        下一篇 
xiaojiang
注册用户
等级:少尉
经验:345
发帖:77
精华:0
注册:2011-8-31
状态:离线
发送短消息息给xiaojiang 加好友    发送短消息息给xiaojiang 发消息
发表于: IP:您无权察看 2014-11-3 14:36:46 | [全部帖] [楼主帖] 楼主

TUXEDO通讯模式以及示例

TUXEDO的通讯方式

TUXEDO中的客户端与服务端之间可以采用的通讯方式有:

1. 同步调用方式

2. 异步调用方式

3. 管道方式

4. 会话方式

5. 消息方式

6. 事件发布订阅方式

7. /Q方式

注意:

1. 服务端的SERVICE之间,可以采用管道方式,客户端与服务端之间不能采用。

2. 客户端与服务端之间可以采用消息方式,服务端的SERVICE之间不能采用消息方式。

3. 其他通讯方式在服务端的SERVICE之间,及客户端与服务端之间都可以采用。

管道方式(tpforward())在服务端编程中有说明,/Q方式在第十章中在介绍,对这两种方式在本章种不做介绍.

     8.1同步调用方式

如下图所示:在同步请求/回答方式中,客户端使用tpcall()给本地或远程的服务器(由TUXEDO系统根据公告板信息确定)

发送服务请求,此时客户将传送请求服务的名字、用于请求服务的输入参数和输出参数,tpcall()发出后,客户的数据

被传送至服务器,得到相应的服务处理。在此方式下,服务器处理请求时,客户端将等待,不继续运行,直到服务器返

回相应结果。调用过程如图:

例子:客户端通过对一个文件分块,每调用一次TPCALL()发送一块数据,把一个文件从客户端传送到服务端。客户端

与服务端采用FML32缓冲区进行通行。在异步调用方式和会话方式中也用到该例子。可以做一个比较。在该例子中我们把

块的大小定义为

1024字节。采用FML32缓冲区。

FML32定义文件Myfml.h的内容:

*base 100
#name   number  type    flags  comments
FNAME 1 string - -
BNUM 3 long
BID 4 long  -       -
FDATA 5 carray -  -
BSIZE 6 long - -


服务端程序Call.c的内容:

#include <stdio.h>
#include <ctype.h>
#include <atmi.h>
#include <userlog.h>
#include "fml32.h"
#include "myfml.h"
CALL(TPSVCINFO *rqst)
{
      FILE *fp;
      long i=0;
      FBFR32 *rcvbuf;
      char fname[100]="";
      FLDLEN32 len=0;
      long bid=0;
      long bsize=1024;/*传送的块大小为1024字节*/
      char *fdata;
      rcvbuf=(FBFR32 *)rqst->data;
      len=sizeof(bid);
      if(Fget32(rcvbuf,BID,0,(char *)&bid,&len) == -1)
      {
            userlog("Fget32(BID) failure :%s",(char *)Fstrerror32(Ferror32));
            tpreturn(TPFAIL,0,0,0,0);
      }
      len=sizeof(fname);
      if(Fget32(rcvbuf,FNAME,0,fname,&len) == -1)
      {
            userlog("Fget32(FNAME) failure :%s",(char *)Fstrerror32(Ferror32));
            tpreturn(TPFAIL,0,0,0,0);
      }
      strcat(fname,".s");
      if ((fp=fopen(fname, "rb"))==NULL)
      {
            fp=fopen(fname, "wb");
      }
      else if(bid == 0 )
      {
            fclose(fp);
            fp=fopen(fname, "wb");
      }
      else
      {
            fclose(fp);
            fp = fopen(fname, "r+b");
      }
      if(fp == NULL)
      {
            userlog("fopen() %s failure\n",fname);
            tpreturn(TPFAIL,0,0,0,0);
      }
      fdata=(char *)malloc(bsize+1);
      if(fdata == NULL)
      {
            userlog("malloc(fdata) failure");
            tpreturn(TPFAIL,0,0,0,0);
      }
      len=bsize;
      if(Fget32(rcvbuf,FDATA,0,fdata,(FLDLEN32 *)&len) == -1)
      {
            userlog("Fget32(FDATA) failure :%s",(char *)Fstrerror32(Ferror32));
            tpreturn(TPFAIL,0,0,0,0);
      }
      i=bid * bsize;
      if(fseek(fp, i, 0)!=0)
      {
            userlog("fseek() failure\n");
            tpreturn(TPFAIL,0,0,0,0);
      }
      i = fwrite(fdata,1,len,fp);
      if(i != len)
      {
            userlog("fwrite()fail\n");
            tpreturn(TPFAIL,0,0,0,0);
      }
      fclose(fp);
      tpreturn(TPSUCCESS, 0, NULL, 0L, 0);
}


客户端程序callcli.c的内容:

#include <stdio.h>
#include <stdarg.h>
#include "atmi.h"
#include "fml32.h"
#include "myfml.h"
FBFR32 *sendbuf=NULL;
char *filebuf;
FILE *fp;
log(const char *fmt, ...)
{
      va_list ap;
      va_start(ap, fmt);
      vfprintf(stdout, fmt, ap);
      fflush(stdout);
      va_end(ap);
      fclose(fp);
      tpfree((char *)sendbuf);
      free(filebuf);
      tpterm();
      exit(1);
}
main(int argc, char *argv[])
{
      long rcvlen=0;
      long filelen=0;
      long i=0;
      int ret=0;
      long reallen=0;
      long bnum=0;
      long bsize=1024; /*传送的块大小为1024字节*/
      FLDLEN32 len=0;
      if(argc != 2)
      {
            (void) fprintf(stderr, "Usage: %s filename\n",argv[0]);
            exit(1);
      }
      fp = fopen( argv[1], "rb" );
      if(fp == NULL)
      {
            printf("open file:%s failure\n",argv[1]);
            exit(1);
      }
      if(fseek(fp, 0, SEEK_END)!=0)
      {
            perror("fseek() failure:");
            exit(1);
      }
      filelen=ftell(fp);
      if(filelen== -1)
      {
            perror("ftell() failure:");
            exit(1);
      }
      rewind(fp);
      if((sendbuf = (FBFR32 *)tpalloc("FML32", NULL, bsize+1024)) == (FBFR32 *)NULL)
      {
            printf("Error allocating send buffer\n");
            exit(1);
      }
      len = Fsizeof32(sendbuf);
      if(Finit32(sendbuf, (FLDLEN32)len)== -1)
      {
            printf("finit32() failure\n");
            exit(1);
      }
      if (tpinit(NULL) == -1)
      {
            printf("tpinit() failure");
            exit(1);
      }
      filebuf=(char *)malloc(bsize);
      if(filebuf == NULL)
      {
            printf("malloc(filebuf) failure");
            exit(1);
      }
      bnum=(filelen -1)/bsize + 1;
      if(Fchg32( sendbuf, FNAME, 0, argv[1], (FLDLEN32)len )<0)
      {
            log("Fchg32(FNAME) failure\n",Fstrerror32(Ferror32));
      }
      printf("filelen = %ld, block num = %ld\n",filelen,bnum);
      for(i=0;i<bnum;i++)
      {
            if(fseek(fp, i*bsize, 0)!=0)
            {
                  log("fseek failure\n");
            }
            reallen=fread(filebuf, 1, bsize, fp);
            if(reallen!=bsize && feof(fp)==0 )
            {
                  log("fread() failure\n");
            }
            if(Fchg32(sendbuf, BID, 0, (char *)&i, 0)<0)
            {
                  log("Fchg32(BID) failure:%s\n",Fstrerror32(Ferror32));
            }
            printf("bid=%ld\n",i);
            if(Fchg32( sendbuf, FDATA, 0, filebuf, (FLDLEN32)reallen)<0)
            {
                  log("Fchg32(FDATA) failure:%s\n",Fstrerror32(Ferror32));
            }
            ret = tpcall("CALL", (char *)sendbuf, 0, (char **)&sendbuf, &rcvlen, (long)0);
            if(ret == -1)
            {
                  log("tpcall() failure:tperrno=%ld,errstr=%s\n",tperrno,tpstrerror(tperrno));
            }
      }
      log("finished\n");
}


8.2 异步调用方式

如图所示:在异步请求/回答方式中,客户端使用tpacall()给本地或远程的服务器(由TUXEDO系统根据公告板信息确定)

发送服务请求,与同步方式不同的是:在此方式下,服务器处理请求时,客户端继续运行。当客户端想得到请求的处理

结果时,用tpgetrply()将结果取回。调用过程如图:

例子:该例子实现与同步调用方式一样的功能.但在该例子中采用异步通讯方式.

服务端的程序与同步调用方式中的一样,客户端的程序Acallcli.c的内容如下:

注意:每调用TPACALL()50次之后,就要调用tpgetrply()把服务端返回的结果取出,

否则,返回缓冲区会满,会出如下错误:

atpcall() failure:tperrno=5,errstr=TPELIMIT - a system limit has been reached


Acallcli.c的内容:

#include <stdio.h>
#include <stdarg.h>
#include "atmi.h"
#include "fml32.h"
#include "myfml.h"
FBFR32 *sendbuf=NULL;
char *filebuf;
FILE *fp;
log(const char *fmt, ...)
{
      va_list ap;
      va_start(ap, fmt);
      vfprintf(stdout, fmt, ap);
      fflush(stdout);
      va_end(ap);
      fclose(fp);
      tpfree((char *)sendbuf);
      free(filebuf);
      tpterm();
      exit(1);
}
main(int argc, char *argv[])
{
      long rcvlen=0;
      long filelen=0;
      long i=0;
      int ret=0;
      long reallen=0;
      long bnum=0;
      long bsize=1024;/*传送的块大小为1024字节*/
      FLDLEN32 len=0;
      long j=0;
      int cd[50];
      if(argc != 2)
      {
            (void) fprintf(stderr, "Usage: %s filename\n",argv[0]);
            exit(1);
      }
      fp = fopen( argv[1], "rb" );
      if(fp == NULL)
      {
            printf("open file:%s failure\n",argv[1]);
            exit(1);
      }
      if(fseek(fp, 0, SEEK_END)!=0)
      {
            perror("fseek() failure:");
            exit(1);
      }
      filelen=ftell(fp);
      rewind(fp);
      if((sendbuf = (FBFR32 *)tpalloc("FML32", NULL, bsize+1024)) == (FBFR32 *)NULL)
      {
            printf("Error allocating send buffer\n");
            exit(1);
      }
      len = Fsizeof32(sendbuf);
      if(Finit32(sendbuf, (FLDLEN32)len)== -1)
      {
            printf("finit32() failure\n");
            exit(1);
      }
      if (tpinit(NULL) == -1)
      {
            printf("tpinit() failure");
            exit(1);
      }
      filebuf=(char *)malloc(bsize);
      if(filebuf == NULL)
      {
            printf("malloc(filebuf) failure");
            exit(1);
      }
      bnum=(filelen -1)/bsize + 1;
      if(Fchg32( sendbuf, FNAME, 0, argv[1], (FLDLEN32)len )<0)
      {
            log("Fchg32(FNAME) failure\n",Fstrerror32(Ferror32));
      }
      printf("filelen = %ld, block num = %ld\n",filelen,bnum);
      j=0;
      for(i=0;i<bnum;i++)
      {
            if(fseek(fp, i*bsize, 0)!=0)
            {
                  log("fseek failure\n");
            }
            reallen=fread(filebuf, 1, bsize, fp);
            if(reallen!=bsize && feof(fp)==0 )
            {
                  log("fread() failure\n");
            }
            if(Fchg32(sendbuf, BID, 0, (char *)&i, 0)<0)
            {
                  log("Fchg32(BID) failure:%s\n",Fstrerror32(Ferror32));
            }
            printf("bid=%ld\n",i);
            if(Fchg32( sendbuf, FDATA, 0, filebuf, (FLDLEN32)reallen)<0)
            {
                  log("Fchg32(FDATA) failure:%s\n",Fstrerror32(Ferror32));
            }
            ret = tpacall("CALL", (char *)sendbuf, 0, 0);
            if(ret == -1)
            {
                  log("atpcall() failure:tperrno=%ld,errstr=%s\n",tperrno,tpstrerror(tperrno));
            }
            cd[j]=ret;
            j++;
            if(j%50==0)
            {
                  for(j=0;j<50;j++)
                  {
                        if(tpgetrply(&cd[j],(char **)&sendbuf, &rcvlen, (long)0)== -1)
                        {
                              log("tpgetrply() failure:cd=%ld,errstr=%s\n",cd,tpstrerror(tperrno));
                        }
                  }
                  j=0;
            }
      }
      log("finished\n");
}


7.3 会话方式

采用会话通讯方式,通讯双方在建立连接之后,可以多次发送或接收数据,TUXEDO中采用的是半双工的通讯方式,这种

方式特别适用于大批量的数据传输。

名称解释:

发起者(originator,initiator):

发起该会话的进程,它调用tpconnect()与服务端的一个SERVICE建立连接

从属者(subordinate):tpconnect()中指定的SERVICE

发送者(sender):当前拥有发送权的进程,它只能发送数据

接收者(receiver):当前拥有发送权的进程,它只能接收数据

函数说明:

int tpconnect (char* name, char *data, long length, long flags)


描述: 与名为name的SERVICE建立连接

参数:

name:SERVICE的名字

*data:要发送的数据

length:数据的长度

flags:可以为TPNOTRAN, TPNOTIME,TPNOBLOCK,TPSIGRSTRT

TPSENDONLY:发送者只能发送数据,被调用的SERVICE只能接收数据

TPRECVONLY:发送者只能接收数据,被调用的SERVICE只能发送数据

返回值: 成功返回一个标识该连接的标识符,失败为-1

int tpsend(int cd, char *data, long length, long flags,long *revent)


描述:用于发送数据

参数:

cd:    tpconnect()的返回值,用于标识该连接

data:  要发送的数据

length:要发送的数据的长度

flags:(TPNOBLOCK, TPNOTIME, TPSIGRSTRT

TPRECVONLY:把发送权交给接受者,在接受者那里会产生事件TPEV_SENDONLY。

revent:

当返回值为-1时,如果tperrno= TPEEVENT,那么表明有事件发生。可能的事件有:

TPEV_DISCONIMM:当会话的发起者调用tpdiscon(), tpreturn(), tpcommit()时,会话的

从属者会收到该事件。如果有网络故障等,那么会话的发起者也会收到该

事件。

TPEV_SVCFAIL:


会话的发起者会收到该事件,表明会话的从属者调用tpreturn(TPFAIL)

或tpreturn(TPEXIT), 并且该会话的从属者不在拥有该控制权

TPEV_SVCERR:


会话的发起者会收到该事件,表明会话的从属者调用

tpreturn(TPSUCCESS,..)返回,并且该会话的从属者不在拥有该控制权.

返回值:失败为-1,如果tperrno= TPEEVENT,那么导致该调用失败的事件保存在revent中

int tprecv(int cd, char **data, long *length, long flags, long *revent)


描述:用于接收数据

data: 接收的数据放到该缓冲区中

length:接收大的数据的长度

flags:(TPNOCHANGE, TPNOBLOCK, TPNOTIME, TPSIGRSTRT)


revent:当返回值为-1时,如果tperrno= TPEEVENT,那么表明有事件发生。可能的事件有:

TPEV_DISCONIMM:与tpsend()中的含义一样.

TPRECVONLY:该会话的发送者把发送权交给接受者,在接受者这里会产生事TPEV_SENDONLY。

TPEV_SVCFAIL: 与tpsend()中的含义一样.

TPEV_SVCERR:  与tpsend()中的含义一样.

TPEV_SVCSUCC: 该会话的从属者已成功完成并关闭该会话,那么会话的发起者会收到该事件.表明该会话已成功结束.

返回值:失败为-1,如果tperrno= TPEEVENT,那么导致该调用失败的事件保存在revent中

int tpdiscon(int cd)


描述:关闭标识符为cd的会话

参数: tpconnect()的返回值,用于标识该连接

返回值:失败为-1 

会话通讯方式的整个过程如图所示:

例子: 该例子实现与同步通讯的例子一样的功能,但采用的是会话通讯方式.

服务端程序的内容:

#include <stdio.h>
#include <atmi.h>
#include <userlog.h>
#include "fml32.h"
#include "myfml.h"
long bsize=1024;/*传送的块大小为1024字节*/
FBFR32 *rcvbuf;
char fname[256]="";
char *fdata;
tpsvrinit(int argc, char **argv)
{
      if ((rcvbuf=(FBFR32 *)tpalloc("FML32",NULL,(bsize+1024))) == NULL)
      {
            userlog("tpalloc failure\n");
            return(-1);
      }
      if ((fdata=(char *)malloc(bsize+1)) == NULL)
      {
            userlog("malloc failure\n");
            tpfree((char *)rcvbuf);
            return(-1);
      }
      return(0);
}
void tpsvrdone()
{
tpfree((char *)rcvbuf);
free((char *)fdata);
}
CONN(TPSVCINFO *rqst)
{
      FILE *fp;
      FLDLEN32 len=0;
      long bid=0;
      long len1=0;
      long revent=0;
      char tmpbuf1[256];
      while(1)
      {
            if(tprecv(rqst->cd,(char **)&rcvbuf,&len1,0,&revent)==-1)
            {
                  if((tperrno==TPEEVENT)&&(revent==TPEV_SENDONLY))
                  {
                        fclose(fp);
                        break;
                  }
                  else
                  {
                        userlog("tprecv failure:%s,event=%ld\n",tpstrerror(tperrno),revent);
                        tpreturn(TPFAIL,0,0,0L,0);
                  }
            }
            len=sizeof(bid);
            if(Fget32(rcvbuf,BID,0,(char *)&bid,&len) == -1)
            {
                  userlog("Fget(BID) failure :%s",(char *)Fstrerror32(Ferror32));
                  tpreturn(TPFAIL,0,0,0,0);
            }
            if (bid == 0)
            {
                  len=sizeof(fname);
                  if(Fget32(rcvbuf,FNAME,0,fname,&len) == -1)
                  {
                        userlog("Fget(FNAME) failure:%s",(char *)Fstrerror32(Ferror32));
                        tpreturn(TPFAIL,0,0,0,0);
                  }
                  strcat(fname,".s");
                  if((fp=fopen(fname, "wb"))== NULL)
                  {
                        userlog("fopen(%s)failure\n", fname);
                        tpreturn(TPFAIL,0,0,0,0);
                  }
            }
            len=bsize;
            if(Fget32(rcvbuf,FDATA,0,fdata,(FLDLEN32 *)&len) == -1)
            {
                  userlog("Fget(FDATA) failure %s",(char *)Fstrerror32(Ferror32));
                  tpreturn(TPFAIL,0,0,0,0);
            }
            if (fwrite(fdata,1,len,fp) != len)
            {
                  userlog("fwrite(%s) failure\n", fname);
                  tpreturn(TPFAIL,0,0,0,0);
            }
      }
      tpreturn(TPSUCCESS,0,0,0L,0);
}


客户端程序的内容:

#include <stdio.h>
#include <stdarg.h>
#include "atmi.h"
#include "fml32.h"
#include "myfml.h"
static FBFR32 *sendbuf=NULL;
static char *filebuf;
static FILE *fp;
log(const char *fmt, ...)
{
      va_list ap;
      va_start(ap, fmt);
      vfprintf(stdout, fmt, ap);
      fflush(stdout);
      va_end(ap);
      fclose(fp);
      tpfree((char *)sendbuf);
      free(filebuf);
      tpterm();
      exit(1);
}
int main(int argc, char **argv)
{
      long cd=0;
      long revent=0;
      long rcvlen=0;
      long filelen=0;
      long reallen=0;
      long bnum=0;
      FLDLEN32 len=0;
      int bsize = 1024; /*传送的块大小为1024字节*/
      long i = 0;
      if(argc != 2)
      {
            printf("Usage: %s filename\n",argv[0]);
            exit(1);
      }
      if ((fp = fopen(argv[1], "rb" ))== NULL)
      {
            printf("open file:%s failure\n",argv[1]);
            exit(1);
      }
      if (fseek(fp, 0, SEEK_END)!=0)
      {
            printf("fseek() failure:");
            exit(1);
      }
      filelen=ftell(fp);
      rewind(fp);
      if((sendbuf = (FBFR32 *)tpalloc("FML32", NULL, bsize+1024)) == (FBFR32 *)NULL)
      {
            printf("Error allocating send buffer\n");
            exit(1);
      }
      if (tpinit(NULL)== -1)
      {
            printf("tpinit() failure:%s\n",tpstrerror(tperrno));
            exit(1);
      }
      if (cd = tpconnect("CONN",NULL,0,TPSENDONLY)==-1)
      {
            printf("tpconnect: %s\n",tpstrerror(tperrno));
            exit(1);
      }
      if((filebuf=(char *)malloc(bsize+1))== NULL)
      {
            printf("malloc(filebuf) failure");
            exit(1);
      }
      bnum=(filelen -1)/bsize + 1;
      if(Fchg32( sendbuf, FNAME, 0, argv[1], (FLDLEN32)len )<0)
      {
            printf("Fchg32(FNAME) failure\n",Fstrerror32(Ferror32));
            exit(1);
      }
      for(i=0;i<bnum;i++)
      {
            reallen=fread(filebuf, 1, bsize, fp);
            if(reallen!=bsize && feof(fp)==0 )
            {
                  log("fread(%s) failure\n",argv[1]);
            }
            if(Fchg32(sendbuf, BID, 0, (char *)&i, 0)<0)
            {
                  log("Fchg32(FBID) failure:%s\n",Fstrerror32(Ferror32));
            }
            if(Fchg32( sendbuf, FDATA, 0, filebuf, (FLDLEN32)reallen)<0)
            {
                  log("Fchg32(FDATA) failure:%s\n",Fstrerror32(Ferror32));
            }
            revent = 0;
            if(tpsend(cd,(char *)sendbuf,0,TPNOTIME,&revent)==-1)
            {
                  if(tperrno==TPEEVENT)
                  {
                        printf("tpsend():failule,revent=%ld,strerr=%s\n", revent,tpstrerror(tperrno));
                  }
                  else
                  {
                        log("tpsend():failule,revent=%ld,strerr=%s\n", revent,tpstrerror(tperrno));
                  }
            }
      }
      /*发送最后一块时,把发送权交给服务端*/
      revent = 0;
      if(i == bnum)
      {
            if (tpsend(cd,NULL,0,TPRECVONLY,&revent)==-1)
            {
                  log("tpsend3():failule,revent=%ld,strerr=%s\n",revent,tpstrerror(tperrno));
            }
      }
      /*当SERVER中tpreturn(TPSUCCESS,...)时revent == TPEV_SVCSUCC,

     当SERVER中tpreturn(TPFAIL,...)时revent == TPEV_SVCFAIL*/
      if (tprecv(cd,(char **)&sendbuf,&rcvlen,0,&revent)==-1)
      {
            if(revent != TPEV_SVCSUCC)
            {
                  log("tprecv():failule,revent=%ld,strerr=%s\n",revent,tpstrerror(tperrno));
            }
      }
      log("finished");
}


7.4 广播方式

客户端与服务端之间可以采用广播方式(UNSOLICITED NOTIFICATION)进行通讯,一个服务端进程或客户端进程可以给一个

或多个客户端进程发送消息. 收到该消息的客户端进程可调用一个预先定义好的函数对该消息进行处理.整个过程很象

UNIX中的信号处理.

有两种使用方式:

一对一(点对点): 一个服务端进程或客户端进程给某个客户端进程发消息

一对多(广播): 一个服务端进程或客户端进程给符合某个条件的一组客户端发消息

注意:  消息的接受者只能是客户端,不能是服务端。

Tpnotify()


服务端进程或客户端进程                            客户端进程

     (1)点对点通讯方式

服务端进程或客户端进程 

     客户端进程

     (2)广播通讯方式

消息的通知方式有3种:

IGNORE: 该TUXEDO应用系统中的客户端不接收任何消息

SIGNAL: 用SIGUSR1,SIGUSR2信号通知客户端有消息到来,如果在非UNIX平台上设置采用

该方式,那么会被自动转化为DIPIN方式。

DIPIN:  当客户端调用ATMI函数时,顺便检查看是否有消息,如果有调用消息处理函数进行处理。

消息的通知方式在UBBCONFIG文件的RESOURCES中设置,默认为DIPIN,如:下面的设置采用SIGNAL通知方式

NOTIFY          SIGNAL


也可以在tpinit()函数的tpinfo结构体的flags字段中设置:

tpinfo->flags=TPU_DIP; /*采用DIPIN通知方式*/
tpinfo->flags=TPU_IGN;  /*不接收任何的消息*/
tpinfo->flags=TPU_SIG;  /*采用SIGNAL通知方式*/


注意:在tpinfo->flags的设置会覆盖在UBBCONFIG中的设置

与消息通讯方式有关的ATMI:

int tpnotify (CLIENTID *clientid, char *data, long len,long flags)


描述: 一个CLIENT或SERVER调用该函数给某一个CLIENT发消息

参数:clientid: TPSVCINFO结构体种中的clientid字段,可从SERVICE的TPSVCINFO结构体中得到,也可通过MIB调用得到.

data:该消息所包含得数据.要用TPALLOC()函数分配得缓冲区来接收.

len :该数据得长度,只有类型为CARRAY时,才要指定.

flags: 可设为TPNOBLOCK, TPNOTIME,TPSIGRSTRT, TPACK

     TPNOBLOCK, TPNOTIME,TPSIGRSTRT的含义与TPCALL()中的一样

     TPACK:如果设置了,会返回详细的出错消息.

返回值: 失败为-1

int tpbroadcast (char *lmid, char *usrname,char *cltname, char *data, long len, long flags)


描述:个CLIENT或SERVER调用该函数给某一个CLIENT或多个CLIENT发消息

参数:

lmid :给该台机器上的所有CLIENT发消息

usrname:给名为usrname的CLIENT发消息

cltname:给组名cltname的一组CLIENT发消息

data: 该消息所包含得数据.要用TPALLOC()函数分配得缓冲区来接收.

len:该数据得长度,只有类型为CARRAY时,才要指定.

flags: 可以设置为TPNOBLOCK, TPNOTIME, TPSIGRSTRT.

返回值: 失败为-1

说明: lmid,usrname,cltname用于确定要发送的范围,如果某个值为NULL,则表示给所有.

如:tpbroadcast(NULL, NULL,NULL,...)表示给所有的客户端发消息

tpbroadcast(SIMPLE, NULL,NULL,...):表示给LMID=SIMPLE的机器上的所有客户端发送消息.

int tpchkunsol()


描述: 如果消息的通知方式为DIP_IN,那么可用该函数检查消息,如果为IGNORE,SIGNAL,那么该函数将立即返回.如果有未

被处理的消息,该函数将调用消息处理函数进行处理,直到所有收到的消息都处理完,该函数才会返回,

参数:

返回值: 成功返回本次调用处理的消息个数,失败为-1

void (*tpsetunsol(void (_TMDLLENTRY *)(*disp)
(char *data, long len,long flags))) ()


描述:用于设置消息处理函数,调用消息处理函数disp对消息进行处理.

参数:

disp:消息处理函数的名称

data: 该消息所包含得数据.要用TPALLOC()函数分配得缓冲区来接收.

len: 该数据得长度,只有类型为CARRAY时,才要指定.

flags: 现在没用,设为0.

返回值: 成功为0,失败为-1

例子:

在该例子中,客户端程序Broadcast.c可以用于给所有的客户端或某些客户端发消息,

如当TUXEDO应用服务器要重启动时,可以用该程序给所有的CLIENT发一条消息: THE TUXEDO SERVER WILL SHUTDOWN IN 3

MINUTES,PLEASE EXIT. 那么所有没有屏蔽消息通讯方式客户端马上会收到该消息。没有屏蔽消息通讯方式使指

UBBCONFIG中NOTIFY 没有设置为IGNORE,

并且在调用tpinit()时没有设置tpinfo->flags=TPU_IGN。

客户端程序Broadcast.c的内容:

#include <stdio.h>
#include "atmi.h" /* TUXEDO Header File */
main(int argc, char *argv[])
{
      char *sendbuf;
      long sendlen;
      int i;
      if(argc != 2)
      {
            (void) fprintf(stderr, "Usage: %s string\n",argv[0]);
            exit(1);
      }
      sendlen = strlen(argv[1]);
      /* Allocate STRING buffers for the request and the reply */
      if((sendbuf = (char *) tpalloc("STRING", NULL, sendlen+1)) == NULL)
      {
            (void) fprintf(stderr,"Error allocating send buffer\n");
            tpterm();
            exit(1);
      }
      strcpy(sendbuf,argv[1]);
      i=0;
      for(;;)
      {
            sprintf(sendbuf,"this is message:%s,%ld\n",argv[1],i);
            if (tpbroadcast(NULL,NULL,NULL,sendbuf,0,TPSIGRSTRT)==-1)
            {
                  printf("tpbroadcast() failure:%s\n",tpstrerror(tperrno));
                  tpfree(sendbuf);
                  tpterm();
                  exit(1);
            }
            printf(this is message:%s,%ld\n",argv[1],i);
            sleep(1);
            i++;
      }
      tpfree(sendbuf);
      tpterm();
}


客户端程序chkunsol.c的内容:

#include <stdio.h>
#include "atmi.h" /* TUXEDO Header File */
/*myfunc must be defined as below,other define is error*/
void _TMDLLENTRY myfunc(char *buffer,long len,long flag)
{
      long ret;
      char buftype[20],subtype[20];
      ret=tptypes(buffer,buftype,subtype);
      if (ret<0)
      {
            printf("invalid buffer received: %s\n",tpstrerror(tperrno));
      }
      else if(!strcmp(buftype,"STRING"))
      {
            printf("unsolicited message from server:%s,\n",buffer);
      }
      else
      printf("cannot print out buffer:%s\n",tpstrerror(tperrno));
}
main(int argc, char **argv)
{
      char *sendbuf, *rcvbuf;
      long sendlen, rcvlen;
      char *sss;
      int ret;
      int i;
      char buftype[20],subtype[20];
      TPINIT *tpinfo=NULL;;
      if(argc != 2)
      {
            (void) fprintf(stderr, "Usage: simpcl string\n");
            exit(1);
      }
      tpinfo =(TPINIT*)tpalloc("TPINIT",NULL,TPINITNEED(0));
      if (tpinfo == (TPINIT*)NULL)
      {
            fprintf(stderr,"unable to allocate TPINIT buffer\n");
            exit(1);
      }
      strcpy(tpinfo->usrname,"teller001");
      /*strcpy(tpinfo->cltname,"client001");
      strcpy(tpinfo->grpname,"teller");*/
      tpinfo->flags=TPU_DIP;/*TPU_IGN,TPU_DIP,TPU_SIG(the default is TUP_DIP)*/
      if (tpinit(tpinfo) == -1)
      {
            (void) fprintf(stderr, "Tpinit() failed:%s\n",tpstrerror(tperrno));
            exit(1);
      }
      sendlen = strlen(argv[1]);
      /* Allocate STRING buffers for the request and the reply */
      if((sendbuf = (char *) tpalloc("STRING", NULL, sendlen+1)) == NULL)
      {
            (void) fprintf(stderr,"Error allocating send buffer\n");
            tpterm();
            exit(1);
      }
      if((rcvbuf = (char *) tpalloc("STRING", NULL, sendlen+1)) == NULL) {
            (void) fprintf(stderr,"Error allocating receive buffer\n");
            tpfree(sendbuf);
            tpterm();
            exit(1);
      }
      if(tpsetunsol(myfunc)== TPUNSOLERR)
      {
            printf("tpsetunsol() failure:%s\n",tpstrerror(tperrno));
            tpfree(sendbuf);
            tpfree(rcvbuf);
            tpterm();
            exit(1);
      }
      (void) strcpy(sendbuf, argv[1]);
      ret = tpcall("TOUPPER", (char *)sendbuf, 0, (char **)&rcvbuf, &rcvlen, (long)0);
      if(ret == -1)
      {
            fprintf(stderr, "Can't send request to service TOUPPER:%s\n",tpstrerror(tperrno));
            tpfree(sendbuf);
            tpfree(rcvbuf);
            tpterm();
            exit(1);
      }
      sleep(5);
      /*可以把下面的注释去掉,那么直接调用pchkunsol()来检查是否有消息到来并处理.否则要到调用tpterm()时,才会检查

是否有消息到来并处理*/
      /*
      if ((i=tpchkunsol())==-1)
      {
            printf("tpchkunsol() failed %d\n",tperrno);
      }
      else if(i==0)
      {
            printf("tpchkunsol() successful but num = %d\n",i);
      }
      else
      {
            printf("tpchkunsol() successful num = %d\n",i);
      }
      */
      printf("Returned string is: %s\n", rcvbuf);
      /* Free Buffers & Detach from System/T */
      tpfree(sendbuf);
      tpfree(rcvbuf);
      tpterm();
      return(0);
}


在上面的程序中,tpinfo->flags=TPU_DIP,既采用DIP_IN方式处理发送到该客户端的消息.

客户端在执行过程中(从开始到结束),在CLIENT调用ATMI函数(如:TPCALL,TPTERM等)时,

将自动检查看当是否有消息,如果在这过程中有消息到来,那么会自动调用消息处理函数myfunc()处理到来的消息.当

myfunc()执行完之后,程序继续往后走.在程序中也可以直接调用tpchkunsol()检查是否有消息到来.

注意:只有在该客户端执行过程中,发送到该客户端的消息才会被接收到.如在上面的程序中

broadcast.c中每隔一秒发送一条消息,chkunsol.c程序从开始执行到结束大约要5秒时间,所以在调用tpterm()时,该客户

端会收到5条消息.

7.5 发布/订阅方式

客户端与服务端之间还可以通过发布/订阅来进行通讯,对某一事件(或某一类事件)感兴趣的客户端或服务端可以订阅该

事件.当该事件发生时,已订阅了该事件的进程就可以收到该事件。在程序中可对这些事件进行处理。

TUXEDO的事件分为系统事件和用户自定义事件

系统事件:

是由TUXEDO系统内部定义的,如系统重起或SHUTDOWN等,都会产生一个系统事件

用户自定义事件:

由应用自己定义的事件.当某种条件满足时,调用TPPOST()产生该事件,如在银行系

统中,当发现某个帐户透支时,可产生一个事件.

TUXEDO使用以下5中通讯方式告诉订阅了该事件的进程,该事件发生了。

1. UNSOLICTED MESSAGE,当事件发生时,TUXEDO调用TPNOTIFY()给订阅了该事件的客户端进程发通知

2.当事件发生时,往一个QUEUE中发送一条消息,

3.通过TPACALL()调用一个SERVICE

4.执行一个系统命令(只能用于MIB中)

5.往ULOG中写一条日志(只能用于MIB中).

系统事件说明:

在TUXEDO联机文挡的reference下的File Formats and Data Descriptions Reference - Section 5中的EVENTS中可以查

到TUXEDO定义的系统事件. 系统事件也可以被订阅者订阅, TMSYSEVT时用于管理系统事件的SERVER,类似TMUSREVT

用户自定义事件说明

一个客户端或服务端程序通过调用tpsubscribe()订阅某个事件,tpsubscribe()

把有关的订阅信息发送个一个系统的SERVER:TMUSREVT. TMUSREVT把该订阅订阅信息保存到一个名为tmusrevt.dat的文

件中,该文件在$APPDIR目录下.

在此后的某个时刻,在某个客户端或服务端进程中,检测到产生该时间的条件满足,该进程调用tppost()发布该事件.

tppost()告诉TMUSREVT该事件产生了. TMUSREVT根据保存在tmusrevt.dat文件中的订阅信息通知订阅者. 订阅者收到该

通知并进行相应的处理.


函数说明:

long tpsubscribe(char *eventexpr,char * filter, TPEVCTL *ctl, long flags)


描述:订阅者用该函数订阅事件

参数:

char *eventexpr: 要订阅的事件的名称表达式,可采用通配符表示某一类的事件.

char * filter: 对订阅的事件进行过滤

TPEVCTL *ctl: 一个TPEVCTL结构体指针,定义该定义者将采用哪种事件通知方式..

如果为NULL,则采用消息(unsolicited message)通讯方式通知.所以如果

该订阅者如果时一个服务端程序,则不能为NULL,因为服务端不能采用消息(unsolicited message)通讯方式.

long flags:可以为TPNOBLOCK,TPNOTIME,TPSIGRSTRT

TPEVCTL的定义如下:

/* Subscription Control structure */
struct tpevctl_t {
      long flags;
      char name1[XATMI_SERVICE_NAME_LENGTH];
      char name2[XATMI_SERVICE_NAME_LENGTH];
      TPQCTL qctl;
};
typedef struct tpevctl_t TPEVCTL;


说明: flags的值可以为:

t TPEVSERVICE:采用TPACALL()调用名为name1的SERVICE

t TPEVQUEUE:给QUEUE SPACE名为name1中名为name2的QUEUE发送一条消息.

t TPEVTRAN:如果产生该事件的进程当前处于事务模式,那么事件的通知过程也包含在该事务中.

t TPEVPERSIST


在默认情况下:如果通知该订阅者失败,如指定的SERVICE或QUEUE不存在,那么TUXEDO将把该订阅者的订阅信息删除.这样

以后就不会给该订阅者发通知.如果设置了TPEVPERSIST,当通知该订阅者失败,如指定的SERVICE或QUEUE不存在,那么

TUXEDO不会把该订阅者的订阅信息删除.这样以后就还可以继续给该订阅者发通知.

例子:

当名字以BANK.开头的事件发生时,名为DOIT的SERVICE将被调用TPEVCTL cntl;

long handle;
cntl.flags = TPEVSERVICE;
strcpy (cntl.name1, “DOIT”);
handle = tpsubscribe (“BANK.*”, (char *) NULL,
&cntl, TPSIGRSTRT);
int tpunsubscribe(long subscription, long flags)


描述:取消在tpsubscribe()中订阅的事件

参数:

subscription: tpsubscribe()的返回值

flags:可以为

TPNOBLOCK
TPNOTIME
TPSIGRSTRT


返回值:失败为-1

int tppost(char * eventname, char * data, long len,
long flags)


描述:发布一个名为eventname的事件.

参数:

eventname:要发布的事件名

data:要发送个订阅者的数据

len: data的长度,只有该缓冲区为CARRAY才需要指定.

Flags: 可以为:TPNOTRAN

t TPNOREPLY
t TPNOBLOCK
t TPNOTIME
t TPSIGRSTRT


与事件通讯方式有关的配置:

在UBBCONFIG中要配置SERVER: TMSYSEVT, TMSYSEVT,如:

*SERVERS
TMSYSEVT SRVGRP=ADMIN1 SRVID=100
RESTART=Y MAXGEN=5
GRACE=900 CLOPT=”-A “
TMSYSEVT SRVGRP=ADMIN2 SRVID=200
RESTART=Y MAXGEN=5 GRACE=900
CLOPT=”-A -- -S -p 120”


例子:

在一个银行的应用系统中,当某一个存储卡帐号在很短的时间内重试了很多次密码,如在一天之内, 重试了100次密码,企

图从该帐号取钱,那么很可能是该存储卡的主人丢了该存储卡,被别人拣到并试图从该帐号取钱.此时应尽快与该用户联系

,.避免给该客户造成损失。通过下面的程序可以做到这一点。在该程序中可以设定当存储卡帐号在一天之内重试了100

次密码时,产生一个TRY_PASSWD_100事件.并把该帐号的客户信息保存在该事件的数据缓冲区中,客户端订阅该事件,当收

到该事件时,工作人员可根据该事件的提供的信息与该存储卡的主人进行联系.避免给该客户造成损失.提高该银行的服务

水平。

配置文件UBBCONFIG的内容:

*RESOURCES
IPCKEY 123456
DOMAINID simpapp
MASTER simple
MAXACCESSERS 100
MAXSERVERS 50
MAXSERVICES 100
MODEL SHM
*MACHINES
DEMO LMID=simple
APPDIR="d:\tuxdemo\event"
TUXCONFIG="d:\tuxdemo\event\tuxconfig"
TUXDIR="d:\tuxedo65"
MAXWSCLIENTS = 25
*GROUPS
GROUP1
LMID=simple GRPNO=1 OPENINFO=NONE
*SERVERS
DEFAULT:
CLOPT="-A"
TMSYSEVT SRVGRP=GROUP1 SRVID=20
CLOPT="-A -- -f tmsysevt.dat"
TMUSREVT SRVGRP=GROUP1 SRVID=21
CLOPT="-A -- -f tmusrevt.dat"
WSL SRVGRP=GROUP1 SRVID=100
CLOPT="-A -- -n //DEMO:8888 -m 5 -M 10 -x 8"
server SRVGRP=GROUP1 SRVID=150
*SERVICES


服务端程序server.cpp的内容:

#include <stdio.h>
#include <atmi.h>
#include <userlog.h>
EXEC SQL INCLUDE sqlca;
EXEC ORACLE OPTION (RELEASE_CURSOR = YES);
EXEC SQL BEGIN DECLARE SECTION;
char c_account[21]="";
char c_passwd[11]="";
char c_telephone[21]="";
long trytime=0;
/*重新定义STRING变量,使其自动加上'\0',如果是CHAR形的则不能重新定义成STRING变量*/
EXEC SQL VAR c_account IS STRING(21);
EXEC SQL VAR c_passwd IS STRING(11);
EXEC SQL VAR c_telephone IS STRING(21);
EXEC SQL END DECLARE SECTION;
void AUTHEN(TPSVCINFO *rqst)
{
      char *sendbuf;
      char *rcvbuf;
      int amt = 0;
      char ac_account[21]="";
      char ac_passwd[11]="";
      long len=0;
      sendbuf=(char*)tpalloc("STRING",NULL,1024);
      if(sendbuf == NULL)
      {
            userlog("tpalloc() failure:%s",tpstrerror(tperrno));
            tpreturn(TPFAIL,0,0,0L,0);
      }
      /*取客户端传送来的帐号及该帐号的密码*/
      rcvbuf=rqst->data;
      len = sizeof(ac_account);
      ac_account[0]='\0';
      if(Fget32(rcvbuf,ACCOUNT,0,ac_account,&len)== -1)
      {
            sprintf(sendbuf,"Fget32(ACCOUNT) failure: %s\n",Fstrerror32(Ferror32));
            tpreturn(TPFAIL,0,sendbuf,0L,0);
      }
      len = sizeof(ac_passwd);
      ac_passwd[0]='\0';
      if(Fget32(rcvbuf,PASSWD,0,ac_passwd,&len)== -1)
      {
            sprintf(sendbuf,"Fget32(PASSWD) failure: %s\n",Fstrerror32(Ferror32));
            tpreturn(TPFAIL,0,sendbuf,0L,0);
      }
      /*从表CUST_INFO中取该帐号的密码及联系电话*/
      EXEC SQL SELECT passwd, telephone into :c_passwd,:c_telephone from CUST_INFO where account = :ac_account;
      if(sqlca.sqlcode !=0)
      {
            sprintf(sendbuf,"select telephone from CUST_INFO failure: sqlcode=%d, sqlerr=%s\n",sqlca.sqlcode,(char
            *)sqlca.sqlerrm.sqlerrmc);
            tpreturn(TPFAIL,0,sendbuf,0L,0);
      }
      /*如果密码一样,那么认证成功*/
      if((strcmp(c_passwd,ac_passwd))==0)
      {
            strcpy(sendbuf,"authenticate pass");
            tpreturn(TPSUCCESS,0,sendbuf,0L,0);
      }
      /*如果密码不对*/
      else
      {
            /*从表CUST_LOG中取该帐号今天密码的重试次数*/
            EXEC SQL select try_count into :trytime from CUST_LOG where account = :ac_account and date = sysdate;
            if(sqlca.sqlcode !=0)
            {
                  /*如果今天是第一次重试该帐号的密码,表CUST_LOG中是没有记录的,插入一条记录*/
                  if(sqlca.sqlcode == 1403)
                  {
                        EXEC SQL insert into CUST_LOG(account,try_count,date) values(:ac_account,1,sysdate);
                        if(sqlca.sqlcode !=0)
                        {
                              sprintf(sendbuf,"select telephone from CUST_INFO failure: sqlcode=%d, sqlerr=%s\n",sqlca.sqlcode,(char
                              *)sqlca.sqlerrm.sqlerrmc);
                              tpreturn(TPFAIL,0,sendbuf,0L,0);
                        }
                  }
                  sprintf(sendbuf,"select trytime from CUST_LOG failure: sqlcode=%d, sqlerr=%s\n",sqlca.sqlcode,(char *)
                  sqlca.sqlerrm.sqlerrmc);
                  tpreturn(TPFAIL,0,sendbuf,0L,0);
            }
            /*如果今天重试该帐号的密码超过100次,那么产生事件TRY_PASSWD_100*/
            if (trytime>100)
            {
                  sprintf(sendbuf,"account=%s have try 100 times password today,please notice the owner, telephone:
                  %s",ac_account,ac_telephone);
                  if(tppost("TRY_PASSWD_100",(char *)sendbuf,0L,TPNOTRAN|TPSIGRSTRT)==-1)
                  {
                        sprintf(sendbuf,"tppost(TRY_PASSWD_100) failed:%s",tpstrerror(tperrno));
                        tpreturn(TPFAIL,0,sendbuf,0L,0);
                  }
            }
            else
            {
                  EXEC SQL update TRY_PASSWD set try_time = :trytime + 1 where account=:ac_account and date = sysdate;
                  if(sqlca.sqlcode !=0)
                  {
                        sprintf(sendbuf,"update CUST_LOG failure: sqlcode=%d, sqlerr=%s\n",sqlca.sqlcode,(char *)
                        sqlca.sqlerrm.sqlerrmc);
                        tpreturn(TPFAIL,0,sendbuf,0L,0);
                  }
            }
            EXEC SQL commit release;
            if(sqlca.sqlcode !=0)
            {
                  sprintf(sendbuf,"commit failure: sqlcode=%d, sqlerr=%s\n",sqlca.sqlcode,(char *)sqlca.sqlerrm.sqlerrmc);
                  tpreturn(TPFAIL,0,sendbuf,0L,0);
            }
            /*密码不对,认证失败*/
            strcpy(sendbuf,"password is incorrect");
            tpreturn(TPFAIL,0,sendbuf,0L,0);
      }
}


客户端程序client.c的内容:

事件的通知方式采用消息方式

#include <stdio.h>
#include "atmi.h"
/*myfunc must be defined as below,other define is error*/
void _TMDLLENTRY myfunc(char *buffer,long len,long flag)
{
      long ret;
      char buftype[20],subtype[20];
      ret=tptypes(buffer,buftype,subtype);
      if (ret<0)
      {
            printf("invalid buffer received: %s\n",tpstrerror(tperrno));
      }
      else if(!strcmp(buftype,"STRING"))
      {
            printf("unsolicited message from server:%s,\n",buffer);
      }
      else
      printf("cannot print out buffer:%s\n",tpstrerror(tperrno));
}
main(int argc, char *argv[])
{
      int ret;
      if (tpinit((TPINIT *) NULL) == -1)
      {
            (void) fprintf(stderr, "Tpinit failed\n");
            exit(1);
      }
      if(tpsetunsol(myfunc)== TPUNSOLERR)
      {
            printf("tpsetunsol() failure:%s\n",tpstrerror(tperrno));
            tpterm();
            exit(1);
      }
      ret = tpsubscribe("TRY_PASSWD_100",NULL,(TPEVCTL *)NULL,TPSIGRSTRT);
      if (ret == -1)
      {
            printf("tpsubscribe() failure:%s\n",tpstrerror(tperrno));
            tpterm () ;
            exit(1);
      }
      else
      {
            printf("tpsubscribe(TRY_PASSWD_100) success\n");
      }
      /*每隔5秒检查一次*/
      while(1)
      {
            if ((ret=tpchkunsol())==-1)
            {
                  printf("tpchkunsol() failed %s\n",tpstrerror(tperrno));
            }
            else if(ret==0)
            {
                  printf("tpchkunsol() successful but num = %d\n",ret);
            }
            else
            {
                  printf("tpchkunsol() successful num = %d\n",ret);
            }
            sleep(5);
      }
      tpterm();
      return(0);
}


--转自
北京联动北方科技有限公司




赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论