Browse Source

More or less working acquire system

Author: jgg
Date: 1998-11-09 01:09:19 GMT
More or less working acquire system
debian/1.8.y
Arch Librarian 18 years ago
parent
commit
8267fe24f7
  1. 59
      apt-pkg/acquire-item.cc
  2. 11
      apt-pkg/acquire-item.h
  3. 4
      apt-pkg/acquire-method.cc
  4. 55
      apt-pkg/acquire-worker.cc
  5. 11
      apt-pkg/acquire-worker.h
  6. 64
      apt-pkg/acquire.cc
  7. 64
      apt-pkg/acquire.h

59
apt-pkg/acquire-item.cc

@ -1,6 +1,6 @@
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
// $Id: acquire-item.cc,v 1.7 1998/11/05 07:21:35 jgg Exp $
// $Id: acquire-item.cc,v 1.8 1998/11/09 01:09:19 jgg Exp $
/* ######################################################################
Acquire Item - Item to acquire
@ -30,7 +30,8 @@
// Acquire::Item::Item - Constructor /*{{{*/
// ---------------------------------------------------------------------
/* */
pkgAcquire::Item::Item(pkgAcquire *Owner) : Owner(Owner), QueueCounter(0)
pkgAcquire::Item::Item(pkgAcquire *Owner) : Owner(Owner), FileSize(0),
Complete(false), QueueCounter(0)
{
Owner->Add(this);
Status = StatIdle;
@ -59,6 +60,16 @@ void pkgAcquire::Item::Failed(string Message)
}
}
/*}}}*/
// Acquire::Item::Start - Item has begun to download /*{{{*/
// ---------------------------------------------------------------------
/* */
void pkgAcquire::Item::Start(string Message,unsigned long Size)
{
Status = StatFetching;
if (FileSize == 0 && Complete == false)
FileSize = Size;
}
/*}}}*/
// Acquire::Item::Done - Item downloaded OK /*{{{*/
// ---------------------------------------------------------------------
/* */
@ -98,8 +109,19 @@ pkgAcqIndex::pkgAcqIndex(pkgAcquire *Owner,const pkgSourceList::Item *Location)
DestFile = _config->FindDir("Dir::State::lists") + "partial/";
DestFile += URItoFileName(Location->PackagesURI());
QueueURI(Location->PackagesURI() + ".gz",Location->PackagesInfo());
// Create the item
Desc.URI = Location->PackagesURI() + ".gz";
Desc.Description = Location->PackagesInfo();
Desc.Owner = this;
// Set the short description to the archive component
if (Location->Dist[Location->Dist.size() - 1] == '/')
Desc.ShortDesc = Location->Dist;
else
Desc.ShortDesc = Location->Dist + '/' + Location->Section;
QueueURI(Desc);
// Create the Release fetch class
new pkgAcqIndexRel(Owner,Location);
@ -149,6 +171,7 @@ void pkgAcqIndex::Done(string Message,unsigned long Size,string MD5)
}
Erase = false;
Complete = true;
// Handle the unzipd case
string FileName = LookupTag(Message,"Alt-Filename");
@ -159,8 +182,10 @@ void pkgAcqIndex::Done(string Message,unsigned long Size,string MD5)
return;
Decompression = true;
FileSize = 0;
DestFile += ".decomp";
QueueURI("copy:" + FileName,string());
Desc.URI = "copy:" + FileName;
QueueURI(Desc);
return;
}
@ -177,10 +202,13 @@ void pkgAcqIndex::Done(string Message,unsigned long Size,string MD5)
if (FileName == DestFile)
Erase = true;
else
FileSize = 0;
Decompression = true;
DestFile += ".decomp";
QueueURI("gzip:" + FileName,string());
Desc.URI = "gzip:" + FileName,Location->PackagesInfo();
QueueURI(Desc);
}
/*}}}*/
@ -194,7 +222,18 @@ pkgAcqIndexRel::pkgAcqIndexRel(pkgAcquire *Owner,
DestFile = _config->FindDir("Dir::State::lists") + "partial/";
DestFile += URItoFileName(Location->ReleaseURI());
QueueURI(Location->ReleaseURI(),Location->ReleaseInfo());
// Create the item
Desc.URI = Location->ReleaseURI();
Desc.Description = Location->ReleaseInfo();
Desc.Owner = this;
// Set the short description to the archive component
if (Location->Dist[Location->Dist.size() - 1] == '/')
Desc.ShortDesc = Location->Dist;
else
Desc.ShortDesc = Location->Dist + '/' + Location->Section;
QueueURI(Desc);
}
/*}}}*/
// AcqIndexRel::Custom600Headers - Insert custom request headers /*{{{*/
@ -229,6 +268,8 @@ void pkgAcqIndexRel::Done(string Message,unsigned long Size,string MD5)
return;
}
Complete = true;
// The files timestamp matches
if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true)
return;
@ -236,7 +277,9 @@ void pkgAcqIndexRel::Done(string Message,unsigned long Size,string MD5)
// We have to copy it into place
if (FileName != DestFile)
{
QueueURI("copy:" + FileName,string());
FileSize = 0;
Desc.URI = "copy:" + FileName;
QueueURI(Desc);
return;
}

11
apt-pkg/acquire-item.h

@ -1,6 +1,6 @@
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
// $Id: acquire-item.h,v 1.5 1998/11/05 07:21:36 jgg Exp $
// $Id: acquire-item.h,v 1.6 1998/11/09 01:09:21 jgg Exp $
/* ######################################################################
Acquire Item - Item to acquire
@ -30,8 +30,8 @@ class pkgAcquire::Item
protected:
pkgAcquire *Owner;
inline void QueueURI(string URI,string Description)
{Owner->Enqueue(this,URI,Description);};
inline void QueueURI(ItemDesc &Item)
{Owner->Enqueue(Item);};
void Rename(string From,string To);
@ -40,6 +40,8 @@ class pkgAcquire::Item
// State of the item
enum {StatIdle, StatFetching, StatDone, StatError} Status;
string ErrorText;
unsigned long FileSize;
bool Complete;
// Number of queues we are inserted into
unsigned int QueueCounter;
@ -49,6 +51,7 @@ class pkgAcquire::Item
virtual void Failed(string Message);
virtual void Done(string Message,unsigned long Size,string Md5Hash);
virtual void Start(string Message,unsigned long Size);
virtual string Custom600Headers() {return string();};
@ -64,6 +67,7 @@ class pkgAcqIndex : public pkgAcquire::Item
const pkgSourceList::Item *Location;
bool Decompression;
bool Erase;
pkgAcquire::ItemDesc Desc;
public:
@ -79,6 +83,7 @@ class pkgAcqIndexRel : public pkgAcquire::Item
protected:
const pkgSourceList::Item *Location;
pkgAcquire::ItemDesc Desc;
public:

4
apt-pkg/acquire-method.cc

@ -1,6 +1,6 @@
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
// $Id: acquire-method.cc,v 1.4 1998/11/05 07:21:38 jgg Exp $
// $Id: acquire-method.cc,v 1.5 1998/11/09 01:09:22 jgg Exp $
/* ######################################################################
Acquire Method
@ -314,7 +314,7 @@ void pkgAcqMethod::Status(const char *Format,...)
// sprintf the description
char S[1024];
unsigned int Len = snprintf(S,sizeof(S),"101 Status\nURI: %s\n"
unsigned int Len = snprintf(S,sizeof(S),"102 Status\nURI: %s\n"
"Message: ",CurrentURI.c_str());
vsnprintf(S+Len,sizeof(S)-Len,Format,args);

55
apt-pkg/acquire-worker.cc

@ -1,6 +1,6 @@
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
// $Id: acquire-worker.cc,v 1.10 1998/11/05 07:21:39 jgg Exp $
// $Id: acquire-worker.cc,v 1.11 1998/11/09 01:09:23 jgg Exp $
/* ######################################################################
Acquire Worker
@ -22,6 +22,7 @@
#include <apt-pkg/fileutl.h>
#include <strutl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <signal.h>
#include <wait.h>
@ -30,7 +31,8 @@
// Worker::Worker - Constructor for Queue startup /*{{{*/
// ---------------------------------------------------------------------
/* */
pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf)
pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
pkgAcquireStatus *Log) : Log(Log)
{
OwnerQ = Q;
Config = Cnf;
@ -221,10 +223,15 @@ bool pkgAcquire::Worker::RunMessages()
_error->Error("Method gave invalid 200 URI Start message");
break;
}
CurrentItem = Itm;
CurrentSize = 0;
TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str()));
if (Log != 0)
Log->Fetch(*Itm);
break;
}
@ -238,9 +245,21 @@ bool pkgAcquire::Worker::RunMessages()
}
pkgAcquire::Item *Owner = Itm->Owner;
pkgAcquire::ItemDesc Desc = *Itm;
OwnerQ->ItemDone(Itm);
Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
LookupTag(Message,"MD5-Hash"));
ItemDone();
// Log that we are done
if (Log != 0)
{
if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true ||
StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true)
Log->IMSHit(Desc);
else
Log->Done(Desc);
}
break;
}
@ -254,8 +273,14 @@ bool pkgAcquire::Worker::RunMessages()
}
pkgAcquire::Item *Owner = Itm->Owner;
pkgAcquire::ItemDesc Desc = *Itm;
OwnerQ->ItemDone(Itm);
Owner->Failed(Message);
ItemDone();
if (Log != 0)
Log->Fail(Desc);
break;
}
@ -419,3 +444,29 @@ bool pkgAcquire::Worker::MethodFailure()
return false;
}
/*}}}*/
// Worker::Pulse - Called periodically /*{{{*/
// ---------------------------------------------------------------------
/* */
void pkgAcquire::Worker::Pulse()
{
if (CurrentItem == 0)
return;
struct stat Buf;
if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
return;
CurrentSize = Buf.st_size;
}
/*}}}*/
// Worker::ItemDone - Called when the current item is finished /*{{{*/
// ---------------------------------------------------------------------
/* */
void pkgAcquire::Worker::ItemDone()
{
CurrentItem = 0;
CurrentSize = 0;
TotalSize = 0;
Status = string();
}
/*}}}*/

11
apt-pkg/acquire-worker.h

@ -1,6 +1,6 @@
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
// $Id: acquire-worker.h,v 1.6 1998/10/30 07:53:36 jgg Exp $
// $Id: acquire-worker.h,v 1.7 1998/11/09 01:09:24 jgg Exp $
/* ######################################################################
Acquire Worker - Worker process manager
@ -33,6 +33,7 @@ class pkgAcquire::Worker
// The access association
Queue *OwnerQ;
pkgAcquireStatus *Log;
MethodConfig *Config;
string Access;
@ -62,6 +63,7 @@ class pkgAcquire::Worker
bool SendConfiguration();
bool MethodFailure();
void ItemDone();
public:
@ -70,12 +72,13 @@ class pkgAcquire::Worker
string Status;
unsigned long CurrentSize;
unsigned long TotalSize;
// Load the method and do the startup
bool QueueItem(pkgAcquire::Queue::QItem *Item);
bool Start();
bool Start();
void Pulse();
Worker(Queue *OwnerQ,MethodConfig *Config);
Worker(Queue *OwnerQ,MethodConfig *Config,pkgAcquireStatus *Log);
Worker(MethodConfig *Config);
~Worker();
};

64
apt-pkg/acquire.cc

@ -1,6 +1,6 @@
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
// $Id: acquire.cc,v 1.9 1998/11/06 02:52:20 jgg Exp $
// $Id: acquire.cc,v 1.10 1998/11/09 01:09:25 jgg Exp $
/* ######################################################################
Acquire - File Acquiration
@ -22,12 +22,14 @@
#include <apt-pkg/configuration.h>
#include <apt-pkg/error.h>
#include <strutl.h>
#include <sys/time.h>
/*}}}*/
// Acquire::pkgAcquire - Constructor /*{{{*/
// ---------------------------------------------------------------------
/* We grab some runtime state from the configuration space */
pkgAcquire::pkgAcquire()
pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
{
Queues = 0;
Configs = 0;
@ -85,7 +87,7 @@ void pkgAcquire::Remove(Item *Itm)
{
if (*I == Itm)
Items.erase(I);
}
}
}
/*}}}*/
// Acquire::Add - Add a worker /*{{{*/
@ -124,10 +126,10 @@ void pkgAcquire::Remove(Worker *Work)
it is construction which creates a queue (based on the current queue
mode) and puts the item in that queue. If the system is running then
the queue might be started. */
void pkgAcquire::Enqueue(Item *Itm,string URI,string Description)
void pkgAcquire::Enqueue(ItemDesc &Item)
{
// Determine which queue to put the item in
string Name = QueueName(URI);
string Name = QueueName(Item.URI);
if (Name.empty() == true)
return;
@ -144,18 +146,18 @@ void pkgAcquire::Enqueue(Item *Itm,string URI,string Description)
I->Startup();
}
Itm->Status = Item::StatIdle;
Item.Owner->Status = Item::StatIdle;
// Queue it into the named queue
I->Enqueue(Itm,URI,Description);
I->Enqueue(Item);
ToFetch++;
// Some trace stuff
if (Debug == true)
{
clog << "Fetching " << URI << endl;
clog << " to " << Itm->DestFile << endl;
clog << " Queue is: " << QueueName(URI) << endl;
clog << "Fetching " << Item.URI << endl;
clog << " to " << Item.Owner->DestFile << endl;
clog << " Queue is: " << QueueName(Item.URI) << endl;
}
}
/*}}}*/
@ -275,6 +277,9 @@ bool pkgAcquire::Run()
I->Startup();
// Run till all things have been acquired
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 500000;
while (ToFetch > 0)
{
fd_set RFds;
@ -284,15 +289,26 @@ bool pkgAcquire::Run()
FD_ZERO(&WFds);
SetFds(Highest,&RFds,&WFds);
if (select(Highest+1,&RFds,&WFds,0,0) <= 0)
int Res = select(Highest+1,&RFds,&WFds,0,&tv);
if (Res < 0)
{
Running = false;
return _error->Errno("select","Select has failed");
_error->Errno("select","Select has failed");
break;
}
RunFds(&RFds,&WFds);
if (_error->PendingError() == true)
break;
// Timeout, notify the log class
if (Res == 0 || (Log != 0 && Log->Update == true))
{
tv.tv_usec = 500000;
for (Worker *I = Workers; I != 0; I = I->NextAcquire)
I->Pulse();
if (Log != 0)
Log->Pulse(this);
}
}
// Shut down the acquire bits
@ -313,6 +329,14 @@ void pkgAcquire::Bump()
I->Bump();
}
/*}}}*/
// Acquire::WorkerStep - Step to the next worker /*{{{*/
// ---------------------------------------------------------------------
/* Not inlined to advoid including acquire-worker.h */
pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
{
return I->NextAcquire;
};
/*}}}*/
// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
// ---------------------------------------------------------------------
@ -356,19 +380,15 @@ pkgAcquire::Queue::~Queue()
// Queue::Enqueue - Queue an item to the queue /*{{{*/
// ---------------------------------------------------------------------
/* */
void pkgAcquire::Queue::Enqueue(Item *Owner,string URI,string Description)
void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
{
// Create a new item
QItem *I = new QItem;
QItem *I = new QItem;
I->Next = Items;
Items = I;
*I = Item;
// Fill it in
Items->Owner = Owner;
Items->URI = URI;
Items->Description = Description;
Owner->QueueCounter++;
Item.Owner->QueueCounter++;
if (Items->Next == 0)
Cycle();
}
@ -410,7 +430,7 @@ bool pkgAcquire::Queue::Startup()
if (Cnf == 0)
return false;
Workers = new Worker(this,Cnf);
Workers = new Worker(this,Cnf,Owner->Log);
Owner->Add(Workers);
if (Workers->Start() == false)
return false;

64
apt-pkg/acquire.h

@ -1,6 +1,6 @@
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
// $Id: acquire.h,v 1.8 1998/11/05 07:21:41 jgg Exp $
// $Id: acquire.h,v 1.9 1998/11/09 01:09:26 jgg Exp $
/* ######################################################################
Acquire - File Acquiration
@ -41,6 +41,7 @@
#include <unistd.h>
class pkgAcquireStatus;
class pkgAcquire
{
public:
@ -49,6 +50,7 @@ class pkgAcquire
class Queue;
class Worker;
struct MethodConfig;
struct ItemDesc;
friend Item;
friend Queue;
@ -61,8 +63,9 @@ class pkgAcquire
Queue *Queues;
Worker *Workers;
MethodConfig *Configs;
pkgAcquireStatus *Log;
unsigned long ToFetch;
// Configurable parameters for the schedular
enum {QueueHost,QueueAccess} QueueMode;
bool Debug;
@ -73,7 +76,7 @@ class pkgAcquire
void Add(Worker *Work);
void Remove(Worker *Work);
void Enqueue(Item *Item,string URI,string Description);
void Enqueue(ItemDesc &Item);
void Dequeue(Item *Item);
string QueueName(string URI);
@ -88,11 +91,26 @@ class pkgAcquire
MethodConfig *GetConfig(string Access);
bool Run();
// Simple iteration mechanism
inline Worker *WorkersBegin() {return Workers;};
Worker *WorkerStep(Worker *I);
inline Item **ItemsBegin() {return Items.begin();};
inline Item **ItemsEnd() {return Items.end();};
pkgAcquire();
pkgAcquire(pkgAcquireStatus *Log = 0);
~pkgAcquire();
};
// Description of an Item+URI
struct pkgAcquire::ItemDesc
{
string URI;
string Description;
string ShortDesc;
Item *Owner;
};
// List of possible items queued for download.
class pkgAcquire::Queue
{
@ -102,15 +120,19 @@ class pkgAcquire::Queue
protected:
// Queued item
struct QItem
struct QItem : pkgAcquire::ItemDesc
{
QItem *Next;
string URI;
string Description;
Item *Owner;
QItem *Next;
pkgAcquire::Worker *Worker;
};
void operator =(pkgAcquire::ItemDesc const &I)
{
URI = I.URI;
Description = I.Description;
ShortDesc = I.ShortDesc;
Owner = I.Owner;
};
};
// Name of the queue
string Name;
@ -123,11 +145,12 @@ class pkgAcquire::Queue
public:
// Put an item into this queue
void Enqueue(Item *Owner,string URI,string Description);
void Enqueue(ItemDesc &Item);
bool Dequeue(Item *Owner);
// Find a Queued item
QItem *FindItem(string URI,pkgAcquire::Worker *Owner);
bool ItemStart(QItem *Itm,unsigned long Size);
bool ItemDone(QItem *Itm);
bool Startup();
@ -155,4 +178,21 @@ struct pkgAcquire::MethodConfig
MethodConfig();
};
class pkgAcquireStatus
{
public:
bool Update;
// Each of these is called by the workers when an event occures
virtual void IMSHit(pkgAcquire::ItemDesc &Itm) {};
virtual void Fetch(pkgAcquire::ItemDesc &Itm) {};
virtual void Done(pkgAcquire::ItemDesc &Itm) {};
virtual void Fail(pkgAcquire::ItemDesc &Itm) {};
virtual void Pulse(pkgAcquire *Owner) {};
pkgAcquireStatus() : Update(false) {};
virtual ~pkgAcquireStatus() {};
};
#endif

Loading…
Cancel
Save