View Javadoc
1   package emissary.output.roller.journal;
2   
3   import jakarta.annotation.Nullable;
4   import org.slf4j.Logger;
5   import org.slf4j.LoggerFactory;
6   
7   import java.io.Closeable;
8   import java.io.IOException;
9   import java.nio.ByteBuffer;
10  import java.nio.channels.FileChannel;
11  import java.nio.file.DirectoryStream;
12  import java.nio.file.Files;
13  import java.nio.file.NoSuchFileException;
14  import java.nio.file.Path;
15  import java.nio.file.Paths;
16  import java.nio.file.StandardOpenOption;
17  import java.util.ArrayList;
18  import java.util.Arrays;
19  import java.util.Collection;
20  import java.util.concurrent.locks.ReentrantLock;
21  
22  import static emissary.output.roller.journal.Journal.CURRENT_VERSION;
23  import static emissary.output.roller.journal.Journal.ENTRY_LENGTH;
24  import static emissary.output.roller.journal.Journal.EXT;
25  import static emissary.output.roller.journal.Journal.MAGIC;
26  import static emissary.output.roller.journal.Journal.NINE;
27  import static emissary.output.roller.journal.Journal.SEP;
28  
29  /**
30   * Encapsulates logic to read/deserialize a BG Journal file.
31   */
32  public class JournalReader implements Closeable {
33  
34      private static final Logger logger = LoggerFactory.getLogger(JournalReader.class);
35  
36      // full path to journal file
37      final Path journalPath;
38      // channel position where entries begin
39      private long begin;
40      // initial sequence value
41      private long startsequence;
42      // current sequence value
43      private long sequence;
44      // persisted journal
45      @Nullable
46      FileChannel journalChannel;
47      // not final to release on close
48      @Nullable
49      private ByteBuffer b = ByteBuffer.allocateDirect(ENTRY_LENGTH);
50  
51      private final Journal journal;
52  
53      private final ReentrantLock lock = new ReentrantLock();
54  
55      public JournalReader(final Path journalPath) throws IOException {
56          this.journalPath = journalPath;
57          journal = new Journal(journalPath);
58          checkJournal();
59          loadEntries();
60      }
61  
62      private void checkJournal() throws IOException {
63          if (Files.exists(journalPath) && Files.size(journalPath) > 0L) {
64              readHeader();
65          } else {
66              throw new NoSuchFileException("File does not exist " + journalPath);
67          }
68      }
69  
70      private void loadEntries() throws IOException {
71          journalChannel.position(begin);
72          sequence = startsequence;
73  
74          while (journalChannel.position() < journalChannel.size()) {
75              int read = read(ENTRY_LENGTH);
76              if (read != ENTRY_LENGTH) {
77                  logger.warn("Incomplete journal read for {}. Expected {} but read {}. Sequence start {}, last successful sequence {} ({}th)",
78                          journal.getKey(), ENTRY_LENGTH, read, startsequence, sequence, sequence - startsequence);
79                  break;
80              }
81              long nextSeq = getSequence();
82              if (nextSeq != ++sequence) {
83                  logger.warn("Incorrect sequence value returned. Expected {} Received {}. Exiting", sequence, nextSeq);
84                  break;
85              }
86              JournalEntry e = JournalEntry.deserialize(b);
87              journal.entries.add(e);
88          }
89      }
90  
91      public Journal getJournal() throws IOException {
92          return journal;
93      }
94  
95      // read long followed by null sep. return -1 if any issues
96      private long getSequence() {
97          long seq = b.getLong();
98          if (b.get() != SEP) {
99              return -1;
100         }
101         return seq;
102     }
103 
104     private void checkMagic() throws IOException {
105         read(MAGIC.length);
106         byte[] mgic = new byte[MAGIC.length];
107         b.get(mgic);
108         if (!Arrays.equals(mgic, MAGIC)) {
109             throw new IllegalStateException("Not a Journal file. Invalid Magic");
110         }
111     }
112 
113     private byte readVersion() throws IOException {
114         read(1);
115         return b.get();
116     }
117 
118     private void readHeader() throws IOException {
119         this.journalChannel = FileChannel.open(journalPath, StandardOpenOption.CREATE, StandardOpenOption.READ);
120         checkMagic();
121         journal.setVersion(readVersion());
122 
123         if (journal.version != CURRENT_VERSION) {
124             // should not happen now, but future versions may delegate to a legacy reader
125             throw new IllegalStateException("Incorrect Version Detected " + journal.version);
126         }
127         journal.setKey(readString());
128         // starting sequence number.
129         read(NINE);
130         sequence = getSequence();
131         if (sequence == -1) {
132             throw new IllegalStateException("Invalid sequence read " + sequence);
133         }
134         startsequence = sequence;
135         begin = journalChannel.position();
136     }
137 
138     private String readString() throws IOException {
139         int read = read(4);
140         int len = b.getInt();
141         if (read < 0 || len < 1) {
142             throw new IllegalStateException("Negative value returned. Possible corrupt file");
143         }
144         byte[] keyBytes = new byte[len];
145         read(len);
146         b.get(keyBytes);
147         return new String(keyBytes);
148     }
149 
150     // attempts to read limit bytes from buffer
151     private int read(int limit) throws IOException {
152         if (limit == 0) {
153             return 0;
154         }
155         b.clear();
156         b.limit(limit);
157         int total = 0;
158         while (total < limit) {
159             int read = journalChannel.read(b);
160             if (read == -1) {
161                 logger.debug("EOF when trying to read {} bytes", limit);
162                 if (total == 0) {
163                     total = -1;
164                 }
165                 break;
166             }
167             total += read;
168         }
169         b.flip();
170         return total;
171     }
172 
173     @Override
174     public void close() throws IOException {
175         lock.lock();
176         try {
177             if (journalChannel != null) {
178                 journalChannel.close();
179             }
180             journalChannel = null;
181             b = null;
182         } finally {
183             lock.unlock();
184         }
185     }
186 
187     @SuppressWarnings("PreferredInterfaceType")
188     public static Collection<Path> getJournalPaths(Path dir) throws IOException {
189         ArrayList<Path> paths = new ArrayList<>();
190         try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, "*" + EXT)) {
191             for (Path path : stream) {
192                 paths.add(path);
193             }
194         }
195         return paths;
196     }
197 
198     /**
199      * Prints contents of a Journal to stdout.
200      */
201     @SuppressWarnings("SystemOut")
202     public static void main(String[] args) throws Exception {
203         String path = args[0];
204         try (JournalReader jr = new JournalReader(Paths.get(path))) {
205             Journal j = jr.getJournal();
206             System.out.println("Journal File: " + path);
207             System.out.println("Journal Version: " + j.getKey());
208             for (JournalEntry je : j.getEntries()) {
209                 System.out.println(je.toString());
210             }
211         }
212     }
213 }