ITPub博客

首页 > 应用开发 > IT综合 > 从一个ConnectionPool的实现看design pattern的运用 (七) (转)

从一个ConnectionPool的实现看design pattern的运用 (七) (转)

原创 IT综合 作者:worldblog 时间:2007-12-12 14:27:15 0 删除 编辑
从一个ConnectionPool的实现看design pattern的运用 (七) (转)[@more@]

从一个ConnectionPool的实现看design pattern的运用 (七)


这里是bonmot对这个Connection pool的一个意见:

pooled Connection可能由于一个client忘记关闭,而导致整个pool阻塞。所以,应该对pooled Connection进行监控,对于超时的或其他invaild状态的pooled connection强制回收。

下面,让我们来分析这个需求。

首先,这样一个监视进程的逻辑可能是什么样的呢?
如果我们对超时的定义是:该连接从被分配出去到现在已经太久了。那么,我们需要在该连接对象上记录它被分配出去的时间。然后,在一个后台运行的daemon线程中定期检查这些正在使用的Connection.
而如果我们的超时又包括了对该connection使用的频繁程度,比如说:该连接已经有两个小时没人动过了,(这里,“动过”又需要定义。是只要某个成员函数调用了就算被“动过”了吗?还是所有从该连接生成的对象,如Statement, ResultSet等等都算?)那我们就要重载我们该兴趣的方法,纪录该方法被调用的时间。

其次,一般来说,监视已分配连接和管理空闲连接之间到底有多大耦合呢?能否对它们解耦呢?经过分析,我感觉,答案是:不能。监视已分配连接的算法理论上有可能需要知道空闲连接的一些信息,而反之也是一样。而且,更讨厌的是,它们之间所需要的信息量无法估计,也就是说,对一些特定的算法,它们可能是完全的紧耦合。如果按这样分析,这种ConnectionPool可能还得要求实现者直接实现ConnectionPool, 就象我们第三章里使用的方法,只能偶尔使用一些utility类,象PooledConnection之类。
不过,虽然我们不能完全把监视算法和分配算法分开。但事实上很多监视算法,分配算法确实是互不相关的。比如象我们刚才分析的需求。所以我们也许可以写一个框架,简化对这些互不相关的算法的实现。虽然对完全紧耦合的情况我们无能为力,但对多数普通的情况,我们还是可以有些作为的。而且,这样一个框架并不影响对复杂的紧耦合情况的特殊实现。


好吧,现在就让我们着手构建这个框架。我们的目标是:定义一个Monitor的接口,负责监视所有分配出去的连接。然后,把一个Monitor对象,一个ConnectionPooling对象组合成一个ConnectionPool.

算法决定数据结构,首先是需要纪录的时间信息:
public interface Momento{
  Java.util.Date getTimestamp();
}
其次,我们的监视类需要知道怎样强行回收一个Connection:
public interface ResourceProxy{
  Momento getBirthMomento();
  void release();
  boolean isReleased();
}
注意,这里,我们的ResourceProxy并不与Connection直接相关。这样,任何的资源,只要实现了这个接口,都可以被我们的监视类所监视。

然后是监视类的接口:
public interface ResourceProxyMonitor{
  public void addResourceProxy(ResourceProxy proxy);
}
这个接口在connection被返回出ConnectionPool之前被调用,把分配的Connection注册给监视类。

下面是监视类的实现:
public class SimpleResourceProxyMonitor implements ResourceProxyMonitor{
 public synchronized void addResourceProxy(ResourceProxy proxy){
 list.add(proxy);
 }
 private final java.util.List list = new java.util.LinkedList();
 private final HeartBeatEngine hb= HeartBeatEngine.instance();
 private final void releaseProxy(ResourceProxy proxy){proxy.release();}
 public final Runnable getMonitor(final long interval, final long ttl){
 return hb.getThreadProc(interval, new HeartBeat(){
 public boolean beat(){
 final java.util.Date now = new java.util.Date();
 synchronized(SimpleResourceProxyMonitor.this){
 for(java.util.Iterator it
 =list.iterator();it.hasNext();){
 final ResourceProxy proxy =
 (ResourceProxy)it.next();
 final java.util.Date then =
 proxy.getMomento().getTimestamp();
 if(now.getTime()-then.getTime()>=ttl){
 releaseProxy(proxy);
 }
 if(proxy.isReleased()){
 it.remove();
 }
 }
 }
 return true;
 } 
 });
 } 
 public final synchronized void clear(){
 turnoffMonitors();
 for(java.util.Iterator it=list.iterator();it.hasNext();){
  final ResourceProxy proxy = (ResourceProxy)it.next();
 releaseProxy(proxy); 
 } 
 list.clear();
 }
 public final synchronized void empty(){
 turnoffMonitors();
 list.clear();
 }
 public final void turnonMonitors(){
 hb.turnon();
 }
 public final void turnoffMonitors(){
 hb.turnoff();
 }
 private SimpleResourceProxyMonitor(){}
 public static SimpleResourceProxyMonitor instance(){
 return new SimpleResourceProxyMonitor();
 }
}

以及两个辅助接口和类:HeartBeat和HeartBeatEngine, 负责daemon线程的睡与醒。
public interface HeartBeat{
 public boolean beat();
 file://return true if continue, false otherwise;
}

public class HeartBeatEngine{
 public Runnable getThreadProc(final long interval, final HeartBeat r){
 return new Runnable(){
 public void run(){
 for(;isOn();){
 try{
 Thread.sleep(interval);
 }catch(InterruptedException e){}
 synchronized(HeartBeatEngine.this){
 if(!isOn())return; 
 }
 if(!r.beat())return;
 }
 }
 };
 }
 private boolean down = false; 
 private HeartBeatEngine(boolean d){
 this.down = d;
 }
 public final synchronized void turnon(){
 down = false;
 }
 public final synchronized void turnoff(){
 down = true;
 }
 private final boolean isOn(){
 return !down;
 }
 public static HeartBeatEngine instance(){
 return new HeartBeatEngine(false);
 } 
}

这里,getMonitor()仅仅返回一个Runnable, 而不是直接启动Thread。这样做更加灵活。使用这个monitor类的客户可以自由地使用这个Runnable, 比如说,使用一个线程池。

 

然后,我们需要一个proxy类来记录连接被分配的时间:

public class PooledConnectionProxy implements ResourceProxy{
 public final Momento getMomento(){return momento;}
 public void release(){
 try{
 conn.close();
 }catch(sqlException e){
 System.err.println(e.getMessage());
 }
 } 
 file://the conn.close() method has to be synchronized
 public boolean isReleased(){
 try{
 return conn.isClosed();
 }catch(SQLException e){return false;} 
 }
 private final Momento momento;
 private final Connection conn;
 private PooledConnectionProxy(Momento momento, Connection conn){
 this.momento = momento;
 this.conn = conn;
 }
 public static ResourceProxy instance(Momento momento, Connection conn){
 return new PooledConnectionProxy(momento, conn);
 }
}

好了,现在我们可以把它们组装在一起,做出一个ConnectionPool来。
还记得我们的ConnectionPooling2Pool吗?它负责封装ConnectionPooling并对每一个连接进行封装。当时我们把封装逻辑写入了ConnectionPooling2Pool, 因为封装逻辑只有一种。
但现在,我们有了另一种封装逻辑。所以, refactor.
了解我的人应该知道,我是不会用template method pattern, 继承ConnectionPooling2Pool然后重载wrapup函数的。用组合!

ConnectionPooler是一个代表封装Connection的接口:
public interface ConnectionPooler{
 public Connection pool(Connection conn, ConnectionHome home)
 throws SQLException;
}
ConnectionPooling2Pool将使用ConnectionPooler进行封装。
public class ConnectionPooling2Pool implements ConnectionPool{
  public final Connection getConnection()
  throws test.res.ResourceNotAvailableException, SQLException{
   return wrapup(pooling.getConnection());
  }
  public final Connection getConnection(long timeout)
  throws test.res.ResourceTimeOutException, SQLException{
   return wrapup(pooling.getConnection(timeout));
  }
  private final Connection wrapup(Connection conn)
  throws SQLException{
   return pl.pool(conn, pooling);
  } 
  public final void clear(){
   pooling.clear();
  }
  private final ConnectionPooling pooling;
  private final ConnectionPooler pl;
  private ConnectionPooling2Pool(ConnectionPooling pooling, ConnectionPooler pl){
   this.pooling = pooling;
   this.pl = pl;
  }
  public static ConnectionPool bridge(ConnectionPooling pooling, ConnectionPooler pl){
   return new ConnectionPooling2Pool(pooling, pl);
  }
}

原来的封装逻辑被实现为:
public class SimpleConnectionPooler implements ConnectionPooler{
 public Connection pool(Connection conn, ConnectionHome home)
 throws SQLException{
 return PooledConnection.decorate(conn, home);
 }
 private SimpleConnectionPooler(){}
 private static final ConnectionPooler singleton = new SimpleConnectionPooler();
 public static ConnectionPooler instance(){return singleton;}
}

我们新的封装逻辑为:
public class MonitoredConnectionPooler implements ConnectionPooler{
 public Connection pool(Connection conn, ConnectionHome home)
 throws SQLException{
   final Connection pooled = PooledConnection.decorate(conn, home);
   monitor.addResourceProxy(
   PooledConnectionProxy.instance(factory.newMomento(), pooled)
   );
   return pooled;
   }
 private final MomentoFactory factory;
 private final ResourceProxyMonitor monitor;
 private MonitoredConnectionPooler(ResourceProxyMonitor mon,
 MomentoFactory factory){
 this.monitor = mon;
 this.factory = factory;
 }
 public static ConnectionPooler instance(ResourceProxyMonitor mon,
 MomentoFactory factory){
 return new MonitoredConnectionPooler(mon, factory);
 }
}
最终的组合代码为:
public class TestConnectionPool{
 public static void test(String driver, String url, String user, String pwd)
 throws java.sql.SQLException, test.res.ResourceNotAvailableException, test.res.ResourceTimeOutException, ClassNotFoundException{
 final ConnectionPool pool = ConnectionPooling2Pool.bridge(
 ConnectionPoolingImpl.instance(
 ConnectionFactoryImpl.instance(
 driver, url, user, pwd),
 1000),
 SimpleConnectionPooler.instance()
 );
 final SimpleResourceProxyMonitor mon =
 SimpleResourceProxyMonitor .instance();

 final ConnectionPool pool2 = ConnectionPooling2Pool.bridge(
 ConnectionPoolingImpl.instance(
 ConnectionFactoryImpl.instance(
 driver, url, user, pwd),
 1000),
 MonitoredConnectionPooler.instance(
 mon, SimpleMomentoFactory.instance())
 );
 final Runnable monproc = mon.getMonitor(1000L, 1000000L);
 new Thread(monproc).start();
 }
}

 

对connection的使用频繁程度的监视,因为算法所要求的数据结构会有所不同,所以会需要自己的一套ResourceProxy, ResourceProxyMonitor接口以及对Connection甚至其它Connection生成对象的进行同步处理和记录存取时间的封装。但实现的机理是相似的。


来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/10752043/viewspace-992147/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论
  • 博文量
    6241
  • 访问量
    2411060