View Javadoc

1   package de.mxro.async.map.sql.internal;
2   
3   import java.io.ByteArrayInputStream;
4   import java.io.ByteArrayOutputStream;
5   import java.io.IOException;
6   import java.io.InputStream;
7   import java.sql.PreparedStatement;
8   import java.sql.ResultSet;
9   import java.sql.SQLException;
10  import java.util.Collections;
11  import java.util.HashMap;
12  import java.util.HashSet;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Queue;
16  import java.util.Set;
17  import java.util.concurrent.ConcurrentLinkedQueue;
18  import java.util.concurrent.ExecutorService;
19  import java.util.concurrent.Executors;
20  
21  import one.utils.jre.OneUtilsJre;
22  import de.mxro.async.map.AsyncMap;
23  import de.mxro.async.map.operations.MapOperation;
24  import de.mxro.async.map.sql.SqlAsyncMapConfiguration;
25  import de.mxro.async.map.sql.SqlAsyncMapDependencies;
26  import de.mxro.concurrency.schedule.SingleInstanceQueueWorker;
27  import de.mxro.concurrency.wrappers.SimpleExecutor;
28  import de.mxro.concurrency.wrappers.SimpleExecutor.WhenExecutorShutDown;
29  import de.mxro.fn.Fn;
30  import de.mxro.serialization.jre.SerializationJre;
31  import delight.async.callbacks.SimpleCallback;
32  import delight.async.callbacks.ValueCallback;
33  
34  public class SqlAsyncMapImplementation<V> implements AsyncMap<String, V> {
35  
36      private final static boolean ENABLE_DEBUG = false;
37  
38      private final SqlAsyncMapConfiguration conf;
39  
40      private final SqlAsyncMapDependencies deps;
41  
42      // internal helper
43      private java.sql.Connection connection;
44      private final Map<String, Object> pendingInserts;
45      private final Set<String> pendingGets;
46      private final ExecutorService commitThread;
47      private final WriteWorker writeWorker;
48  
49      private final static Object DELETE_NODE = Fn.object();
50  
51      private class WriteWorker extends SingleInstanceQueueWorker<String> {
52  
53          @Override
54          protected void processItems(final List<String> items) {
55  
56              synchronized (pendingInserts) {
57  
58                  if (ENABLE_DEBUG) {
59                      System.out.println(this + ": Inserting [" + items.size() + "] elements.");
60                  }
61  
62                  for (final String item : items) {
63                      final String uri = item;
64  
65                      final Object data;
66  
67                      if (!pendingInserts.containsKey(uri)) {
68                          if (ENABLE_DEBUG) {
69                              System.out.println(this + ": Insert has been performed by previous operation [" + uri
70                                      + "].");
71                          }
72                          continue;
73                      }
74  
75                      data = pendingInserts.get(uri);
76  
77                      assertConnection();
78  
79                      try {
80                          writeToSqlDatabase(uri, data);
81                      } catch (final Throwable t) {
82                          // try reconnecting once if any errors occur
83                          // in order to deal with mysql automatic disconnect
84                          initConnection();
85                          try {
86                              writeToSqlDatabase(uri, data);
87                          } catch (final SQLException e) {
88                              throw new RuntimeException(e);
89                          }
90                      }
91                  }
92  
93                  try {
94                      connection.commit();
95                  } catch (final SQLException e) {
96                      throw new RuntimeException(e);
97                  }
98  
99                  if (ENABLE_DEBUG) {
100                     System.out.println("SqlConnection: Inserting [" + items.size() + "] elements completed.");
101                 }
102 
103                 for (final String item : items) {
104                     // assert pendingInserts.containsKey(item);
105                     // might have been done by previous op
106                     pendingInserts.remove(item);
107                 }
108 
109             }
110 
111         }
112 
113         private final void writeToSqlDatabase(final String uri, final Object data) throws SQLException {
114 
115             assert data != null : "Trying to write node <null> to database.\n" + "  Node: " + uri;
116 
117             if (data == DELETE_NODE) {
118                 performDelete(uri);
119                 return;
120             }
121 
122             if (conf.sql().supportsMerge()) {
123                 performMerge(uri, data);
124                 return;
125             }
126 
127             if (conf.sql().supportsInsertOrUpdate()) {
128                 performInsertOrUpdate(uri, data);
129                 return;
130             }
131 
132             performInsert(uri, data);
133 
134         }
135 
136         private void performInsert(final String uri, final Object data) throws SQLException {
137             final ByteArrayOutputStream os = new ByteArrayOutputStream();
138             deps.getSerializer().serialize(data, SerializationJre.createStreamDestination(os));
139             final byte[] bytes = os.toByteArray();
140 
141             try {
142                 if (performGet(uri) == null) {
143                     PreparedStatement insertStatement = null;
144                     try {
145                         insertStatement = connection.prepareStatement(conf.sql().getInsertTemplate());
146 
147                         insertStatement.setQueryTimeout(10);
148 
149                         insertStatement.setString(1, uri);
150                         insertStatement.setBinaryStream(2, new ByteArrayInputStream(bytes));
151 
152                         if (ENABLE_DEBUG) {
153                             System.out.println("SqlConnection: Inserting [" + uri + "].");
154                         }
155                         insertStatement.executeUpdate();
156                         // connection.commit();
157                     } finally {
158                         if (insertStatement != null) {
159                             insertStatement.close();
160                         }
161                     }
162                     return;
163                 } else {
164 
165                     PreparedStatement updateStatement = null;
166                     try {
167                         updateStatement = connection.prepareStatement(conf.sql().getUpdateTemplate());
168                         updateStatement.setQueryTimeout(10);
169 
170                         updateStatement.setBinaryStream(1, new ByteArrayInputStream(bytes));
171                         updateStatement.setString(2, uri);
172                         if (ENABLE_DEBUG) {
173                             System.out.println("SqlConnection: Updating [" + uri + "].");
174                         }
175                         updateStatement.executeUpdate();
176                         // connection.commit();
177                     } finally {
178                         if (updateStatement != null) {
179                             updateStatement.close();
180                         }
181                     }
182 
183                 }
184             } catch (final IOException e) {
185                 throw new RuntimeException(e);
186             }
187         }
188 
189         private void performMerge(final String uri, final Object data) throws SQLException {
190             final ByteArrayOutputStream os = new ByteArrayOutputStream();
191             deps.getSerializer().serialize(data, SerializationJre.createStreamDestination(os));
192             final byte[] bytes = os.toByteArray();
193 
194             PreparedStatement mergeStatement = null;
195             try {
196                 mergeStatement = connection.prepareStatement(conf.sql().getMergeTemplate());
197 
198                 mergeStatement.setQueryTimeout(10);
199 
200                 mergeStatement.setString(1, uri);
201                 mergeStatement.setBinaryStream(2, new ByteArrayInputStream(bytes));
202 
203                 if (ENABLE_DEBUG) {
204                     System.out.println("SqlConnection: Merging [" + uri + "].");
205                 }
206                 mergeStatement.executeUpdate();
207                 // connection.commit();
208             } finally {
209                 if (mergeStatement != null) {
210                     mergeStatement.close();
211                 }
212             }
213 
214         }
215 
216         private void performInsertOrUpdate(final String uri, final Object data) throws SQLException {
217             final ByteArrayOutputStream os = new ByteArrayOutputStream();
218             deps.getSerializer().serialize(data, SerializationJre.createStreamDestination(os));
219             final byte[] bytes = os.toByteArray();
220 
221             PreparedStatement insertStatement = null;
222             try {
223                 insertStatement = connection.prepareStatement(conf.sql().getInsertOrUpdateTemplate());
224                 insertStatement.setQueryTimeout(10);
225                 insertStatement.setString(1, uri);
226 
227                 // TODO this seems somehow non-optimal, probably the
228                 // byte data is sent to the database twice ...
229                 insertStatement.setBinaryStream(2, new ByteArrayInputStream(bytes));
230                 insertStatement.setBinaryStream(3, new ByteArrayInputStream(bytes));
231                 insertStatement.executeUpdate();
232                 if (ENABLE_DEBUG) {
233                     System.out.println("SqlConnection: Inserting [" + uri + "].");
234                 }
235 
236                 // connection.commit();
237             } finally {
238                 if (insertStatement != null) {
239                     insertStatement.close();
240                 }
241             }
242         }
243 
244         private void performDelete(final String uri) throws SQLException {
245             PreparedStatement deleteStatement = null;
246 
247             try {
248                 deleteStatement = connection.prepareStatement(conf.sql().getDeleteTemplate());
249                 deleteStatement.setQueryTimeout(10);
250 
251                 deleteStatement.setString(1, uri);
252                 deleteStatement.executeUpdate();
253                 if (ENABLE_DEBUG) {
254                     System.out.println("SqlConnection: Deleting [" + uri + "].");
255                 }
256 
257                 // connection.commit();
258             } finally {
259                 if (deleteStatement != null) {
260                     deleteStatement.close();
261                 }
262             }
263         }
264 
265         public WriteWorker(final SimpleExecutor executor, final Queue<String> queue) {
266             super(executor, queue, OneUtilsJre.newJreConcurrency());
267         }
268 
269     }
270 
271     private final void scheduleWrite(final String uri) {
272 
273         writeWorker.offer(uri);
274         writeWorker.startIfRequired();
275 
276     }
277 
278     @Override
279     public void putSync(final String uri, final V node) {
280 
281         synchronized (pendingInserts) {
282 
283             // System.out.println("Currently pending:  " +
284             // pendingInserts.size());
285             pendingInserts.put(uri, node);
286 
287         }
288         scheduleWrite(uri);
289     }
290 
291     public static class SqlGetResources {
292         ResultSet resultSet;
293         PreparedStatement getStatement;
294 
295     }
296 
297     @SuppressWarnings("unchecked")
298     @Override
299     public V getSync(final String key) {
300 
301         // System.out.println("get " + key);
302         final V value = (V) getNode(key);
303         // System.out.println("got " + value);
304         // if
305         // (key.equals("mxrogm/mxrogm/xplr/.n/Manage/.n/Projects/.n/Appjangle_Apps/.n/Maybes/.n/Application_Deployment/Reading_It"))
306         // {
307         // throw new RuntimeException();
308         // }
309         return value;
310     }
311 
312     public Object getNode(final String uri) {
313 
314         synchronized (pendingInserts) {
315 
316             if (ENABLE_DEBUG) {
317                 System.out.println("SqlConnection: Retrieving [" + uri + "].");
318             }
319 
320             if (pendingInserts.containsKey(uri)) {
321 
322                 final Object node = pendingInserts.get(uri);
323 
324                 if (ENABLE_DEBUG) {
325                     System.out.println("SqlConnection: Was cached [" + uri + "] Value [" + node + "].");
326                 }
327 
328                 if (node == DELETE_NODE) {
329                     return null;
330                 }
331 
332                 return node;
333             }
334 
335             assert !pendingInserts.containsKey(uri);
336 
337             pendingGets.add(uri);
338 
339             try {
340 
341                 final Object performGet = performGet(uri);
342 
343                 assert pendingGets.contains(uri);
344                 pendingGets.remove(uri);
345 
346                 return performGet;
347 
348             } catch (final Exception e) {
349 
350                 pendingGets.remove(uri);
351                 throw new IllegalStateException("SQL connection cannot load node: " + uri, e);
352             }
353 
354         }
355     }
356 
357     private Object performGet(final String uri) throws SQLException, IOException {
358         assertConnection();
359 
360         SqlGetResources getResult = null;
361 
362         try {
363 
364             try {
365                 getResult = readFromSqlDatabase(uri);
366             } catch (final Throwable t) {
367                 // try reconnecting once if any error occurs
368                 initConnection();
369                 try {
370                     getResult = readFromSqlDatabase(uri);
371                 } catch (final SQLException e) {
372                     throw new RuntimeException(e);
373                 }
374             }
375 
376             if (!getResult.resultSet.next()) {
377 
378                 if (ENABLE_DEBUG) {
379                     System.out.println("SqlConnection: Not found [" + uri + "].");
380                 }
381 
382                 return null;
383             }
384 
385             final InputStream is = getResult.resultSet.getBinaryStream(2);
386 
387             final byte[] data = OneUtilsJre.toByteArray(is);
388             is.close();
389             getResult.resultSet.close();
390             assert data != null;
391 
392             final Object node = deps.getSerializer().deserialize(
393                     SerializationJre.createStreamSource(new ByteArrayInputStream(data)));
394             if (ENABLE_DEBUG) {
395                 System.out.println("SqlConnection: Retrieved [" + node + "].");
396             }
397             return node;
398 
399         } finally {
400             if (getResult != null) {
401                 getResult.getStatement.close();
402             }
403         }
404     }
405 
406     private final SqlGetResources readFromSqlDatabase(final String uri) throws SQLException {
407 
408         PreparedStatement getStatement = null;
409 
410         getStatement = connection.prepareStatement(conf.sql().getGetTemplate());
411 
412         getStatement.setQueryTimeout(10);
413 
414         getStatement.setString(1, uri);
415 
416         final ResultSet resultSet = getStatement.executeQuery();
417 
418         connection.commit();
419 
420         final SqlGetResources res = new SqlGetResources();
421         res.resultSet = resultSet;
422         res.getStatement = getStatement;
423 
424         return res;
425 
426     }
427 
428     @Override
429     public void removeSync(final String key) {
430         deleteNode(key);
431     }
432 
433     public void deleteNode(final String uri) {
434 
435         synchronized (pendingInserts) {
436             // new Exception("Schedule delete " + uri).printStackTrace();
437             pendingInserts.put(uri, DELETE_NODE);
438         }
439         scheduleWrite(uri);
440 
441     }
442 
443     public synchronized void waitForAllPendingRequests(final SimpleCallback callback) {
444 
445         new Thread() {
446 
447             @Override
448             public void run() {
449                 if (ENABLE_DEBUG) {
450                     System.out.println("SqlConnection: Waiting for pending requests.\n" + "  Write worker running: ["
451                             + writeWorker.isRunning() + "]\n" + "  Pending inserts: [" + pendingInserts.size() + "]\n"
452                             + "  Pending gets: [" + pendingGets.size() + "]");
453                 }
454                 while (writeWorker.isRunning() || pendingInserts.size() > 0 || pendingGets.size() > 0) {
455                     try {
456                         Thread.sleep(10);
457                     } catch (final Exception e) {
458                         callback.onFailure(e);
459                     }
460                     Thread.yield();
461                 }
462 
463                 if (ENABLE_DEBUG) {
464                     System.out.println("SqlConnection: Waiting for pending requests completed.");
465                 }
466 
467                 callback.onSuccess();
468             }
469 
470         }.start();
471 
472     }
473 
474     @Override
475     public void put(final String key, final V value, final SimpleCallback callback) {
476         putSync(key, value);
477 
478         this.commit(callback);
479     }
480 
481     @Override
482     public void get(final String key, final ValueCallback<V> callback) {
483 
484         final V value = getSync(key);
485         callback.onSuccess(value);
486 
487     }
488 
489     @Override
490     public void remove(final String key, final SimpleCallback callback) {
491         removeSync(key);
492 
493         this.commit(callback);
494 
495     }
496 
497     @Override
498     public void performOperation(final MapOperation operation) {
499         // nothing
500     }
501 
502     @Override
503     public void stop(final SimpleCallback callback) {
504         this.commit(new SimpleCallback() {
505 
506             @Override
507             public void onFailure(final Throwable t) {
508                 callback.onFailure(t);
509             }
510 
511             @Override
512             public void onSuccess() {
513                 writeWorker.getThread().getExecutor().shutdown(new WhenExecutorShutDown() {
514 
515                     @Override
516                     public void thenDo() {
517                         try {
518 
519                             if (connection != null && !connection.isClosed()) {
520                                 try {
521                                     connection.commit();
522                                     connection.close();
523                                 } catch (final Throwable t) {
524                                     callback.onFailure(new Exception("Sql exception could not be closed.", t));
525                                     return;
526                                 }
527                             }
528 
529                         } catch (final Throwable t) {
530                             callback.onFailure(t);
531                             return;
532                         }
533 
534                         callback.onSuccess();
535                     }
536 
537                     @Override
538                     public void onFailure(final Throwable t) {
539                         callback.onFailure(t);
540                     }
541                 });
542 
543             }
544         });
545     }
546 
547     @Override
548     public void commit(final SimpleCallback callback) {
549         commitThread.submit(new Runnable() {
550 
551             @Override
552             public void run() {
553                 // writeWorker.startIfRequired();
554                 waitForAllPendingRequests(new SimpleCallback() {
555 
556                     @Override
557                     public void onSuccess() {
558                         callback.onSuccess();
559                     }
560 
561                     @Override
562                     public void onFailure(final Throwable message) {
563                         callback.onFailure(message);
564                     }
565                 });
566             }
567 
568         });
569     }
570 
571     protected void assertConnection() {
572         try {
573             if (connection == null || connection.isClosed()) {
574                 initConnection();
575             }
576         } catch (final SQLException e) {
577             throw new RuntimeException(e);
578         }
579     }
580 
581     protected void initConnection() {
582         connection = SqlConnectionFactory.createConnection(conf.sql());
583     }
584 
585     @Override
586     public void start(final SimpleCallback callback) {
587         try {
588             assertConnection();
589 
590         } catch (final Throwable t) {
591             callback.onFailure(t);
592             return;
593         }
594         callback.onSuccess();
595     }
596 
597     public SqlAsyncMapImplementation(final SqlAsyncMapConfiguration conf, final SqlAsyncMapDependencies deps) {
598         super();
599 
600         this.conf = conf;
601         this.deps = deps;
602 
603         this.pendingInserts = Collections.synchronizedMap(new HashMap<String, Object>(100));
604         this.pendingGets = Collections.synchronizedSet(new HashSet<String>());
605 
606         this.writeWorker = new WriteWorker(OneUtilsJre.newJreConcurrency().newExecutor().newSingleThreadExecutor(this),
607                 new ConcurrentLinkedQueue<String>());
608 
609         this.commitThread = Executors.newFixedThreadPool(1);
610 
611     }
612 
613 }