Live Sequence Protocol for Java
Implementation of LSP in Java language
 Todos Classes Namespaces Arquivos Funções Variáveis
LspSocket.java
Vá para a documentação deste arquivo.
1 package lsp;
2 
3 import java.io.IOException;
4 import java.net.DatagramPacket;
5 import java.net.DatagramSocket;
6 import java.net.SocketAddress;
7 import java.nio.ByteBuffer;
8 import java.util.concurrent.ArrayBlockingQueue;
9 import java.util.concurrent.BlockingDeque;
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.Callable;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingDeque;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20 
26 abstract class LspSocket {
27  protected static final byte CONNECT = 0;
28  protected static final byte DATA = 1;
29  protected static final byte ACK = 2;
30 
31  private static final byte[] PAYLOAD_NIL = new byte[0];
32 
33  private static final short LEN_PACKAGE = 1024;
34  private static final byte LEN_HEADER = 6;
35  private static final short LEN_PAYLOAD = LEN_PACKAGE - LEN_HEADER;
36 
38  private static final byte QUEUE_ZISE = 50;
39 
40  /* Filas de entrada e saída */
41  private final BlockingQueue<InternalPack> inputQueue;
42  private final BlockingDeque<Pack> outputQueue;
43 
44  /* Lock para garantir que apenas uma thread envie pacotes */
45  private final Object sendLock = new Object();
46 
47  /* Usado no pedido de conexão a um servidor LSP partindo desse socket */
48  private volatile ConnectTask connectTask;
49  private final Object connectLock = new Object();
50 
51  /* Socket de comunicação em uso */
52  private final DatagramSocket socket;
53  private final int port;
54 
55  /* Threads processando entradas e saídas */
56  private final Thread inputThread;
57  private final Thread outputThread;
58 
66  LspSocket(int port, int queueSize) throws IOException {
67  // Cria o socket e as filas
68  this.socket = new DatagramSocket(port);
69  this.port = this.socket.getLocalPort();
70  this.inputQueue = new LinkedBlockingQueue<>(queueSize);
71  this.outputQueue = new LinkedBlockingDeque<>(queueSize);
72 
73  // Inicializa thread de entradas
74  this.inputThread = new Thread(new InputTask());
75  this.inputThread.setDaemon(true);
76  this.inputThread.start();
77 
78  // Inicializa thread de saídas
79  this.outputThread = new Thread(new OutputTask());
80  this.outputThread.setDaemon(true);
81  this.outputThread.start();
82  }
83 
90  LspSocket(int port) throws IOException {
91  this(port, QUEUE_ZISE);
92  }
93 
100  abstract boolean isActive();
101 
102  final void close() {
103  socket.close();
104 
105  // Para todas as threads
106  inputThread.interrupt();
107  outputThread.interrupt();
108 
109  // Limpeza de memória
110  inputQueue.clear();
111  outputQueue.clear();
112  }
113 
118  final LspConnection connect(SocketAddress sockAddr, LspParams params, ConnectionTriggers triggers) throws TimeoutException {
119  // Novo processo de conexão
120  final ConnectTask task = new ConnectTask(sockAddr, params);
121 
122  try {
123  while (isActive()) {
124  synchronized (connectLock) {
125  // Se não há um processo de conexão em curso, registra um
126  if (this.connectTask == null) {
127  this.connectTask = task;
128  }
129 
130  // Havendo um processo de conexão em curso, aguarda uma
131  // época e tenta se conectar novamente
132  else {
133  Thread.sleep(params.getEpoch());
134  continue;
135  }
136  }
137 
138  // Executa o processo de conexão
139  final ExecutorService exec = Executors.newSingleThreadExecutor();
140  final Future<Short> id = exec.submit(task);
141 
142  // Se o processo concluir corretamente, uma nova conexão será gerada
143  try {
144  final short connId = id.get();
145  return new LspConnection(connId, sockAddr, params, triggers);
146  }
147 
148  // Se uma exceção foi lançada, então relança-a contextualmente
149  catch (ExecutionException e) {
150  if (e.getCause() instanceof TimeoutException) {
151  throw (TimeoutException) e.getCause().fillInStackTrace();
152  } else {
153  throw new RuntimeException(e);
154  }
155  }
156 
157  // Garante a liberação da thread de requisição de conexão e
158  // registra que o processo de conexão se encerrou
159  finally {
160  exec.shutdown();
161  this.connectTask = null;
162  }
163  }
164  } catch (InterruptedException e) {}
165 
166  return null;
167  }
168 
175  private void dgramReceive(final DatagramPacket pack) throws IOException {
176  this.socket.receive(pack);
177  final ByteBuffer buf = ByteBuffer.wrap(pack.getData(), 0,
178  pack.getLength()).asReadOnlyBuffer();
179  final short msgType = buf.getShort();
180 
181  switch (msgType) {
182  case CONNECT:
183  dgramReceiveConnect(pack.getSocketAddress(), buf.slice());
184  break;
185  case DATA:
186  dgramReceiveData(pack.getSocketAddress(), buf.slice());
187  break;
188  case ACK:
189  dgramReceiveAck(pack.getSocketAddress(), buf.slice());
190  break;
191  }
192  }
193 
199  void dgramReceiveConnect(final SocketAddress sockAddr, final ByteBuffer buf) {
200  }
201 
203  void dgramReceiveData(final SocketAddress sockAddr, final ByteBuffer buf) {
204  LspConnection conn = usedConnection(sockAddr, buf.getShort());
205 
206  // Só continua se a conexão é válida e não estiver fechada
207  if (conn != null && !conn.isClosed()) {
208  short seqNum = buf.getShort();
209  byte[] payload = payload(buf);
210  InternalPack pack = new InternalPack(conn, seqNum, payload);
211 
212  // Se a mensagem foi enfileirada, envia o ACK e informa o número
213  // de sequência à conexão (usado nos disparos da época).
214  if (inputQueue.offer(pack)) {
215  dgramSendAck(pack);
216  conn.received(seqNum);
217  }
218 
219  // Caso contrário, mesmo que a mensagem não possa ser lida,
220  // atualiza o momento da última mensagem recebida
221  else {
222  conn.received();
223  }
224  }
225  }
226 
228  void dgramReceiveAck(final SocketAddress sockAddr, final ByteBuffer buf) {
229  final short connId = buf.getShort();
230  final LspConnection conn = usedConnection(sockAddr, connId);
231 
232  // Se o connId é válido, reconhece a mensagem
233  if (conn != null) {
234  final short seqNum = buf.getShort();
235  conn.ack(seqNum);
236  }
237 
238  // Senão verifica se há uma tentativa de conexão em curso. Caso
239  // positivo, verifica também se id não é 0, número de sequência é 0,
240  // conferindo antes se o ACK vem do socket remoto correto
241  else {
242  final ConnectTask task = connectTask;
243  if (task != null && connId > 0 && buf.getShort() == 0
244  && sockAddr.equals(task.sockAddr)) {
245  task.ack(connId);
246  }
247  }
248  }
249 
250  private void dgramSend(final SocketAddress sockAddr, final short msgType,
251  final short connId, final short seqNum, final byte[] payload) {
252  ByteBuffer buf = ByteBuffer.allocate(LEN_HEADER + payload.length);
253  buf.putShort(msgType).putShort(connId).putShort(seqNum).put(payload);
254 
255  DatagramPacket packet = new DatagramPacket(buf.array(), buf.capacity());
256  packet.setSocketAddress(sockAddr);
257  try {
258  synchronized (sendLock) {
259  socket.send(packet);
260  }
261  } catch (IOException e) {
262  e.printStackTrace();
263  }
264  }
265 
266  private void dgramSend(final short msgType, final LspConnection conn,
267  final short seqNum, final byte[] payload) {
268  dgramSend(conn.getSockAddr(), msgType, conn.getId(), seqNum, payload);
269  }
270 
271  final void dgramSendData(final LspConnection conn, final short seqNum, final byte[] payload) {
272  if (payload.length > LEN_PAYLOAD) {
273  throw new IllegalArgumentException("Payload não pode ser maior que " + LEN_PAYLOAD);
274  }
275 
276  dgramSend(DATA, conn, seqNum, payload);
277  }
278 
279  final void dgramSendData(final InternalPack p) {
280  dgramSendData(p.getConnection(), p.getSeqNum(), p.getPayload());
281  }
282 
283  final void dgramSendAck(final LspConnection conn, final short seqNum) {
284  dgramSend(ACK, conn, seqNum, PAYLOAD_NIL);
285  }
286 
287  final void dgramSendAck(final InternalPack p) {
288  dgramSendAck(p.getConnection(), p.getSeqNum());
289  }
290 
292  static final byte[] payload(final ByteBuffer buf) {
293  byte[] bs = new byte[buf.remaining()];
294  for (int i = 0; i < bs.length; i++) {
295  bs[i] = buf.get();
296  }
297 
298  return bs;
299  }
300 
307  abstract LspConnection usedConnection(short connId);
308 
309  private LspConnection usedConnection(final SocketAddress sockAddr, final short connId) {
310  final LspConnection conn = usedConnection(connId);
311 
312  // Descarta o pacote se não há conexão aberta com o remetente ou se
313  // o id recebido não corresponde ao id registrado com a conexão.
314  if (conn != null && conn.getId() == connId
315  && sockAddr.equals(conn.getSockAddr())) {
316  return conn;
317  }
318 
319  return null;
320  }
321 
324  while (isActive()) {
325  try {
326  InternalPack nextPack = inputQueue.poll(1, TimeUnit.SECONDS);
327  if (nextPack != null) {
328  return nextPack;
329  }
330  } catch (InterruptedException e) {
331  break;
332  }
333  }
334 
335  return null;
336  }
337 
339  public void send(Pack p) {
340  if (p.getPayload().length > LEN_PAYLOAD) {
341  throw new IllegalArgumentException("Payload não pode ser maior que " + LEN_PAYLOAD);
342  }
343 
344  if (!outputQueue.offer(p)) {
345  throw new IllegalStateException("Fila de saída cheia");
346  }
347  }
348 
349  int getPort() {
350  return this.port;
351  }
352 
353  private final class ConnectTask implements Callable<Short> {
354  private final SocketAddress sockAddr;
355  private final BlockingQueue<Short> result;
356  private final LspParams params;
357 
358  ConnectTask(SocketAddress sockAddr, LspParams params) {
359  this.sockAddr = sockAddr;
360  this.params = params;
361  this.result = new ArrayBlockingQueue<>(1);
362  }
363 
364  @Override
365  public Short call() throws TimeoutException {
366  // Envia e aguarda, durante o tempo das épocas, o ACK da conexão
367  int limit = params.getEpochLimit();
368  while (isActive() && limit-- > 0) {
369  try {
370  dgramSend(sockAddr, CONNECT, (short) 0, (short) 0, PAYLOAD_NIL);
371  Short id = result.poll(params.getEpoch(), TimeUnit.MILLISECONDS);
372  if (id != null) {
373  return id;
374  }
375  } catch (InterruptedException e) {
376  return null;
377  }
378  }
379 
380  // Como já passou o tempo definido nos params, então considera
381  // que o servidor não está disponível
382  throw new TimeoutException("Servidor " + sockAddr + " não responde");
383  }
384 
385  void ack(short connId) {
386  result.offer(connId);
387  }
388  }
389 
390  private final class InputTask implements Runnable {
391  @Override
392  public void run() {
393  // Configuração do pacote de entrada
394  byte[] bs = new byte[LEN_PACKAGE];
395  DatagramPacket pack = new DatagramPacket(bs, bs.length);
396 
397  // Recebe pacotes até o servidor ser encerrado
398  while (isActive()) {
399  try {
400  dgramReceive(pack);
401  } catch (IOException e) {
402  return;
403  }
404  }
405  }
406  }
407 
408  private final class OutputTask implements Runnable {
409  @Override
410  public void run() {
411  // Envia pacotes até o servidor ser encerrado
412  while (isActive()) {
413  try {
414  sendNextData();
415  } catch (InterruptedException e) {
416  return;
417  }
418  }
419  }
420 
421  private void sendNextData() throws InterruptedException {
422  // Obtém o próximo pacote de dados da fila, se houver.
423  final Pack p = outputQueue.poll(1, TimeUnit.SECONDS);
424  if (p == null) {
425  return;
426  }
427 
428  // Se o id de conexão do pacote é inválido, encerra
429  LspConnection conn = usedConnection(p.getConnId());
430  if (conn == null) {
431  return;
432  }
433 
434  // Tenta associar o pacote à conexão. Se sucesso, envia esse pacote
435  InternalPack sent = conn.sent(p);
436  if (sent != null) {
437  dgramSendData(sent);
438  return;
439  }
440 
441  // Se não foi possível associar à conexão (já havia outro pacote em
442  // espera de um ACK) então devolve o pacote à fila (segunda posição)
443  synchronized (outputQueue) {
444  Pack first = outputQueue.poll();
445  outputQueue.offerFirst(p);
446  if (first != null) {
447  outputQueue.offerFirst(first);
448  }
449  }
450  }
451  }
452 }
LspConnection getConnection()
Serviço de entrada e saída de pacotes.
Definition: LspSocket.java:26
final void dgramSendData(final InternalPack p)
Definition: LspSocket.java:279
InternalPack receive()
Recebe um pacote da fila de entrada.
Definition: LspSocket.java:323
final void dgramSendData(final LspConnection conn, final short seqNum, final byte[] payload)
Definition: LspSocket.java:271
static final byte DATA
Definition: LspSocket.java:28
final void dgramSendAck(final InternalPack p)
Definition: LspSocket.java:287
LspSocket(int port, int queueSize)
Inicia um LspSocket.
Definition: LspSocket.java:66
Representa uma conexão LSP.
void dgramReceiveData(final SocketAddress sockAddr, final ByteBuffer buf)
Tratamento de um pacote do tipo DATA recebido.
Definition: LspSocket.java:203
static final byte[] payload(final ByteBuffer buf)
Helper para obter um array de bytes com o resto do ByteBuffer.
Definition: LspSocket.java:292
abstract LspConnection usedConnection(short connId)
Obtém o objeto LspConnection em uso.
void dgramReceiveConnect(final SocketAddress sockAddr, final ByteBuffer buf)
Tratamento de um pacote do tipo CONNECT recebido.
Definition: LspSocket.java:199
final LspConnection connect(SocketAddress sockAddr, LspParams params, ConnectionTriggers triggers)
Tenta estabelecer conexão com um servidor LSP, reenviando solicitação a cada época, até completar o limite da época.
Definition: LspSocket.java:118
static final byte ACK
Definition: LspSocket.java:29
static final byte CONNECT
Definition: LspSocket.java:27
abstract boolean isActive()
Indica até quando o serviço será executado.
byte[] getPayload()
Definition: Pack.java:20
final void close()
Definition: LspSocket.java:102
void send(Pack p)
Insere um pacote na fila de saída.
Definition: LspSocket.java:339
final void dgramSendAck(final LspConnection conn, final short seqNum)
Definition: LspSocket.java:283
LspSocket(int port)
Inicia um LspSocket.
Definition: LspSocket.java:90
void dgramReceiveAck(final SocketAddress sockAddr, final ByteBuffer buf)
Tratamento de um pacote do tipo ACK recebido.
Definition: LspSocket.java:228