1 package emissary.output.roller.journal;
2
3 import jakarta.annotation.Nullable;
4 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory;
6
7 import java.io.Closeable;
8 import java.io.IOException;
9 import java.nio.ByteBuffer;
10 import java.nio.channels.FileChannel;
11 import java.nio.file.DirectoryStream;
12 import java.nio.file.Files;
13 import java.nio.file.NoSuchFileException;
14 import java.nio.file.Path;
15 import java.nio.file.Paths;
16 import java.nio.file.StandardOpenOption;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.Collection;
20 import java.util.concurrent.locks.ReentrantLock;
21
22 import static emissary.output.roller.journal.Journal.CURRENT_VERSION;
23 import static emissary.output.roller.journal.Journal.ENTRY_LENGTH;
24 import static emissary.output.roller.journal.Journal.EXT;
25 import static emissary.output.roller.journal.Journal.MAGIC;
26 import static emissary.output.roller.journal.Journal.NINE;
27 import static emissary.output.roller.journal.Journal.SEP;
28
29
30
31
32 public class JournalReader implements Closeable {
33
34 private static final Logger logger = LoggerFactory.getLogger(JournalReader.class);
35
36
37 final Path journalPath;
38
39 private long begin;
40
41 private long startsequence;
42
43 private long sequence;
44
45 @Nullable
46 FileChannel journalChannel;
47
48 @Nullable
49 private ByteBuffer b = ByteBuffer.allocateDirect(ENTRY_LENGTH);
50
51 private final Journal journal;
52
53 private final ReentrantLock lock = new ReentrantLock();
54
55 public JournalReader(final Path journalPath) throws IOException {
56 this.journalPath = journalPath;
57 journal = new Journal(journalPath);
58 checkJournal();
59 loadEntries();
60 }
61
62 private void checkJournal() throws IOException {
63 if (Files.exists(journalPath) && Files.size(journalPath) > 0L) {
64 readHeader();
65 } else {
66 throw new NoSuchFileException("File does not exist " + journalPath);
67 }
68 }
69
70 private void loadEntries() throws IOException {
71 journalChannel.position(begin);
72 sequence = startsequence;
73
74 while (journalChannel.position() < journalChannel.size()) {
75 int read = read(ENTRY_LENGTH);
76 if (read != ENTRY_LENGTH) {
77 logger.warn("Incomplete journal read for {}. Expected {} but read {}. Sequence start {}, last successful sequence {} ({}th)",
78 journal.getKey(), ENTRY_LENGTH, read, startsequence, sequence, sequence - startsequence);
79 break;
80 }
81 long nextSeq = getSequence();
82 if (nextSeq != ++sequence) {
83 logger.warn("Incorrect sequence value returned. Expected {} Received {}. Exiting", sequence, nextSeq);
84 break;
85 }
86 JournalEntry e = JournalEntry.deserialize(b);
87 journal.entries.add(e);
88 }
89 }
90
91 public Journal getJournal() throws IOException {
92 return journal;
93 }
94
95
96 private long getSequence() {
97 long seq = b.getLong();
98 if (b.get() != SEP) {
99 return -1;
100 }
101 return seq;
102 }
103
104 private void checkMagic() throws IOException {
105 read(MAGIC.length);
106 byte[] mgic = new byte[MAGIC.length];
107 b.get(mgic);
108 if (!Arrays.equals(mgic, MAGIC)) {
109 throw new IllegalStateException("Not a Journal file. Invalid Magic");
110 }
111 }
112
113 private byte readVersion() throws IOException {
114 read(1);
115 return b.get();
116 }
117
118 private void readHeader() throws IOException {
119 this.journalChannel = FileChannel.open(journalPath, StandardOpenOption.CREATE, StandardOpenOption.READ);
120 checkMagic();
121 journal.setVersion(readVersion());
122
123 if (journal.version != CURRENT_VERSION) {
124
125 throw new IllegalStateException("Incorrect Version Detected " + journal.version);
126 }
127 journal.setKey(readString());
128
129 read(NINE);
130 sequence = getSequence();
131 if (sequence == -1) {
132 throw new IllegalStateException("Invalid sequence read " + sequence);
133 }
134 startsequence = sequence;
135 begin = journalChannel.position();
136 }
137
138 private String readString() throws IOException {
139 int read = read(4);
140 int len = b.getInt();
141 if (read < 0 || len < 1) {
142 throw new IllegalStateException("Negative value returned. Possible corrupt file");
143 }
144 byte[] keyBytes = new byte[len];
145 read(len);
146 b.get(keyBytes);
147 return new String(keyBytes);
148 }
149
150
151 private int read(int limit) throws IOException {
152 if (limit == 0) {
153 return 0;
154 }
155 b.clear();
156 b.limit(limit);
157 int total = 0;
158 while (total < limit) {
159 int read = journalChannel.read(b);
160 if (read == -1) {
161 logger.debug("EOF when trying to read {} bytes", limit);
162 if (total == 0) {
163 total = -1;
164 }
165 break;
166 }
167 total += read;
168 }
169 b.flip();
170 return total;
171 }
172
173 @Override
174 public void close() throws IOException {
175 lock.lock();
176 try {
177 if (journalChannel != null) {
178 journalChannel.close();
179 }
180 journalChannel = null;
181 b = null;
182 } finally {
183 lock.unlock();
184 }
185 }
186
187 @SuppressWarnings("PreferredInterfaceType")
188 public static Collection<Path> getJournalPaths(Path dir) throws IOException {
189 ArrayList<Path> paths = new ArrayList<>();
190 try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, "*" + EXT)) {
191 for (Path path : stream) {
192 paths.add(path);
193 }
194 }
195 return paths;
196 }
197
198
199
200
201 @SuppressWarnings("SystemOut")
202 public static void main(String[] args) throws Exception {
203 String path = args[0];
204 try (JournalReader jr = new JournalReader(Paths.get(path))) {
205 Journal j = jr.getJournal();
206 System.out.println("Journal File: " + path);
207 System.out.println("Journal Version: " + j.getKey());
208 for (JournalEntry je : j.getEntries()) {
209 System.out.println(je.toString());
210 }
211 }
212 }
213 }