3 import java.io.IOException;
4 import java.net.DatagramPacket;
5 import java.net.DatagramSocket;
6 import java.net.SocketAddress;
7 import java.nio.ByteBuffer;
8 import java.util.concurrent.ArrayBlockingQueue;
9 import java.util.concurrent.BlockingDeque;
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.Callable;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingDeque;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
28 protected static final byte
DATA = 1;
29 protected static final byte
ACK = 2;
31 private static final byte[] PAYLOAD_NIL =
new byte[0];
33 private static final short LEN_PACKAGE = 1024;
34 private static final byte LEN_HEADER = 6;
35 private static final short LEN_PAYLOAD = LEN_PACKAGE - LEN_HEADER;
38 private static final byte QUEUE_ZISE = 50;
41 private final BlockingQueue<InternalPack> inputQueue;
42 private final BlockingDeque<Pack> outputQueue;
45 private final Object sendLock =
new Object();
48 private volatile ConnectTask connectTask;
49 private final Object connectLock =
new Object();
52 private final DatagramSocket socket;
53 private final int port;
56 private final Thread inputThread;
57 private final Thread outputThread;
66 LspSocket(
int port,
int queueSize)
throws IOException {
68 this.socket =
new DatagramSocket(port);
69 this.port = this.socket.getLocalPort();
70 this.inputQueue =
new LinkedBlockingQueue<>(queueSize);
71 this.outputQueue =
new LinkedBlockingDeque<>(queueSize);
74 this.inputThread =
new Thread(
new InputTask());
75 this.inputThread.setDaemon(
true);
76 this.inputThread.start();
79 this.outputThread =
new Thread(
new OutputTask());
80 this.outputThread.setDaemon(
true);
81 this.outputThread.start();
91 this(port, QUEUE_ZISE);
106 inputThread.interrupt();
107 outputThread.interrupt();
120 final ConnectTask task =
new ConnectTask(sockAddr, params);
124 synchronized (connectLock) {
126 if (this.connectTask == null) {
127 this.connectTask = task;
133 Thread.sleep(params.getEpoch());
139 final ExecutorService exec = Executors.newSingleThreadExecutor();
140 final Future<Short>
id = exec.submit(task);
144 final short connId = id.get();
145 return new LspConnection(connId, sockAddr, params, triggers);
149 catch (ExecutionException e) {
150 if (e.getCause() instanceof TimeoutException) {
151 throw (TimeoutException) e.getCause().fillInStackTrace();
153 throw new RuntimeException(e);
161 this.connectTask = null;
164 }
catch (InterruptedException e) {}
175 private void dgramReceive(
final DatagramPacket pack)
throws IOException {
176 this.socket.receive(pack);
177 final ByteBuffer buf = ByteBuffer.wrap(pack.getData(), 0,
178 pack.getLength()).asReadOnlyBuffer();
179 final short msgType = buf.getShort();
207 if (conn != null && !conn.
isClosed()) {
208 short seqNum = buf.getShort();
214 if (inputQueue.offer(pack)) {
216 conn.received(seqNum);
229 final short connId = buf.getShort();
234 final short seqNum = buf.getShort();
242 final ConnectTask task = connectTask;
243 if (task != null && connId > 0 && buf.getShort() == 0
244 && sockAddr.equals(task.sockAddr)) {
250 private void dgramSend(
final SocketAddress sockAddr,
final short msgType,
251 final short connId,
final short seqNum,
final byte[]
payload) {
252 ByteBuffer buf = ByteBuffer.allocate(LEN_HEADER + payload.length);
253 buf.putShort(msgType).putShort(connId).putShort(seqNum).put(payload);
255 DatagramPacket packet =
new DatagramPacket(buf.array(), buf.capacity());
256 packet.setSocketAddress(sockAddr);
258 synchronized (sendLock) {
261 }
catch (IOException e) {
266 private void dgramSend(
final short msgType,
final LspConnection conn,
267 final short seqNum,
final byte[] payload) {
268 dgramSend(conn.getSockAddr(), msgType, conn.getId(), seqNum, payload);
272 if (payload.length > LEN_PAYLOAD) {
273 throw new IllegalArgumentException(
"Payload não pode ser maior que " + LEN_PAYLOAD);
276 dgramSend(
DATA, conn, seqNum, payload);
284 dgramSend(
ACK, conn, seqNum, PAYLOAD_NIL);
292 static final byte[]
payload(
final ByteBuffer buf) {
293 byte[] bs =
new byte[buf.remaining()];
294 for (
int i = 0; i < bs.length; i++) {
314 if (conn != null && conn.
getId() == connId
315 && sockAddr.equals(conn.getSockAddr())) {
326 InternalPack nextPack = inputQueue.poll(1, TimeUnit.SECONDS);
327 if (nextPack != null) {
330 }
catch (InterruptedException e) {
341 throw new IllegalArgumentException(
"Payload não pode ser maior que " + LEN_PAYLOAD);
344 if (!outputQueue.offer(p)) {
345 throw new IllegalStateException(
"Fila de saída cheia");
353 private final class ConnectTask
implements Callable<Short> {
354 private final SocketAddress sockAddr;
355 private final BlockingQueue<Short> result;
358 ConnectTask(SocketAddress sockAddr,
LspParams params) {
359 this.sockAddr = sockAddr;
360 this.params = params;
361 this.result =
new ArrayBlockingQueue<>(1);
365 public Short call() throws TimeoutException {
367 int limit = params.getEpochLimit();
370 dgramSend(sockAddr,
CONNECT, (
short) 0, (
short) 0, PAYLOAD_NIL);
371 Short
id = result.poll(params.getEpoch(), TimeUnit.MILLISECONDS);
375 }
catch (InterruptedException e) {
382 throw new TimeoutException(
"Servidor " + sockAddr +
" não responde");
385 void ack(
short connId) {
386 result.offer(connId);
390 private final class InputTask
implements Runnable {
394 byte[] bs =
new byte[LEN_PACKAGE];
395 DatagramPacket pack =
new DatagramPacket(bs, bs.length);
401 }
catch (IOException e) {
408 private final class OutputTask
implements Runnable {
415 }
catch (InterruptedException e) {
421 private void sendNextData() throws InterruptedException {
423 final Pack p = outputQueue.poll(1, TimeUnit.SECONDS);
435 InternalPack sent = conn.sent(p);
443 synchronized (outputQueue) {
444 Pack first = outputQueue.poll();
445 outputQueue.offerFirst(p);
447 outputQueue.offerFirst(first);
LspConnection getConnection()
Serviço de entrada e saída de pacotes.
final void dgramSendData(final InternalPack p)
InternalPack receive()
Recebe um pacote da fila de entrada.
final void dgramSendData(final LspConnection conn, final short seqNum, final byte[] payload)
final void dgramSendAck(final InternalPack p)
LspSocket(int port, int queueSize)
Inicia um LspSocket.
Representa uma conexão LSP.
void dgramReceiveData(final SocketAddress sockAddr, final ByteBuffer buf)
Tratamento de um pacote do tipo DATA recebido.
static final byte[] payload(final ByteBuffer buf)
Helper para obter um array de bytes com o resto do ByteBuffer.
abstract LspConnection usedConnection(short connId)
Obtém o objeto LspConnection em uso.
void dgramReceiveConnect(final SocketAddress sockAddr, final ByteBuffer buf)
Tratamento de um pacote do tipo CONNECT recebido.
final LspConnection connect(SocketAddress sockAddr, LspParams params, ConnectionTriggers triggers)
Tenta estabelecer conexão com um servidor LSP, reenviando solicitação a cada época, até completar o limite da época.
static final byte CONNECT
abstract boolean isActive()
Indica até quando o serviço será executado.
void send(Pack p)
Insere um pacote na fila de saída.
final void dgramSendAck(final LspConnection conn, final short seqNum)
LspSocket(int port)
Inicia um LspSocket.
void dgramReceiveAck(final SocketAddress sockAddr, final ByteBuffer buf)
Tratamento de um pacote do tipo ACK recebido.