#include "StdAfx.h" #include "DataTransfer.h" #include "Csock_cl.h" //********************************************************************************** DataTransfer::DataTransfer(void) { nb_ini =0; n_buf =0; sizeBuf =0; nb_total =0; sc =NULL; dt =NULL; total_p =0; sc_p =0; dt_p =0; cancela = false; mode = MODE_RECEIVE; thbuf.data = this; thsoc.data = this; } //********************************************************************************** DataTransfer::~DataTransfer(void) { cancela = true; thbuf.join(); thsoc.join(); } //********************************************************************************** bool DataTransfer::start() { if(currando() || !sc || !dt || nb_buf.n<=0 || buf.n<=0 || sizeBuf<=0) return false; errorSoc = 0; errorDat =0; nb_total =0; total_p =0; sc_p =0; dt_p =0; porcen =0; mal = false; cancela = false; if(mode == MODE_SEND) { nb_total =dt->getNB_it(); if(!dt->setPosition_it(nb_ini) || nb_total<=0) return false; nb_total = nb_total-nb_ini; total_p = (int)(nb_total*1./sizeBuf); if(nb_total>(total_p*sizeBuf)) total_p++; } if(!thbuf.lanza("DataTransferBuf")) return false; if(!thsoc.lanza("DataTransferSoc")) { cancela = true; thbuf.join(); cancela = false; return false; } return true; } //********************************************************************************** bool DataTransfer::currando() { return thsoc.isRunning() || thbuf.isRunning(); } //********************************************************************************** bool DataTransfer::isCanceled() { return cancela; } //********************************************************************************** bool DataTransfer::setMode( int mod, int nbbuf /*= (1024*5)*/,int nbuf/*=2*/ ) { mode = mod; nb_buf.n =0; buf.n=0; if(!(nb_buf+=nbuf)|| !(buf+=(nbbuf*nbuf))) return false; nb_buf.n=nbuf; sizeBuf = nbbuf; buf.n =nbbuf*nbuf; n_buf = nbuf; memset(nb_buf.ptr,0,sizeof(int)*nbuf); return true; } //********************************************************************************** bool DataTransfer::setData( Csock_cl* soc, IdataTransfer*datat ) { if(!soc || !datat) return false; sc = soc; dt =datat; return true; } double DataTransfer::getStatus() { return porcen; } void DataTransfer::cancel() { cancela = true; } //********************************************************************************** void ThBufDataTransfer::run() { if(data->mode == DataTransfer::MODE_RECEIVE) runClear(); else runFill(); } //********************************************************************************** void ThBufDataTransfer::runFill() { char *pbuf; data->porcen =0; ibuf =0; while(!data->cancela && data->dt_ptotal_p) { data->porcen = data->dt_p*100./data->total_p; if(data->nb_buf[ibuf]<=0) { //decide numero de bites a cargar------- __int64 nbp = (data->nb_total-data->sizeBuf*(data->dt_p)); if(nbp>data->sizeBuf) nbp = data->sizeBuf; //rellena buffer-------------- pbuf = &data->buf[ibuf*data->sizeBuf]; __int64 nb = data->dt->getNext_it(pbuf,nbp ); if(nb!=nbp) { data->errorDat =1; data->mal = true; data->cancela = true; continue; } //actualiza indices data->dt_p++; data->nb_buf[ibuf] = (int)nb; ibuf = (ibuf+1)%data->n_buf; continue; } Sleep(0); } data->porcen = 100; } //********************************************************************************** void ThBufDataTransfer::runClear() { char *pbuf; ibuf =0; data->porcen = 0; //inicia------------------------- //espera a que se recivan datos while(!data->cancela && data->total_p<=0) { Sleep(1); } if(!data->dt->setPosition_it(data->nb_ini)) { data->errorDat =-1; data->mal = true; data->cancela = true; } while(!data->cancela && data->dt_ptotal_p) { data->porcen = data->dt_p*100./data->total_p; if(data->nb_buf[ibuf]>0) { //decide numero de bites a cargar------- __int64 nbp = data->nb_buf[ibuf]; //rellena buffer-------------- pbuf = &data->buf[ibuf*data->sizeBuf]; if(!data->dt->set_it(pbuf,nbp )) { data->errorDat =-2; data->mal = true; data->cancela = true; continue; } //actualiza indices data->dt_p++; data->nb_buf[ibuf] = 0; ibuf = (ibuf+1)%data->n_buf; continue; } Sleep(0); } data->porcen =100; } //********************************************************************************** void ThSockDataTransfer::run() { if(data->mode == DataTransfer::MODE_RECEIVE) runReceive(); else runSend(); } //********************************************************************************** void ThSockDataTransfer::runSend() { char *pbuf; //inicia----------------------- ibuf =0; //envia cavecera Head_DataTransfer cab; cab.version = VERSION_HEAD_DATA_TRANSFER; cab.nb_total = data->nb_total; cab.nb_ini = data->nb_ini; if(!data->sc->envia((BYTE*)&cab,(int) sizeof(cab))) { data->errorSoc =1; data->mal = true; data->cancela = true; } while(!data->cancela && data->sc_ptotal_p) { if(data->nb_buf[ibuf]>0) { //decide numero de bites a cargar------- __int64 nbp =data->nb_buf[ibuf]; if(nbpsizeBuf) nbp =nbp; //rellena buffer-------------- pbuf = &data->buf[ibuf*data->sizeBuf]; if(!data->sc->envia((BYTE*)pbuf,(int) nbp)) { data->errorSoc =2; data->mal = true; data->cancela = true; continue; } data->sc_p++; data->nb_buf[ibuf] = 0; ibuf = (ibuf+1)%data->n_buf; continue; } Sleep(0); } if(!data->cancela && data->sc_p>=data->total_p) { if(!data->sc->recibe((BYTE*)&cab,(int) sizeof(cab)) || cab.version != VERSION_HEAD_DATA_TRANSFER || cab.nb_total != data->nb_total) { data->errorSoc =3; data->mal = true; data->cancela = true; } } } //********************************************************************************** void ThSockDataTransfer::runReceive() { char *pbuf; //inicia----------------------- ibuf =0; //envia cavecera Head_DataTransfer cab; int nv=0; bool res =false; if(!data->sc->Receive((BYTE*)&cab,(int) sizeof(cab)) || cab.version != VERSION_HEAD_DATA_TRANSFER || cab.nb_total <=0) { data->errorSoc =-1; data->mal = true; data->cancela = true; } else { data->nb_ini = cab.nb_ini; data->nb_total = cab.nb_total; int pp =(int)(data->nb_total*1./data->sizeBuf); if(data->nb_total>(pp*data->sizeBuf)) pp++; data->total_p = pp; } while(!data->cancela && data->sc_ptotal_p) { if(data->nb_buf[ibuf]<=0) { //decide numero de bites a cargar------- __int64 nbp =data->nb_total-((data->sc_p)*data->sizeBuf); if(nbp>data->sizeBuf) nbp=data->sizeBuf; else nbp =nbp; //rellena buffer-------------- pbuf = &data->buf[ibuf*data->sizeBuf]; if(!data->sc->recibe((BYTE*)pbuf,(int) nbp)) { if(data->sc->er->tipo<0) continue; data->errorSoc =-2; data->mal = true; data->cancela = true; continue; } data->sc_p++; data->nb_buf[ibuf] = (int)nbp; ibuf = (ibuf+1)%data->n_buf; continue; } Sleep(0); } cab.version = VERSION_HEAD_DATA_TRANSFER; cab.nb_total = data->nb_total; cab.nb_ini = data->nb_ini; if(!data->cancela && !data->sc->envia((BYTE*)&cab,(int) sizeof(cab))) { data->errorSoc =-3; data->mal = true; data->cancela = true; } } //**********************************************************************************