int xudp_queueparcel(struct XUDPCB *cb, u_char type);
1 int xudp_queueparcel(struct XUDPCB *cb, u_char type) 2 { 3 4 u_short wDataWaiting; 5 char *buffer; 6 struct XUDPParcel *pParcel; 7 Timestamp tto; 8 9 /** Read in the number of bytes in the parcel waiting on the data */ 10 /** socket */ 11 12 streamRead(cb->localclient->fdControl, (char *)&wDataWaiting, 13 sizeof(u_short)); 14 15 /* Read in time to obsolence of parcel */ 16 17 streamRead(cb->localclient->fdControl, (char *) &tto, 18 sizeof(Timestamp)); 19 20 if ( !(buffer=malloc(wDataWaiting))) 21 xudpError("xudp_queuevideo: Allocating RAM"); 22 23 /** Read in the parcel's data */ 24 25 streamRead(cb->localclient->fdVideo, buffer, wDataWaiting); 26 27 /** Set it up and queue it on the queue of parcels waiting to be */ 28 /** fragmented and sent */ 29 30 pParcel=makeParcel(); 31 pParcel->length=wDataWaiting; 32 pParcel->options=type; 33 pParcel->tto=tto; 34 pParcel->buffer=buffer; 35 pParcel->pos=buffer; 36 addParcel(cb->pOutHead, pParcel); 37 38 return 0; 39 }
int xudp_dequeueparcel(struct XUDPCB *cb);
1 int xudp_dequeueparcel(struct XUDPCB *cb) 2 { 3 struct XUDPParcel *pParcel, *pParcelTmp; 4 int err; 5 6 /** Traverse list of assembled parcels received from the network */ 7 8 pParcel=cb->pInHead->next; 9 while (pParcel!=NULL) 10 { 11 pParcelTmp=pParcel->next; 12 13 /** Hits the earliest parcel first! */ 14 /** Write the parcel's data to the appropriate socket stream */ 15 16 err=write(cb->localclient->fdVideo, pParcel->buffer, 17 pParcel->length); 18 19 /** Send the appropriate command and size over the control */ 20 /** socket connection */ 21 22 xudpSendFunction(cb->localclient, F_RECVVIDEO, 23 (char *)&pParcel->length); 24 25 26 /** Remove the parcel from the queue, if it was sent ok. */ 27 /** If it wasn't, return immediately */ 28 29 if (err==pParcel->length) 30 deleteParcel(pParcel); 31 else return -1; 32 pParcel=pParcelTmp; 33 } 34 return 0; 35 }
int xudp_makecommandparcel(struct XUDPCB *cb);
1 int xudp_makecommandparcel(struct XUDPCB *cb) 2 { 3 struct XUDPParcel *pParcel; 4 5 /** Set up a parcel for a RELIABLE, COMMAND payload */ 6 7 pParcel=makeParcel(); 8 pParcel->length=0; 9 pParcel->options=COMMAND; 10 pParcel->tto=RELIABLE; 11 pParcel->buffer=NULL; 12 pParcel->pos=NULL; 13 14 /** Queue parcel on the list of parcels waiting to be fragmented */ 15 /** and sent on the network */ 16 17 addParcel(cb->pOutHead, pParcel); 18 cb->push=TRUE; 19 20 return 0; 21 }
int xudp_fragmentparcel(struct XUDPCB *cb);
1 int xudp_fragmentparcel(struct XUDPCB *cb) 2 { 3 static Sequence sequence=0; 4 struct XUDPParcel *pParcel, *pParcelTmp; 5 struct XUDPPacket *pHead, *pPacket; 6 struct XUDPCmd *pCmd, *pCmdTmp; 7 u_char *pPayload; 8 u_short cmds=0; 9 u_short temp; 10 u_short length=0; 11 u_short datalen=0; 12 13 pHead=cb->pXmitQueue; 14 pPacket=NULL; 15 16 pParcel=cb->pOutHead->next; 17 pCmd=cb->pCmdQueue->next; 18 19 /** We're only going to fragment parcels to create packets while */ 20 /** there is room available on the network */ 21 /*** PARCEL LEVEL: Traverses the list of parcels waiting to be */ 22 /*** fragmented */ 23 24 while ((pParcel!=NULL) && 25 (cb->packetsInPipe < (min(cb->winRemote, cb->winCongestion)) 26 || cb->push)) 27 { 28 pParcelTmp=pParcel->next; 29 30 /** Test again to make sure we still have room on the net. */ 31 /** This may create a packet with data from this parcel, with */ 32 /** data from commands that are queued up to be sent (even a */ 33 /** zero-length payload with only commands is okay). */ 34 /*** PACKET LEVEL: Creates a new packet with either commands, */ 35 /*** payload or both */ 36 37 while (((pParcel->length) || (pCmd!=NULL)) && 38 ((cb->packetsInPipe < min(cb->winRemote, cb->winCongestion)) || 39 cb->push)) 40 { 41 cmds=0; 42 length=0; 43 datalen=0; 44 45 pPacket=makePacket(); 46 addPacket(pHead, pPacket); /* Tack this new packet onto the */ 47 /* Xmit queue */ 48 49 pPayload=(pPacket->payload)+HEADERSIZE; /* Skip over the space */ 50 /* where the header goes */ 51 52 if (pParcel->buffer==pParcel->pos) sequence=0; 53 /* if we're at the start of a parcel, reset */ 54 /* packet sequence */ 55 56 /* Go through the list of queued up commands, and add up all */ 57 /* the commands and lengths of arguments */ 58 59 pCmdTmp=pCmd; 60 temp=0; 61 while (pCmdTmp!=NULL) 62 { 63 temp+=1+pCmdTmp->length; 64 pCmdTmp=pCmdTmp->next; 65 } 66 /** "temp" now holds the total length of all of the */ 67 /** commands in the command queue */ 68 69 /* Sees if this is going to be the last packet in the parcel, */ 70 /* if so, queues up a EOP command for it */ 71 /** Commands actually take precedence to data here, so all */ 72 /** commands that are queued up when xudp_fragmentparcel */ 73 /** is called get sent before any data */ 74 75 if ((temp+HEADERSIZE+pParcel->length+1)<=cb->mps) 76 { 77 pCmdTmp=makeCmd(); 78 pCmdTmp->cmd=C_EOP; 79 pCmdTmp->length=0; 80 addCmd(cb->pCmdQueue, pCmdTmp); 81 pCmd=cb->pCmdQueue->next; 82 } 83 84 /** COMMAND LEVEL: Traverse list of commands waiting to be */ 85 /** sent, preparing and de-queueing them */ 86 /** Add commands to packet until we've reached the end */ 87 /** of the list or we've reached the maximum allowable */ 88 /** commands in a packet. If the packet is a COMMAND */ 89 /** packet, then there isn't a limit. */ 90 91 while ((pCmd!=NULL) && ((cmds<=MAXCMD) || 92 (pParcel->options==COMMAND))) 93 { 94 pCmdTmp=pCmd; 95 96 /** Make sure we have space left in the packet! */ 97 98 if (length <= (cb->mps - HEADERSIZE - pCmd->length)) 99 { 100 *(pPayload++)=pCmd->cmd; 101 if (pCmd->length>0) 102 memcpy(pPayload, pCmd->argbuffer, pCmd->length); 103 pPayload+=pCmd->length; 104 length+=(pCmd->length)+1; 105 cmds++; 106 pCmd=pCmd->next; 107 deleteCmd(pCmdTmp); /* Delete old commands */ 108 } 109 else pCmd=pCmd->next; 110 } 111 112 /** Finish setting up the packet headers and stuff */ 113 /*** "temp" is now the minimum of the maximum remaining */ 114 /*** payload capacity of this packet and the remaining */ 115 /*** bytes in the current parcel */ 116 117 temp=min((cb->mps - HEADERSIZE - length), pParcel->length); 118 length+=temp; 119 datalen+=temp; 120 121 /** Copy the payload out of the parcel into the packet */ 122 123 if (temp) memcpy(pPayload, pParcel->pos, temp); 124 125 pPacket->header.sequence=sequence; 126 pPacket->header.parcel=pParcel->parcel; 127 pPacket->header.length=datalen; /* Header length field only */ 128 /* represents the data */ 129 /* payload (no commands) */ 130 pPacket->header.window=cb->winAdvertised; 131 pPacket->header.timestamp=makeTimestamp(); 132 pPacket->header.options=pParcel->options; 133 pPacket->header.tto=pParcel->tto; 134 pPacket->header.cmds=cmds; 135 pPacket->length=length+HEADERSIZE; /* Packet length field */ 136 /* represents both */ 137 /* header and payload */ 138 139 /** Pop data off the top of the parcel's payload */ 140 141 pParcel->length-=datalen; 142 pParcel->pos+=datalen; 143 144 if (++sequence==65535) sequence=0; /* Wrap sequence numbers */ 145 146 cb->packetsInPipe++; 147 cb->push=FALSE; 148 149 #ifdef DEBUG2 150 show_status(cb); 151 #endif 152 } 153 154 if (pParcel->length==0) deleteParcel(pParcel); 155 156 pParcel=pParcelTmp; 157 } 158 159 return 0; 160 }
{int xudp_reassembleparcel(struct XUDPCB *cb);
1 int xudp_reassembleparcel(struct XUDPCB *cb) 2 { 3 struct XUDPParcel *pParcel; 4 struct XUDPFragParcel *pFragParcel, *pFragParcelTmp; 5 struct XUDPPacket *pPacket, *pPacketTmp; 6 char *buffer; 7 8 /** Traverse list of fragmented parcels that have been received */ 9 /** from the network */ 10 11 pFragParcel=cb->pRecvWaitQueue->next; 12 while(pFragParcel!=NULL) 13 { 14 pFragParcelTmp=pFragParcel->next; 15 16 /** Only consider the ones that are complete and have sent an */ 17 /** acknowledgement to the sender */ 18 19 if (pFragParcel->ack) 20 { 21 /** Only attempt to reassemble something if its not a command */ 22 /** parcel! */ 23 24 if (pFragParcel->headPacket->header.length>0) 25 { 26 /** Create a new raw parcel to reassemble into */ 27 28 pParcel=makeParcel(); 29 pParcel->options=pFragParcel->options; 30 pParcel->tto=pFragParcel->tto; 31 pParcel->parcel=pFragParcel->parcel; 32 pParcel->length=0; 33 34 /** Traverse list of packets that form our parcel to */ 35 /** determine the size of buffer to allocate for the raw */ 36 /** parcel */ 37 38 pPacket=pFragParcel->headPacket; 39 while (pPacket!=NULL) 40 { 41 pParcel->length+=pPacket->header.length; 42 pPacket=pPacket->next; 43 } 44 45 if ( !(pParcel->buffer=malloc(pParcel->length)) ) 46 xudpError("xudp_reassembleparcel: Allocating RAM"); 47 buffer=pParcel->buffer; 48 pParcel->pos=buffer; 49 50 /** Traverse the list of packets that form our parcel */ 51 /** and tack their payloads onto our parcel's buffer */ 52 53 pPacket=pFragParcel->headPacket; 54 while (pPacket!=NULL) 55 { 56 pPacketTmp=pPacket->next; 57 memcpy(buffer, pPacket->payload, pPacket->header.length); 58 buffer+=pPacket->header.length; 59 deletePacket(pPacket); 60 pPacket=pPacketTmp; 61 } 62 63 /** Queue up our reassembled RAW parcel on the list of */ 64 /** parcels waiting to be read by the application */ 65 66 addParcel(cb->pInHead, pParcel); 67 } 68 69 /** Record that this parcel number has been used */ 70 cb->usedparcels[pFragParcel->parcel]=TRUE; 71 72 /** Wipe the fragmented version */ 73 deleteFragParcel(pFragParcel); 74 75 #ifdef DEBUG2 76 show_status(cb); 77 #endif 78 } 79 pFragParcel=pFragParcelTmp; 80 } 81 82 return 0; 83 }
int xudp_recvackparcels(struct XUDPCB *cb);
1 int xudp_recvackparcels(struct XUDPCB *cb) 2 { 3 struct XUDPCmd *pCmd, *pCmdTmp; 4 struct XUDPFragParcel *pFragParcel, *pFragParcelTmp; 5 struct XUDPPacket *pPacket, *pPacketTmp; 6 Sequence parcel; 7 Boolean eflag=FALSE, anyacks=FALSE; 8 Timestamp ts; 9 10 pCmd=cb->pCmdInQueue->next; 11 while (pCmd!=NULL) 12 { 13 pCmdTmp=pCmd->next; 14 if (pCmd->cmd==C_ACKPARCEL) 15 { 16 memcpy(&parcel, pCmd->argbuffer, sizeof(Sequence)); 17 parcel=ntohs(parcel); 18 eflag=FALSE; 19 pFragParcel=cb->pAckWaitQueue->next; 20 while ((pFragParcel!=NULL)&&!eflag) 21 { 22 pFragParcelTmp=pFragParcel->next; 23 if (pFragParcel->parcel==parcel) 24 { 25 eflag=TRUE; 26 pPacket=pFragParcel->headPacket; 27 while (pPacket!=NULL) 28 { 29 pPacketTmp=pPacket->next; 30 if (!pPacket->ack) 31 { 32 anyacks=TRUE; 33 if (cb->packetsInPipe>0) cb->packetsInPipe--; 34 ts=makeTimestamp(); 35 cb->timeElapsed=ts; 36 cb->bytesACKed=cb->bytesACKed+pPacket->length; 37 cb->rtt=ts-pPacket->header.timestamp; 38 xudp_rttmath(cb); 39 } 40 41 deletePacket(pPacket); 42 pPacket=pPacketTmp; 43 } 44 deleteFragParcel(pFragParcel); 45 } 46 pFragParcel=pFragParcelTmp; 47 } 48 deleteCmd(pCmd); 49 #ifdef DEBUG2 50 show_status(cb); 51 #endif 52 } 53 54 pCmd=pCmdTmp; 55 } 56 if (anyacks) xudp_windowchange(cb); 57 58 return 0; 59 }
int xudp_ackparcels(struct XUDPCB *cb);
1 int xudp_ackparcels(struct XUDPCB *cb) 2 { 3 struct XUDPCmd *pCmd; 4 struct XUDPFragParcel *pFragParcel; 5 struct XUDPPacket *pPacket; 6 Sequence sequence; 7 Boolean eflag=FALSE; 8 9 /** Traverse list of received fragmented parcels */ 10 11 pFragParcel=cb->pRecvWaitQueue->next; 12 while (pFragParcel!=NULL) 13 { 14 /** Continue if an End-Of-Parcel marker has been received (parcel's */ 15 /** end marker is less than 65535) and parcel hasn't already been */ 16 /** acknowledged. */ 17 18 if ((pFragParcel->end<65535) && (!pFragParcel->ack)) 19 { 20 sequence=0; 21 22 /** Traverse list of packets that form our fragmented parcel, */ 23 /** exiting by setting "eflag" if we find an hole in the */ 24 /** parcel, or by reaching the end of the fragments */ 25 /** successfully */ 26 27 pPacket=pFragParcel->headPacket; 28 eflag=FALSE; 29 while (!eflag&&(pPacket!=NULL)&&(sequence!=pFragParcel->end)) 30 { 31 if (pPacket->header.sequence==sequence) 32 { 33 sequence++; 34 pPacket=pPacket->next; 35 } 36 else eflag=TRUE; 37 } 38 39 /** Do this if we actually have the entire parcel */ 40 41 if (!eflag && (pPacket!=NULL) && (sequence==pFragParcel->end)) 42 { 43 if (pPacket->header.sequence==sequence) 44 { 45 pCmd=makeCmd(); 46 47 /** Acknowledge as either a regular parcel or a */ 48 /** command only parcel */ 49 50 if (pPacket->header.length==0) 51 pCmd->cmd=C_ACKCMDPCL; 52 else pCmd->cmd=C_ACKPARCEL; 53 54 /** Queue up an ACKPARCEL for this parcel and mark it */ 55 /** as such. */ 56 57 pCmd->length=sizeof(Sequence); 58 sequence=pFragParcel->parcel; 59 sequence=htons(sequence); 60 memcpy(pCmd->argbuffer, &sequence, sizeof(Sequence)); 61 addCmd(cb->pCmdQueue, pCmd); 62 pFragParcel->ack=TRUE; 63 64 /** Traverse through the packets that form this parcel */ 65 /** and adjust the remote packetsInPipe size to reflect */ 66 /** the ones that haven't been individually ACKed. */ 67 68 pPacket=pFragParcel->headPacket; 69 while (pPacket!=NULL) 70 { 71 if (!(pPacket->ack)) 72 if (cb->packetsInPipeRmt>0) 73 cb->packetsInPipeRmt--; 74 pPacket=pPacket->next; 75 } 76 } 77 } 78 #ifdef DEBUG2 79 show_status(cb); 80 #endif 81 } 82 83 pFragParcel=pFragParcel->next; 84 } 85 86 return 0; 87 }
int xudp_endparcel(struct XUDPCB *cb);
1 int xudp_endparcel(struct XUDPCB *cb) 2 { 3 struct XUDPFragParcel *pFragParcel; 4 struct XUDPCmd *pCmd, *pCmdTmp; 5 Sequence sequence; 6 Boolean eflag; 7 8 9 /*** Scan through the incoming commands queue for an End Of Parcel. */ 10 /*** If we find it, locate the appropriate parcel in the Receive */ 11 /*** Wait Queue and mark its ending packet. */ 12 13 /** Traverse the list of commands that have been dequeued from */ 14 /** packets received from the network */ 15 16 pCmd=cb->pCmdInQueue->next; 17 while (pCmd!=NULL) 18 { 19 pCmdTmp=pCmd->next; 20 21 /** Go until we find an End Of Parcel command */ 22 23 if (pCmd->cmd==C_EOP) 24 { 25 memcpy(&sequence, pCmd->argbuffer, sizeof(Sequence)); 26 27 /** Traverse the list of parcels that are in the process of */ 28 /** receiving packets from the network */ 29 30 eflag=FALSE; 31 pFragParcel=cb->pRecvWaitQueue->next; 32 while ((pFragParcel!=NULL)&&!eflag) 33 { 34 /** If we find a parcel that matches the one we have an */ 35 /** EOP command for, or we reach the end, exit the loop. */ 36 37 if (pFragParcel->parcel!=sequence) 38 pFragParcel=pFragParcel->next; 39 else eflag=TRUE; 40 } 41 42 /** Set the end packet sequence number on the parcel */ 43 44 if (pFragParcel!=NULL) 45 memcpy(&pFragParcel->end, 46 pCmd->argbuffer+sizeof(Sequence), 47 sizeof(Sequence)); 48 49 /** De-queue the command, as it was either used or refers */ 50 /** to an erroneous parcel that we don't have any data for */ 51 52 deleteCmd(pCmd); 53 } 54 pCmd=pCmdTmp; 55 } 56 return 0; 57 }
int xudp_removeobsolete(struct XUDPCB *cb);
1 int xudp_removeobsolete(struct XUDPCB *cb) 2 { 3 struct XUDPParcel *pParcel, *pParcelTmp; 4 struct XUDPFragParcel *pFragParcel, *pFragParcelTmp; 5 struct XUDPPacket *pPacket; 6 Timestamp ts; 7 int count; 8 9 /** Get the current time */ 10 ts=makeTimestamp(); 11 12 /** Add 1/2 the recent average RTT to it (the delivery time) */ 13 ts+=(int)(0.5*cb->rttavg); 14 15 /** Traverse through the outgoing raw parcel queue */ 16 17 pParcel=cb->pOutHead->next; 18 while (pParcel!=NULL) 19 { 20 pParcelTmp=pParcel->next; 21 22 /** Test if we've exceeded the "sell by" time on this parcel, if */ 23 /** we have, delete it. */ 24 25 if (pParcel->tto<=(ts-pParcel->timestamp)) 26 deleteParcel(pParcel); 27 28 pParcel=pParcelTmp; 29 } 30 31 /** Traverse the Outgoing ACK wait queue */ 32 33 pFragParcel=cb->pAckWaitQueue->next; 34 while (pFragParcel!=NULL) 35 { 36 pFragParcelTmp=pFragParcel->next; 37 38 /** Test to see if we've exceeded the "sell by" time on this */ 39 /** parcel */ 40 41 if (pFragParcel->tto<=(ts-pFragParcel->timestamp)) 42 { 43 /** Traverse the list of packets that form the fragmented */ 44 /** parcel */ 45 46 count=0; 47 pPacket=pFragParcel->headPacket; 48 while (pPacket!=NULL) 49 { 50 /** Keep the accounting straight by incrementing a */ 51 /** counter variable every time we find a packet that */ 52 /** hasn't been acknowledged. (i.e. we still think */ 53 /** its in the network */ 54 55 if (!(pPacket->ack)) count++; 56 57 pPacket=pPacket->next; 58 } 59 60 /** Delete the parcel */ 61 62 deleteFragParcel(pFragParcel); 63 64 /** Realize that these packets can't be in the network */ 65 /** anymore, and that they have lived out their */ 66 /** usefullness. Remove them from the accounting stuff */ 67 68 cb->packetsInPipe-=count; 69 } 70 pFragParcel=pFragParcelTmp; 71 } 72 73 /** Traverse the incoming queue of received fragmented parcels */ 74 75 pFragParcel=cb->pRecvWaitQueue->next; 76 while (pFragParcel!=NULL) 77 { 78 pFragParcelTmp=pFragParcel->next; 79 80 /** Test to see if we've exceeded the "sell by" time on this */ 81 /** parcel, if we have, delete it now. */ 82 83 if (pFragParcel->tto<=(ts-pFragParcel->timestamp)) 84 deleteFragParcel(pFragParcel); 85 86 pFragParcel=pFragParcelTmp; 87 } 88 89 return 0; 90 }