public class ChunkedPipelineConnection extends ConnectionBase
intended to replace existing pipeline per further tests. This pipeline:
Design:
We're basically delegating output throttling concerns to the OS and the TCP/IP layer using blocking write semantics. The TCP layer will write MTU sized packets, regardless of actual user data, so clearly the more we pack per packet, the higher will be the throughput of the connector.
| Modifier and Type | Class and Description |
|---|---|
(package private) static class |
ChunkedPipelineConnection.Concurrent2LockQueue<E>
This was pretty much based on Maged M.
|
(package private) static class |
ChunkedPipelineConnection.PendingCPRequest
ChunkedPipeline specific of
PendingRequest. |
(package private) static class |
ChunkedPipelineConnection.ProtocolHelper
A not so KISSy reproduction of necessary
Protocol support
from ProtocolBase. |
class |
ChunkedPipelineConnection.ResponseHandler
Provides the response processing logic as a
Runnable. |
Connection.Event, Connection.Factory, Connection.Flag, Connection.Listener, Connection.Modality, Connection.Property, Connection.Socket, Connection.State| Modifier and Type | Field and Description |
|---|---|
(package private) static int |
CHUNK_BUFF_SIZE
chunk buffer size
|
(package private) static int |
CHUNK_Q_SIZE
Chunk Queue size (slots)
|
private byte[] |
chunkbuff
chunk buffer
|
private ChunkedPipelineConnection.PendingCPRequest[] |
chunkqueue
Chunk Queue of requests in Chunk buffer
|
private java.util.concurrent.CountDownLatch |
connectionEstablished
counted down on notifyConnect
|
private int |
ctl_word
chunk [hi:idx | lo:off] control long word
|
private java.util.concurrent.atomic.AtomicBoolean |
isActive
used by the Pipeline to indicate its state.
|
(package private) static int |
MIN_REQ_SIZE
minimum request size in bytes -- using PING e.g.
|
private static int |
MTU_FACTOR
MTU multiples to use as upper bound of the size of the chunk buffer
|
(package private) static int |
MTU_SIZE
Assuming TCP MTU of 1500 - ~tcp header overhead rounded to nearest power of 8
|
private boolean |
pendingQuit
flag (default false) indicates if a pending QUIT command is being processed.
|
(package private) java.util.concurrent.BlockingQueue<ChunkedPipelineConnection.PendingCPRequest[]> |
pendingResponseQueue |
private java.util.concurrent.locks.Lock |
requestlock
synchronization object used to serialize request queuing
|
private ChunkedPipelineConnection.ResponseHandler |
respHandler |
private java.lang.Thread |
respHandlerThread |
spec, thrdProtocol| Constructor and Description |
|---|
ChunkedPipelineConnection(ConnectionSpec spec) |
| Modifier and Type | Method and Description |
|---|---|
protected void |
initializeComponents()
Extension point: child classes may override for additional components:
|
protected java.io.InputStream |
newInputStream(java.io.InputStream socketInputStream)
Just make sure its a
FastBufferedInputStream. |
protected Protocol |
newProtocolHandler()
Pipeline must use a concurrent protocol handler.
|
protected void |
notifyConnected()
Extension point -- callback on this method when
ConnectionBase has connected to server. |
protected void |
notifyDisconnected()
Extension point -- callback on this method when
ConnectionBase has disconnected from server. |
(package private) void |
onResponseHandlerError(ClientRuntimeException cre,
PendingRequest request) |
java.util.concurrent.Future<Response> |
queueRequest(Command cmd,
byte[]... args)
This is a true asynchronous method.
|
addListener, connect, disconnect, getInputStream, getOutputStream, getProtocolHandler, getSpec, initializeAsyncConnection, initializeOnConnect, initializeSocketStreams, initializeSyncConnection, isConnected, newOutputStream, notifyFaulted, notifyListeners, notifyShuttingDown, onConnectionFault, reconnect, removeListener, serviceRequest, shutdown, toStringprivate ChunkedPipelineConnection.ResponseHandler respHandler
private java.lang.Thread respHandlerThread
java.util.concurrent.BlockingQueue<ChunkedPipelineConnection.PendingCPRequest[]> pendingResponseQueue
private java.util.concurrent.locks.Lock requestlock
private boolean pendingQuit
private java.util.concurrent.atomic.AtomicBoolean isActive
private java.util.concurrent.CountDownLatch connectionEstablished
private static final int MTU_FACTOR
static final int MTU_SIZE
static final int CHUNK_BUFF_SIZE
static final int MIN_REQ_SIZE
static final int CHUNK_Q_SIZE
private byte[] chunkbuff
private int ctl_word
private ChunkedPipelineConnection.PendingCPRequest[] chunkqueue
public ChunkedPipelineConnection(ConnectionSpec spec) throws ClientRuntimeException
spec - ClientRuntimeExceptionprotected void initializeComponents()
ConnectionBase
In the extended class:
protected void initializeComponents() {
super.initializeComponents();
// my components here ...
//
}
initializeComponents in class ConnectionBaseprotected void notifyConnected()
ConnectionBaseConnectionBase has connected to server.
It is important to note that the extension must call super.notifyConnected if reliable service (using
heartbeats) is required!.notifyConnected in class ConnectionBaseprotected void notifyDisconnected()
ConnectionBaseConnectionBase has disconnected from server.
It is important to note that the extension must call super.notifyDisconnected if reliable service (using
heartbeats) is required!.notifyDisconnected in class ConnectionBaseprotected Protocol newProtocolHandler()
newProtocolHandler in class ConnectionBaseConnectionBase.newProtocolHandler()protected final java.io.InputStream newInputStream(java.io.InputStream socketInputStream)
throws java.lang.IllegalArgumentException
FastBufferedInputStream.newInputStream in class ConnectionBasejava.lang.IllegalArgumentExceptionpublic final java.util.concurrent.Future<Response> queueRequest(Command cmd, byte[]... args) throws ClientRuntimeException, ProviderException
Command.CONN_FLUSH.
Other item of note is that once a QUIT request has been queued, no further requests are accepted and a ClientRuntimeException is thrown.
queueRequest in interface ConnectionqueueRequest in class ConnectionBaseFuture Response.ClientRuntimeExceptionProviderExceptionConnectionBase.queueRequest(org.jredis.protocol.Command, byte[][])void onResponseHandlerError(ClientRuntimeException cre, PendingRequest request)