public class RecordSupply
extends java.lang.Object
It is a supply of RecordRingItems in which a single producer does a get()
,
fills the record with data, and finally does a publish(RecordRingItem)
to let consumers know the data is ready.
This class is setup to handle 2 types of consumers.
The first type is a thread which compresses a record's data.
The number of such consumers is set in the constructor.
Each of these will call getToCompress(int)
to get a record
and eventually call releaseCompressor(RecordRingItem)
to indicate it is
finished compressing and the record is available for writing to disk.
The second type of consumer is a single thread which writes all compressed
records to a file. This will call getToWrite()
to get a record
and eventually call releaseWriter(RecordRingItem)
to indicate it is
finished writing and the record is available for being filled with new data.
Due to the multithreaded nature of writing files using this class, a mechanism for reporting errors that occur in the writing and compressing threads is provided. Also, and probably more importantly, one can call errorAlert() to notify any compression or write threads that an error has occurred. That way these threads can clean up and exit.
It transparently makes sure that all records are written in the proper order.
This is a graphical representation of how our ring buffer is set up.
(1) The producer who calls get() will get a ring item allowing a record to be
filled. That same user does a publish() when done with the record.
(2) The consumer who calls getToCompress() will get that ring item and will
compress its data. There may be any number of compression threads
as long as # threads <= # of ring items!!!.
That same user does a releaseCompressor() when done with the record.
(3) The consumer who calls getToWrite() will get that ring item and will
write its data to a file or another buffer. There may be only 1
such thread. This same user does a releaseWriter() when done with the record.
||
|| writeBarrier
^gt; ||
/ _____
Write thread / | \
---> /1 _|_ 2\ <---- Compression Threads 1-M
================ |__/ \__| |
|6 | | 3| V
^ |__|___|__| ==========================
| \ 5 | 4 / compressBarrier
Producer-> \__|__/
Constructor and Description |
---|
RecordSupply()
Constructor.
|
RecordSupply(int ringSize,
java.nio.ByteOrder order,
int threadCount,
int maxEventCount,
int maxBufferSize,
CompressionType compressionType)
Constructor.
|
Modifier and Type | Method and Description |
---|---|
void |
errorAlert()
Method to have sequence barriers throw a Disruptor's AlertException.
|
RecordRingItem |
get()
Get the next available record item from the ring buffer.
|
java.lang.String |
getError()
If there is an error, this contains the error message.
|
long |
getFillLevel()
Get the percentage of data-filled but unwritten records in ring.
|
long |
getLastSequence()
Get the sequence of last ring buffer item published (seq starts at 0).
|
int |
getMaxRingBytes()
Get the max number of bytes the records in this supply can hold all together.
|
java.nio.ByteOrder |
getOrder()
Get the byte order of all records in this supply.
|
int |
getRingSize()
Get the number of records in this supply.
|
RecordRingItem |
getToCompress(int threadNumber)
Get the next available record item from the ring buffer
in order to compress the data already in it.
|
RecordRingItem |
getToWrite()
Get the next available record item from the ring buffer
in order to write data into it.
|
boolean |
haveError()
Has an error occurred in writing or compressing data?
|
void |
haveError(boolean err)
Set whether an error occurred in writing or compressing data.
|
boolean |
isDiskFull()
Has the writing of a RecordRingItem to disk has been stopped
due to the disk partition being full?
|
void |
publish(RecordRingItem item)
Tell consumers that the record item is ready for consumption.
|
void |
release(int threadNum,
long sequenceNum)
Release claim on ring items up to sequenceNum for the given compressor thread.
|
void |
releaseCompressor(RecordRingItem item)
A compressing thread releases its claim on the given ring buffer item
so it becomes available for use by writing thread behind the write barrier.
|
boolean |
releaseWriter(RecordRingItem item)
A writer thread releases its claim on the given ring buffer item
so it becomes available for reuse by the producer.
|
boolean |
releaseWriterSequential(RecordRingItem item)
A writer thread releases its claim on the given ring buffer item
so it becomes available for reuse by the producer.
|
void |
setDiskFull(boolean full)
Set whether the writing of a RecordRingItem to disk has been stopped
due to the disk partition being full.
|
void |
setError(java.lang.String err)
Set the error message.
|
public RecordSupply()
public RecordSupply(int ringSize, java.nio.ByteOrder order, int threadCount, int maxEventCount, int maxBufferSize, CompressionType compressionType) throws java.lang.IllegalArgumentException
ringSize
- number of RecordRingItem objects in ring buffer.order
- byte order of RecordOutputStream in each RecordRingItem object.threadCount
- number of threads simultaneously doing compression.
Must be <= ringSize.maxEventCount
- max number of events each record can hold.
Value <= O means use default (1M).maxBufferSize
- max number of uncompressed data bytes each record can hold.
Value of < 8MB results in default of 8MB.compressionType
- type of data compression to do.java.lang.IllegalArgumentException
- if args < 1, ringSize not power of 2,
threadCount > ringSize.public void errorAlert()
getToCompress(int)
and
getToWrite()
. Do this in case of a write, compress, or some other error.
This allows any threads waiting on these 2 methods to wake up, clean up,
and exit.public int getMaxRingBytes()
public int getRingSize()
public java.nio.ByteOrder getOrder()
public long getFillLevel()
public long getLastSequence()
public RecordRingItem get()
public void publish(RecordRingItem item)
get()
.item
- record item available for consumers' use.public RecordRingItem getToCompress(int threadNumber) throws com.lmax.disruptor.AlertException, java.lang.InterruptedException
threadNumber
- number of thread (0,1, ...) used to compress.
This number cannot exceed (compressionThreadCount - 1).com.lmax.disruptor.AlertException
- if errorAlert()
called.java.lang.InterruptedException
- if thread interrupted.public RecordRingItem getToWrite() throws com.lmax.disruptor.AlertException, java.lang.InterruptedException
com.lmax.disruptor.AlertException
- if errorAlert()
called.java.lang.InterruptedException
- if thread interrupted.public void releaseCompressor(RecordRingItem item)
To be used in conjunction with getToCompress(int)
.
item
- item in ring buffer to release for reuse.public boolean releaseWriterSequential(RecordRingItem item)
getToWrite()
.
Care must be taken to ensure thread-safety.
This method may only be called if the writing is done IN THE SAME THREAD
as the calling of this method so that items are released in sequence
as ensured by the caller.
Otherwise use releaseWriter(RecordRingItem)
.
item
- item in ring buffer to release for reuse.public boolean releaseWriter(RecordRingItem item)
getToWrite()
.Care must be taken to ensure thread-safety. The following can happen if no precautions are taken. In the case of EventWriterUnsync, writing to a file involves 2, simultaneous, asynchronous writes to a file - both in separate threads to the thread which calls the "write" method. If the writing of the later item finishes first, it releases it's item and sequence which, unfortunately, also releases the previous item's sequence (which is still being written). When the first write is complete, it also releases its item. However item.getSequenceObj() will return null (causing NullPointerException) because it was already released thereby allowing it to be reused and reset called on it.
In order to prevent such a scenario, releaseWriter ensures that items are only released in sequence.
If the same item is released more than once, bad things will happen.
Thus the caller must take steps to prevent it. To avoid problems,
call RecordRingItem.setAlreadyReleased(boolean)
and set to true if
item is released but will still be used in some manner.
item
- item in ring buffer to release for reuse.public void release(int threadNum, long sequenceNum)
threadNum
- compressor thread numbersequenceNum
- sequence to releasepublic boolean haveError()
true
if an error occurred in writing or compressing data.public void haveError(boolean err)
err
- true
if an error occurred in writing or compressing data.public java.lang.String getError()
public void setError(java.lang.String err)
err
- error message.public boolean isDiskFull()
public void setDiskFull(boolean full)
full
- true if he writing of a RecordRingItem to disk has been stopped
due to the disk partition being full.