《TUXEDO 系统经典——经乾》 9.6 TUXEDO/Q实现可靠数据传输 源代码,代码已在linux环境下验证运行通过,由于国内涉及到tuxedo中间件的文章不多,此文仅用于热爱tuxedo和即将要学习tuxedo的同仁交流学习。
客户端程序
f_send.c
[cpp] view plaincopy
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
#include<sys/stat.h>
#include<sys/types.h>
#include<fcntl.h>
#include<unistd.h>
#include"atmi.h"
#include"userlog.h"
#include"fml32.h"
#include"ftpfml.h"
#define HEADER_LEN 32
#define F_BLOCKSIZE 128
#define F_SOURCE_DIR "/ngbss/tuxapp/src/chapter09/"
char *data_buf;
FBFR32 *desc_buf;
int send_file(char *f_name);
int main(int argc,char *argv[]){
if(argc!=2){
printf("Useage:commond filename\n");
return -1;
}
if(tpinit((TPINIT*)NULL)==-1){
fprintf(stderr,"tpinit failed!\n");
return -1;
}
if((data_buf=tpalloc("CARRAY",NULL,F_BLOCKSIZE+HEADER_LEN))==NULL){
fprintf(stderr,"Allocate CARRAY buffer failed!\n");
tpterm();
return -1;
}
if((desc_buf=(FBFR32*)tpalloc("FML32",NULL,1024))==NULL){
fprintf(stderr,"Allocate FML32 buffer failed!\n");
tpfree(data_buf);
tpterm();
return -1;
}
//发送文件
send_file(argv[1]);
return 0;
}
int send_file(char *f_name){
int fd,n_byte;
char sitename[16],head_c[32]="\0";
long startcid,head_l;
long blockcnt,rcv_len;
struct stat f_stat;
char c_buf[F_BLOCKSIZE];
TPQCTL qctl;
qctl.flags=TPQDELIVERYQOS;
qctl.delivery_qos=TPQQOSNONPERSISTENT;
//qctl.flags = TPQREPLYQ;
//(void) strcpy(qctl.replyqueue, "REPLYQ");
if((fd=open(f_name,O_BINARY|O_RDONLY))<0){
printf("Open file:%s error!\n",f_name);
return -1;
}
//计算文件分片数
fstat(fd,&f_stat);
blockcnt=f_stat.st_size/F_BLOCKSIZE;
if(f_stat.st_size%F_BLOCKSIZE!=0) blockcnt++;
sprintf(data_buf,"%d",blockcnt);
if(tpcall("COMMON_OP",data_buf,strlen(data_buf)+1,&data_buf,&rcv_len,0)==-1){
userlog("common_op failed,%s!\n",tpstrerror(tperrno));
close(fd);
return -1;
}
sscanf(data_buf,"%s %d",sitename,&startcid);
Fchg32(desc_buf,FILENAME,0,f_name,0);
Fchg32(desc_buf,SITENAME,0,sitename,0);
Fchg32(desc_buf,BLOCKCNT,0,(char*)&blockcnt,0);
Fchg32(desc_buf,STARTCID,0,(char*)&startcid,0);
printf("INFO:\tfilename:%s\tblockcnt:%d\n",f_name,blockcnt);
//memset(data_buf,' ',HEADER_LEN+F_BLOCKSIZE);
while((n_byte=read(fd,c_buf,F_BLOCKSIZE))>0){
printf("n_byte=%d\n",n_byte);
//确定消息头
sprintf(head_c,"%s %d",sitename,startcid);
head_l=strlen(head_c);
memcpy(data_buf,head_c,head_l);
//复制内容
memcpy(data_buf+HEADER_LEN,c_buf,n_byte);
printf("%s",data_buf+HEADER_LEN);
printf("HEADER_LEN+n_byte=%d\n",HEADER_LEN+n_byte);
//放入发送队列
if(tpenqueue("QSPACE","SENDQ",&qctl,data_buf,HEADER_LEN+n_byte,0)==-1){
fprintf(stderr,"Enqueue SENDQ failed,%s\nDiagnostic:%d\n",tpstrerror(tperrno),qctl.diagnostic);
break;
}
startcid++;
}
close(fd);
//发送通知消息
//printf("send end message\n");
if(tpenqueue("QSPACE","SENDQ",&qctl,(char*)desc_buf,0,0)==-1){
fprintf(stderr,"SNEDQ is full,%s\nDiagnostic:%d\n",tpstrerror(tperrno),qctl.diagnostic);
tpfree(data_buf);
tpfree((char*)desc_buf);
tpterm();
return -1;
}
return 0;
}
服务器端程序:
common_op.c
[cpp] view plaincopy
#include<Uunix.h>
#include<string.h>
#include"atmi.h"
#include"userlog.h"
long currid=0;
char sitename[16];
void COMMON_OP(TPSVCINFO *rqst){
strcpy(sitename,"site1");
sprintf(rqst->data,"%s %d",sitename,currid);
userlog("common_op:%s\n",rqst->data);
tpreturn(TPSUCCESS,0,rqst->data,strlen(rqst->data)+1,0);
}
b_recv.c
[cpp] view plaincopy
#include<string.h>
#include<stdlib.h>
#include"atmi.h"
#include"userlog.h"
void SENDQ(TPSVCINFO *rqst){
userlog("Enter SENDQ--\n");
TPQCTL qctl;
char type[20],subtype[20],f_name[20];
long head_l,blockcnt;
FBFR32 *rbuf=(FBFR32*)rqst->data;
tptypes(rqst->data,type,subtype);
if(strcmp(type,"FML32")==0){ //收到通知消息
userlog("Received FileArrived--\n");
tppost("FileArrived",rqst->data,0,TPSIGRSTRT);
tpreturn(TPSUCCESS,0,NULL,0,0);
}else{
userlog("Tpenqueuing -- \n");
qctl.flags=TPQCORRID;
memset(qctl.corrid,0,sizeof(qctl.corrid));
strcpy(qctl.corrid,rqst->data);//strncpy
userlog("qctl.corrid:%s\n",qctl.corrid);
qctl.flags|=TPQDELIVERYQOS;
qctl.delivery_qos=TPQQOSNONPERSISTENT;
userlog("rqst->len=%d\t strlen(rqst->data)=%d\n",rqst->len,strlen(rqst->data));
if(tpenqueue("QSPACE","RECVQ",&qctl,rqst->data,rqst->len,0)==-1){
userlog("tpenqueue RECVQ failed,%s\n",tpstrerror(tperrno));
tpreturn(TPFAIL,0,NULL,0,0);
}
}
tpreturn(TPSUCCESS,0,NULL,0,0);
}
f_recv.c
[cpp] view plaincopy
#include<stdlib.h>
#include<string.h>
#include"atmi.h"
#include"userlog.h"
#include"ftpfml.h"
#include<fcntl.h>
#define HEADER_LEN 32 //==tpqctl.corrid[32]
#define F_STEP 128
#define F_RECV_PATH "/ngbss/tuxapp/src/chapter09/Recv"
long sub_handle;
char *tux_buf;
int tpsvrinit(int argc,char *argv[]){
if(tx_open()==-1){
userlog("tpsvrinit: failed to open database");
return -1;
}
TPEVCTL ectl;
ectl.flags=TPEVSERVICE;
strcpy(ectl.name1,"F_RECV");
if((sub_handle=tpsubscribe("FileArrived",NULL,&ectl,TPSIGRSTRT))==-1){
userlog("tpsubscribe() failed,%s\n",tpstrerror(tperrno));
return -1;
}
if((tux_buf=tpalloc("CARRAY",NULL,HEADER_LEN+F_STEP))==NULL){
userlog("Can`t allocate buffer CARRAY,%s\n",tpstrerror(tperrno));
return -1;
}
return 0;
}
void F_RECV(TPSVCINFO *rqst){
int fd,n_byte;
long fn_len,sn_len,blockcnt,startcid,buflen;
char c_buf[F_STEP],f_name[20],ff_name[80],s_name[20];
TPQCTL qctl;
FBFR32 *rbuf=(FBFR32*)rqst->data;
qctl.flags=TPQWAIT|TPQGETBYCORRID;
qctl.flags|=TPQREPLYQOS;
qctl.reply_qos=TPQQOSNONPERSISTENT;
fn_len=sizeof(f_name);
sn_len=sizeof(s_name);
Fget32(rbuf,FILENAME,0,f_name,&fn_len);
Fget32(rbuf,SITENAME,0,s_name,&sn_len);
Fget32(rbuf,BLOCKCNT,0,(char*)&blockcnt,0);
Fget32(rbuf,STARTCID,0,(char*)&startcid,0);
sprintf(ff_name,"%s%s",F_RECV_PATH,f_name);
userlog("FileName:%s\n",ff_name);
if((fd=open(ff_name,O_BINARY|O_RDWR|O_CREAT))<0){
userlog("Open file error,%s\n",tpstrerror(tperrno));
tpreturn(TPFAIL,0,NULL,0,0);
}
memset(qctl.corrid,0,sizeof(qctl.corrid));
while(blockcnt-- > 0){
sprintf(qctl.corrid,"%s %d",s_name,startcid++);
userlog("F_RECV:\t qctl.corrid:%s\n",qctl.corrid);
userlog("Begining dequeue....\n");
if(tpdequeue("QSPACE","RECVQ",&qctl,(char**)&tux_buf,&buflen,0)==-1){
userlog("tpdequeue failed,%s\n",tpstrerror(tperrno));
break;
}
//userlog("tux_buf:%s",tux_buf+HEADER_LEN);
userlog("buflen=%d\t buflen-HEADER_LEN=%d\n",buflen,buflen-HEADER_LEN);
if((n_byte=write(fd,tux_buf+HEADER_LEN,buflen-HEADER_LEN))<0){
close(fd);
userlog("write to file :%s error!\n",ff_name);
tpreturn(TPFAIL,0,NULL,0,0);
}
}
close(fd);
tpreturn(TPSUCCESS,0,NULL,0,0);
}
void tpsvrdone(){
tpunsubscribe(sub_handle,0);
tpfree(tux_buf);
}
消息队列创建脚本:
[cpp] view plaincopy
crdl /ngbss/tuxapp/etc/QUE 0 3000
qspacecreate -n 1000 QSPACE 67775 2000 6 4 9 1000 errque y 16
qopen QSPACE
qcreate SENDQ fifo none 2 30 80% 0% echo "STRING is nearly full"
qcreate RECVQ fifo none 2 30 80% 0% echo "RPLYQ is nearly full"
qcreate errque fifo none 2 30 80% 0% echo "errque is nearly full"
quit
ubb主要配置:
[cpp] view plaincopy
*GROUPS
CHP9GRP LMID=site1 GRPNO=309 TMSNAME=TMS_QM TMSCOUNT=2
OPENINFO="TUXEDO/QM:/ngbss/tuxapp/etc/QUE:QSPACE"
CHP9GRP_1 LMID=site1 GRPNO=310 TMSNAME=TMS TMSCOUNT=2
*SERVERS
#add for chapter09
commonopserver SRVGRP=CHP9GRP_1 SRVID=4046
brecvserver SRVGRP=CHP9GRP_1 SRVID=4048
frecvserver SRVGRP=CHP9GRP_1 SRVID=4050
TMUSREVT SRVGRP=CHP9GRP_1 SRVID=4044 CLOPT="-A -- -p 10"
TMQUEUE SRVGRP=CHP9GRP SRVID=4040 CLOPT="-s QSPACE:TMQUEUE -- -t 60"
TMQFORWARD SRVGRP=CHP9GRP SRVID=4042 CLOPT="-- -q SENDQ"
运行结果:
./f_send file.data
[cpp] view plaincopy
[tuxapp@REDHAT5_FG chapter09]$ ./f_send file.data
INFO: filename:file.data blockcnt:4
n_byte=128
dddddddddddddddddddddddddddddddd
tttttttttttttttttttttttttttttttt
ffffffffffffffffffffffffffffffff
dddddddddddddddddddddddddddddHEADER_LEN+n_byte=160
n_byte=128
ddd
tttttttttttttttttttttttttttttttt
ffffffffffffffffffffffffffffffff
dddddddddddddddddddddddddddddddd
tttttttttttttttttttttttttHEADER_LEN+n_byte=160
n_byte=128
ttttttt
ffffffffffffffffffffffffffffffff
dddddddddddddddddddddddddddddddd
tttttttttttttttttttttttttttttttt
fffffffffffffffffffffHEADER_LEN+n_byte=160
n_byte=111
fffffffffff
dddddddddddddddddddddddddddddddd
tttttttttttttttttttttttttttttttt
ffffffffffffffffffffffffffffffff
fffffffffffffffffHEADER_LEN+n_byte=143
服务端ulog.xxx
[cpp] view plaincopy
185356.REDHAT5_FG!commonopserver.3785.3086702800.0: common_op:site1 0
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x85: Enter SENDQ--
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x85: Tpenqueuing --
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x85: qctl.corrid:site1 0
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x85: rqst->len=160 strlen(rqst->data)=7
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x86: Enter SENDQ--
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x86: Tpenqueuing --
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x86: qctl.corrid:site1 1
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x86: rqst->len=160 strlen(rqst->data)=7
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x87: Enter SENDQ--
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x87: Tpenqueuing --
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x87: qctl.corrid:site1 2
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x87: rqst->len=160 strlen(rqst->data)=7
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x88: Enter SENDQ--
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x88: Tpenqueuing --
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x88: qctl.corrid:site1 3
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x88: rqst->len=143 strlen(rqst->data)=7
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x89: Enter SENDQ--
185419.REDHAT5_FG!brecvserver.3786.3086985424.0: gtrid x0 x51b2801b x89: Received FileArrived--
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: FileName:/ngbss/tuxapp/src/chapter09/Recvfile.data
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: F_RECV: qctl.corrid:site1 0
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: Begining dequeue....
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: buflen=160 buflen-HEADER_LEN=128
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: F_RECV: qctl.corrid:site1 1
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: Begining dequeue....
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: buflen=160 buflen-HEADER_LEN=128
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: F_RECV: qctl.corrid:site1 2
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: Begining dequeue....
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: buflen=160 buflen-HEADER_LEN=128
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: F_RECV: qctl.corrid:site1 3
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: Begining dequeue....
185419.REDHAT5_FG!frecvserver.3799.3086547152.0: buflen=143 buflen-HEADER_LEN=111
客户端和服务器端文件内容完全一致