CloudServices/Sagrada/Queuey: Difference between revisions

no edit summary
(Undo revision 389976 by Telliott (talk))
No edit summary
Line 1: Line 1:
= Overview =
= Overview =


The '''Message Queue''' project is part of [[Services/Sagrada|Project Sagrada]], providing a service for applications to queue messages for clients.
The '''Message Queue''' project is part of [[Services/Sagrada|Project Sagrada]],
providing a service for applications to queue messages for clients.


= Project =
= Terminology =


== Engineers ==
Cassandra
    Apache Cassandra, the default storage back-end for the queue
CL
    Consistency Level
DC
    Data-center
MQ
    message-queue
qdo
    Worker library for queuey
queuey
    The message queue web application Python package that provides the
    RESTful API for the message queue.
 
= Engineers =


* Ben Bangert
* Ben Bangert
* Hanno Schlichting
* Hanno Schlichting


== User Requirements ==
= Design Background =
 
=== Phase 1 ===
 
'''NOTE: This phase is being somewhat skipped at the moment to Phase 2'''
 
The first version of the MQ will focus on providing a useful service for the
[[Services/Notifications|Notifications project]] to enqueue messages for many
queues and for clients to poll for messages on a given queue.
 
No message aggregation is done in this version, if a client needs to get
messages for more than one subscription it must poll each queue and aggregate
them.
 
The first version could be considered a '''Message Store''' rather than a
''queue'' as it supports a much richer set of query semantics and does not let
public consumers remove messages. Messages can be removed via the entire queue
being deleted by the App or by expiring.
 
Requirements:
 
* Service App can create queues
* Service App can add messages to queue
* Messages on queues expire
* Clients may read any queue they are aware of
 
 
=== Phase 2 ===
 
The second version allows authenticated applications to queue messages and for
clients to consume them. This model allows for a worker model where jobs are
added to a queue that multiple clients may be watching, and each message
will be given to an individual client.
 
Optionally, the client can 'reserve' a message to indicate it would like it,
and if the client does not verify that it has successfully handled the message
it will be put back in the queue or to a fail/retry queue if desired.
 
Requirements:
 
* Service App can create queues, which can be partitioned
* Service App can add messages to queue, and specify partition to retain ordering
* Clients can ask for information about how many partitions a queue has
* Clients may read any queue, and its partitions that they are aware of


== Design Background ==
An initial search was done of available message queue products to determine if
any of them would be appropriate for Sagrada. Generally, most MQ's do not
assume message persistence is of great importance, don't have a good RESTful
API, and/or aren't built with the assumption that millions of queues will be
used at once. Having a configurable MQ that can easily have the various message
delivery guarantee's and availability options toggled is quite important, so
Queuey is unique in that deployment and configuration drastically alter the
actual MQ one works with.


For scalability purposes, since some messages may be retained for periods of time
For scalability purposes, since some messages may be retained for periods of
per Phase 1 requirements, the initial choice of a backend is Cassandra. However,
time per Notifications requirements and millions of queues will be required,
for Phase 2 requirements, Cassandra is missing the ability to manage and coordinate
the initial choice for the backend is Cassandra. As Cassandra also provides
queue consumers so Zookeeper is being used for distributed synchronization.
fast reads with internal caching (like memcached), using it in single node mode
for Socorro should also work well.


When used for Notifications, each user will have a single queue per Notification
When used for Notifications, each user will have a single queue per Notification
Line 67: Line 47:
a more level distribution rather than a single deep queue.
a more level distribution rather than a single deep queue.


Under the Phase 2 use-case, it is most likely desired that massive amounts of
Under the Socorro use-case, it is most likely desired that massive amounts of
messages may be intended for the same queue, which would normally result in a
messages may be intended for the same queue, which would normally result in a
single extremely deep queue. Deep queue's do not scale well horizontally,
single extremely deep queue. Deep queue's do not scale well horizontally,
Line 88: Line 68:
to ensure one consumer per partition.
to ensure one consumer per partition.


This MessageQueue project goes with the second option, and only incurs the
Queuey goes with the second option, and only incurs the lock during consumer
lock during consumer addition/removal. The MessageQueue also does not track
addition/removal. The MessageQueue also does not track
the state or last message read in the queue/partition's, it is the consumers
the state or last message read in the queue/partition's, it is the consumers
responsibility to track how far it has read and processed successfully. There
responsibility to track how far it has read and processed successfully. There
Line 95: Line 75:
has successfully processed in a queue/partition.
has successfully processed in a queue/partition.


When reading messages for a processing workload, they should be read in batches
= Architecture =
for performance to avoid network latency overhead.


Since messages are stored and read by timestamp from Cassandra, and Cassandra
'''Queue Web Application'''
only has eventual consistency, there is an enforced delay in how soon a message
is availalable for reading to ensure a consistent picture of the queue. This
is no less than 5 seconds after insertion, and at most about 15 seconds.


When using queue's that are to be consumed, they must be declared up-front as
    queuey (Python web application providing RESTful API)
a ''partioned'' queue. The amount of partitions should also be specified, and
new messages will be randomly partioned. If messages should be processed in
order, they can be inserted into a single partition to enforce ordering. All
messages that are randomly partitioned should be considered loosely ordered.
 
== Architecture ==


'''Queue Storage Backend'''
'''Queue Storage Backend'''
Line 131: Line 101:
     * Consumption and rebalancing is done entirely client-side
     * Consumption and rebalancing is done entirely client-side


== Proposed API ==
Queuey is composed of two core parts:
 
* A web application (handles the RESTful API)
* Storage back-end (used by the web application to store queues/messages)
 
The storage back-end is pluggable, and the current default storage back-end is
Cassandra.
 
Unlike other MQ products, Queuey does not hand out messages amongst all clients
reading from the same queue. Every client is responsible for tracking how far
it has read into the queue, if only a single client should see a message, then
the queue should be partitioned, and clients should decide who reads which
partition of the queue.
 
Different performance and message guarantee characteristics can be configured by
changing the deployment strategy and Cassandra replication and read / write
options. Therefore multiple Queuey deployments will be necessary, and Apps
should use the deployment with the operational characteristics desired.
 
Since messages are stored and read by timestamp from Cassandra, and Cassandra
only has eventual consistency, there is an enforced delay in how soon a message
is available for reading to ensure a consistent picture of the queue. This
helps ensure that written messages will show up when reading the queue to avoid
'losing' messages by reading past where they appeared in the queue.
 
For more on what kind of probabilities are involved in various replication
factors and differing read/write CL's, see:
 
* [[PBS: Probabilistically Bounded Staleness|http://www.eecs.berkeley.edu/~pbailis/projects/pbs/]]
* [[Your Ideal Performance: Consistency Tradeoff|http://www.datastax.com/dev/blog/your-ideal-performance-consistency-tradeoff]]
 
In the event that the clients are deleting messages in their queue as they've
been read, the delay is unimportant. Enforcing a proper delay is only required
when clients read but never delete messages (and thus track how far into the
queue they've read based on time-stamp).
 
When using queue's that are to be consumed, they must be declared up-front as
a ''partioned'' queue. The amount of partitions should also be specified, and
new messages will be randomly partioned. If messages should be processed in
order, they can be inserted into a single partition to enforce ordering. All
messages that are randomly partitioned should be considered loosely ordered.
 
== Queue Workers ==
 
A worker library called Qdo handles coordinating and processing messages off
queues in a job processing setup. The Qdo library utilizes Apache Zookeeper for
worker and queue coordination.
 
== Example Deployment Configurations ==
 
=== Multiple DC Reliability ===
 
This setup is for messages that are critical to deliver, performance is
substantially slower as writes will not return until a quorum of storage
nodes in each data-center acknowledge the message was written.
 
Depending on how urgent it is for the clients to see the message, the read
CL can be between ONE and LOCAL_QUORUM. Higher read availability
will be achieved by using ONE though, with only a slight delay before a
written message will be seen regardless of the data-center.
 
Using a read CL of ONE allows for an entire data-center to become unavailable
and messages will still be delivered, however no new messages can be written
as all data-centers must have quorum's available to write messages. Changing
the write CL to a lower value adversely affects how clients may read the queue
as they may need to track what has been read to ensure they don't miss messages
that are still propagating.
 
**Queuey**
- Deploy to webheads in each data-center
 
**Storage back-end**
- Select Cassandra
- Deploy Cassandra machines (3+) in each data-center
- Set write CL to EACH_QUORUM
- Set read CL between ONE - LOCAL_QUORUM
- Set delay as appropriate for the read/write CL's
 
=== High Throughput, Occasional Unavailability of Messages, Single DC ===
 
In this setup, Queuey behaves like Apache Kafka. The queuey service runs on the
same nodes as Cassandra, and each node runs Cassandra as a single node. This
provides no data durability beyond the disks, so if message persistence is
important than the drives should be RAID. If a node goes down, the messages on
that node are unavailable until the node returns.
 
Applications writing messages should hit a load balancer that has the queuey
nodes behind it, so writes will be unaffected by individual machine outages and
message producers will not need configuration updates to add/remove queuey
nodes.
 
Clients reading queues will have to know the individual hostnames of the queuey
nodes and track how far they've read into the queue per-host, as well as be
able to communicate directly with the queuey nodes (bypassing the load
balancer).
 
**Queuey**
- Deploy to desired nodes
 
**Storage back-end**
- Select Cassandra
- Deploy as single-node instance to every queuey node
- Set read/write CL to ONE, as there is only ONE node
- No delay needed
 
=== Good Throughput, Available Messages, Single DC ===
 
This is a good generic setup that provides message durability in the event a
few nodes are lost. Webheads run queuey, and all connect to the same Cassandra
cluster.
 
Message guarantee's can be tweaked by altering the read/write CL's, and the
enforced message availability delay. For example, to raise availability the
read/write CL can be set to ONE and a delay can be set that will provide a
fairly high 99% change of seeing a message within 50ms of writing it (assuming
no nodes go down).
 
See http://www.eecs.berkeley.edu/~pbailis/projects/pbs/#demo for more details
on tweaking the delay and CL's to achieve the desired message delivery
guarantee's.
 
**Queuey**
- Deploy to webheads
 
**Storage back-end**
- Select Cassandra
- Deploy Cassandra cluster (3+) machines
- Set write CL to ONE - QUORUM
- Set read CL to ONE - QUORUM
- Set delay appropriate for read/write CL's
 
= Initial User Requirements =
 
== Notifications ==
 
The [[Services/Notifications|Notifications project]] needs Queuey for storing
messages on behalf of users that want to receive them. Each user gets their own
queue.
 
If a user needs to get notifications for general topics, the Notifications
application will create a queue and clients will poll multiple queues.
 
The first version could be considered a '''Message Store''' rather than a
''queue'' as it supports a much richer set of query semantics and does not let
public consumers remove messages. Messages can be removed via the entire queue
being deleted by the App or by expiring.
 
Requirements:
 
* Service App can create queues
* Service App can add messages to queue
* Messages on queues expire
* Clients may read any queue they are aware of
 
 
== Socorro ==
 
The second version allows authenticated applications to queue messages and for
clients to consume them. This model allows for a worker model where jobs are
added to a queue that multiple clients may be watching, and each message
will be given to an individual client.
 
Optionally, the client can 'reserve' a message to indicate it would like it,
and if the client does not verify that it has successfully handled the message
it will be put back in the queue or to a fail/retry queue if desired.
 
Requirements:
 
* Service App can create queues, which can be partitioned
* Service App can add messages to queue, and specify partition to retain ordering
* Clients can ask for information about how many partitions a queue has
* Clients may read any queue, and its partitions that they are aware of
 
= API =


Applications allowed to use the '''Message Queue'''
Applications allowed to use the '''Message Queue'''
Line 143: Line 286:
be sent as a HTTP header named 'ApplicationKey'.
be sent as a HTTP header named 'ApplicationKey'.


=== Internal Apps ===
== Internal Apps ==


These methods are authenticated by IP, and are intended for use by Services Applications.
These methods are authenticated by IP, and are intended for use by Services Applications.
Line 171: Line 314:
     Raises an error if the queue does not exist.
     Raises an error if the queue does not exist.


=== Clients ===
== Clients ==


'''GET /queue/{queue_name}'''
'''GET /queue/{queue_name}'''
Line 195: Line 338:
         }
         }


== Use Cases ==
= Use Cases =


=== Notifications ===
== Notifications ==


The new version of [[Services/Notifications/Push|Notifications]] will use the
The new version of [[Services/Notifications/Push|Notifications]] will use the
Line 210: Line 353:
* Messages are encoded as JSON
* Messages are encoded as JSON


=== Socorro ===
== Socorro ==


Socorro is Mozilla's crash reporting service, receiving in excess of 8 thousand
Socorro is Mozilla's crash reporting service, receiving in excess of 8 thousand
Confirmed users
192

edits