Browse Source

stop handling items in doomed transactions

With the previous commit we track the state of transactions, so we can
now use our knowledge to avoid processing data for a transaction which
was already closed (via an abort in this case).

This is needed as multiple independent processes are interacting in the
process, so there isn't a simple immediate full-engine stop and it would
also be bad to teach each and every item how to check if its manager
has failed subordinate and what to do in that case.

In the pdiff case, which deals (potentially) with many items during its
lifetime e.g. a hashsum mismatch in another file can abort the
transaction the file we try to patch via pdiff belongs to. This causes
some of the items (which are already done) to be aborted with it, but
items still in the process of acquisition continue in the processing and
will later try to use all the items together failing in strange ways as
cleanup already happened.

The chosen solution is to dry up the communication channels instead by
ignoring new requests for data acquisition, canceling requests which are
not assigned to a queue and not calling Done/Failed on items anymore.
This means that e.g. already started or pending (e.g. pipelined)
downloads aren't stopped and continue as normal for now, but they remain
in partial/ and aren't processed further so the next update command will
pick them up and put them to good use while the current process fails
updating (for this transaction group) in an orderly fashion.

Closes: 817240
Thanks: Barr Detwix & Vincent Lefevre for log files
tags/debian/1.2.11
David Kalnischkies 5 years ago
parent
commit
38f8704e41
3 changed files with 102 additions and 53 deletions
  1. +8
    -0
      apt-pkg/acquire-item.cc
  2. +63
    -52
      apt-pkg/acquire-worker.cc
  3. +31
    -1
      test/integration/test-pdiff-usage

+ 8
- 0
apt-pkg/acquire-item.cc View File

@@ -281,6 +281,12 @@ bool pkgAcquire::Item::QueueURI(pkgAcquire::ItemDesc &Item)
for a hashsum mismatch to happen which helps nobody) */
bool pkgAcqTransactionItem::QueueURI(pkgAcquire::ItemDesc &Item)
{
if (TransactionManager->State != TransactionStarted)
{
if (_config->FindB("Debug::Acquire::Transaction", false))
std::clog << "Skip " << Target.URI << " as transaction was already dealt with!" << std::endl;
return false;
}
std::string const FinalFile = GetFinalFilename();
if (TransactionManager != NULL && TransactionManager->IMSHit == true &&
FileExists(FinalFile) == true)
@@ -866,6 +872,8 @@ void pkgAcqMetaBase::AbortTransaction()
for (std::vector<pkgAcqTransactionItem*>::iterator I = Transaction.begin();
I != Transaction.end(); ++I)
{
if ((*I)->Status != pkgAcquire::Item::StatFetching)
Owner->Dequeue(*I);
(*I)->TransactionState(TransactionAbort);
}
Transaction.clear();


+ 63
- 52
apt-pkg/acquire-worker.cc View File

@@ -172,6 +172,25 @@ bool pkgAcquire::Worker::ReadMessages()
// ---------------------------------------------------------------------
/* This takes the messages from the message queue and runs them through
the parsers in order. */
enum class APT_HIDDEN MessageType {
CAPABILITIES = 100,
LOG = 101,
STATUS = 102,
REDIRECT = 103,
WARNING = 104,
URI_START = 200,
URI_DONE = 201,
URI_FAILURE = 400,
GENERAL_FAILURE = 401,
MEDIA_CHANGE = 403
};
static bool isDoomedItem(pkgAcquire::Item const * const Itm)
{
auto const TransItm = dynamic_cast<pkgAcqTransactionItem const * const>(Itm);
if (TransItm == nullptr)
return false;
return TransItm->TransactionManager->State != pkgAcqTransactionItem::TransactionStarted;
}
bool pkgAcquire::Worker::RunMessages()
{
while (MessageQueue.empty() == false)
@@ -184,7 +203,7 @@ bool pkgAcquire::Worker::RunMessages()

// Fetch the message number
char *End;
int Number = strtol(Message.c_str(),&End,10);
MessageType const Number = static_cast<MessageType>(strtoul(Message.c_str(),&End,10));
if (End == Message.c_str())
return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());

@@ -210,27 +229,23 @@ bool pkgAcquire::Worker::RunMessages()
// Determine the message number and dispatch
switch (Number)
{
// 100 Capabilities
case 100:
case MessageType::CAPABILITIES:
if (Capabilities(Message) == false)
return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
break;

// 101 Log
case 101:
case MessageType::LOG:
if (Debug == true)
clog << " <- (log) " << LookupTag(Message,"Message") << endl;
break;

// 102 Status
case 102:
case MessageType::STATUS:
Status = LookupTag(Message,"Message");
break;

// 103 Redirect
case 103:
case MessageType::REDIRECT:
{
if (Itm == 0)
if (Itm == nullptr)
{
_error->Error("Method gave invalid 103 Redirect message");
break;
@@ -248,11 +263,13 @@ bool pkgAcquire::Worker::RunMessages()
// and then put it in the main queue again
std::vector<Item*> const ItmOwners = Itm->Owners;
OwnerQ->ItemDone(Itm);
Itm = NULL;
for (pkgAcquire::Queue::QItem::owner_iterator O = ItmOwners.begin(); O != ItmOwners.end(); ++O)
Itm = nullptr;
for (auto const &Owner: ItmOwners)
{
pkgAcquire::Item *Owner = *O;
pkgAcquire::ItemDesc &desc = Owner->GetItemDesc();
if (Log != nullptr)
Log->Done(desc);

// if we change site, treat it as a mirror change
if (URI::SiteOnly(NewURI) != URI::SiteOnly(desc.URI))
{
@@ -270,22 +287,19 @@ bool pkgAcquire::Worker::RunMessages()
}
}
desc.URI = NewURI;
OwnerQ->Owner->Enqueue(desc);

if (Log != 0)
Log->Done(desc);
if (isDoomedItem(Owner) == false)
OwnerQ->Owner->Enqueue(desc);
}
break;
}
// 104 Warning
case 104:
case MessageType::WARNING:
_error->Warning("%s: %s", Itm->Owner->DescURI().c_str(), LookupTag(Message,"Message").c_str());
break;

// 200 URI Start
case 200:
case MessageType::URI_START:
{
if (Itm == 0)
if (Itm == nullptr)
{
_error->Error("Method gave invalid 200 URI Start message");
break;
@@ -295,26 +309,24 @@ bool pkgAcquire::Worker::RunMessages()
CurrentSize = 0;
TotalSize = strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10);
ResumePoint = strtoull(LookupTag(Message,"Resume-Point","0").c_str(), NULL, 10);
for (pkgAcquire::Queue::QItem::owner_iterator O = Itm->Owners.begin(); O != Itm->Owners.end(); ++O)
for (auto const Owner: Itm->Owners)
{
(*O)->Start(Message, TotalSize);

Owner->Start(Message, TotalSize);
// Display update before completion
if (Log != 0)
if (Log != nullptr)
{
if (Log->MorePulses == true)
Log->Pulse((*O)->GetOwner());
Log->Fetch((*O)->GetItemDesc());
Log->Pulse(Owner->GetOwner());
Log->Fetch(Owner->GetItemDesc());
}
}

break;
}

// 201 URI Done
case 201:
case MessageType::URI_DONE:
{
if (Itm == 0)
if (Itm == nullptr)
{
_error->Error("Method gave invalid 201 URI Done message");
break;
@@ -363,9 +375,8 @@ bool pkgAcquire::Worker::RunMessages()

bool const isIMSHit = StringToBool(LookupTag(Message,"IMS-Hit"),false) ||
StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false);
for (pkgAcquire::Queue::QItem::owner_iterator O = ItmOwners.begin(); O != ItmOwners.end(); ++O)
for (auto const Owner: ItmOwners)
{
pkgAcquire::Item * const Owner = *O;
HashStringList const ExpectedHashes = Owner->GetExpectedHashes();
if(_config->FindB("Debug::pkgAcquire::Auth", false) == true)
{
@@ -412,10 +423,12 @@ bool pkgAcquire::Worker::RunMessages()
else // hashsum mismatch
Owner->Status = pkgAcquire::Item::StatAuthError;


if (consideredOkay == true)
{
Owner->Done(Message, ReceivedHashes, Config);
if (Log != 0)
if (isDoomedItem(Owner) == false)
Owner->Done(Message, ReceivedHashes, Config);
if (Log != nullptr)
{
if (isIMSHit)
Log->IMSHit(Owner->GetItemDesc());
@@ -425,8 +438,9 @@ bool pkgAcquire::Worker::RunMessages()
}
else
{
Owner->Failed(Message,Config);
if (Log != 0)
if (isDoomedItem(Owner) == false)
Owner->Failed(Message,Config);
if (Log != nullptr)
Log->Fail(Owner->GetItemDesc());
}
}
@@ -434,10 +448,9 @@ bool pkgAcquire::Worker::RunMessages()
break;
}

// 400 URI Failure
case 400:
case MessageType::URI_FAILURE:
{
if (Itm == 0)
if (Itm == nullptr)
{
std::string const msg = LookupTag(Message,"Message");
_error->Error("Method gave invalid 400 URI Failure message: %s", msg.c_str());
@@ -447,13 +460,13 @@ bool pkgAcquire::Worker::RunMessages()
PrepareFiles("400::URIFailure", Itm);

// Display update before completion
if (Log != 0 && Log->MorePulses == true)
if (Log != nullptr && Log->MorePulses == true)
for (pkgAcquire::Queue::QItem::owner_iterator O = Itm->Owners.begin(); O != Itm->Owners.end(); ++O)
Log->Pulse((*O)->GetOwner());

std::vector<Item*> const ItmOwners = Itm->Owners;
OwnerQ->ItemDone(Itm);
Itm = NULL;
Itm = nullptr;

bool errTransient;
{
@@ -463,27 +476,25 @@ bool pkgAcquire::Worker::RunMessages()
errTransient = std::find(std::begin(reasons), std::end(reasons), failReason) != std::end(reasons);
}

for (pkgAcquire::Queue::QItem::owner_iterator O = ItmOwners.begin(); O != ItmOwners.end(); ++O)
for (auto const Owner: ItmOwners)
{
if (errTransient)
(*O)->Status = pkgAcquire::Item::StatTransientNetworkError;
(*O)->Failed(Message,Config);
if (Log != 0)
Log->Fail((*O)->GetItemDesc());
Owner->Status = pkgAcquire::Item::StatTransientNetworkError;
if (isDoomedItem(Owner) == false)
Owner->Failed(Message,Config);
if (Log != nullptr)
Log->Fail(Owner->GetItemDesc());
}
ItemDone();

break;
}

// 401 General Failure
case 401:
case MessageType::GENERAL_FAILURE:
_error->Error("Method %s General failure: %s",Access.c_str(),LookupTag(Message,"Message").c_str());
break;

// 403 Media Change
case 403:
case MessageType::MEDIA_CHANGE:
MediaChange(Message);
break;
}


+ 31
- 1
test/integration/test-pdiff-usage View File

@@ -10,9 +10,24 @@ LOWCOSTEXT='lz4'

buildaptarchive
setupflataptarchive
changetowebserver -o aptwebserver::support::modified-since=false
changetowebserver

cat >rootdir/etc/apt/apt.conf.d/contents.conf <<EOF
Acquire::IndexTargets::deb::Contents {
MetaKey "\$(COMPONENT)/Contents-\$(ARCHITECTURE)";
ShortDescription "Contents";
Description "\$(RELEASE)/\$(COMPONENT) \$(ARCHITECTURE) Contents";
MetaKey "\$(COMPONENT)/Contents-\$(ARCHITECTURE)";
flatMetaKey "Contents-\$(ARCHITECTURE)";
flatDescription "\$(RELEASE) \$(ARCHITECTURE) Contents";
};
EOF

PKGFILE="${TESTDIR}/$(echo "$(basename $0)" | sed 's#^test-#Packages-#')"
echo 'contents for stuff' > aptarchive/Contents-i386
compressfile aptarchive/Contents-i386
echo 'hacked' > aptarchive/hacked-i386
compressfile aptarchive/hacked-i386

wasmergeused() {
testsuccess apt update "$@"
@@ -192,6 +207,21 @@ SHA256-Download:
testsuccessequal "$(cat Packages-future)
" aptcache show apt newstuff futurestuff

# we reuse the archive state of the previous test here
msgmsg "Testcase: pdiff handling is stopped if transaction fails $*"
rm -rf rootdir/var/lib/apt/lists
cp -a rootdir/var/lib/apt/lists-bak rootdir/var/lib/apt/lists
cp Packages-future aptarchive/Packages
rm -f rootdir/var/lib/apt/lists/*_Contents-*
webserverconfig 'aptwebserver::overwrite::.*Contents-.*::filename' '/hacked-i386.gz'
testfailure apt update "$@" -o Debug::pkgAcquire::Worker=1 -o Debug::Acquire::http=1
webserverconfig 'aptwebserver::overwrite::.*Contents-.*::filename' '/Contents-i386.gz'
cp rootdir/tmp/testfailure.output patchdownload.output
testfailure grep 'rred:600' patchdownload.output
testnopackage newstuff futurestuff
testsuccessequal "$(cat "${PKGFILE}")
" aptcache show apt oldstuff

# we reuse the archive state of the previous test here
msgmsg "Testcase: downloading a patch fails, but successful fallback: $*"
rm -rf rootdir/var/lib/apt/lists


Loading…
Cancel
Save