Active是主动的意思,因此ActiveObject就是主动对象的意思。所谓主动一般指有自己特有的线程,举例来说,java.lang.Thread类的实例就是一种主动对象。

不过,在Active Object模式中出厂的主动对象可不仅仅有自己特有的线程,它同时还具备可以从外部接收和处理异步消息并根据需要返回处理结果的特征。

Active Object模式中的主动对象会通过自己特有的线程在合适的时机处理从外部接收到的异步消息。

在Active Object中,组成主动对象与许多自然人组成法人类似,即使是java语言这样没有异步消息的编程语言,也可以使用Active Object模式组成实际上能够处理异步消息的主动对象。

二、示例程序类和接口一览表 

类名 说明
Main.java 测试示例程序的类
MakerClientThread.java 发出“生成字符串”请求的线程
DisplayClientThread.java 发出“显示字符串”请求的线程
ActiveObject.java 定义“主动对象”的接口(API)的接口
ActiveObjectFactory.java 创建“主动对象”的类
Proxy.java 将方法调用转换为MethodRequest对象的类(实现了ActiveObject的接口)
SchedulerThread.java 调用execute方法处理 MethodRequest对象的类
ActivationQueue.java 按顺序保存MethodRequest对象的类
MethodRequest.java 表示请求的抽象类
MakeStringRequest.java makeString方法(生成字符串)对应的类,MethodRequest类的子类
DisplayStringRequest.java displayString方法(显示字符串)对应的类,MethodRequest类的子类
Result.java 表示执行结果的抽象类
FutureResult.java 在Future模式中表示执行结果的类
RealResult.java 表示实际的执行结果的类
Servant.java 执行实际处理的类(实现了ActiveObject接口)

 

 

 

 

 

 

 

 

 

 

 

三、示例程序的类图

多线程Active Objects设计模式

 

 

四、示例程序时序图

多线程Active Objects设计模式

 

 

五、代码演示

ActiveObject接口

package com.dwz.concurrency2.chapter19;
/**
 *    接受异步消息的主动对象,类似 System.gc();
 */
public interface ActiveObject {
    Result makeString(int count, char fillChar);
    
    void displayString(String text);
}

Servant

package com.dwz.concurrency2.chapter19;

class Servant implements ActiveObject {

    @Override
    public void displayString(String text) {
        try {
            System.out.println("Display:" + text);
            Thread.sleep(10);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    @Override
    public Result makeString(int count, char fillChar) {
        char[] buf = new char[count];
        for(int i = 0; i < count; i++) {
            buf[i] = fillChar;
            try {
                Thread.sleep(10);
            } catch (Exception e) {
            }
        }
        return new RealResult(new String(buf));
    }

}

ActivationQueue

package com.dwz.concurrency2.chapter19;

import java.util.LinkedList;

public class ActivationQueue {
    private final static int MAX_METHOD_REQUEST_QUEUE_SIZE = 100;
    
    private final LinkedList<MethodRequest> methodQueue;

    public ActivationQueue() {
        this.methodQueue = new LinkedList<>();
    }
    
    public synchronized void put(MethodRequest request) {
        while (methodQueue.size() >= MAX_METHOD_REQUEST_QUEUE_SIZE) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        this.methodQueue.addLast(request);
        this.notifyAll();
    }
    
    public synchronized MethodRequest take() {
        while (methodQueue.isEmpty()) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        MethodRequest methodrequest = methodQueue.removeFirst();
        this.notifyAll();
        return methodrequest;
    }
}

MethodRequest

package com.dwz.concurrency2.chapter19;
/**
 *    对应ActiveObjects的每一个方法,将每个方法转换成一个对象
 */
public abstract class MethodRequest {
    protected final Servant servant;
    
    protected final FutureResult futureresult;
    
    public MethodRequest(Servant servant, FutureResult futureresult) {
        this.servant = servant;
        this.futureresult = futureresult;
    }

    public abstract void execute();
}

MakeStringRequest

package com.dwz.concurrency2.chapter19;
/**
 *    {@link ActiveObject#makeString(int, char)}
 */
public class MakeStringRequest extends MethodRequest {
    private final int count;
    private final char fillChar;
    
    public MakeStringRequest(Servant servant, FutureResult futureresult, int count, char fillChar) {
        super(servant, futureresult);
        this.count = count;
        this.fillChar = fillChar;
    }

    @Override
    public void execute() {
        Result result = servant.makeString(count, fillChar);
        futureresult.setResult(result);
    }
    
}

DisplayStringRequest

package com.dwz.concurrency2.chapter19;

public class DisplayStringRequest extends MethodRequest {
    private final String text;

    public DisplayStringRequest(Servant servant, final String text) {
        super(servant, null);
        this.text = text;
    }

    @Override
    public void execute() {
        this.servant.displayString(text);
    }

}

Result接口

package com.dwz.concurrency2.chapter19;

public interface Result {
    Object getResultValue();
}

RealResult

package com.dwz.concurrency2.chapter19;

public class RealResult implements Result {
    private final Object resultValue;
    
    public RealResult(Object resultValue) {
        this.resultValue = resultValue;
    }

    @Override
    public Object getResultValue() {
        return this.resultValue;
    }

}

FutureResult

package com.dwz.concurrency2.chapter19;

public class FutureResult implements Result {
    private Result result;
    private boolean ready = false;
    
    public synchronized void setResult(Result result) {
        this.result = result;
        this.ready = true;
        this.notifyAll();
    }
    
    @Override
    public synchronized Object getResultValue() {
        while (!ready) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return this.result.getResultValue();
    }
    
}

ActiveObjectProxy

package com.dwz.concurrency2.chapter19;

class ActiveObjectProxy implements ActiveObject {
    private final SchedulerThread schedulerThread;
    
    private final Servant servant;
    
    public ActiveObjectProxy(SchedulerThread schedulerThread, Servant servant) {
        this.schedulerThread = schedulerThread;
        this.servant = servant;
    }

    @Override
    public Result makeString(int count, char fillChar) {
        FutureResult future = new FutureResult();
        schedulerThread.invoke(new MakeStringRequest(servant, future, count, fillChar));
        return future;
    }

    @Override
    public void displayString(String text) {
        schedulerThread.invoke(new DisplayStringRequest(servant, text));
    }
    
}

ActiveObjectFactory

package com.dwz.concurrency2.chapter19;

public final class ActiveObjectFactory {

    private ActiveObjectFactory() {
        
    }
    
    public static ActiveObject createActiveObject() {
        Servant servant = new Servant();
        ActivationQueue queue = new ActivationQueue();
        SchedulerThread schedulerThread = new SchedulerThread(queue);
        ActiveObjectProxy proxy = new ActiveObjectProxy(schedulerThread, servant);
        schedulerThread.start();
        return proxy;
    }
}

SchedulerThread

package com.dwz.concurrency2.chapter19;

public class SchedulerThread extends Thread {
    private final ActivationQueue activationQueue;

    public SchedulerThread(ActivationQueue activationQueue) {
        this.activationQueue = activationQueue;
    }
    
    public void invoke(MethodRequest request) {
        this.activationQueue.put(request);
    }
    
    @Override
    public void run() {
        while (true) {
            this.activationQueue.take().execute();
        }
    }
}

MakerClientThread

package com.dwz.concurrency2.chapter19;

public class MakerClientThread extends Thread {
    private final ActiveObject activeObject;
    private final char fillChar;
    
    public MakerClientThread(ActiveObject activeObject, String name) {
        super(name);
        this.activeObject = activeObject;
        this.fillChar = name.charAt(0);
    }
    
    @Override
    public void run() {
        try {
            for (int i = 0; true; i++) {
                Result result = activeObject.makeString(i + 1, fillChar);
                Thread.sleep(20);
                String value = (String)result.getResultValue();
                System.out.println(Thread.currentThread().getName() + ": value=" + value);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
}

DisplayClientThread

package com.dwz.concurrency2.chapter19;

public class DisplayClientThread extends Thread {
    private final ActiveObject activeObject;

    public DisplayClientThread(String name, ActiveObject activeObject) {
        super(name);
        this.activeObject = activeObject;
    }
    
    @Override
    public void run() {
        try {
            for(int i = 0; true; i++) {
                String text = Thread.currentThread().getName() + "=>" + i;
                activeObject.displayString(text);
                Thread.sleep(200);
            }
        } catch (Exception e) {
        }
    }
}

main

package com.dwz.concurrency2.chapter19;

public class Test {
    public static void main(String[] args) {
        ActiveObject activeObject = ActiveObjectFactory.createActiveObject();
        
        new MakerClientThread(activeObject, "Alex").start();
        new MakerClientThread(activeObject, "Bobby").start();
        
        new DisplayClientThread("Chris", activeObject).start();
    }
}

参考文章:

https://blog.csdn.net/smartdt/article/details/79363022

https://blog.csdn.net/cuichaox/article/details/1414305