utiles_v2017/DataTransfer.cpp

336 lines
7.3 KiB
C++

#include "StdAfx.h"
#include "DataTransfer.h"
#include "Csock_cl.h"
//**********************************************************************************
DataTransfer::DataTransfer(void)
{
nb_ini =0;
n_buf =0;
sizeBuf =0;
nb_total =0;
sc =NULL;
dt =NULL;
total_p =0;
sc_p =0;
dt_p =0;
cancela = false;
mode = MODE_RECEIVE;
thbuf.data = this;
thsoc.data = this;
}
//**********************************************************************************
DataTransfer::~DataTransfer(void)
{
cancela = true;
thbuf.join();
thsoc.join();
}
//**********************************************************************************
bool DataTransfer::start()
{
if(currando() || !sc || !dt || nb_buf.n<=0 || buf.n<=0 || sizeBuf<=0)
return false;
errorSoc = 0;
errorDat =0;
nb_total =0;
total_p =0;
sc_p =0;
dt_p =0;
porcen =0;
mal = false;
cancela = false;
if(mode == MODE_SEND)
{
nb_total =dt->getNB_it();
if(!dt->setPosition_it(nb_ini) || nb_total<=0)
return false;
nb_total = nb_total-nb_ini;
total_p = (int)(nb_total*1./sizeBuf);
if(nb_total>(total_p*sizeBuf))
total_p++;
}
if(!thbuf.lanza("DataTransferBuf"))
return false;
if(!thsoc.lanza("DataTransferSoc"))
{
cancela = true;
thbuf.join();
cancela = false;
return false;
}
return true;
}
//**********************************************************************************
bool DataTransfer::currando()
{
return thsoc.isRunning() || thbuf.isRunning();
}
//**********************************************************************************
bool DataTransfer::isCanceled()
{
return cancela;
}
//**********************************************************************************
bool DataTransfer::setMode( int mod, int nbbuf /*= (1024*5)*/,int nbuf/*=2*/ )
{
mode = mod;
nb_buf.n =0;
buf.n=0;
if(!(nb_buf+=nbuf)|| !(buf+=(nbbuf*nbuf)))
return false;
nb_buf.n=nbuf;
sizeBuf = nbbuf;
buf.n =nbbuf*nbuf;
n_buf = nbuf;
memset(nb_buf.ptr,0,sizeof(int)*nbuf);
return true;
}
//**********************************************************************************
bool DataTransfer::setData( Csock_cl* soc, IdataTransfer*datat )
{
if(!soc || !datat)
return false;
sc = soc;
dt =datat;
return true;
}
double DataTransfer::getStatus()
{
return porcen;
}
void DataTransfer::cancel()
{
cancela = true;
}
//**********************************************************************************
void ThBufDataTransfer::run()
{
if(data->mode == DataTransfer::MODE_RECEIVE)
runClear();
else
runFill();
}
//**********************************************************************************
void ThBufDataTransfer::runFill()
{
char *pbuf;
data->porcen =0;
ibuf =0;
while(!data->cancela && data->dt_p<data->total_p)
{
data->porcen = data->dt_p*100./data->total_p;
if(data->nb_buf[ibuf]<=0)
{
//decide numero de bites a cargar-------
__int64 nbp = (data->nb_total-data->sizeBuf*(data->dt_p));
if(nbp>data->sizeBuf)
nbp = data->sizeBuf;
//rellena buffer--------------
pbuf = &data->buf[ibuf*data->sizeBuf];
__int64 nb = data->dt->getNext_it(pbuf,nbp );
if(nb!=nbp)
{
data->errorDat =1;
data->mal = true;
data->cancela = true;
continue;
}
//actualiza indices
data->dt_p++;
data->nb_buf[ibuf] = (int)nb;
ibuf = (ibuf+1)%data->n_buf;
continue;
}
Sleep(0);
}
data->porcen = 100;
}
//**********************************************************************************
void ThBufDataTransfer::runClear()
{
char *pbuf;
ibuf =0;
data->porcen = 0;
//inicia-------------------------
//espera a que se recivan datos
while(!data->cancela && data->total_p<=0)
{
Sleep(1);
}
if(!data->dt->setPosition_it(data->nb_ini))
{
data->errorDat =-1;
data->mal = true;
data->cancela = true;
}
while(!data->cancela && data->dt_p<data->total_p)
{
data->porcen = data->dt_p*100./data->total_p;
if(data->nb_buf[ibuf]>0)
{
//decide numero de bites a cargar-------
__int64 nbp = data->nb_buf[ibuf];
//rellena buffer--------------
pbuf = &data->buf[ibuf*data->sizeBuf];
if(!data->dt->set_it(pbuf,nbp ))
{
data->errorDat =-2;
data->mal = true;
data->cancela = true;
continue;
}
//actualiza indices
data->dt_p++;
data->nb_buf[ibuf] = 0;
ibuf = (ibuf+1)%data->n_buf;
continue;
}
Sleep(0);
}
data->porcen =100;
}
//**********************************************************************************
void ThSockDataTransfer::run()
{
if(data->mode == DataTransfer::MODE_RECEIVE)
runReceive();
else
runSend();
}
//**********************************************************************************
void ThSockDataTransfer::runSend()
{
char *pbuf;
//inicia-----------------------
ibuf =0;
//envia cavecera
Head_DataTransfer cab;
cab.version = VERSION_HEAD_DATA_TRANSFER;
cab.nb_total = data->nb_total;
cab.nb_ini = data->nb_ini;
if(!data->sc->envia((BYTE*)&cab,(int) sizeof(cab)))
{
data->errorSoc =1;
data->mal = true;
data->cancela = true;
}
while(!data->cancela && data->sc_p<data->total_p)
{
if(data->nb_buf[ibuf]>0)
{
//decide numero de bites a cargar-------
__int64 nbp =data->nb_buf[ibuf];
if(nbp<data->sizeBuf)
nbp =nbp;
//rellena buffer--------------
pbuf = &data->buf[ibuf*data->sizeBuf];
if(!data->sc->envia((BYTE*)pbuf,(int) nbp))
{
data->errorSoc =2;
data->mal = true;
data->cancela = true;
continue;
}
data->sc_p++;
data->nb_buf[ibuf] = 0;
ibuf = (ibuf+1)%data->n_buf;
continue;
}
Sleep(0);
}
if(!data->cancela && data->sc_p>=data->total_p)
{
if(!data->sc->recibe((BYTE*)&cab,(int) sizeof(cab)) || cab.version != VERSION_HEAD_DATA_TRANSFER || cab.nb_total != data->nb_total)
{
data->errorSoc =3;
data->mal = true;
data->cancela = true;
}
}
}
//**********************************************************************************
void ThSockDataTransfer::runReceive()
{
char *pbuf;
//inicia-----------------------
ibuf =0;
//envia cavecera
Head_DataTransfer cab;
int nv=0;
bool res =false;
if(!data->sc->Receive((BYTE*)&cab,(int) sizeof(cab)) || cab.version != VERSION_HEAD_DATA_TRANSFER || cab.nb_total <=0)
{
data->errorSoc =-1;
data->mal = true;
data->cancela = true;
}
else
{
data->nb_ini = cab.nb_ini;
data->nb_total = cab.nb_total;
int pp =(int)(data->nb_total*1./data->sizeBuf);
if(data->nb_total>(pp*data->sizeBuf))
pp++;
data->total_p = pp;
}
while(!data->cancela && data->sc_p<data->total_p)
{
if(data->nb_buf[ibuf]<=0)
{
//decide numero de bites a cargar-------
__int64 nbp =data->nb_total-((data->sc_p)*data->sizeBuf);
if(nbp>data->sizeBuf)
nbp=data->sizeBuf;
else
nbp =nbp;
//rellena buffer--------------
pbuf = &data->buf[ibuf*data->sizeBuf];
if(!data->sc->recibe((BYTE*)pbuf,(int) nbp))
{
if(data->sc->er->tipo<0)
continue;
data->errorSoc =-2;
data->mal = true;
data->cancela = true;
continue;
}
data->sc_p++;
data->nb_buf[ibuf] = (int)nbp;
ibuf = (ibuf+1)%data->n_buf;
continue;
}
Sleep(0);
}
cab.version = VERSION_HEAD_DATA_TRANSFER;
cab.nb_total = data->nb_total;
cab.nb_ini = data->nb_ini;
if(!data->cancela && !data->sc->envia((BYTE*)&cab,(int) sizeof(cab)))
{
data->errorSoc =-3;
data->mal = true;
data->cancela = true;
}
}
//**********************************************************************************