1. 观察者模式简介
观察者模式描述了一对多的关系,让多个观察者监测到主题,当主题发生改变的时候,能够通知到观察者,让其能更新自己。这么说挺抽象的,我举一些实际应用的例子,如果你有一些开发经验的话,这些应用你肯定也用过。比如redis的发布-订阅功能、Java Swing编程里的源-监听器。
观察者模式中有两个概念比较重要,主题(Subject)又叫被观察者,观察者(Observer)。下图截选自百度百科
- Observer - 观察者,其中定义好update方法
- Subjedt - 抽象的主题,他是个抽象类,其中维护了Observers
- 新增Observer
- 删除Observer
- 通知Observers的update方法
- ConcreteSubject - 具体的主题
- ConcreateObserver - 具体的观察者
代码实现
1 2 3 4 5 6
| package observer;
public interface Observer {
void update(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package observer;
import java.util.Enumeration; import java.util.Vector;
public abstract class Subject {
private Vector observerVector = new Vector();
public void registerOberver(Observer observer) { observerVector.add(observer); }
public void detach(Observer observer) { observerVector.removeElement(observer); }
public void notifyObservers() {
Enumeration elements = observers(); while (elements.hasMoreElements()){ ((Observer) elements.nextElement()).update(); } }
public Enumeration observers() { return ((Vector) observerVector.clone()).elements(); }
}
|
1 2 3 4 5 6 7 8
| package observer;
public class ConcreteObserver implements Observer {
public void update() { System.out.println("I am notified."); } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| package observer;
public class ConcreteSubject extends Subject{ private String state;
public void change(String newState){ state = newState; this.notifyObservers(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| package observer;
public class Client {
public static void main(String[] args) {
ConcreteSubject subject = new ConcreteSubject(); Observer observer = new ConcreteObserver(); subject.registerOberver(observer);
subject.change("123"); subject.change("456");
} }
|
2. Java中的观察者模式
Java提供了实现观察者模式的方法,其中两个核心类,Observable和Observer
- Observer - 观察者,对应我们写的接口类Observer
- Observable - 主题,对应我们写的抽象类Subject
下面以问题:公鸡见到太阳升起,就打鸣为例,进行建模
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13
| package observer.java.example1;
import java.util.Observable; public class Sun extends Observable {
public void rise(){ System.out.println("太阳升起"); this.setChanged(); this.notifyObservers(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| package observer.java.example1;
import java.util.Observable; import java.util.Observer;
public class Cock implements Observer {
public void update(Observable o, Object arg) { System.out.println("公鸡打鸣"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package observer.java.example1;
public class Client {
public static void main(String[] args) { Sun sun = new Sun(); Cock cock = new Cock(); sun.addObserver(cock);
sun.rise(); } }
|
3. 实际应用
最近写多线程代码是遇到了这样的问题,线程在用JDBC链接HiveServer2时因server不稳定,常常出现Timeout问题,联系运营发现,服务会在某时间段内重启,可能会有服务波动。
针对此问题,简单的思路就是在指定时间段内,发现timeout就重启线程。这里有两个类,观察者Restartor,线程JdbcQuerier。
- Restartor - 观察者,主要负责观察线程运行状态,并完成重启线程操作
- JdbcQuerier - 被观察者,该类继承Runnable,在发生异常时,改变状态。
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package observer.java.example2;
import java.util.Observable; import java.util.Observer; import java.util.Random;
public class Restartor implements Observer {
public void update(Observable o, Object arg) { System.out.println("重启线程"); if(condition1()) { System.out.println("检测到当前时间为9-10点,重启线程"); new Thread(((JdbcQuerier) o)).start(); }else{ o = null; System.out.println("检测到当前时间不是9-10点,不重启线程"); } }
public boolean condition1(){ int i = new Random().nextInt(); if(i % 2 == 0){ return true; } return false; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package observer.java.example2;
import java.sql.SQLException; import java.util.Observable; import java.util.Random; import java.util.concurrent.TimeoutException;
public class JdbcQuerier extends Observable implements Runnable {
public void run() { System.out.println(Thread.currentThread().getName() + ":" + "线程运行"); try{ queryData(); }catch (TimeoutException e1){ System.out.println(Thread.currentThread().getName() + ":" + "检测到Timeout异常"); this.setChanged(); this.notifyObservers(); }catch (SQLException e2){ System.out.println(Thread.currentThread().getName() + ":" + "检测到SQLException异常"); }finally { System.out.println(Thread.currentThread().getName() + ":" + "线程终止"); } } private void queryData() throws SQLException, TimeoutException { try{ System.out.println(Thread.currentThread().getName() + ":" + "获取连接"); System.out.println(Thread.currentThread().getName() + ":" + "select * from tableName"); int i = new Random().nextInt(); if(i % 2 == 0) throw new TimeoutException(); else throw new SQLException(); }catch (SQLException e1){ throw e1; }catch (TimeoutException e2){ throw e2; }finally { System.out.println(Thread.currentThread().getName() + ":" + "关闭连接"); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package observer.java.example2;
public class Client {
public static void main(String[] args) throws InterruptedException { JdbcQuerier jdbcQuerier = new JdbcQuerier(); Restartor restartor = new Restartor(); jdbcQuerier.addObserver(restartor);
Thread t1 = new Thread(jdbcQuerier);
t1.start(); while(true){ Thread.sleep(1000L); }
} }
|
方便学习提供源码