View Javadoc

1   package de.mxro.async.map.sql.internal;
2   
3   import java.io.ByteArrayInputStream;
4   import java.io.ByteArrayOutputStream;
5   import java.io.IOException;
6   import java.io.InputStream;
7   import java.sql.PreparedStatement;
8   import java.sql.ResultSet;
9   import java.sql.SQLException;
10  import java.util.Collections;
11  import java.util.HashMap;
12  import java.util.HashSet;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Queue;
16  import java.util.Set;
17  import java.util.concurrent.ConcurrentLinkedQueue;
18  import java.util.concurrent.ExecutorService;
19  import java.util.concurrent.Executors;
20  
21  import mx.gwtutils.concurrent.SingleInstanceQueueWorker;
22  import one.utils.jre.OneUtilsJre;
23  import de.mxro.async.callbacks.SimpleCallback;
24  import de.mxro.async.callbacks.ValueCallback;
25  import de.mxro.async.map.AsyncMap;
26  import de.mxro.async.map.operations.MapOperation;
27  import de.mxro.async.map.sql.SqlAsyncMapConfiguration;
28  import de.mxro.async.map.sql.SqlAsyncMapDependencies;
29  import de.mxro.concurrency.Executor;
30  import de.mxro.concurrency.Executor.WhenExecutorShutDown;
31  import de.mxro.fn.Fn;
32  import de.mxro.serialization.jre.SerializationJre;
33  
34  public class SqlAsyncMapImplementation<V> implements AsyncMap<String, V> {
35  
36  	private final static boolean ENABLE_DEBUG = false;
37  
38  	private final SqlAsyncMapConfiguration conf;
39  	
40  	private final SqlAsyncMapDependencies deps;
41  	
42  	// internal helper
43  	private java.sql.Connection connection;
44  	private final Map<String, Object> pendingInserts;
45  	private final Set<String> pendingGets;
46  	private final ExecutorService commitThread;
47  	private final WriteWorker writeWorker;
48  
49  	private final static Object DELETE_NODE = Fn.object();
50  	
51  
52  	private class WriteWorker extends SingleInstanceQueueWorker<String> {
53  
54  		@Override
55  		protected void processItems(final List<String> items) {
56  
57  			synchronized (pendingInserts) {
58  
59  				if (ENABLE_DEBUG) {
60  					System.out.println("SqlConnection: Inserting ["
61  							+ items.size() + "] elements.");
62  				}
63  
64  				for (final String item : items) {
65  					final String uri = item;
66  
67  					final Object data;
68  
69  					if (!pendingInserts.containsKey(uri)) {
70  						if (ENABLE_DEBUG) {
71  							System.out
72  									.println("SqlConnection: Insert has been performed by previous operation ["
73  											+ uri + "].");
74  						}
75  						continue;
76  					}
77  
78  					data = pendingInserts.get(uri);
79  
80  					assertConnection();
81  
82  					try {
83  						writeToSqlDatabase(uri, data);
84  					} catch (final Throwable t) {
85  						// try reconnecting once if any errors occur
86  						// in order to deal with mysql automatic disconnect
87  						initConnection();
88  						try {
89  							writeToSqlDatabase(uri, data);
90  						} catch (final SQLException e) {
91  							throw new RuntimeException(e);
92  						}
93  					}
94  				}
95  
96  				try {
97  					connection.commit();
98  				} catch (final SQLException e) {
99  					throw new RuntimeException(e);
100 				}
101 
102 				if (ENABLE_DEBUG) {
103 					System.out.println("SqlConnection: Inserting ["
104 							+ items.size() + "] elements completed.");
105 				}
106 
107 				for (final String item : items) {
108 					// assert pendingInserts.containsKey(item);
109 					// might have been done by previous op
110 					pendingInserts.remove(item);
111 				}
112 
113 			}
114 
115 		}
116 
117 		private final void writeToSqlDatabase(final String uri,
118 				final Object data) throws SQLException {
119 
120 			assert data != null : "Trying to write node <null> to database.\n"
121 					+ "  Node: " + uri;
122 
123 			if (data == DELETE_NODE) {
124 				performDelete(uri);
125 				return;
126 			}
127 
128 			if (conf.sql().supportsMerge()) {
129 				performMerge(uri, data);
130 				return;
131 			}
132 
133 			if (conf.sql().supportsInsertOrUpdate()) {
134 				performInsertOrUpdate(uri, data);
135 				return;
136 			}
137 
138 			performInsert(uri, data);
139 
140 		}
141 
142 		private void performInsert(final String uri, final Object data)
143 				throws SQLException {
144 			final ByteArrayOutputStream os = new ByteArrayOutputStream();
145 			deps.getSerializer().serialize(data, SerializationJre.createStreamDestination(os));
146 			final byte[] bytes = os.toByteArray();
147 
148 			try {
149 				if (performGet(uri) == null) {
150 					PreparedStatement insertStatement = null;
151 					try {
152 						insertStatement = connection.prepareStatement(conf
153 								.sql().getInsertTemplate());
154 
155 						insertStatement.setQueryTimeout(10);
156 
157 						insertStatement.setString(1, uri);
158 						insertStatement.setBinaryStream(2,
159 								new ByteArrayInputStream(bytes));
160 
161 						if (ENABLE_DEBUG) {
162 							System.out.println("SqlConnection: Inserting ["
163 									+ uri + "].");
164 						}
165 						insertStatement.executeUpdate();
166 						// connection.commit();
167 					} finally {
168 						if (insertStatement != null) {
169 							insertStatement.close();
170 						}
171 					}
172 					return;
173 				} else {
174 
175 					PreparedStatement updateStatement = null;
176 					try {
177 						updateStatement = connection.prepareStatement(conf
178 								.sql().getUpdateTemplate());
179 						updateStatement.setQueryTimeout(10);
180 
181 						updateStatement.setBinaryStream(1,
182 								new ByteArrayInputStream(bytes));
183 						updateStatement.setString(2, uri);
184 						if (ENABLE_DEBUG) {
185 							System.out.println("SqlConnection: Updating ["
186 									+ uri + "].");
187 						}
188 						updateStatement.executeUpdate();
189 						// connection.commit();
190 					} finally {
191 						if (updateStatement != null) {
192 							updateStatement.close();
193 						}
194 					}
195 
196 				}
197 			} catch (final IOException e) {
198 				throw new RuntimeException(e);
199 			}
200 		}
201 
202 		private void performMerge(final String uri, final Object data)
203 				throws SQLException {
204 			final ByteArrayOutputStream os = new ByteArrayOutputStream();
205 			deps.getSerializer().serialize(data, SerializationJre.createStreamDestination(os));
206 			final byte[] bytes = os.toByteArray();
207 
208 			PreparedStatement mergeStatement = null;
209 			try {
210 				mergeStatement = connection.prepareStatement(conf.sql()
211 						.getMergeTemplate());
212 
213 				mergeStatement.setQueryTimeout(10);
214 
215 				mergeStatement.setString(1, uri);
216 				mergeStatement.setBinaryStream(2, new ByteArrayInputStream(
217 						bytes));
218 
219 				if (ENABLE_DEBUG) {
220 					System.out.println("SqlConnection: Merging [" + uri + "].");
221 				}
222 				mergeStatement.executeUpdate();
223 				// connection.commit();
224 			} finally {
225 				if (mergeStatement != null) {
226 					mergeStatement.close();
227 				}
228 			}
229 
230 		}
231 
232 		private void performInsertOrUpdate(final String uri,
233 				final Object data) throws SQLException {
234 			final ByteArrayOutputStream os = new ByteArrayOutputStream();
235 			deps.getSerializer().serialize(data, SerializationJre.createStreamDestination(os));
236 			final byte[] bytes = os.toByteArray();
237 
238 			PreparedStatement insertStatement = null;
239 			try {
240 				insertStatement = connection.prepareStatement(conf.sql()
241 						.getInsertOrUpdateTemplate());
242 				insertStatement.setQueryTimeout(10);
243 				insertStatement
244 						.setString(1, uri);
245 
246 				// TODO this seems somehow non-optimal, probably the
247 				// byte data is sent to the database twice ...
248 				insertStatement.setBinaryStream(2, new ByteArrayInputStream(
249 						bytes));
250 				insertStatement.setBinaryStream(3, new ByteArrayInputStream(
251 						bytes));
252 				insertStatement.executeUpdate();
253 				if (ENABLE_DEBUG) {
254 					System.out.println("SqlConnection: Inserting [" + uri
255 							+ "].");
256 				}
257 
258 				// connection.commit();
259 			} finally {
260 				if (insertStatement != null) {
261 					insertStatement.close();
262 				}
263 			}
264 		}
265 
266 		private void performDelete(final String uri) throws SQLException {
267 			PreparedStatement deleteStatement = null;
268 
269 			try {
270 				deleteStatement = connection.prepareStatement(conf.sql()
271 						.getDeleteTemplate());
272 				deleteStatement.setQueryTimeout(10);
273 
274 				deleteStatement
275 						.setString(1, uri);
276 				deleteStatement.executeUpdate();
277 				if (ENABLE_DEBUG) {
278 					System.out
279 							.println("SqlConnection: Deleting [" + uri + "].");
280 				}
281 
282 				// connection.commit();
283 			} finally {
284 				if (deleteStatement != null) {
285 					deleteStatement.close();
286 				}
287 			}
288 		}
289 
290 		public WriteWorker(final Executor executor, final Queue<String> queue) {
291 			super(executor, queue, OneUtilsJre.newJreConcurrency());
292 		}
293 
294 	}
295 
296 	private final void scheduleWrite(final String uri) {
297 
298 		writeWorker.offer(uri);
299 		writeWorker.startIfRequired();
300 
301 	}
302 
303 	@Override
304 	public void putSync(final String uri, final V node) {
305 
306 		synchronized (pendingInserts) {
307 
308 			// System.out.println("Currently pending:  " +
309 			// pendingInserts.size());
310 			pendingInserts.put(uri, node);
311 
312 		}
313 		scheduleWrite(uri);
314 	}
315 
316 	public static class SqlGetResources {
317 		ResultSet resultSet;
318 		PreparedStatement getStatement;
319 
320 	}
321 
322 	@SuppressWarnings("unchecked")
323 	@Override
324 	public V getSync(String key) {
325 		
326 		return (V) getNode(key);
327 	}
328 	
329 	public Object getNode(final String uri) {
330 
331 		synchronized (pendingInserts) {
332 
333 			if (ENABLE_DEBUG) {
334 				System.out.println("SqlConnection: Retrieving [" + uri + "].");
335 			}
336 
337 			if (pendingInserts.containsKey(uri)) {
338 
339 				final Object node = pendingInserts.get(uri);
340 
341 				if (ENABLE_DEBUG) {
342 					System.out.println("SqlConnection: Was cached [" + uri
343 							+ "] Value [" + node + "].");
344 				}
345 
346 				if (node == DELETE_NODE) {
347 					return null;
348 				}
349 
350 				return node;
351 			}
352 
353 			assert !pendingInserts.containsKey(uri);
354 
355 			pendingGets.add(uri);
356 
357 			try {
358 
359 				final Object performGet = performGet(uri);
360 
361 				assert pendingGets.contains(uri);
362 				pendingGets.remove(uri);
363 
364 				return performGet;
365 
366 			} catch (final Exception e) {
367 
368 				pendingGets.remove(uri);
369 				throw new IllegalStateException(
370 						"SQL connection cannot load node: " + uri, e);
371 			}
372 
373 		}
374 	}
375 
376 	private Object performGet(final String uri) throws SQLException,
377 			IOException {
378 		assertConnection();
379 
380 		SqlGetResources getResult = null;
381 
382 		try {
383 
384 			try {
385 				getResult = readFromSqlDatabase(uri);
386 			} catch (final Throwable t) {
387 				// try reconnecting once if any error occurs
388 				initConnection();
389 				try {
390 					getResult = readFromSqlDatabase(uri);
391 				} catch (final SQLException e) {
392 					throw new RuntimeException(e);
393 				}
394 			}
395 
396 			if (!getResult.resultSet.next()) {
397 
398 				if (ENABLE_DEBUG) {
399 					System.out.println("SqlConnection: Not found [" + uri
400 							+ "].");
401 				}
402 
403 				return null;
404 			}
405 
406 			final InputStream is = getResult.resultSet.getBinaryStream(2);
407 
408 			final byte[] data = OneUtilsJre.toByteArray(is);
409 			is.close();
410 			getResult.resultSet.close();
411 			assert data != null;
412 
413 			final Object node = deps.getSerializer().deserialize(SerializationJre.createStreamSource(new ByteArrayInputStream(data)));
414 			if (ENABLE_DEBUG) {
415 				System.out.println("SqlConnection: Retrieved [" + node + "].");
416 			}
417 			return node;
418 
419 		} finally {
420 			if (getResult != null) {
421 				getResult.getStatement.close();
422 			}
423 		}
424 	}
425 
426 	private final SqlGetResources readFromSqlDatabase(final String uri)
427 			throws SQLException {
428 		
429 		PreparedStatement getStatement = null;
430 
431 		getStatement = connection.prepareStatement(conf.sql().getGetTemplate());
432 
433 		getStatement.setQueryTimeout(10);
434 
435 		getStatement.setString(1, uri);
436 
437 		final ResultSet resultSet = getStatement.executeQuery();
438 
439 		connection.commit();
440 
441 		final SqlGetResources res = new SqlGetResources();
442 		res.resultSet = resultSet;
443 		res.getStatement = getStatement;
444 
445 		return res;
446 
447 	}
448 
449 	
450 	
451 	@Override
452 	public void removeSync(String key) {
453 		deleteNode(key);
454 	}
455 
456 	public void deleteNode(final String uri) {
457 
458 		synchronized (pendingInserts) {
459 			// new Exception("Schedule delete " + uri).printStackTrace();
460 			pendingInserts.put(uri, DELETE_NODE);
461 		}
462 		scheduleWrite(uri);
463 
464 	}
465 
466 	public synchronized void waitForAllPendingRequests(
467 			final SimpleCallback callback) {
468 
469 		new Thread() {
470 
471 			@Override
472 			public void run() {
473 				if (ENABLE_DEBUG) {
474 					System.out
475 							.println("SqlConnection: Waiting for pending requests.\n"
476 									+ "  Write worker running: ["
477 									+ writeWorker.isRunning()
478 									+ "]\n"
479 									+ "  Pending inserts: ["
480 									+ pendingInserts.size()
481 									+ "]\n"
482 									+ "  Pending gets: ["
483 									+ pendingGets.size()
484 									+ "]");
485 				}
486 				while (writeWorker.isRunning() || pendingInserts.size() > 0
487 						|| pendingGets.size() > 0) {
488 					try {
489 						Thread.sleep(10);
490 					} catch (final Exception e) {
491 						callback.onFailure(e);
492 					}
493 					Thread.yield();
494 				}
495 
496 				if (ENABLE_DEBUG) {
497 					System.out
498 							.println("SqlConnection: Waiting for pending requests completed.");
499 				}
500 
501 				callback.onSuccess();
502 			}
503 
504 		}.start();
505 
506 	}
507 
508 	
509 	
510 	
511 	
512 	@Override
513 	public void put(String key, V value, SimpleCallback callback) {
514 		putSync(key, value);
515 		
516 		this.commit(callback);
517 	}
518 
519 	@Override
520 	public void get(final String key, final ValueCallback<V> callback) {
521 		final V value = getSync(key);
522 		callback.onSuccess(value);
523 		
524 	}
525 
526 	@Override
527 	public void remove(String key, SimpleCallback callback) {
528 		removeSync(key);
529 		
530 		this.commit(callback);
531 		
532 	}
533 
534 	@Override
535 	public void performOperation(MapOperation operation) {
536 		// nothing
537 	}
538 
539 	@Override
540 	public void stop(final SimpleCallback callback) {
541 		this.commit(new SimpleCallback() {
542 			
543 			@Override
544 			public void onFailure(Throwable t) {
545 				callback.onFailure(t);
546 			}
547 			
548 			@Override
549 			public void onSuccess() {
550 				writeWorker.getThread().getExecutor()
551 				.shutdown(new WhenExecutorShutDown() {
552 
553 					@Override
554 					public void thenDo() {
555 						try {
556 
557 							if (connection != null
558 									&& !connection.isClosed()) {
559 								try {
560 									connection.commit();
561 									connection.close();
562 								} catch (final Throwable t) {
563 									callback
564 											.onFailure(new Exception(
565 													"Sql exception could not be closed.",
566 													t));
567 									return;
568 								}
569 							}
570 
571 						} catch (final Throwable t) {
572 							callback.onFailure(t);
573 							return;
574 						}
575 
576 						callback.onSuccess();
577 					}
578 
579 					@Override
580 					public void onFailure(final Throwable t) {
581 						callback.onFailure(t);
582 					}
583 				});
584 				
585 			}
586 		});
587 	}
588 
589 	
590 	
591 
592 	@Override
593 	public void commit(final SimpleCallback callback) {
594 		commitThread.submit(new Runnable() {
595 
596 			@Override
597 			public void run() {
598 				// writeWorker.startIfRequired();
599 				waitForAllPendingRequests(new SimpleCallback() {
600 
601 					@Override
602 					public void onSuccess() {
603 						callback.onSuccess();
604 					}
605 
606 					@Override
607 					public void onFailure(final Throwable message) {
608 						callback.onFailure(message);
609 					}
610 				});
611 			}
612 
613 		});
614 	}
615 
616 	
617 
618 	protected void assertConnection() {
619 		try {
620 			if (connection == null || connection.isClosed()) {
621 				initConnection();
622 			}
623 		} catch (final SQLException e) {
624 			throw new RuntimeException(e);
625 		}
626 	}
627 
628 	protected void initConnection() {
629 		connection = SqlConnectionFactory.createConnection(conf.sql());
630 	}
631 
632 	
633 
634 	@Override
635 	public void start(SimpleCallback callback) {
636 		try  {
637 			assertConnection();
638 			
639 		} catch (Throwable t) {
640 			callback.onFailure(t);
641 			return;
642 		}
643 		callback.onSuccess();
644 	}
645 
646 	public SqlAsyncMapImplementation(final SqlAsyncMapConfiguration conf, SqlAsyncMapDependencies deps) {
647 		super();
648 
649 		this.conf = conf;
650 		this.deps = deps;
651 
652 		this.pendingInserts = Collections
653 				.synchronizedMap(new HashMap<String, Object>(100));
654 		this.pendingGets = Collections.synchronizedSet(new HashSet<String>());
655 
656 		this.writeWorker = new WriteWorker(OneUtilsJre.newJreConcurrency()
657 				.newExecutor().newSingleThreadExecutor(this),
658 				new ConcurrentLinkedQueue<String>());
659 
660 		this.commitThread = Executors.newFixedThreadPool(1);
661 
662 	}
663 
664 }