Initial experimental extension type registry support.

For extension objectType and verb identifiers, this implements an
incremental discovery/resolution mechanism that will attempt to
discover type value metadata via HTTP GET calls to the Type Value id.
This commit is contained in:
James M Snell 2014-04-24 13:59:02 -07:00
parent eb6a561e5c
commit 2a67769422
24 changed files with 1344 additions and 81 deletions

View File

@ -31,7 +31,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.google.common.base.Function;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.ibm.common.activitystreams.internal.Adapter; import com.ibm.common.activitystreams.internal.Adapter;
@ -154,11 +153,6 @@ public final class IO {
return this; return this;
} }
public Builder typeValueResolver(Function<TypeValue,TypeValue> resolver) {
inner.typeValueResolver(resolver);
return this;
}
/** /**
* Turn pretty print on or off * Turn pretty print on or off
* @param on boolean * @param on boolean

View File

@ -22,7 +22,6 @@
package com.ibm.common.activitystreams; package com.ibm.common.activitystreams;
import java.io.Serializable; import java.io.Serializable;
import com.ibm.common.activitystreams.util.AbstractWritable; import com.ibm.common.activitystreams.util.AbstractWritable;
/** /**

View File

@ -51,7 +51,6 @@ import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInterval; import org.joda.time.ReadableInterval;
import org.joda.time.ReadablePeriod; import org.joda.time.ReadablePeriod;
import com.google.common.base.Function;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
@ -71,7 +70,6 @@ import com.ibm.common.activitystreams.LinkValue;
import com.ibm.common.activitystreams.NLV; import com.ibm.common.activitystreams.NLV;
import com.ibm.common.activitystreams.TypeValue; import com.ibm.common.activitystreams.TypeValue;
import com.ibm.common.activitystreams.Writable; import com.ibm.common.activitystreams.Writable;
import com.ibm.common.activitystreams.util.TypeValueResolver;
/** /**
* @author james * @author james
@ -99,13 +97,6 @@ public final class GsonWrapper {
private Schema schema = null; // default private Schema schema = null; // default
private ImmutableList.Builder<AdapterEntry<?>> adapters = private ImmutableList.Builder<AdapterEntry<?>> adapters =
ImmutableList.builder(); ImmutableList.builder();
private Function<TypeValue,TypeValue> typeValueResolver =
TypeValueResolver.DEFAULT_INSTANCE;
public Builder typeValueResolver(Function<TypeValue,TypeValue> resolver) {
this.typeValueResolver = resolver;
return this;
}
/** /**
* Method charset. * Method charset.
@ -250,7 +241,7 @@ public final class GsonWrapper {
Schema schema, Schema schema,
ASObjectAdapter base) { ASObjectAdapter base) {
return new GsonBuilder() return new GsonBuilder()
.registerTypeHierarchyAdapter(TypeValue.class, new TypeValueAdapter(schema, builder.typeValueResolver)) .registerTypeHierarchyAdapter(TypeValue.class, new TypeValueAdapter(schema))
.registerTypeHierarchyAdapter(LinkValue.class, new LinkValueAdapter(schema)) .registerTypeHierarchyAdapter(LinkValue.class, new LinkValueAdapter(schema))
.registerTypeHierarchyAdapter(NLV.class, NLV) .registerTypeHierarchyAdapter(NLV.class, NLV)
.registerTypeHierarchyAdapter(Iterable.class, ITERABLE) .registerTypeHierarchyAdapter(Iterable.class, ITERABLE)

View File

@ -26,7 +26,6 @@ import static com.ibm.common.activitystreams.Makers.type;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import com.google.common.base.Function;
import com.google.gson.JsonDeserializationContext; import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
@ -45,17 +44,14 @@ final class TypeValueAdapter
extends Adapter<TypeValue> { extends Adapter<TypeValue> {
private final Schema schema; private final Schema schema;
private final Function<TypeValue,TypeValue> resolver;
/** /**
* Constructor for TypeValueAdapter. * Constructor for TypeValueAdapter.
* @param schema Schema * @param schema Schema
*/ */
public TypeValueAdapter( public TypeValueAdapter(
Schema schema, Schema schema) {
Function<TypeValue,TypeValue> resolver) {
this.schema = schema; this.schema = schema;
this.resolver = resolver;
} }
/** /**
@ -97,7 +93,7 @@ final class TypeValueAdapter
JsonPrimitive prim = JsonPrimitive prim =
el.getAsJsonPrimitive(); el.getAsJsonPrimitive();
checkArgument(prim.isString()); checkArgument(prim.isString());
return resolver.apply(type(prim.getAsString())); return type(prim.getAsString());
} else { } else {
JsonObject obj = el.getAsJsonObject(); JsonObject obj = el.getAsJsonObject();
if (obj.has("objectType")) { if (obj.has("objectType")) {
@ -107,17 +103,17 @@ final class TypeValueAdapter
TypeValue.class); TypeValue.class);
Model pMap = Model pMap =
schema.forObjectType(tv.id()); schema.forObjectType(tv.id());
return resolver.apply( return
context.<ASObject>deserialize( context.<ASObject>deserialize(
el, el,
pMap.type() != null ? pMap.type() != null ?
pMap.type() : pMap.type() :
ASObject.class)); ASObject.class);
} else { } else {
return resolver.apply( return
context.<ASObject>deserialize( context.<ASObject>deserialize(
el, el,
ASObject.class)); ASObject.class);
} }
} }
} }

View File

@ -1,52 +0,0 @@
/**
* Copyright 2013 OpenSocial Foundation
* Copyright 2013 International Business Machines Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Utility library for working with Activity Streams Actions
* Requires underscorejs.
*
* @author James M Snell (jasnell@us.ibm.com)
*/
package com.ibm.common.activitystreams.util;
import com.google.common.base.Function;
import com.ibm.common.activitystreams.TypeValue;
/**
* A TypeValue resolver is used to optionally replace TypeValue
* instances. Typically, this would be used to exchange simple
* string TypeValue's with their object equivalents (if one is
* available).
*
* The replacement can be performed during parsing by setting a
* TypeValueResolver on the IO.Builder. This should be done
* carefully, however, as the resolver could negatively impact
* parsing performance depending on how it is implemented.
*
* @author james
*/
public interface TypeValueResolver
extends Function<TypeValue,TypeValue> {
public static final TypeValueResolver DEFAULT_INSTANCE =
new DefaultTypeValueResolver();
public static final class DefaultTypeValueResolver
implements TypeValueResolver {
public TypeValue apply(TypeValue tv) {
return tv;
}
}
}

View File

@ -48,6 +48,7 @@
<module>geo</module> <module>geo</module>
<module>assembly</module> <module>assembly</module>
<module>legacy</module> <module>legacy</module>
<module>typext</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>

1
typext/EXPERIMENTAL Normal file
View File

@ -0,0 +1 @@
This module is currently experimental

153
typext/pom.xml Normal file
View File

@ -0,0 +1,153 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.ibm.common</groupId>
<artifactId>activitystreams</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>activitystreams-ext</artifactId>
<name>Activity Streams 2.0 - Type Extension Support</name>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<configuration>
<encoding>UTF-8</encoding>
<docencoding>UTF-8</docencoding>
<charset>UTF-8</charset>
<additionalparam>-XDignore.symbol.file</additionalparam>
<show>public</show>
<links>
<link>http://www.joda.org/joda-time/apidocs</link>
<link>http://docs.guava-libraries.googlecode.com/git-history/v16.0.1/javadoc/</link>
</links>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.1</version>
<configuration>
<archive>
<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>2.3.7</version>
<extensions>true</extensions>
<executions>
<execution>
<id>bundle-manifest</id>
<phase>process-classes</phase>
<goals>
<goal>manifest</goal>
</goals>
</execution>
</executions>
<configuration>
<instructions>
<Export-Package>
com.ibm.common.activitystreams.ext.*,
com.ibm.common.activitystreams.registry.*
</Export-Package>
<Import-Package>
com.ibm.common.activitystreams.*,
com.google.gson.*,
com.google.common.*,
org.joda.time.*,
org.apache.http.*,
</Import-Package>
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.2</version>
<configuration>
<descriptors>
<descriptor>assembly.xml</descriptor>
</descriptors>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<versionRange>[2.3.7,)</versionRange>
<goals>
<goal>manifest</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<dependency>
<groupId>com.ibm.common</groupId>
<artifactId>activitystreams-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient-cache</artifactId>
<version>4.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcomponents-client</artifactId>
<version>4.3.3</version>
<type>pom</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,74 @@
/**
* Copyright 2013 OpenSocial Foundation
* Copyright 2013 International Business Machines Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Utility library for working with Activity Streams Actions
* Requires underscorejs.
*
* @author James M Snell (jasnell@us.ibm.com)
*/
package com.ibm.common.activitystreams.ext;
import java.lang.reflect.Type;
import com.google.common.collect.ImmutableSet;
import com.ibm.common.activitystreams.ASObject.AbstractBuilder;
import com.ibm.common.activitystreams.internal.ASObjectAdapter;
import com.ibm.common.activitystreams.internal.Model;
import com.ibm.common.activitystreams.internal.Schema;
public class ExtAdapter
extends ASObjectAdapter {
protected ExtAdapter(Schema schema) {
super(schema);
}
private static final ImmutableSet<? extends Type> knownTypes =
ImmutableSet.of(Verb.class,ObjectType.class);
@Override
protected boolean knowsType(Type type) {
if (super.knowsType(type))
return true;
return knownTypes.contains(type);
}
@Override
protected AbstractBuilder<?, ?> builderFor(Type type) {
if (super.knowsType(type))
return super.builderFor(type);
if (type == Verb.class) {
return ExtMakers.verb();
} else if (type == ObjectType.class) {
return ExtMakers.objectType();
} else return null;
}
@Override
protected Model modelFor(Type type) {
if (super.knowsType(type))
return super.modelFor(type);
if (type == Verb.class) {
return schema().forObjectClassOrType(
Verb.Builder.class,
"verb");
} else if (type == ObjectType.class) {
return schema().forObjectClassOrType(
ObjectType.Builder.class,
"objectType");
} else return null;
}
}

View File

@ -0,0 +1,23 @@
package com.ibm.common.activitystreams.ext;
public final class ExtMakers {
private ExtMakers() {}
public static Verb.Builder verb() {
return new Verb.Builder();
}
public static Verb.Builder verb(String id) {
return verb().id(id);
}
public static ObjectType.Builder objectType() {
return new ObjectType.Builder();
}
public static ObjectType.Builder objectType(String id) {
return objectType().id(id);
}
}

View File

@ -0,0 +1,36 @@
package com.ibm.common.activitystreams.ext;
import com.ibm.common.activitystreams.IO;
import com.ibm.common.activitystreams.internal.Model;
import com.ibm.common.activitystreams.internal.Schema;
import com.ibm.common.activitystreams.internal.Schema.Builder;
import com.ibm.common.activitystreams.util.Module;
public class ExtModule implements Module {
public static Module instance = new ExtModule();
public static final Model verb =
Schema.object.template()
.type(Verb.class, Verb.Builder.class)
.get();
public static final Model objectType =
Schema.object.template()
.type(ObjectType.class, ObjectType.Builder.class)
.get();
public void apply(Builder builder) {
builder.map("verb", verb);
builder.map("objectType", objectType);
}
public void apply(
IO.Builder builder,
Schema schema) {
ExtAdapter base = new ExtAdapter(schema);
builder.adapter(Verb.class, base);
builder.adapter(ObjectType.class, base);
}
}

View File

@ -0,0 +1,24 @@
package com.ibm.common.activitystreams.ext;
import com.ibm.common.activitystreams.ASObject;
public final class ObjectType
extends ASObject {
public static final class Builder
extends ASObject.AbstractBuilder<ObjectType, Builder> {
Builder() {
objectType("objectType");
}
public ObjectType get() {
return new ObjectType(this);
}
}
private ObjectType(Builder builder) {
super(builder);
}
}

View File

@ -0,0 +1,23 @@
package com.ibm.common.activitystreams.ext;
import java.util.concurrent.Future;
import static com.ibm.common.activitystreams.Makers.type;
import com.ibm.common.activitystreams.TypeValue;
import com.ibm.common.activitystreams.registry.TypeValueRegistry;
public class Test {
public static void main(String... args) throws Exception {
TypeValueRegistry reg =
TypeValueRegistry
.makeDefaultSilent();
Future<TypeValue> tv =
reg.resolve(type("urn:example:foo"));
System.out.println(tv.get().valueType());
}
}

View File

@ -0,0 +1,25 @@
package com.ibm.common.activitystreams.ext;
import com.ibm.common.activitystreams.ASObject;
public final class Verb
extends ASObject {
public static final class Builder
extends ASObject.AbstractBuilder<Verb, Builder> {
Builder() {
objectType("verb");
}
public Verb get() {
return new Verb(this);
}
}
private Verb(Builder builder) {
super(builder);
}
}

View File

@ -0,0 +1,119 @@
package com.ibm.common.activitystreams.registry;
import static com.google.common.base.Throwables.propagate;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Supplier;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.ibm.common.activitystreams.Makers;
import com.ibm.common.activitystreams.TypeValue;
import com.ibm.common.activitystreams.ValueType;
public abstract class CachingResolutionStrategy
implements ResolutionStrategy {
@SuppressWarnings("unchecked")
public static abstract class AbstractBuilder
<C extends CachingResolutionStrategy, B extends AbstractBuilder<C,B>>
implements Supplier<C> {
private boolean silentfail = false;
private final CacheBuilder<Object,Object> cache =
CacheBuilder.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES)
.expireAfterWrite(10, TimeUnit.MINUTES)
.maximumSize(100)
.initialCapacity(50);
public B silentfail() {
this.silentfail = true;
return (B)this;
}
public B customizeCache(Receiver<CacheBuilder<Object,Object>> receiver) {
if (receiver != null)
receiver.receive(cache);
return (B)this;
}
}
private final LoadingCache<TypeValue,TypeValue> cache;
private final boolean silentfail;
protected LoadingCache<TypeValue,TypeValue> cache() {
return cache;
}
CachingResolutionStrategy(AbstractBuilder<?,?> builder) {
this.cache = initCache(builder);
this.silentfail = builder.silentfail;
}
protected boolean silentfail() {
return silentfail;
}
private LoadingCache<TypeValue,TypeValue> initCache(AbstractBuilder<?,?> builder) {
return builder.cache.build(loader());
}
public Callable<TypeValue> resolverFor(TypeValue tv) {
return new Resolver(tv);
}
protected abstract CacheLoader<TypeValue,TypeValue> loader();
public final class Resolver
implements Callable<TypeValue> {
private final TypeValue input;
Resolver(TypeValue input) {
this.input = input;
}
public TypeValue call() throws Exception {
try {
if (input == null) return null;
switch(input.valueType()) {
case OBJECT:
return input;
case SIMPLE:
return cache.get(input);
default:
throw new IllegalArgumentException();
}
} catch (Throwable t) {
if (silentfail())
return input;
else throw propagate(t);
}
}
}
public Receiver<TypeValue> preloader() {
return new CachePreloader(cache());
}
private static final class CachePreloader
implements Receiver<TypeValue> {
private final LoadingCache<TypeValue,TypeValue> cache;
CachePreloader(LoadingCache<TypeValue,TypeValue> cache) {
this.cache = cache;
}
public void receive(TypeValue t) {
if (t.valueType() == ValueType.OBJECT && t.id() != null)
cache.put(Makers.type(t.id()),t);
}
}
}

View File

@ -0,0 +1,111 @@
package com.ibm.common.activitystreams.registry;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.net.URL;
import java.util.Enumeration;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.ibm.common.activitystreams.ASObject;
import com.ibm.common.activitystreams.Collection;
import com.ibm.common.activitystreams.IO;
import com.ibm.common.activitystreams.TypeValue;
public final class ClasspathPreloader
implements PreloadStrategy {
public static final class Builder
implements Supplier<ClasspathPreloader> {
private ClassLoader loader =
Thread.currentThread().getContextClassLoader();
private boolean avoidDuplicates = false;
public Builder avoidDuplicates() {
this.avoidDuplicates = true;
return this;
}
public Builder classLoader(ClassLoader loader) {
this.loader = loader != null ?
loader : Thread.currentThread().getContextClassLoader();
return this;
}
public ClasspathPreloader get() {
return new ClasspathPreloader(this);
}
}
private final ClassLoader loader;
private final boolean avoidDuplicates;
private ClasspathPreloader(Builder builder) {
this.loader = builder.loader;
this.avoidDuplicates = builder.avoidDuplicates;
}
public void load(IO io, Receiver<TypeValue> receiver) {
final BloomFilter<CharSequence> filter =
avoidDuplicates ?
BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 1000) : null;
try {
for (InputStream in : streams.apply(loader.getResources("typeValues.bin"))) {
try {
ObjectInputStream oin = new ObjectInputStream(in);
Collection col = (Collection) oin.readObject();
load(col, receiver, filter);
} catch (Throwable t) {}
}
for (InputStream in : streams.apply(loader.getResources("typeValues.json"))) {
try {
load(io.readAsCollection(in), receiver, filter);
} catch (Throwable t) {}
}
} catch (Throwable t) {
throw Throwables.propagate(t);
}
}
private void load(
Collection col,
Receiver<TypeValue> receiver,
BloomFilter<CharSequence> filter) {
if (col != null && receiver != null)
for (ASObject obj : col.items())
if (obj.id() != null && (filter == null || filter.put(obj.id()))) {
try {
receiver.receive(obj);
} catch (Throwable t) {}
}
}
private static Function<Enumeration<URL>,Iterable<InputStream>> streams =
new Function<Enumeration<URL>,Iterable<InputStream>> () {
public Iterable<InputStream> apply(Enumeration<URL> input) {
ImmutableList.Builder<InputStream> list =
ImmutableList.builder();
while(input.hasMoreElements()) {
try {
list.add(input.nextElement().openStream());
} catch (Throwable t) {}
}
return list.build();
}
};
public static final PreloadStrategy instance =
new Builder().get();
}

View File

@ -0,0 +1,201 @@
package com.ibm.common.activitystreams.registry;
import static com.google.common.util.concurrent.MoreExecutors.platformThreadFactory;
import static com.google.common.base.Throwables.propagate;
import java.util.Set;
import java.util.concurrent.Callable;
import com.google.common.base.Objects;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableSet;
import com.ibm.common.activitystreams.ASObject;
import com.ibm.common.activitystreams.Collection;
import com.ibm.common.activitystreams.Makers;
import com.ibm.common.activitystreams.TypeValue;
public final class DefaultResolutionStrategy
extends CachingResolutionStrategy {
public static Builder make() {
return new Builder();
}
public static DefaultResolutionStrategy makeDefault() {
return make().get();
}
public static final class Builder
extends CachingResolutionStrategy.AbstractBuilder<DefaultResolutionStrategy, Builder> {
private boolean proactive = false;
private final HttpFetch.Builder fetcherBuilder =
new HttpFetch.Builder();
private final ImmutableSet.Builder<String> proactiveTypes =
ImmutableSet.<String>builder()
.add("verb")
.add("objectType");
public Builder customizeFetcher(Receiver<HttpFetch.Builder> receiver) {
if (receiver != null)
receiver.receive(fetcherBuilder);
return this;
}
/**
* Tells the loader to proactively cache additional typevalue
* identifiers that happen to be discovered when attempting to
* resolve a given typevalue.
* @return
*/
public Builder proactiveCaching() {
this.proactive = true;
return this;
}
/**
* Specifies additional objectType identifiers to watch for when
* proactiveCaching is enabled.
* @param typeValueId
* @return
*/
public Builder typeValueObjectType(String typeValueId) {
proactiveTypes.add(typeValueId);
return this;
}
public DefaultResolutionStrategy get() {
return new DefaultResolutionStrategy(this);
}
}
private final boolean proactiveCaching;
private final ImmutableSet<String> proactiveTypes;
private final HttpFetch fetcher;
private DefaultResolutionStrategy(Builder builder) {
super(builder);
this.proactiveCaching = builder.proactive;
this.proactiveTypes = builder.proactiveTypes.build();
this.fetcher = initFetcher(builder);
ensureAlwaysShutdown(this);
}
private HttpFetch initFetcher(Builder builder) {
return builder.fetcherBuilder.get();
}
@Override
protected CacheLoader<TypeValue, TypeValue> loader() {
return new DefaultCacheLoader();
}
private final class DefaultCacheLoader
extends CacheLoader<TypeValue,TypeValue> {
@Override
public TypeValue load(TypeValue key) throws Exception {
try {
if (key == null)
throw new IllegalArgumentException();
switch(key.valueType()) {
case OBJECT:
return key; // type is already resolved
case SIMPLE:
String id = key.id();
ASObject obj = fetcher.fetch(id); // attempt to fetch an object
ImmutableSet.Builder<TypeValue> additional =
ImmutableSet.builder();
if (obj instanceof Collection) {
Collection col = (Collection) obj;
ASObject matching =
processItems(
col.items(),
id,
proactiveCaching,
proactiveTypes,
additional);
if (matching != null)
return matching;
} else if (obj.has("items")) {
Iterable<ASObject> items =
obj.<Iterable<ASObject>>get("items");
ASObject matching =
processItems(
items,
id,
proactiveCaching,
proactiveTypes,
additional);
if (matching != null)
return matching;
} else if (Objects.equal(id, obj.id())) {
return obj;
}
default:
break;
}
} catch (Throwable t) {
if (silentfail())
return key;
else propagate(t);
}
throw new UncacheableResponseException();
}
}
private ASObject processItems(
Iterable<ASObject> items,
String lookingFor,
boolean proactive,
Set<String> proactiveTypes,
ImmutableSet.Builder<TypeValue> additional) {
ASObject matching = null;
for (final ASObject obj : items) {
if (Objects.equal(lookingFor, obj.id())) {
matching = obj;
if (!proactive) break;
} else if (proactive) {
TypeValue objectType = obj.objectType();
String id = obj.id();
if (objectType != null && id != null && !Objects.equal(lookingFor,id)) {
String otid = objectType.id();
if (proactiveTypes.contains(otid)) {
try {
cache().get(
Makers.type(id),
new Callable<TypeValue>() {
public TypeValue call() throws Exception {
return obj;
}
});
} catch (Throwable t) {}
}
}
}
}
return matching;
}
private static void ensureAlwaysShutdown(
final ResolutionStrategy strategy) {
Thread shutdownThread =
platformThreadFactory()
.newThread(new Runnable() {
public void run() {
try {
strategy.shutdown();
} catch (Throwable t) {}
}
});
Runtime.getRuntime()
.addShutdownHook(shutdownThread);
}
public void shutdown() {
try {
fetcher.shutdown();
} catch (Throwable t) {}
}
}

View File

@ -0,0 +1,239 @@
package com.ibm.common.activitystreams.registry;
import static com.google.common.base.Throwables.propagate;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.ibm.common.activitystreams.ASObject;
import com.ibm.common.activitystreams.IO;
import com.ibm.common.activitystreams.ext.ExtModule;
public final class HttpFetch
extends CacheLoader<String,ASObject> {
static final class Builder
implements Supplier<HttpFetch> {
private final RegistryBuilder<ConnectionSocketFactory> csfr =
RegistryBuilder.<ConnectionSocketFactory>create();
private ConnectionConfig defaultConnectionConfig;
private SocketConfig defaultSocketConfig;
private final ImmutableMap.Builder<HttpHost,ConnectionConfig> connectionConfigs =
ImmutableMap.builder();
private final ImmutableMap.Builder<HttpHost,SocketConfig> socketConfigs =
ImmutableMap.builder();
private int maxConnectionsPerRoute = 2;
private int maxConnections = 20;
private final ImmutableSet.Builder<Header> defaultHeaders =
ImmutableSet.builder();
private String userAgent = null;
private IO io = null;
private HttpClientBuilder builder =
HttpClients.custom();
private final CacheBuilder<Object,Object> cache =
CacheBuilder.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES)
.expireAfterWrite(10, TimeUnit.MINUTES)
.maximumSize(50)
.initialCapacity(50);
private HttpClientConnectionManager manager;
public Builder customizeCache(
Receiver<CacheBuilder<Object,Object>> receiver) {
if (receiver != null)
receiver.receive(cache);
return this;
}
public Builder customizeClientBuilder(
Receiver<HttpClientBuilder> receiver) {
receiver.receive(builder);
return this;
}
public Builder io(IO io) {
this.io = io;
return this;
}
public Builder useragent(String ua) {
this.userAgent = ua;
return this;
}
public Builder defaultHeader(String name, String value) {
defaultHeaders.add(new BasicHeader(name,value));
return this;
}
public Builder maxConnectionsPerRoute(int max) {
this.maxConnectionsPerRoute = max;
return this;
}
public Builder maxConnections(int max) {
this.maxConnections = max;
return this;
}
public Builder connectionConfig(ConnectionConfig config) {
defaultConnectionConfig = config;
return this;
}
public Builder connectionConfig(HttpHost host, ConnectionConfig config) {
connectionConfigs.put(host,config);
return this;
}
public Builder socketConfig(SocketConfig config) {
defaultSocketConfig = config;
return this;
}
public Builder socketConfig(HttpHost host, SocketConfig config) {
socketConfigs.put(host, config);
return this;
}
public Builder registerConnectionSocketFactory(
String id,
ConnectionSocketFactory csf) {
csfr.register(id, csf);
return this;
}
public Builder manager(HttpClientConnectionManager manager) {
this.manager = manager;
return this;
}
public HttpFetch get() {
return new HttpFetch(this);
}
}
private final LoadingCache<String,ASObject> cache;
private final HttpClientConnectionManager manager;
private final IO io;
private final CloseableHttpClient client;
HttpFetch(Builder builder) {
this.cache = initCache(builder);
this.manager = initManager(builder);
this.io = initIO(builder);
this.client = initClient(builder);
}
public void shutdown() {
try {
manager.shutdown();
} catch (Throwable t) {}
}
private IO initIO(Builder builder) {
if (builder.io != null)
return builder.io;
return IO.makeDefault(ExtModule.instance);
}
private CloseableHttpClient initClient(Builder builder) {
HttpClientBuilder b = builder.builder;
b.setConnectionManager(manager);
ImmutableSet<Header> headers =
builder.defaultHeaders.build();
if (!headers.isEmpty())
b.setDefaultHeaders(headers);
if (builder.userAgent != null)
b.setUserAgent(builder.userAgent);
return b.build();
}
private HttpClientConnectionManager initManager(Builder builder) {
if (builder.manager != null)
return builder.manager;
PoolingHttpClientConnectionManager pm =
new PoolingHttpClientConnectionManager(builder.csfr.build());
for (Map.Entry<HttpHost,ConnectionConfig> entry : builder.connectionConfigs.build().entrySet())
pm.setConnectionConfig(entry.getKey(), entry.getValue());
if (builder.defaultConnectionConfig != null)
pm.setDefaultConnectionConfig(builder.defaultConnectionConfig);
for (Map.Entry<HttpHost,SocketConfig> entry : builder.socketConfigs.build().entrySet())
pm.setSocketConfig(entry.getKey(), entry.getValue());
if (builder.defaultSocketConfig != null)
pm.setDefaultSocketConfig(builder.defaultSocketConfig);
pm.setDefaultMaxPerRoute(builder.maxConnectionsPerRoute);
pm.setMaxTotal(builder.maxConnections);
return pm;
}
private LoadingCache<String,ASObject> initCache(Builder builder) {
return builder.cache.build(this);
}
public ASObject fetch(String uri) {
try {
return cache.get(uri);
} catch (Throwable t) {
throw propagate(t);
}
}
@Override
public ASObject load(String key) throws Exception {
HttpContext context = new HttpClientContext();
HttpGet get = new HttpGet(key);
HttpResponse resp = client.execute(get, context);
StatusLine status = resp.getStatusLine();
int code = status.getStatusCode();
if (code >= 200 && code < 300) {
HttpEntity entity = resp.getEntity();
if (entity != null) {
// attempt parse
Optional<ASObject> parsed =
parse(entity.getContent());
if (parsed.isPresent())
return parsed.get();
}
}
throw new UncacheableResponseException();
}
private Optional<ASObject> parse(InputStream in) {
try {
return Optional.<ASObject>of(io.read(in));
} catch (Throwable t) {
return Optional.<ASObject>absent();
}
}
}

View File

@ -0,0 +1,25 @@
package com.ibm.common.activitystreams.registry;
import java.util.concurrent.Callable;
import com.ibm.common.activitystreams.TypeValue;
class NonOpResolutionStrategy
implements ResolutionStrategy {
public Callable<TypeValue> resolverFor(final TypeValue tv) {
return new Callable<TypeValue>() {
public TypeValue call() throws Exception {
return tv;
}
};
}
public Receiver<TypeValue> preloader() {
return new Receiver<TypeValue>() {
public void receive(TypeValue t) {}
};
}
public void shutdown() {}
}

View File

@ -0,0 +1,10 @@
package com.ibm.common.activitystreams.registry;
import com.ibm.common.activitystreams.IO;
import com.ibm.common.activitystreams.TypeValue;
public interface PreloadStrategy {
void load(IO io, Receiver<TypeValue> receiver);
}

View File

@ -0,0 +1,7 @@
package com.ibm.common.activitystreams.registry;
public interface Receiver<T> {
void receive(T t);
}

View File

@ -0,0 +1,24 @@
package com.ibm.common.activitystreams.registry;
import java.util.concurrent.Callable;
import com.ibm.common.activitystreams.TypeValue;
/**
* Implements a strategy for resolving TypeValue identifiers.
*/
public interface ResolutionStrategy {
Receiver<TypeValue> preloader();
/**
* Returns a Callable that implements TypeValue resolution
* for the given TypeValue.
*/
Callable<TypeValue> resolverFor(TypeValue tv);
void shutdown();
public static final ResolutionStrategy nonop =
new NonOpResolutionStrategy();
}

View File

@ -0,0 +1,234 @@
package com.ibm.common.activitystreams.registry;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.getExitingExecutorService;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.util.concurrent.Executors.newFixedThreadPool;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.Monitor;
import com.ibm.common.activitystreams.IO;
import com.ibm.common.activitystreams.TypeValue;
import com.ibm.common.activitystreams.ext.ExtModule;
/**
* Maintains a registry of resolved TypeValues. If a given TypeValue
* is not currently known, an attempt will be made to resolve the
* value based on the ResolutionStrategy.
*/
public final class TypeValueRegistry
implements Function<TypeValue,Future<TypeValue>> {
public static Builder make () {
return new Builder();
}
public static TypeValueRegistry makeDefault() {
return make().get();
}
public static TypeValueRegistry makeDefaultSilent() {
return make()
.resolver(DefaultResolutionStrategy.make().silentfail().get())
.get();
}
public static TypeValueRegistry makeDefault(final IO io) {
return make()
.io(io)
.resolver(
DefaultResolutionStrategy
.make()
.customizeFetcher(
new Receiver<HttpFetch.Builder>() {
public void receive(HttpFetch.Builder builder) {
builder.io(io);
}
})
.get())
.get();
}
public static TypeValueRegistry makeDefaultSilent(final IO io) {
return make()
.io(io)
.resolver(
DefaultResolutionStrategy.make()
.silentfail()
.customizeFetcher(
new Receiver<HttpFetch.Builder>() {
public void receive(HttpFetch.Builder builder) {
builder.io(io);
}})
.get())
.get();
}
public static enum Status {
LOADING,
READY,
ERROR
}
public static final class Builder
implements Supplier<TypeValueRegistry> {
private ExecutorService executor;
private PreloadStrategy preloader =
ClasspathPreloader.instance;
private ResolutionStrategy strategy =
DefaultResolutionStrategy.makeDefault();
private IO io;
public Builder io(IO io) {
this.io = io;
return this;
}
public Builder executor(ExecutorService executor) {
this.executor = executor;
return this;
}
public Builder preloader(PreloadStrategy strategy) {
this.preloader = strategy != null ?
strategy : ClasspathPreloader.instance;
return this;
}
public Builder resolver(ResolutionStrategy strategy) {
this.strategy = strategy != null ?
strategy : ResolutionStrategy.nonop;
return this;
}
public TypeValueRegistry get() {
return new TypeValueRegistry(this);
}
}
private final ResolutionStrategy strategy;
private final ListeningExecutorService executor;
private Status readyStatus = Status.LOADING;
private Throwable loadError = null;
private final ListenableFuture<?> loader;
private final IO io;
private final Monitor monitor = new Monitor();
private final Monitor.Guard ready =
new Monitor.Guard(monitor) {
@Override
public boolean isSatisfied() {
return readyStatus != Status.LOADING;
}
};
private TypeValueRegistry(Builder builder) {
this.strategy = builder.strategy;
this.io = initIO(builder);
this.executor = initExecutor(builder);
this.loader = preload(builder);
}
private IO initIO(Builder builder) {
if (builder.io != null)
return builder.io;
return IO.makeDefault(ExtModule.instance);
}
private ListenableFuture<?> preload(Builder builder) {
final PreloadStrategy strategy = builder.preloader;
final Receiver<TypeValue> receiver = this.strategy.preloader();
ListenableFuture<?> future =
executor.submit(new Runnable() {
public void run() {
strategy.load(io,receiver);
}
});
addCallback(
future,
new FutureCallback<Object>() {
public void onSuccess(Object result) {
readyStatus = Status.READY;
}
public void onFailure(Throwable t) {
readyStatus = Status.ERROR;
loadError = t;
}
});
return future;
}
public Status readyStatus() {
return readyStatus;
}
public Throwable loadError() {
return loadError;
}
public void waitForPreloader()
throws InterruptedException,
ExecutionException {
loader.get();
}
public void waitForPreloader(long duration, TimeUnit unit)
throws InterruptedException,
ExecutionException,
TimeoutException {
loader.get(duration,unit);
}
private ListeningExecutorService initExecutor(
Builder builder) {
if (builder.executor != null)
return listeningDecorator(builder.executor);
return listeningDecorator(
getExitingExecutorService(
(ThreadPoolExecutor)newFixedThreadPool(1)));
}
public Future<TypeValue> resolve(TypeValue tv) {
try {
monitor.enterWhen(ready);
return executor.submit(strategy.resolverFor(tv));
} catch (Throwable t) {
throw propagate(t);
} finally {
monitor.leave();
}
}
public Future<TypeValue> resolve(
TypeValue tv,
long timeout,
TimeUnit unit) {
try {
if (monitor.enterWhen(ready, timeout, unit)) {
return executor.submit(strategy.resolverFor(tv));
} else throw new IllegalStateException();
} catch (Throwable t) {
throw propagate(t);
} finally {
monitor.leave();
}
}
public Future<TypeValue> apply(TypeValue input) {
return resolve(input);
}
}

View File

@ -0,0 +1,5 @@
package com.ibm.common.activitystreams.registry;
public final class UncacheableResponseException extends RuntimeException {
private static final long serialVersionUID = -8868045836116771952L;
}