false, 'PRI', null, null, true),
new ColumnDef('data', 'blob', null, false),
new ColumnDef('prioritise', 'tinyint', 1, false),
+ new ColumnDef('attempts', 'integer', null, false),
new ColumnDef('created', 'datetime', null, false),
new ColumnDef('claimed', 'datetime')));
public $id; // int primary_key not_null auto_increment\r
public $data; // blob not_null\r
public $prioritise; // tinyint(1) not_null\r
+ public $attempts; // int not_null\r
public $created; // datetime() not_null\r
public $claimed; // datetime()\r
\r
return null;\r
}\r
\r
+ /**\r
+ * Increment the attempts count\r
+ *\r
+ * @return void\r
+ * @throws Exception\r
+ */\r
+ public function incAttempts() {\r
+ $orig = clone($this);\r
+ $this->attempts++;\r
+ $result = $this->update($orig);\r
+\r
+ if (!$result) {\r
+ throw Exception(sprintf(_m("Could not increment attempts count for %d"), $this->id));\r
+ }\r
+ }\r
+\r
/**\r
* Release a claimed item.\r
*/\r
$this->messageWaiting = false;
return;
}
- $data = unserialize($wm->data);
- if (!$this->send_raw_message($data)) {
- $this->plugin->enqueue_outgoing_raw(
- array(
- 'type' => 'message',
- 'prioritise' => $data['prioritise'],
- 'data' => $data['data']
- )
- );
+ $data = unserialize($wm->data);
+ $wm->incAttempts();
+
+ if ($this->send_raw_message($data)) {
+ $wm->delete();
+ } else {
+ if ($wm->attempts <= common_config('queue', 'max_retries')) {
+ // Try again next idle
+ $wm->releaseClaim();
+ } else {
+ // Exceeded the maximum number of retries
+ $wm->delete();
+ }
}
-
- $wm->delete();
}
}
}
$wm->data = serialize($data);
$wm->prioritise = $data['prioritise'];
+ $wm->attempts = 0;
$wm->created = common_sql_now();
$result = $wm->insert();