public abstract class DataChannelAdapter extends CODAStateMachineAdapter implements DataChannel
This class defines an object that can send and receive banks of data in the CODA evio format. It refers to a particular connection (eg. an et open or cMsg connection id).
Modifier and Type | Class and Description |
---|---|
protected static class |
DataChannelAdapter.ThreadState
Keep track of output channel thread's state.
|
Modifier and Type | Field and Description |
---|---|
protected long[] |
availableSequences
Maximum index of available ring items.
|
protected java.lang.String[] |
bAddrList
List of broadcast addresses corresponding to ipAddrList.
|
protected double |
buffersPerSec
Number of buffers per second of data to write over the output channel.
|
protected java.nio.ByteOrder |
byteOrder
Byte order of output data.
|
protected CODAStateIF |
channelState
Channel state.
|
protected DataTransport |
dataTransport
Object used by Emu to create this channel.
|
protected Emu |
emu
EMU object that created this channel.
|
protected EmuEventNotify |
endCallback
Object used by Emu to be notified of END event arrival.
|
protected java.util.concurrent.atomic.AtomicReference<java.lang.String> |
errorMsg
Channel error message.
|
protected int |
eventsPerBuffer
Number of events to place into one data buffer for the output channel.
|
protected boolean |
gotEndCmd
Got END command from Run Control.
|
protected boolean |
gotResetCmd
Got RESET command from Run Control.
|
protected int |
id
Channel id (corresponds to sourceId of ROCs for CODA event building).
|
protected boolean |
ignoreDataErrors
Any error reading/parsing data can be ignored if true.
|
protected boolean |
input
Is this channel an input (true) or output (false) channel?
|
protected int |
inputRingItemCount
Number of items in input ring buffer.
|
protected java.lang.String[] |
ipAddrList
List of destination IP addresses.
|
protected boolean |
isFifo
Is this channel an input or output fifo channel? Convenience variable.
|
protected Logger |
logger
Logger associated with this EMU (convenience member).
|
protected EmuModule |
module
Module to which this channel belongs.
|
protected java.lang.String |
name
Channel name
|
protected long |
nanoSecPerBuf
Number of milliseconds between each buffer being sent.
|
protected long |
nextEvent
Number of the module's buildable event produced
which this channel will output next (starting at 0).
|
protected long[] |
nextSequences
Index of next ring item.
|
protected int |
outputChannelCount
Total number of module's output channels.
|
protected int[] |
outputChannelFill
Number of entries (filled slots) in each ring buffer of this output channel.
|
protected int |
outputIndex
This output channel's order in relation to the other output channels
for module, starting at 0.
|
protected int |
outputModuleInternalRingCount
Number of items in output channel's internal ByteBuffer supplies.
|
protected int |
outputRingCount
Total number of module's event-building threads and therefore output ring buffers.
|
protected int |
outputRingItemCount
Number of items in output ring buffers.
|
protected boolean |
pause
Do we pause the dataThread?
|
protected int |
prescale
For ET output channel of ER, select a prescaled number of events to be sent.
|
protected EmuEventNotify |
prestartCallback
Object used by Emu to be notified of PRESTART event arrival.
|
protected int |
recordId
Record id (corresponds to evio events flowing through data channel).
|
protected boolean |
regulateBufferRate
Use both bufsPerSec and eventsPerBuffer to regulate the data flow out of this channel.
|
protected com.lmax.disruptor.RingBuffer<RingItem> |
ringBufferIn
Ring buffer - one per input channel.
|
protected com.lmax.disruptor.RingBuffer<RingItem>[] |
ringBuffersOut
Array holding all ring buffers for output.
|
protected int |
ringIndex
Ring that the next event will show up on.
|
protected com.lmax.disruptor.SequenceBarrier[] |
sequenceBarriers
One barrier for each output ring.
|
protected com.lmax.disruptor.Sequence[] |
sequences
One sequence for each output ring.
|
protected boolean |
singleEventOut
If we're a PEB or SEB and want to send 1 evio event per et-buffer/cmsg-message,
then this is true.
|
protected int |
totalRingCapacity
Total number of slots in all output channel ring buffers.
|
Constructor and Description |
---|
DataChannelAdapter(java.lang.String name,
DataTransport transport,
java.util.Map<java.lang.String,java.lang.String> attributeMap,
boolean input,
Emu emu,
EmuModule module,
int outputIndex)
Constructor to create a new DataChannel instance.
|
Modifier and Type | Method and Description |
---|---|
DataTransport |
getDataTransport()
Get the DataTransport object used to create this data channel.
|
EmuEventNotify |
getEndCallback()
This method gets the callback object previously registered by the caller
used to notify upon the arrival of an END event.
|
java.lang.String |
getError()
Get any available error information.
|
int |
getID()
Get the CODA ID number of the CODA component connected to this
data channel.
|
int |
getInputLevel()
Get the relative fill level (0-100) of the ring of this input channel.
|
EmuModule |
getModule()
Get the module which created this channel.
|
protected RingItem |
getNextOutputRingItem(int ringIndex)
Gets the next ring buffer item placed there by the last module.
|
long |
getNextSequence(int ringIndex) |
int |
getOutputLevel()
Get the relative fill level (0-100) of all the rings of this output channel.
|
int |
getOutputRingCount()
Get the total number of data-holding output ring buffers.
|
int |
getPrescale()
Get the prescale value associated with this channel.
|
EmuEventNotify |
getPrestartCallback()
This method gets the callback object previously registered by the caller
used to notify upon the arrival of a PRESTART event.
|
int |
getRecordId()
Get the record ID number of the latest event through this channel.
|
com.lmax.disruptor.RingBuffer<RingItem> |
getRingBufferIn()
Get the one and only input ring buffer of this data channel.
|
com.lmax.disruptor.RingBuffer<RingItem>[] |
getRingBuffersOut()
Get the array of output ring buffers.
|
abstract TransportType |
getTransportType()
Get the type of transport this channel implements.
|
protected void |
gotoNextRingItem(int ringIndex)
It prepares to get the next ring item after
getNextOutputRingItem(int)
is called. |
boolean |
isInput()
Get whether this channel is an input channel (true),
or it is an output channel (false).
|
java.lang.String |
name()
Get the name of this data channel.
|
void |
prestart()
This method implements the PRESTART transition of the CODA run control state machine.
|
void |
registerEndCallback(EmuEventNotify callback)
This method allows for setting a object used to notify the caller when an END event
has arrived (or any other occurrence for that matter).
|
void |
registerPrestartCallback(EmuEventNotify callback)
This method allows for setting a object used to notify the caller when a PRESTART event
has arrived (or any other occurrence for that matter).
|
void |
regulateOutputBufferRate(int eventsPerBuffer,
double buffersPerSec)
Turn on regulation of the flow of data through an output channel.
|
protected void |
releaseCurrentAndGoToNextOutputRingItem(int ringIndex)
Releases the item obtained by calling
getNextOutputRingItem(int) ,
so that it may be reused for writing into by the last module. |
protected void |
releaseOutputRingItem(int ringIndex)
Releases the item obtained by calling
getNextOutputRingItem(int) ,
so that it may be reused for writing into by the last module. |
protected void |
sequentialReleaseOutputRingItem(byte[] ringIndexes,
long[] seqs,
int len)
Releases the items obtained by calling
getNextOutputRingItem(int) ,
so that it may be reused for writing into by the last module. |
void |
setDestinationBaList(java.lang.String[] baList)
Set the output channel's list of destination broadcast addresses of emu TCP server
when using emu sockets to communicate - each corresponding to the same element of the IpList.
|
void |
setDestinationIpList(java.lang.String[] ipList)
Set the output channel's list of possible destination IP addresses of emu TCP server
when using emu sockets to communicate.
|
protected int |
setNextEventAndRing()
Set the index of the next buildable event to get from the module
and the ring it will appear on.
|
void |
setRecordId(int recordId)
Set the record ID number of the latest event through this channel.
|
(package private) void |
setupInputRingBuffers()
Setup the input channel ring buffers.
|
(package private) void |
setupOutputRingBuffers()
Setup the output channel ring buffers.
|
CODAStateIF |
state()
Get the state of this object.
|
download, end, go, pause, reset
protected int id
protected int recordId
protected int prescale
protected boolean singleEventOut
protected CODAStateIF channelState
protected java.util.concurrent.atomic.AtomicReference<java.lang.String> errorMsg
protected final java.lang.String name
protected boolean ignoreDataErrors
protected final boolean isFifo
protected final boolean input
protected final Emu emu
protected final Logger logger
protected java.nio.ByteOrder byteOrder
protected EmuEventNotify endCallback
protected EmuEventNotify prestartCallback
protected final EmuModule module
protected final DataTransport dataTransport
protected volatile boolean pause
protected volatile boolean gotEndCmd
protected volatile boolean gotResetCmd
protected java.lang.String[] ipAddrList
protected java.lang.String[] bAddrList
protected boolean regulateBufferRate
protected double buffersPerSec
protected long nanoSecPerBuf
protected int eventsPerBuffer
protected int outputChannelCount
protected int outputRingCount
protected int outputIndex
protected int ringIndex
protected long nextEvent
protected com.lmax.disruptor.RingBuffer<RingItem> ringBufferIn
protected int inputRingItemCount
protected com.lmax.disruptor.RingBuffer<RingItem>[] ringBuffersOut
protected int outputRingItemCount
protected int outputModuleInternalRingCount
protected com.lmax.disruptor.SequenceBarrier[] sequenceBarriers
protected com.lmax.disruptor.Sequence[] sequences
protected long[] nextSequences
protected long[] availableSequences
protected int[] outputChannelFill
protected int totalRingCapacity
public DataChannelAdapter(java.lang.String name, DataTransport transport, java.util.Map<java.lang.String,java.lang.String> attributeMap, boolean input, Emu emu, EmuModule module, int outputIndex)
name
- the name of this channeltransport
- the DataTransport object that this channel belongs toattributeMap
- the hashmap of config file attributes for this channelinput
- true if this is an input data channel, otherwise falseemu
- emu this channel belongs tomodule
- module this channel belongs tooutputIndex
- order in which module's events will be sent to this
output channel (0 for first output channel, 1 for next, etc.).void setupOutputRingBuffers()
void setupInputRingBuffers()
public void prestart() throws CmdExecException
prestart
in interface CODAStateMachine
prestart
in class CODAStateMachineAdapter
CmdExecException
- if error during command execution.protected int setNextEventAndRing()
NOTE: only called IFF outputRingCount > 1.
public abstract TransportType getTransportType()
getTransportType
in interface DataChannel
public EmuModule getModule()
getModule
in interface DataChannel
public int getID()
getID
in interface DataChannel
public int getRecordId()
getRecordId
in interface DataChannel
public void setRecordId(int recordId)
setRecordId
in interface DataChannel
recordId
- record ID number of the latest event through channel.public CODAStateIF state()
state
in interface StatedObject
public java.lang.String getError()
getError
in interface StatedObject
public java.lang.String name()
name
in interface DataChannel
public boolean isInput()
isInput
in interface DataChannel
true
if input channel, else false
public DataTransport getDataTransport()
getDataTransport
in interface DataChannel
public int getOutputRingCount()
getOutputRingCount
in interface DataChannel
public com.lmax.disruptor.RingBuffer<RingItem> getRingBufferIn()
getRingBufferIn
in interface DataChannel
public com.lmax.disruptor.RingBuffer<RingItem>[] getRingBuffersOut()
getRingBuffersOut
in interface DataChannel
public void registerEndCallback(EmuEventNotify callback)
registerEndCallback
in interface CODAStateMachine
registerEndCallback
in class CODAStateMachineAdapter
callback
- object used for notifying caller.public EmuEventNotify getEndCallback()
getEndCallback
in interface CODAStateMachine
getEndCallback
in class CODAStateMachineAdapter
public void registerPrestartCallback(EmuEventNotify callback)
registerPrestartCallback
in interface CODAStateMachine
registerPrestartCallback
in class CODAStateMachineAdapter
callback
- object used for notifying caller.public EmuEventNotify getPrestartCallback()
getPrestartCallback
in interface CODAStateMachine
getPrestartCallback
in class CODAStateMachineAdapter
public long getNextSequence(int ringIndex)
public int getInputLevel()
getInputLevel
in interface DataChannel
public int getOutputLevel()
getOutputLevel
in interface DataChannel
public void setDestinationIpList(java.lang.String[] ipList)
setDestinationIpList
in interface DataChannel
ipList
- list of possible destination IP addresses if connecting to server.public void setDestinationBaList(java.lang.String[] baList)
setDestinationBaList
in interface DataChannel
baList
- list of destination broadcast addresses each corresponding
to the same element of the IpList.public int getPrescale()
getPrescale
in interface DataChannel
public void regulateOutputBufferRate(int eventsPerBuffer, double buffersPerSec)
regulateOutputBufferRate
in interface DataChannel
eventsPerBuffer
- events per output 4 MB buffer.buffersPerSec
- desired number of buffers to be sent per second.protected RingItem getNextOutputRingItem(int ringIndex) throws java.lang.InterruptedException, EmuException
releaseOutputRingItem(int)
AFTER the returned item
is used or nothing will work right.ringIndex
- ring buffer to take item fromjava.lang.InterruptedException
- if thread interrupted.EmuException
- problem with the ring buffer.protected void releaseCurrentAndGoToNextOutputRingItem(int ringIndex)
getNextOutputRingItem(int)
,
so that it may be reused for writing into by the last module.
And it prepares to get the next ring item when that method is called.
Must NOT be used in conjunction with releaseOutputRingItem(int)
and gotoNextRingItem(int)
.ringIndex
- ring buffer to release item toprotected void releaseOutputRingItem(int ringIndex)
getNextOutputRingItem(int)
,
so that it may be reused for writing into by the last module.
Must NOT be used in conjunction with releaseCurrentAndGoToNextOutputRingItem(int)
and must be called after gotoNextRingItem(int)
.ringIndex
- ring buffer to release item toprotected void gotoNextRingItem(int ringIndex)
getNextOutputRingItem(int)
is called.
Must NOT be used in conjunction with releaseCurrentAndGoToNextOutputRingItem(int)
and must be called before releaseOutputRingItem(int)
.ringIndex
- ring buffer to release item toprotected void sequentialReleaseOutputRingItem(byte[] ringIndexes, long[] seqs, int len)
getNextOutputRingItem(int)
,
so that it may be reused for writing into by the last module.
Must NOT be used in conjunction with releaseCurrentAndGoToNextOutputRingItem(int)
or releaseOutputRingItem(int)
and must be called after gotoNextRingItem(int)
.This method ensures that sequences are released in order and is thread-safe. Only works if each ring item is released individually.
ringIndexes
- array of ring buffers to release item toseqs
- array of sequences to release.len
- number of array items to release