《TUXEDO 系统经典——经乾》 9.6 TUXEDO/Q实现可靠数据传输 源代码_MQ, Tuxedo及OLTP讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  MQ, Tuxedo及OLTP讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 4013 | 回复: 0   主题: 《TUXEDO 系统经典——经乾》 9.6 TUXEDO/Q实现可靠数据传输 源代码        下一篇 
weiwei.fu
注册用户
等级:上尉
经验:661
发帖:47
精华:0
注册:2013-12-12
状态:离线
发送短消息息给weiwei.fu 加好友    发送短消息息给weiwei.fu 发消息
发表于: IP:您无权察看 2014-9-18 9:28:15 | [全部帖] [楼主帖] 楼主

《TUXEDO 系统经典——经乾》 9.6 TUXEDO/Q实现可靠数据传输 源代码,代码已在linux环境下验证运行通过,由于国内涉及到tuxedo中间件的文章不多,此文仅用于热爱tuxedo和即将要学习tuxedo的同仁交流学习。

客户端程序

f_send.c
[cpp] view plaincopy
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
#include<sys/stat.h>
#include<sys/types.h>
#include<fcntl.h>
#include<unistd.h>
#include"atmi.h"
#include"userlog.h"
#include"fml32.h"
#include"ftpfml.h"
#define HEADER_LEN 32
#define F_BLOCKSIZE 128
#define F_SOURCE_DIR "/ngbss/tuxapp/src/chapter09/"
char *data_buf;
FBFR32 *desc_buf;
int send_file(char *f_name);
int main(int argc,char *argv[]){
      if(argc!=2){
            printf("Useage:commond filename\n");
            return -1;
      }
      if(tpinit((TPINIT*)NULL)==-1){
            fprintf(stderr,"tpinit failed!\n");
            return -1;
      }
      if((data_buf=tpalloc("CARRAY",NULL,F_BLOCKSIZE+HEADER_LEN))==NULL){
            fprintf(stderr,"Allocate CARRAY buffer failed!\n");
            tpterm();
            return -1;
      }
      if((desc_buf=(FBFR32*)tpalloc("FML32",NULL,1024))==NULL){
            fprintf(stderr,"Allocate FML32 buffer failed!\n");
            tpfree(data_buf);
            tpterm();
            return -1;
      }
      //发送文件  
      send_file(argv[1]);
      return 0;
}
int send_file(char *f_name){
      int fd,n_byte;
      char sitename[16],head_c[32]="\0";
      long startcid,head_l;
      long blockcnt,rcv_len;
      struct stat f_stat;
      char c_buf[F_BLOCKSIZE];
      TPQCTL qctl;
      qctl.flags=TPQDELIVERYQOS;
      qctl.delivery_qos=TPQQOSNONPERSISTENT;
      //qctl.flags = TPQREPLYQ;
      //(void) strcpy(qctl.replyqueue, "REPLYQ");
      if((fd=open(f_name,O_BINARY|O_RDONLY))<0){
            printf("Open file:%s error!\n",f_name);
            return -1;
      }
      //计算文件分片数  
      fstat(fd,&f_stat);
      blockcnt=f_stat.st_size/F_BLOCKSIZE;
      if(f_stat.st_size%F_BLOCKSIZE!=0) blockcnt++;
      sprintf(data_buf,"%d",blockcnt);
      if(tpcall("COMMON_OP",data_buf,strlen(data_buf)+1,&data_buf,&rcv_len,0)==-1){
            userlog("common_op failed,%s!\n",tpstrerror(tperrno));
            close(fd);
            return -1;
      }
      sscanf(data_buf,"%s %d",sitename,&startcid);
      Fchg32(desc_buf,FILENAME,0,f_name,0);
      Fchg32(desc_buf,SITENAME,0,sitename,0);
      Fchg32(desc_buf,BLOCKCNT,0,(char*)&blockcnt,0);
      Fchg32(desc_buf,STARTCID,0,(char*)&startcid,0);
      printf("INFO:\tfilename:%s\tblockcnt:%d\n",f_name,blockcnt);
      //memset(data_buf,' ',HEADER_LEN+F_BLOCKSIZE);
      while((n_byte=read(fd,c_buf,F_BLOCKSIZE))>0){
            printf("n_byte=%d\n",n_byte);
            //确定消息头  
            sprintf(head_c,"%s %d",sitename,startcid);
            head_l=strlen(head_c);
            memcpy(data_buf,head_c,head_l);
            //复制内容  
            memcpy(data_buf+HEADER_LEN,c_buf,n_byte);
            printf("%s",data_buf+HEADER_LEN);
            printf("HEADER_LEN+n_byte=%d\n",HEADER_LEN+n_byte);
            //放入发送队列  
            if(tpenqueue("QSPACE","SENDQ",&qctl,data_buf,HEADER_LEN+n_byte,0)==-1){
                  fprintf(stderr,"Enqueue SENDQ failed,%s\nDiagnostic:%d\n",tpstrerror(tperrno),qctl.diagnostic);
                  break;
            }
            startcid++;
      }
      close(fd);
      //发送通知消息  
      //printf("send end message\n");
      if(tpenqueue("QSPACE","SENDQ",&qctl,(char*)desc_buf,0,0)==-1){
            fprintf(stderr,"SNEDQ is full,%s\nDiagnostic:%d\n",tpstrerror(tperrno),qctl.diagnostic);
            tpfree(data_buf);
            tpfree((char*)desc_buf);
            tpterm();
            return -1;
      }
      return 0;
}


服务器端程序:

common_op.c
[cpp] view plaincopy
#include<Uunix.h>
#include<string.h>
#include"atmi.h"
#include"userlog.h"
long currid=0;
char sitename[16];
void COMMON_OP(TPSVCINFO *rqst){
      strcpy(sitename,"site1");
      sprintf(rqst->data,"%s %d",sitename,currid);
      userlog("common_op:%s\n",rqst->data);
      tpreturn(TPSUCCESS,0,rqst->data,strlen(rqst->data)+1,0);
}
b_recv.c
[cpp] view plaincopy
#include<string.h>
#include<stdlib.h>
#include"atmi.h"
#include"userlog.h"
void SENDQ(TPSVCINFO *rqst){
      userlog("Enter SENDQ--\n");
      TPQCTL qctl;
      char type[20],subtype[20],f_name[20];
      long head_l,blockcnt;
      FBFR32 *rbuf=(FBFR32*)rqst->data;
      tptypes(rqst->data,type,subtype);
      if(strcmp(type,"FML32")==0){ //收到通知消息  
            userlog("Received FileArrived--\n");
            tppost("FileArrived",rqst->data,0,TPSIGRSTRT);
            tpreturn(TPSUCCESS,0,NULL,0,0);
      }else{
      userlog("Tpenqueuing -- \n");
      qctl.flags=TPQCORRID;
      memset(qctl.corrid,0,sizeof(qctl.corrid));
      strcpy(qctl.corrid,rqst->data);//strncpy
      userlog("qctl.corrid:%s\n",qctl.corrid);
      qctl.flags|=TPQDELIVERYQOS;
      qctl.delivery_qos=TPQQOSNONPERSISTENT;
      userlog("rqst->len=%d\t strlen(rqst->data)=%d\n",rqst->len,strlen(rqst->data));
      if(tpenqueue("QSPACE","RECVQ",&qctl,rqst->data,rqst->len,0)==-1){
            userlog("tpenqueue RECVQ failed,%s\n",tpstrerror(tperrno));
            tpreturn(TPFAIL,0,NULL,0,0);
      }
}
tpreturn(TPSUCCESS,0,NULL,0,0);
}
f_recv.c
[cpp] view plaincopy
#include<stdlib.h>
#include<string.h>
#include"atmi.h"
#include"userlog.h"
#include"ftpfml.h"
#include<fcntl.h>
#define HEADER_LEN 32 //==tpqctl.corrid[32]
#define F_STEP 128
#define F_RECV_PATH "/ngbss/tuxapp/src/chapter09/Recv"
long sub_handle;
char *tux_buf;
int tpsvrinit(int argc,char *argv[]){
      if(tx_open()==-1){
            userlog("tpsvrinit: failed to open database");
            return -1;
      }
      TPEVCTL ectl;
      ectl.flags=TPEVSERVICE;
      strcpy(ectl.name1,"F_RECV");
      if((sub_handle=tpsubscribe("FileArrived",NULL,&ectl,TPSIGRSTRT))==-1){
            userlog("tpsubscribe() failed,%s\n",tpstrerror(tperrno));
            return -1;
      }
      if((tux_buf=tpalloc("CARRAY",NULL,HEADER_LEN+F_STEP))==NULL){
            userlog("Can`t allocate buffer CARRAY,%s\n",tpstrerror(tperrno));
            return -1;
      }
      return 0;
}
void F_RECV(TPSVCINFO *rqst){
      int fd,n_byte;
      long fn_len,sn_len,blockcnt,startcid,buflen;
      char c_buf[F_STEP],f_name[20],ff_name[80],s_name[20];
      TPQCTL qctl;
      FBFR32 *rbuf=(FBFR32*)rqst->data;
      qctl.flags=TPQWAIT|TPQGETBYCORRID;
      qctl.flags|=TPQREPLYQOS;
      qctl.reply_qos=TPQQOSNONPERSISTENT;
      fn_len=sizeof(f_name);
      sn_len=sizeof(s_name);
      Fget32(rbuf,FILENAME,0,f_name,&fn_len);
      Fget32(rbuf,SITENAME,0,s_name,&sn_len);
      Fget32(rbuf,BLOCKCNT,0,(char*)&blockcnt,0);
      Fget32(rbuf,STARTCID,0,(char*)&startcid,0);
      sprintf(ff_name,"%s%s",F_RECV_PATH,f_name);
      userlog("FileName:%s\n",ff_name);
      if((fd=open(ff_name,O_BINARY|O_RDWR|O_CREAT))<0){
            userlog("Open file error,%s\n",tpstrerror(tperrno));
            tpreturn(TPFAIL,0,NULL,0,0);
      }
      memset(qctl.corrid,0,sizeof(qctl.corrid));
      while(blockcnt-- > 0){
            sprintf(qctl.corrid,"%s %d",s_name,startcid++);
            userlog("F_RECV:\t qctl.corrid:%s\n",qctl.corrid);
            userlog("Begining dequeue....\n");
            if(tpdequeue("QSPACE","RECVQ",&qctl,(char**)&tux_buf,&buflen,0)==-1){
                  userlog("tpdequeue failed,%s\n",tpstrerror(tperrno));
                  break;
            }
            //userlog("tux_buf:%s",tux_buf+HEADER_LEN);
            userlog("buflen=%d\t buflen-HEADER_LEN=%d\n",buflen,buflen-HEADER_LEN);
            if((n_byte=write(fd,tux_buf+HEADER_LEN,buflen-HEADER_LEN))<0){
                  close(fd);
                  userlog("write to file :%s error!\n",ff_name);
                  tpreturn(TPFAIL,0,NULL,0,0);
            }
      }
      close(fd);
      tpreturn(TPSUCCESS,0,NULL,0,0);
}
void tpsvrdone(){
tpunsubscribe(sub_handle,0);
tpfree(tux_buf);
}


消息队列创建脚本:

[cpp] view plaincopy
crdl /ngbss/tuxapp/etc/QUE 0 3000
qspacecreate -n 1000 QSPACE 67775 2000 6 4 9 1000 errque y 16
qopen QSPACE
qcreate SENDQ fifo none 2 30 80% 0% echo "STRING is nearly full"
qcreate RECVQ fifo none 2 30 80% 0% echo "RPLYQ is nearly full"
qcreate errque fifo none 2 30 80% 0% echo "errque is nearly full"
quit


ubb主要配置:

[cpp] view plaincopy
*GROUPS
CHP9GRP     LMID=site1  GRPNO=309   TMSNAME=TMS_QM  TMSCOUNT=2
OPENINFO="TUXEDO/QM:/ngbss/tuxapp/etc/QUE:QSPACE"
CHP9GRP_1   LMID=site1  GRPNO=310   TMSNAME=TMS     TMSCOUNT=2
*SERVERS
#add for chapter09
commonopserver  SRVGRP=CHP9GRP_1    SRVID=4046
brecvserver     SRVGRP=CHP9GRP_1    SRVID=4048
frecvserver     SRVGRP=CHP9GRP_1    SRVID=4050
TMUSREVT        SRVGRP=CHP9GRP_1    SRVID=4044  CLOPT="-A -- -p 10"
TMQUEUE         SRVGRP=CHP9GRP      SRVID=4040  CLOPT="-s QSPACE:TMQUEUE -- -t 60"
TMQFORWARD      SRVGRP=CHP9GRP      SRVID=4042  CLOPT="-- -q SENDQ"


运行结果:

./f_send file.data
[cpp] view plaincopy
[tuxapp@REDHAT5_FG chapter09]$ ./f_send file.data
INFO:   filename:file.data      blockcnt:4
n_byte=128
dddddddddddddddddddddddddddddddd
tttttttttttttttttttttttttttttttt
ffffffffffffffffffffffffffffffff
dddddddddddddddddddddddddddddHEADER_LEN+n_byte=160
n_byte=128
ddd
tttttttttttttttttttttttttttttttt
ffffffffffffffffffffffffffffffff
dddddddddddddddddddddddddddddddd
tttttttttttttttttttttttttHEADER_LEN+n_byte=160
n_byte=128
ttttttt
ffffffffffffffffffffffffffffffff
dddddddddddddddddddddddddddddddd
tttttttttttttttttttttttttttttttt
fffffffffffffffffffffHEADER_LEN+n_byte=160
n_byte=111
fffffffffff
dddddddddddddddddddddddddddddddd
tttttttttttttttttttttttttttttttt
ffffffffffffffffffffffffffffffff
fffffffffffffffffHEADER_LEN+n_byte=143


服务端ulog.xxx

[cpp] view plaincopy
185356.REDHAT5_FG!commonopserver.3785.3086702800.0: common_op:site1 0
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x85:        Enter SENDQ--
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x85:        Tpenqueuing --
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x85:        qctl.corrid:site1 0
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x85:        rqst->len=160    strlen(rqst->data)=7
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x86:        Enter SENDQ--
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x86:        Tpenqueuing --
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x86:        qctl.corrid:site1 1
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x86:        rqst->len=160    strlen(rqst->data)=7
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x87:        Enter SENDQ--
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x87:        Tpenqueuing --
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x87:        qctl.corrid:site1 2
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x87:        rqst->len=160    strlen(rqst->data)=7
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x88:        Enter SENDQ--
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x88:        Tpenqueuing --
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x88:        qctl.corrid:site1 3
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x88:        rqst->len=143    strlen(rqst->data)=7
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x89:        Enter SENDQ--
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x89:        Received FileArrived--
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: FileName:/ngbss/tuxapp/src/chapter09/Recvfile.data
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: F_RECV:         qctl.corrid:site1 0
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: Begining dequeue....
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: buflen=160      buflen-HEADER_LEN=128
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: F_RECV:         qctl.corrid:site1 1
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: Begining dequeue....
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: buflen=160      buflen-HEADER_LEN=128
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: F_RECV:         qctl.corrid:site1 2
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: Begining dequeue....
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: buflen=160      buflen-HEADER_LEN=128
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: F_RECV:         qctl.corrid:site1 3
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: Begining dequeue....
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: buflen=143      buflen-HEADER_LEN=111


客户端和服务器端文件内容完全一致




赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论