package org.jgroups.protocols;
import org.jgroups.*;
import org.jgroups.annotations.Experimental;
import org.jgroups.annotations.Property;
import org.jgroups.util.Promise;
import org.jgroups.util.UUID;
import org.jgroups.util.Util;
import java.io.*;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Simple discovery protocol which uses a file on shared storage such as an SMB share, NFS mount or S3. The local
* address information, e.g. UUID and physical addresses mappings are written to the file and the content is read and
* added to our transport's UUID-PhysicalAddress cache.
* The design is at doc/design/FILE_PING.txt
* @author Bela Ban
*/
@Experimental
public class FILE_PING extends Discovery {
protected static final String SUFFIX=".node";
/* ----------------------------------------- Properties -------------------------------------------------- */
@Property(description="The absolute path of the shared file")
protected String location=File.separator + "tmp" + File.separator + "jgroups";
@Property(description="Interval (in milliseconds) at which the own Address is written. 0 disables it.")
protected long interval=60000;
/* --------------------------------------------- Fields ------------------------------------------------------ */
protected File root_dir=null;
protected FilenameFilter filter;
private Future> writer_future;
public void init() throws Exception {
super.init();
createRootDir();
}
public void start() throws Exception {
super.start();
if(interval > 0)
writer_future=timer.scheduleWithFixedDelay(new WriterTask(), interval, interval, TimeUnit.MILLISECONDS);
}
public void stop() {
if(writer_future != null) {
writer_future.cancel(false);
writer_future=null;
}
super.stop();
}
public boolean isDynamic() {
return true;
}
public void sendGetMembersRequest(String cluster_name, Promise promise, ViewId view_id) throws Exception{
List existing_mbrs=readAll(cluster_name);
PhysicalAddress physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
List physical_addrs=Arrays.asList(physical_addr);
PingData data=new PingData(local_addr, null, false, UUID.get(local_addr), physical_addrs);
// If we don't find any files, return immediately
if(existing_mbrs.isEmpty()) {
if(promise != null) {
promise.setResult(null);
}
}
else {
// 1. Send GET_MBRS_REQ message to members listed in the file
for(PingData tmp: existing_mbrs) {
Collection dests=tmp != null? tmp.getPhysicalAddrs() : null;
if(dests == null)
continue;
for(final PhysicalAddress dest: dests) {
if(dest == null || dest.equals(physical_addr))
continue;
PingHeader hdr=new PingHeader(PingHeader.GET_MBRS_REQ, data, cluster_name);
hdr.view_id=view_id;
final Message msg=new Message(dest);
msg.setFlag(Message.OOB);
msg.putHeader(this.id, hdr); // needs to be getName(), so we might get "MPING" !
// down_prot.down(new Event(Event.MSG, msg));
if(log.isTraceEnabled())
log.trace("[FIND_INITIAL_MBRS] sending PING request to " + msg.getDest());
timer.execute(new Runnable() {
public void run() {
try {
down_prot.down(new Event(Event.MSG, msg));
}
catch(Exception ex){
if(log.isErrorEnabled())
log.error("failed sending discovery request to " + dest, ex);
}
}
});
}
}
}
// Write my own data to file
writeToFile(data, cluster_name);
}
public Object down(Event evt) {
Object retval=super.down(evt);
if(evt.getType() == Event.VIEW_CHANGE)
handleView((View)evt.getArg());
return retval;
}
protected void createRootDir() {
root_dir=new File(location);
if(root_dir.exists()) {
if(!root_dir.isDirectory())
throw new IllegalArgumentException("location " + root_dir.getPath() + " is not a directory");
}
else {
root_dir.mkdirs();
}
if(!root_dir.exists())
throw new IllegalArgumentException("location " + root_dir.getPath() + " could not be accessed");
filter=new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.endsWith(SUFFIX);
}
};
}
// remove all files which are not from the current members
protected void handleView(View view) {
Collection mbrs=view.getMembers();
boolean is_coordinator=!mbrs.isEmpty() && mbrs.iterator().next().equals(local_addr);
if(is_coordinator) {
List data=readAll(group_addr);
for(PingData entry: data) {
Address addr=entry.getAddress();
if(addr != null && !mbrs.contains(addr)) {
remove(group_addr, addr);
}
}
}
}
protected void remove(String clustername, Address addr) {
if(clustername == null || addr == null)
return;
File dir=new File(root_dir, clustername);
if(!dir.exists())
return;
if(log.isDebugEnabled()) {
log.debug("remove : "+clustername);
}
String filename=addr instanceof UUID? ((UUID)addr).toStringLong() : addr.toString();
File file=new File(dir, filename + SUFFIX);
deleteFile(file);
}
/**
* Reads all information from the given directory under clustername
* @return
*/
protected synchronized List readAll(String clustername) {
List retval=new ArrayList();
File dir=new File(root_dir, clustername);
if(!dir.exists())
dir.mkdir();
log.info("reading all : "+clustername);
File[] files=dir.listFiles(filter);
if(files != null) {
for(File file: files) {
PingData data = null;
//implementing a simple spin lock doing a few attempts to read the file
//this is done since the file may be written in concurrency and may therefore not be readable
for(int i=0; i < 3; i++) {
data=null;
if(file.exists()) {
data=readFile(file);
}
if (data!=null){
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
}
}
}
if(data == null) {
log.warn("failed parsing content in " + file.getName() + ": removing it from " + clustername);
deleteFile(file);
}
else {
retval.add(data);
log.trace("new data/node " + clustername);
}
}
}
return retval;
}
protected synchronized PingData readFile(File file) {
PingData retval=null;
DataInputStream in=null;
try {
in=new DataInputStream(new FileInputStream(file));
PingData tmp=new PingData();
tmp.readFrom(in);
return tmp;
}
catch(Exception e) {
log.warn("Failed to read file : "+file.getAbsolutePath(), e);
}
finally {
Util.close(in);
}
return retval;
}
protected synchronized void writeToFile(PingData data, String clustername) {
File dir=new File(root_dir, clustername);
if(!dir.exists())
dir.mkdir();
if (data == null) {
return;
}
String filename=addressAsString(local_addr);
//first write all data to a temporary file
//this is because the writing can be very slow under some circumstances
File tmpFile = writeToTempFile(dir, data);
if (tmpFile != null) {
File destination = new File(dir, filename + SUFFIX);
try {
//do a file move, this is much faster and could be considered atomic on most operating systems
FileChannel src_ch = new FileInputStream(tmpFile).getChannel();
FileChannel dest_ch = new FileOutputStream(destination).getChannel();
src_ch.transferTo(0, src_ch.size(), dest_ch);
src_ch.close();
dest_ch.close();
//delete the tmp file
deleteFile(tmpFile);
if(log.isTraceEnabled()) {
log.trace("Moved: "+tmpFile.getName()+"->"+destination.getName());
}
} catch (IOException ioe) {
log.error("attempt to move failed at " + clustername + " : "+tmpFile.getName()+"->"+destination.getName(), ioe);
}
finally {
if (destination.exists()) {
destination.deleteOnExit();
}
if (tmpFile.exists()) {
tmpFile.deleteOnExit();
}
}
}
}
protected class WriterTask implements Runnable {
public void run() {
PhysicalAddress physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
List physical_addrs=Arrays.asList(physical_addr);
PingData data=new PingData(local_addr, null, false, UUID.get(local_addr), physical_addrs);
log.info("writer task");
writeToFile(data, group_addr);
log.trace("writer task finished");
}
}
protected static String addressAsString(Address address) {
if (address == null) {
return "";
} else if (address instanceof UUID) {
return ((UUID) address).toStringLong();
} else {
return address.toString();
}
}
/**
* Attempts to delete the provided file.
* Logging is performed on the result
* @param file
* @return
*/
private boolean deleteFile(File file) {
boolean result = true;
if(log.isTraceEnabled()) {
log.trace("Attempting to delete file : "+file.getAbsolutePath());
}
if(file != null) {
try {
result = file.delete();
}
catch(Exception e) {
log.error("Failed to delete file: "+file.getAbsolutePath(), e);
}
log.trace("Delete file result: "+file.getAbsolutePath() +" : "+result);
}
return result;
}
/**
* Writes the data to a temporary file.
* The file is stored in the same directory as the other cluster files but is given the .tmp suffix
* @param dir The cluster file dir
* @param data the data to write
* @return
*/
private File writeToTempFile(File dir, PingData data) {
DataOutputStream out=null;
String filename=addressAsString(local_addr);
File file=new File(dir, filename + ".tmp");
try {
out=new DataOutputStream(new FileOutputStream(file));
data.writeTo(out);
Util.close(out);
if(log.isTraceEnabled()) {
log.trace("Stored temporary file: "+file.getAbsolutePath());
}
}
catch(Exception e) {
Util.close(out);
log.error("Failed to write temporary file: "+file.getAbsolutePath());
deleteFile(file);
file = null;
}
return file;
}
}