You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

568 lines
15 KiB

  1. // -*- mode: cpp; mode: fold -*-
  2. // Description /*{{{*/
  3. // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
  4. /* ######################################################################
  5. Acquire Worker
  6. The worker process can startup either as a Configuration prober
  7. or as a queue runner. As a configuration prober it only reads the
  8. configuration message and
  9. ##################################################################### */
  10. /*}}}*/
  11. // Include Files /*{{{*/
  12. #ifdef __GNUG__
  13. #pragma implementation "apt-pkg/acquire-worker.h"
  14. #endif
  15. #include <apt-pkg/acquire-worker.h>
  16. #include <apt-pkg/acquire-item.h>
  17. #include <apt-pkg/configuration.h>
  18. #include <apt-pkg/error.h>
  19. #include <apt-pkg/fileutl.h>
  20. #include <apt-pkg/strutl.h>
  21. #include <apti18n.h>
  22. #include <iostream>
  23. #include <sstream>
  24. #include <fstream>
  25. #include <sys/stat.h>
  26. #include <unistd.h>
  27. #include <fcntl.h>
  28. #include <signal.h>
  29. #include <stdio.h>
  30. #include <errno.h>
  31. /*}}}*/
  32. using namespace std;
  33. // Worker::Worker - Constructor for Queue startup /*{{{*/
  34. // ---------------------------------------------------------------------
  35. /* */
  36. pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
  37. pkgAcquireStatus *Log) : Log(Log)
  38. {
  39. OwnerQ = Q;
  40. Config = Cnf;
  41. Access = Cnf->Access;
  42. CurrentItem = 0;
  43. TotalSize = 0;
  44. CurrentSize = 0;
  45. Construct();
  46. }
  47. /*}}}*/
  48. // Worker::Worker - Constructor for method config startup /*{{{*/
  49. // ---------------------------------------------------------------------
  50. /* */
  51. pkgAcquire::Worker::Worker(MethodConfig *Cnf)
  52. {
  53. OwnerQ = 0;
  54. Config = Cnf;
  55. Access = Cnf->Access;
  56. CurrentItem = 0;
  57. TotalSize = 0;
  58. CurrentSize = 0;
  59. Construct();
  60. }
  61. /*}}}*/
  62. // Worker::Construct - Constructor helper /*{{{*/
  63. // ---------------------------------------------------------------------
  64. /* */
  65. void pkgAcquire::Worker::Construct()
  66. {
  67. NextQueue = 0;
  68. NextAcquire = 0;
  69. Process = -1;
  70. InFd = -1;
  71. OutFd = -1;
  72. OutReady = false;
  73. InReady = false;
  74. Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
  75. }
  76. /*}}}*/
  77. // Worker::~Worker - Destructor /*{{{*/
  78. // ---------------------------------------------------------------------
  79. /* */
  80. pkgAcquire::Worker::~Worker()
  81. {
  82. close(InFd);
  83. close(OutFd);
  84. if (Process > 0)
  85. {
  86. /* Closing of stdin is the signal to exit and die when the process
  87. indicates it needs cleanup */
  88. if (Config->NeedsCleanup == false)
  89. kill(Process,SIGINT);
  90. ExecWait(Process,Access.c_str(),true);
  91. }
  92. }
  93. /*}}}*/
  94. // Worker::Start - Start the worker process /*{{{*/
  95. // ---------------------------------------------------------------------
  96. /* This forks the method and inits the communication channel */
  97. bool pkgAcquire::Worker::Start()
  98. {
  99. // Get the method path
  100. string Method = _config->FindDir("Dir::Bin::Methods") + Access;
  101. if (FileExists(Method) == false)
  102. return _error->Error(_("The method driver %s could not be found."),Method.c_str());
  103. if (Debug == true)
  104. clog << "Starting method '" << Method << '\'' << endl;
  105. // Create the pipes
  106. int Pipes[4] = {-1,-1,-1,-1};
  107. if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
  108. {
  109. _error->Errno("pipe","Failed to create IPC pipe to subprocess");
  110. for (int I = 0; I != 4; I++)
  111. close(Pipes[I]);
  112. return false;
  113. }
  114. for (int I = 0; I != 4; I++)
  115. SetCloseExec(Pipes[I],true);
  116. // Fork off the process
  117. Process = ExecFork();
  118. if (Process == 0)
  119. {
  120. // Setup the FDs
  121. dup2(Pipes[1],STDOUT_FILENO);
  122. dup2(Pipes[2],STDIN_FILENO);
  123. SetCloseExec(STDOUT_FILENO,false);
  124. SetCloseExec(STDIN_FILENO,false);
  125. SetCloseExec(STDERR_FILENO,false);
  126. const char *Args[2];
  127. Args[0] = Method.c_str();
  128. Args[1] = 0;
  129. execv(Args[0],(char **)Args);
  130. cerr << "Failed to exec method " << Args[0] << endl;
  131. _exit(100);
  132. }
  133. // Fix up our FDs
  134. InFd = Pipes[0];
  135. OutFd = Pipes[3];
  136. SetNonBlock(Pipes[0],true);
  137. SetNonBlock(Pipes[3],true);
  138. close(Pipes[1]);
  139. close(Pipes[2]);
  140. OutReady = false;
  141. InReady = true;
  142. // Read the configuration data
  143. if (WaitFd(InFd) == false ||
  144. ReadMessages() == false)
  145. return _error->Error(_("Method %s did not start correctly"),Method.c_str());
  146. RunMessages();
  147. if (OwnerQ != 0)
  148. SendConfiguration();
  149. return true;
  150. }
  151. /*}}}*/
  152. // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
  153. // ---------------------------------------------------------------------
  154. /* */
  155. bool pkgAcquire::Worker::ReadMessages()
  156. {
  157. if (::ReadMessages(InFd,MessageQueue) == false)
  158. return MethodFailure();
  159. return true;
  160. }
  161. /*}}}*/
  162. // Worker::RunMessage - Empty the message queue /*{{{*/
  163. // ---------------------------------------------------------------------
  164. /* This takes the messages from the message queue and runs them through
  165. the parsers in order. */
  166. bool pkgAcquire::Worker::RunMessages()
  167. {
  168. while (MessageQueue.empty() == false)
  169. {
  170. string Message = MessageQueue.front();
  171. MessageQueue.erase(MessageQueue.begin());
  172. if (Debug == true)
  173. clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
  174. // Fetch the message number
  175. char *End;
  176. int Number = strtol(Message.c_str(),&End,10);
  177. if (End == Message.c_str())
  178. return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
  179. string URI = LookupTag(Message,"URI");
  180. pkgAcquire::Queue::QItem *Itm = 0;
  181. if (URI.empty() == false)
  182. Itm = OwnerQ->FindItem(URI,this);
  183. // Determine the message number and dispatch
  184. switch (Number)
  185. {
  186. // 100 Capabilities
  187. case 100:
  188. if (Capabilities(Message) == false)
  189. return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
  190. break;
  191. // 101 Log
  192. case 101:
  193. if (Debug == true)
  194. clog << " <- (log) " << LookupTag(Message,"Message") << endl;
  195. break;
  196. // 102 Status
  197. case 102:
  198. Status = LookupTag(Message,"Message");
  199. break;
  200. // 200 URI Start
  201. case 200:
  202. {
  203. if (Itm == 0)
  204. {
  205. _error->Error("Method gave invalid 200 URI Start message");
  206. break;
  207. }
  208. CurrentItem = Itm;
  209. CurrentSize = 0;
  210. TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
  211. ResumePoint = atoi(LookupTag(Message,"Resume-Point","0").c_str());
  212. Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str()));
  213. // Display update before completion
  214. if (Log != 0 && Log->MorePulses == true)
  215. Log->Pulse(Itm->Owner->GetOwner());
  216. if (Log != 0)
  217. Log->Fetch(*Itm);
  218. break;
  219. }
  220. // 201 URI Done
  221. case 201:
  222. {
  223. if (Itm == 0)
  224. {
  225. _error->Error("Method gave invalid 201 URI Done message");
  226. break;
  227. }
  228. pkgAcquire::Item *Owner = Itm->Owner;
  229. pkgAcquire::ItemDesc Desc = *Itm;
  230. // Display update before completion
  231. if (Log != 0 && Log->MorePulses == true)
  232. Log->Pulse(Owner->GetOwner());
  233. OwnerQ->ItemDone(Itm);
  234. if (TotalSize != 0 &&
  235. (unsigned)atoi(LookupTag(Message,"Size","0").c_str()) != TotalSize)
  236. _error->Warning("Bizarre Error - File size is not what the server reported %s %lu",
  237. LookupTag(Message,"Size","0").c_str(),TotalSize);
  238. Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
  239. LookupTag(Message,"MD5-Hash"),Config);
  240. ItemDone();
  241. // Log that we are done
  242. if (Log != 0)
  243. {
  244. if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true ||
  245. StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true)
  246. {
  247. /* Hide 'hits' for local only sources - we also manage to
  248. hide gets */
  249. if (Config->LocalOnly == false)
  250. Log->IMSHit(Desc);
  251. }
  252. else
  253. Log->Done(Desc);
  254. }
  255. break;
  256. }
  257. // 400 URI Failure
  258. case 400:
  259. {
  260. if (Itm == 0)
  261. {
  262. _error->Error("Method gave invalid 400 URI Failure message");
  263. break;
  264. }
  265. // Display update before completion
  266. if (Log != 0 && Log->MorePulses == true)
  267. Log->Pulse(Itm->Owner->GetOwner());
  268. pkgAcquire::Item *Owner = Itm->Owner;
  269. pkgAcquire::ItemDesc Desc = *Itm;
  270. OwnerQ->ItemDone(Itm);
  271. Owner->Failed(Message,Config);
  272. ItemDone();
  273. if (Log != 0)
  274. Log->Fail(Desc);
  275. break;
  276. }
  277. // 401 General Failure
  278. case 401:
  279. _error->Error("Method %s General failure: %s",Access.c_str(),LookupTag(Message,"Message").c_str());
  280. break;
  281. // 403 Media Change
  282. case 403:
  283. MediaChange(Message);
  284. break;
  285. }
  286. }
  287. return true;
  288. }
  289. /*}}}*/
  290. // Worker::Capabilities - 100 Capabilities handler /*{{{*/
  291. // ---------------------------------------------------------------------
  292. /* This parses the capabilities message and dumps it into the configuration
  293. structure. */
  294. bool pkgAcquire::Worker::Capabilities(string Message)
  295. {
  296. if (Config == 0)
  297. return true;
  298. Config->Version = LookupTag(Message,"Version");
  299. Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
  300. Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
  301. Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
  302. Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
  303. Config->NeedsCleanup = StringToBool(LookupTag(Message,"Needs-Cleanup"),false);
  304. Config->Removable = StringToBool(LookupTag(Message,"Removable"),false);
  305. // Some debug text
  306. if (Debug == true)
  307. {
  308. clog << "Configured access method " << Config->Access << endl;
  309. clog << "Version:" << Config->Version <<
  310. " SingleInstance:" << Config->SingleInstance <<
  311. " Pipeline:" << Config->Pipeline <<
  312. " SendConfig:" << Config->SendConfig <<
  313. " LocalOnly: " << Config->LocalOnly <<
  314. " NeedsCleanup: " << Config->NeedsCleanup <<
  315. " Removable: " << Config->Removable << endl;
  316. }
  317. return true;
  318. }
  319. /*}}}*/
  320. // Worker::MediaChange - Request a media change /*{{{*/
  321. // ---------------------------------------------------------------------
  322. /* */
  323. bool pkgAcquire::Worker::MediaChange(string Message)
  324. {
  325. int status_fd = _config->FindI("APT::Status-Fd",-1);
  326. if(status_fd > 0)
  327. {
  328. string Media = LookupTag(Message,"Media");
  329. string Drive = LookupTag(Message,"Drive");
  330. ostringstream msg,status;
  331. ioprintf(msg,_("Please insert the disc labeled: "
  332. "'%s' "
  333. "in the drive '%s' and press enter."),
  334. Media.c_str(),Drive.c_str());
  335. status << "media-change: " // message
  336. << Media << ":" // media
  337. << Drive << ":" // drive
  338. << msg.str() // l10n message
  339. << endl;
  340. write(status_fd, status.str().c_str(), status.str().size());
  341. }
  342. if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
  343. LookupTag(Message,"Drive")) == false)
  344. {
  345. char S[300];
  346. snprintf(S,sizeof(S),"603 Media Changed\nFailed: true\n\n");
  347. if (Debug == true)
  348. clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
  349. OutQueue += S;
  350. OutReady = true;
  351. return true;
  352. }
  353. char S[300];
  354. snprintf(S,sizeof(S),"603 Media Changed\n\n");
  355. if (Debug == true)
  356. clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
  357. OutQueue += S;
  358. OutReady = true;
  359. return true;
  360. }
  361. /*}}}*/
  362. // Worker::SendConfiguration - Send the config to the method /*{{{*/
  363. // ---------------------------------------------------------------------
  364. /* */
  365. bool pkgAcquire::Worker::SendConfiguration()
  366. {
  367. if (Config->SendConfig == false)
  368. return true;
  369. if (OutFd == -1)
  370. return false;
  371. string Message = "601 Configuration\n";
  372. Message.reserve(2000);
  373. /* Write out all of the configuration directives by walking the
  374. configuration tree */
  375. const Configuration::Item *Top = _config->Tree(0);
  376. for (; Top != 0;)
  377. {
  378. if (Top->Value.empty() == false)
  379. {
  380. string Line = "Config-Item: " + QuoteString(Top->FullTag(),"=\"\n") + "=";
  381. Line += QuoteString(Top->Value,"\n") + '\n';
  382. Message += Line;
  383. }
  384. if (Top->Child != 0)
  385. {
  386. Top = Top->Child;
  387. continue;
  388. }
  389. while (Top != 0 && Top->Next == 0)
  390. Top = Top->Parent;
  391. if (Top != 0)
  392. Top = Top->Next;
  393. }
  394. Message += '\n';
  395. if (Debug == true)
  396. clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
  397. OutQueue += Message;
  398. OutReady = true;
  399. return true;
  400. }
  401. /*}}}*/
  402. // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
  403. // ---------------------------------------------------------------------
  404. /* Send a URI Acquire message to the method */
  405. bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
  406. {
  407. if (OutFd == -1)
  408. return false;
  409. string Message = "600 URI Acquire\n";
  410. Message.reserve(300);
  411. Message += "URI: " + Item->URI;
  412. Message += "\nFilename: " + Item->Owner->DestFile;
  413. Message += Item->Owner->Custom600Headers();
  414. Message += "\n\n";
  415. if (Debug == true)
  416. clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
  417. OutQueue += Message;
  418. OutReady = true;
  419. return true;
  420. }
  421. /*}}}*/
  422. // Worker::OutFdRead - Out bound FD is ready /*{{{*/
  423. // ---------------------------------------------------------------------
  424. /* */
  425. bool pkgAcquire::Worker::OutFdReady()
  426. {
  427. int Res;
  428. do
  429. {
  430. Res = write(OutFd,OutQueue.c_str(),OutQueue.length());
  431. }
  432. while (Res < 0 && errno == EINTR);
  433. if (Res <= 0)
  434. return MethodFailure();
  435. // Hmm.. this should never happen.
  436. if (Res < 0)
  437. return true;
  438. OutQueue.erase(0,Res);
  439. if (OutQueue.empty() == true)
  440. OutReady = false;
  441. return true;
  442. }
  443. /*}}}*/
  444. // Worker::InFdRead - In bound FD is ready /*{{{*/
  445. // ---------------------------------------------------------------------
  446. /* */
  447. bool pkgAcquire::Worker::InFdReady()
  448. {
  449. if (ReadMessages() == false)
  450. return false;
  451. RunMessages();
  452. return true;
  453. }
  454. /*}}}*/
  455. // Worker::MethodFailure - Called when the method fails /*{{{*/
  456. // ---------------------------------------------------------------------
  457. /* This is called when the method is belived to have failed, probably because
  458. read returned -1. */
  459. bool pkgAcquire::Worker::MethodFailure()
  460. {
  461. _error->Error("Method %s has died unexpectedly!",Access.c_str());
  462. ExecWait(Process,Access.c_str(),true);
  463. Process = -1;
  464. close(InFd);
  465. close(OutFd);
  466. InFd = -1;
  467. OutFd = -1;
  468. OutReady = false;
  469. InReady = false;
  470. OutQueue = string();
  471. MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
  472. return false;
  473. }
  474. /*}}}*/
  475. // Worker::Pulse - Called periodically /*{{{*/
  476. // ---------------------------------------------------------------------
  477. /* */
  478. void pkgAcquire::Worker::Pulse()
  479. {
  480. if (CurrentItem == 0)
  481. return;
  482. struct stat Buf;
  483. if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
  484. return;
  485. CurrentSize = Buf.st_size;
  486. // Hmm? Should not happen...
  487. if (CurrentSize > TotalSize && TotalSize != 0)
  488. TotalSize = CurrentSize;
  489. }
  490. /*}}}*/
  491. // Worker::ItemDone - Called when the current item is finished /*{{{*/
  492. // ---------------------------------------------------------------------
  493. /* */
  494. void pkgAcquire::Worker::ItemDone()
  495. {
  496. CurrentItem = 0;
  497. CurrentSize = 0;
  498. TotalSize = 0;
  499. Status = string();
  500. }
  501. /*}}}*/