Live Sequence Protocol for Java
Implementation of LSP in Java language
 Todos Classes Namespaces Arquivos Funções Variáveis
LspConnection.java
Vá para a documentação deste arquivo.
1 package lsp;
2 
3 import java.net.InetSocketAddress;
4 import java.net.SocketAddress;
5 import java.util.concurrent.atomic.AtomicInteger;
6 
13  private final short id;
14  private final long sockId;
15  private final ConnectionTriggers triggers;
16 
17  private volatile boolean closed;
18  private volatile boolean markClosed;
19  private volatile short seqNum;
20  private volatile long receivedTime;
21  private volatile short receivedSeqNum;
22  private final AtomicInteger sendMissing;
23  private final Object lock = new Object();
24 
25  private volatile InternalPack sentMessage;
26  private final SocketAddress sockAddr;
27  private final Thread statusThread;
28 
41  LspConnection(short id, long sockId, SocketAddress sockAddr, LspParams params, ConnectionTriggers triggers) {
42  if (sockAddr == null || params == null)
43  throw new NullPointerException("Nenhum parâmetro pode ser nulo");
44 
45  this.id = id;
46  this.sockId = sockId;
47  this.sockAddr = sockAddr;
48  this.triggers = triggers;
49  this.closed = false;
50  this.seqNum = 0;
51  this.receivedTime = -1;
52  this.receivedSeqNum = -1;
53  this.sendMissing = new AtomicInteger(0);
54 
55  this.statusThread = new Thread(new StatusChecker(params));
56  this.statusThread.setDaemon(true);
57  this.statusThread.start();
58  }
59 
70  LspConnection(short id, SocketAddress sockAddr, LspParams params, ConnectionTriggers triggers) {
71  this(id, uniqueSockId(sockAddr), sockAddr, params, triggers);
72  }
73 
74  short getId() {
75  return this.id;
76  }
77 
83  static long uniqueSockId(SocketAddress sockAddr) {
84  final InetSocketAddress addr = (InetSocketAddress) sockAddr;
85  final int ip = addr.getAddress().hashCode();
86  final int port = addr.getPort();
87  return (ip & 0xffff_ffffL) << 16 | (short) port;
88  }
89 
95  long getSockId() {
96  return this.sockId;
97  }
98 
99  SocketAddress getSockAddr() {
100  return this.sockAddr;
101  }
102 
107  void incSendMissing() {
108  this.sendMissing.incrementAndGet();
109  }
110 
116  return this.sendMissing.intValue();
117  }
118 
121  return this.sentMessage;
122  }
123 
131  synchronized (lock) {
132  InternalPack p = new InternalPack(this, ++seqNum, pack.getPayload());
133  if (this.sentMessage == null) {
134  this.sentMessage = p;
135  return p;
136  }
137  }
138 
139  return null;
140  }
141 
143  void ack(short seqNum) {
144  // Atualiza o momento de recebimento
145  received();
146 
147  synchronized (lock) {
148  // Marca dados como recebidos, se o número de sequência é igual ao atual
149  if (this.seqNum == seqNum) {
150  this.sentMessage = null;
151  }
152 
153  // Diminuição da quantidade de mensagens faltando entregar.
154  this.sendMissing.decrementAndGet();
155  }
156  }
157 
164  short receivedSeqNum() {
165  return receivedSeqNum;
166  }
167 
172  void received() {
173  this.receivedTime = System.currentTimeMillis();
174  }
175 
180  void received(short seqNum) {
181  // Atualiza o momento de recebimento
182  received();
183  // Altera o número de sequência atual
184  this.receivedSeqNum = seqNum;
185  }
186 
187  boolean isInterrupted() {
188  return this.closed;
189  }
190 
191  boolean isClosed() {
192  return this.closed || this.markClosed;
193  }
194 
195  void close() {
196  close(true);
197  }
198 
199  void close(boolean interrupt) {
200  if (interrupt) {
201  this.statusThread.interrupt();
202  this.closed = true;
203  } else {
204  this.markClosed = true;
205  }
206  }
207 
212  private final class StatusChecker implements Runnable {
213  private final LspParams params;
214 
215  private StatusChecker(LspParams params) {
216  this.params = params;
217  }
218 
219  @Override
220  public void run() {
221  // Obtém o horário da última mensagem recebida em milisegundos
222  long lastTime = receivedTime;
223 
224  // Obtém parâmetros da conexão
225  int limit = params.getEpochLimit();
226  final int epoch = params.getEpoch();
227 
228  // Monitora a conexão continuamente até que o limite de épocas seja
229  // atingido ou a conexão seja fechada
230  while (!closed && limit-- > 0) {
231  // ..ou quando a conexão está no estado de encerramento, até que
232  // não haja mais mensagens para enviar.
233  if (markClosed && sendMissing.get() <= 0) {
234  break;
235  }
236 
237  try {
238  Thread.sleep(epoch);
239  } catch (InterruptedException e) {
240  return;
241  }
242 
243  // Dispara as ações da época
244  triggers.doEpochActions();
245 
246  // Reinicia contagem de épocas se houve mensagens recebidas
247  // desde a última época
248  final long time = receivedTime;
249  if (time != lastTime) {
250  lastTime = time;
251  limit = params.getEpochLimit();
252  }
253  }
254 
255  // Encerra formalmente a conexão
256  triggers.doCloseConnection();
257  }
258  }
259 }
InternalPack sent(Pack pack)
Informa o payload da última mensagem enviada.
LspConnection(short id, long sockId, SocketAddress sockAddr, LspParams params, ConnectionTriggers triggers)
Constrói um objeto LspConnection.
static long uniqueSockId(SocketAddress sockAddr)
Número único gerado a partir de um endereço IP e uma porta.
Representa uma conexão LSP.
void ack(short seqNum)
Informa que o ACK do número de sequência informado foi recebido.
SocketAddress getSockAddr()
InternalPack sent()
Obtém a última mensagem de dados enviada (aguardando ACK)
long getSockId()
Número único gerado a partir de um endereço IP e uma porta.
void received(short seqNum)
Informa o número de sequência em que uma mensagem DATA foi recebida.
byte[] getPayload()
Definition: Pack.java:20
void incSendMissing()
Aumenta em um o número de mensagens na fila, mas faltam enviar.
void received()
Informa que houve uma mensagem recebida por essa conexão.
int getSendMissing()
Número de mensagens na fila, mas faltam enviar.
LspConnection(short id, SocketAddress sockAddr, LspParams params, ConnectionTriggers triggers)
Constrói um objeto LspConnection.
short receivedSeqNum()
Número de sequência da última mensagem DATA recebida por essa conexão.
void close(boolean interrupt)