Confirmed users
192
edits
No edit summary |
|||
| Line 1: | Line 1: | ||
= Overview = | = Overview = | ||
The '''Message Queue''' project is part of [[Services/Sagrada|Project Sagrada]], providing a service for applications to queue messages for clients. | The '''Message Queue''' project is part of [[Services/Sagrada|Project Sagrada]], | ||
providing a service for applications to queue messages for clients. | |||
= | = Terminology = | ||
Cassandra | |||
Apache Cassandra, the default storage back-end for the queue | |||
CL | |||
Consistency Level | |||
DC | |||
Data-center | |||
MQ | |||
message-queue | |||
qdo | |||
Worker library for queuey | |||
queuey | |||
The message queue web application Python package that provides the | |||
RESTful API for the message queue. | |||
= Engineers = | |||
* Ben Bangert | * Ben Bangert | ||
* Hanno Schlichting | * Hanno Schlichting | ||
== | = Design Background = | ||
An initial search was done of available message queue products to determine if | |||
any of them would be appropriate for Sagrada. Generally, most MQ's do not | |||
assume message persistence is of great importance, don't have a good RESTful | |||
API, and/or aren't built with the assumption that millions of queues will be | |||
used at once. Having a configurable MQ that can easily have the various message | |||
delivery guarantee's and availability options toggled is quite important, so | |||
Queuey is unique in that deployment and configuration drastically alter the | |||
actual MQ one works with. | |||
For scalability purposes, since some messages may be retained for periods of time | For scalability purposes, since some messages may be retained for periods of | ||
per | time per Notifications requirements and millions of queues will be required, | ||
the initial choice for the backend is Cassandra. As Cassandra also provides | |||
fast reads with internal caching (like memcached), using it in single node mode | |||
for Socorro should also work well. | |||
When used for Notifications, each user will have a single queue per Notification | When used for Notifications, each user will have a single queue per Notification | ||
| Line 67: | Line 47: | ||
a more level distribution rather than a single deep queue. | a more level distribution rather than a single deep queue. | ||
Under the | Under the Socorro use-case, it is most likely desired that massive amounts of | ||
messages may be intended for the same queue, which would normally result in a | messages may be intended for the same queue, which would normally result in a | ||
single extremely deep queue. Deep queue's do not scale well horizontally, | single extremely deep queue. Deep queue's do not scale well horizontally, | ||
| Line 88: | Line 68: | ||
to ensure one consumer per partition. | to ensure one consumer per partition. | ||
Queuey goes with the second option, and only incurs the lock during consumer | |||
lock during consumer addition/removal. The MessageQueue also does not track | addition/removal. The MessageQueue also does not track | ||
the state or last message read in the queue/partition's, it is the consumers | the state or last message read in the queue/partition's, it is the consumers | ||
responsibility to track how far it has read and processed successfully. There | responsibility to track how far it has read and processed successfully. There | ||
| Line 95: | Line 75: | ||
has successfully processed in a queue/partition. | has successfully processed in a queue/partition. | ||
= Architecture = | |||
'''Queue Web Application''' | |||
queuey (Python web application providing RESTful API) | |||
'''Queue Storage Backend''' | '''Queue Storage Backend''' | ||
| Line 131: | Line 101: | ||
* Consumption and rebalancing is done entirely client-side | * Consumption and rebalancing is done entirely client-side | ||
== | Queuey is composed of two core parts: | ||
* A web application (handles the RESTful API) | |||
* Storage back-end (used by the web application to store queues/messages) | |||
The storage back-end is pluggable, and the current default storage back-end is | |||
Cassandra. | |||
Unlike other MQ products, Queuey does not hand out messages amongst all clients | |||
reading from the same queue. Every client is responsible for tracking how far | |||
it has read into the queue, if only a single client should see a message, then | |||
the queue should be partitioned, and clients should decide who reads which | |||
partition of the queue. | |||
Different performance and message guarantee characteristics can be configured by | |||
changing the deployment strategy and Cassandra replication and read / write | |||
options. Therefore multiple Queuey deployments will be necessary, and Apps | |||
should use the deployment with the operational characteristics desired. | |||
Since messages are stored and read by timestamp from Cassandra, and Cassandra | |||
only has eventual consistency, there is an enforced delay in how soon a message | |||
is available for reading to ensure a consistent picture of the queue. This | |||
helps ensure that written messages will show up when reading the queue to avoid | |||
'losing' messages by reading past where they appeared in the queue. | |||
For more on what kind of probabilities are involved in various replication | |||
factors and differing read/write CL's, see: | |||
* [[PBS: Probabilistically Bounded Staleness|http://www.eecs.berkeley.edu/~pbailis/projects/pbs/]] | |||
* [[Your Ideal Performance: Consistency Tradeoff|http://www.datastax.com/dev/blog/your-ideal-performance-consistency-tradeoff]] | |||
In the event that the clients are deleting messages in their queue as they've | |||
been read, the delay is unimportant. Enforcing a proper delay is only required | |||
when clients read but never delete messages (and thus track how far into the | |||
queue they've read based on time-stamp). | |||
When using queue's that are to be consumed, they must be declared up-front as | |||
a ''partioned'' queue. The amount of partitions should also be specified, and | |||
new messages will be randomly partioned. If messages should be processed in | |||
order, they can be inserted into a single partition to enforce ordering. All | |||
messages that are randomly partitioned should be considered loosely ordered. | |||
== Queue Workers == | |||
A worker library called Qdo handles coordinating and processing messages off | |||
queues in a job processing setup. The Qdo library utilizes Apache Zookeeper for | |||
worker and queue coordination. | |||
== Example Deployment Configurations == | |||
=== Multiple DC Reliability === | |||
This setup is for messages that are critical to deliver, performance is | |||
substantially slower as writes will not return until a quorum of storage | |||
nodes in each data-center acknowledge the message was written. | |||
Depending on how urgent it is for the clients to see the message, the read | |||
CL can be between ONE and LOCAL_QUORUM. Higher read availability | |||
will be achieved by using ONE though, with only a slight delay before a | |||
written message will be seen regardless of the data-center. | |||
Using a read CL of ONE allows for an entire data-center to become unavailable | |||
and messages will still be delivered, however no new messages can be written | |||
as all data-centers must have quorum's available to write messages. Changing | |||
the write CL to a lower value adversely affects how clients may read the queue | |||
as they may need to track what has been read to ensure they don't miss messages | |||
that are still propagating. | |||
**Queuey** | |||
- Deploy to webheads in each data-center | |||
**Storage back-end** | |||
- Select Cassandra | |||
- Deploy Cassandra machines (3+) in each data-center | |||
- Set write CL to EACH_QUORUM | |||
- Set read CL between ONE - LOCAL_QUORUM | |||
- Set delay as appropriate for the read/write CL's | |||
=== High Throughput, Occasional Unavailability of Messages, Single DC === | |||
In this setup, Queuey behaves like Apache Kafka. The queuey service runs on the | |||
same nodes as Cassandra, and each node runs Cassandra as a single node. This | |||
provides no data durability beyond the disks, so if message persistence is | |||
important than the drives should be RAID. If a node goes down, the messages on | |||
that node are unavailable until the node returns. | |||
Applications writing messages should hit a load balancer that has the queuey | |||
nodes behind it, so writes will be unaffected by individual machine outages and | |||
message producers will not need configuration updates to add/remove queuey | |||
nodes. | |||
Clients reading queues will have to know the individual hostnames of the queuey | |||
nodes and track how far they've read into the queue per-host, as well as be | |||
able to communicate directly with the queuey nodes (bypassing the load | |||
balancer). | |||
**Queuey** | |||
- Deploy to desired nodes | |||
**Storage back-end** | |||
- Select Cassandra | |||
- Deploy as single-node instance to every queuey node | |||
- Set read/write CL to ONE, as there is only ONE node | |||
- No delay needed | |||
=== Good Throughput, Available Messages, Single DC === | |||
This is a good generic setup that provides message durability in the event a | |||
few nodes are lost. Webheads run queuey, and all connect to the same Cassandra | |||
cluster. | |||
Message guarantee's can be tweaked by altering the read/write CL's, and the | |||
enforced message availability delay. For example, to raise availability the | |||
read/write CL can be set to ONE and a delay can be set that will provide a | |||
fairly high 99% change of seeing a message within 50ms of writing it (assuming | |||
no nodes go down). | |||
See http://www.eecs.berkeley.edu/~pbailis/projects/pbs/#demo for more details | |||
on tweaking the delay and CL's to achieve the desired message delivery | |||
guarantee's. | |||
**Queuey** | |||
- Deploy to webheads | |||
**Storage back-end** | |||
- Select Cassandra | |||
- Deploy Cassandra cluster (3+) machines | |||
- Set write CL to ONE - QUORUM | |||
- Set read CL to ONE - QUORUM | |||
- Set delay appropriate for read/write CL's | |||
= Initial User Requirements = | |||
== Notifications == | |||
The [[Services/Notifications|Notifications project]] needs Queuey for storing | |||
messages on behalf of users that want to receive them. Each user gets their own | |||
queue. | |||
If a user needs to get notifications for general topics, the Notifications | |||
application will create a queue and clients will poll multiple queues. | |||
The first version could be considered a '''Message Store''' rather than a | |||
''queue'' as it supports a much richer set of query semantics and does not let | |||
public consumers remove messages. Messages can be removed via the entire queue | |||
being deleted by the App or by expiring. | |||
Requirements: | |||
* Service App can create queues | |||
* Service App can add messages to queue | |||
* Messages on queues expire | |||
* Clients may read any queue they are aware of | |||
== Socorro == | |||
The second version allows authenticated applications to queue messages and for | |||
clients to consume them. This model allows for a worker model where jobs are | |||
added to a queue that multiple clients may be watching, and each message | |||
will be given to an individual client. | |||
Optionally, the client can 'reserve' a message to indicate it would like it, | |||
and if the client does not verify that it has successfully handled the message | |||
it will be put back in the queue or to a fail/retry queue if desired. | |||
Requirements: | |||
* Service App can create queues, which can be partitioned | |||
* Service App can add messages to queue, and specify partition to retain ordering | |||
* Clients can ask for information about how many partitions a queue has | |||
* Clients may read any queue, and its partitions that they are aware of | |||
= API = | |||
Applications allowed to use the '''Message Queue''' | Applications allowed to use the '''Message Queue''' | ||
| Line 143: | Line 286: | ||
be sent as a HTTP header named 'ApplicationKey'. | be sent as a HTTP header named 'ApplicationKey'. | ||
== Internal Apps == | |||
These methods are authenticated by IP, and are intended for use by Services Applications. | These methods are authenticated by IP, and are intended for use by Services Applications. | ||
| Line 171: | Line 314: | ||
Raises an error if the queue does not exist. | Raises an error if the queue does not exist. | ||
== Clients == | |||
'''GET /queue/{queue_name}''' | '''GET /queue/{queue_name}''' | ||
| Line 195: | Line 338: | ||
} | } | ||
= Use Cases = | |||
== Notifications == | |||
The new version of [[Services/Notifications/Push|Notifications]] will use the | The new version of [[Services/Notifications/Push|Notifications]] will use the | ||
| Line 210: | Line 353: | ||
* Messages are encoded as JSON | * Messages are encoded as JSON | ||
== Socorro == | |||
Socorro is Mozilla's crash reporting service, receiving in excess of 8 thousand | Socorro is Mozilla's crash reporting service, receiving in excess of 8 thousand | ||