import java.util.List ; import java.util.LinkedList ; import java.util.NoSuchElementException ; import java.io.File ; import java.io.FilenameFilter ; import java.io.InputStream ; import java.io.FileInputStream ; import java.io.DataInputStream ; import java.io.OutputStream ; import java.io.FileOutputStream ; import java.io.DataOutputStream ; import java.io.IOException ; import java.io.EOFException ; public interface MessageQueue { public void push(byte[] msg) throws IOException ; public byte[] pop() throws NoSuchElementException, IOException ; } class StringQueue { private MessageQueue mq ; private String charset ; public StringQueue(MessageQueue mq, String charset) { this.mq = mq ; this.charset = charset ; } public StringQueue(MessageQueue mq) { this(mq, "UTF-8") ; } public void push(String str) throws IOException { mq.push(str.getBytes(charset)) ; } public String pop() throws NoSuchElementException, IOException { return new String(mq.pop(), charset) ; } } class TransientMessageQueue implements MessageQueue { private List lst ; public TransientMessageQueue() { lst = new LinkedList() ; } public synchronized void push(byte[] msg) { lst.add(msg) ; } public synchronized byte[] pop() throws NoSuchElementException { if (lst.isEmpty()) throw new NoSuchElementException() ; return lst.remove(0) ; } } class MartinMessageQueue implements MessageQueue { private static final String TEMP_PREFIX = "msq-" ; private static final FilenameFilter EXCLUDE_TEMP = new FilenameFilter() { public boolean accept(File dir, String name) { return !name.startsWith(TEMP_PREFIX) ; } } ; private File dir ; public MartinMessageQueue(File dir) throws IOException { if (!dir.isDirectory()) { if (!dir.mkdirs()) throw new IOException("could not create directory: " + dir) ; } this.dir = dir ; } public void push(byte[] msg) throws IOException { File tmp = File.createTempFile(TEMP_PREFIX, null, dir) ; writeFully(msg, tmp) ; File f = new File(dir, Long.toString(System.currentTimeMillis())) ; boolean renameOk = tmp.renameTo(f) ; if (!renameOk) throw new IOException("could not create queue entry: " + f) ; } private void writeFully(byte[] msg, File f) throws IOException { OutputStream out = new FileOutputStream(f) ; out.write(msg) ; out.close() ; } public byte[] pop() throws NoSuchElementException, IOException { String[] filenames = dir.list(EXCLUDE_TEMP) ; if (filenames.length == 0) throw new NoSuchElementException() ; String firstFilename = "SENTINEL" ; for (String filename: filenames) { if (filename.compareTo(firstFilename) < 0) firstFilename = filename ; } File f = new File(dir, firstFilename) ; byte[] msg = readFully(f) ; boolean deletedOk = f.delete() ; if (!deletedOk) throw new IOException("could not delete queue entry: " + f) ; return msg ; } private byte[] readFully(File f) throws IOException { int len = (int)f.length() ; byte[] buf = new byte[len] ; InputStream in = new FileInputStream(f) ; int got = 0 ; while (got < len) { int read = in.read(buf, got, (len - got)) ; if (read == -1) throw new EOFException("expected: " + len + ", read: " + got) ; got += read ; } return buf ; } } class TomMessageQueue implements MessageQueue { private static final int LOG_SIZE = 100 ; private File dir ; private DataOutputStream out ; private int outCount ; private int written ; private DataInputStream in ; private int inCount ; public TomMessageQueue(File dir) throws IOException { if (!dir.isDirectory()) { if (!dir.mkdirs()) throw new IOException("could not create directory: " + dir) ; } this.dir = dir ; out = new DataOutputStream(new FileOutputStream(Integer.toString(0))) ; in = new DataInputStream(new FileInputStream(Integer.toString(0))) ; // how do we do recovery? how do we know how far we got in the oldest file? } public void push(byte[] msg) throws IOException { out.writeInt(msg.length) ; out.write(msg) ; out.flush() ; // is that sufficient? written += (msg.length + 4) ; if (written > LOG_SIZE) { // System.err.println("rolling output") ; out.writeInt(-1) ; out.close() ; ++outCount ; out = new DataOutputStream(new FileOutputStream(Integer.toString(outCount))) ; written = 0 ; } } public byte[] pop() throws NoSuchElementException, IOException { int length ; try { length = in.readInt() ; if (length == -1) { // System.err.println("rolling input") ; in.close() ; new File(Integer.toString(inCount)).delete() ; ++inCount ; in = new DataInputStream(new FileInputStream(Integer.toString(inCount))) ; length = in.readInt() ; } } catch (EOFException e) { throw new NoSuchElementException() ; } byte[] msg = new byte[length] ; in.readFully(msg) ; return msg ; } } class SymlinkMessageQueue implements MessageQueue { private File dir ; public SymlinkMessageQueue(File dir) throws IOException { if (!dir.isDirectory()) { if (!dir.mkdirs()) throw new IOException("could not create directory: " + dir) ; } this.dir = dir ; } public void push(byte[] msg) throws IOException { File f = new File(dir, Long.toString(System.currentTimeMillis())) ; Symlink.writelink(f, msg) ; } public byte[] pop() throws NoSuchElementException, IOException { String[] filenames = dir.list() ; if (filenames.length == 0) throw new NoSuchElementException() ; String firstFilename = "SENTINEL" ; for (String filename: filenames) { if (filename.compareTo(firstFilename) < 0) firstFilename = filename ; } File f = new File(dir, firstFilename) ; byte[] msg = Symlink.readlink(f) ; boolean deletedOk = f.delete() ; if (!deletedOk) throw new IOException("could not delete queue entry: " + f) ; return msg ; } } class MessageQueueTest { public static void main(String[] args) throws IOException { // MessageQueue mq = new TransientMessageQueue() ; // MessageQueue mq = new MartinMessageQueue(new File("mmq")) ; // MessageQueue mq = new TomMessageQueue(new File("tmq")) ; final MessageQueue mq = new SymlinkMessageQueue(new File("lmq")) ; final StringQueue q = new StringQueue(mq) ; new Thread(new Runnable() { public void run() { try { for (int i = 0 ; i < 1000 ; ++i) { q.push(Integer.toString(i)) ; // System.err.println("pushed " + i) ; Thread.sleep((long)Math.max(((Math.random() * 10) + 5), 1)) ; } } catch (IOException e) { throw new RuntimeException(e) ; } catch (InterruptedException e) { throw new RuntimeException(e) ; } } }).start() ; new Thread(new Runnable() { public void run() { try { long t0 = System.currentTimeMillis() ; for (int i = 0 ; i < 1000 ; ++i) { while (true) { try { Thread.sleep((long)Math.max(((Math.random() * 10) + 4), 1)) ; int j = Integer.parseInt(q.pop()) ; // System.err.println("popped " + j) ; if (i != j) throw new RuntimeException("expected: " + i + ", got: " + j) ; break ; } catch (NoSuchElementException e) { // System.err.println("retrying pop") ; } } } long t1 = System.currentTimeMillis() ; System.err.println(mq.getClass().getName() + ": " + (t1 - t0)) ; } catch (IOException e) { throw new RuntimeException(e) ; } catch (InterruptedException e) { throw new RuntimeException(e) ; } } }).start() ; } }